Skip to content

Merging Data Using Delta Merge

Getting Started

This step-by-step guide shows you how to merge data between tables using the WRITE_DELTA_MERGE action. We'll start with a basic merge where source and target have the same column names, then progress to handling tables with different column names.

What You'll Build

By the end of this tutorial, you'll have working pipelines that:

  1. Basic Merge: Merges data when source and target have the same column names
  2. Advanced Merge with Column Mapping: Merges data when source and target have different column names
  3. Updates existing records and inserts new ones based on key columns

What You'll Learn

  • How to perform basic delta merge operations (UPDATE and INSERT)
  • How to merge data when source and target have different column names
  • How to use column mapping to translate between naming conventions
  • How to work with the WRITE_DELTA_MERGE action

Prerequisites

Before starting, make sure you have:

  • Database Access: You can connect to Unity Catalog (Databricks' data storage system)
  • Permissions: You can create new tables and read/write data in your assigned workspace
  • Environment: Access to a Databricks notebook or similar Spark environment

Setup and Installation

Install Nessy and restart the Python kernel:

%pip install cloe_nessy
%restart_python

Part 1: Basic Merge (Same Column Names)

Let's start with a simple merge where source and target tables have identical column names. This is the most common use case.

Step 1: Create Target and Source Tables

First, create both target and source tables with the same structure:

import os
from pyspark.sql import Row

# Replace these with your own database object names
CATALOG = "nessy_dev_catalog"
SCHEMA = "demo"
BASIC_TARGET_TABLE = "creatures"
BASIC_SOURCE_TABLE = "creatures_updates"

BASIC_TARGET_NAME = f"{CATALOG}.{SCHEMA}.{BASIC_TARGET_TABLE}"
BASIC_SOURCE_NAME = f"{CATALOG}.{SCHEMA}.{BASIC_SOURCE_TABLE}"

# Create the schema if it doesn't exist
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{SCHEMA}")

# Store table names in environment variables
os.environ["BASIC_TARGET"] = BASIC_TARGET_NAME
os.environ["BASIC_SOURCE"] = BASIC_SOURCE_NAME

# Create target table with existing data
target_data = [
    Row(creature_id=1, name="Nessy", species="Lake Monster", status="Active", location="Loch Ness"),
    Row(creature_id=2, name="Highland Kelpie", species="Water Horse", status="Active", location="Highlands"),
]

# Create source table with updates and new records
source_data = [
    # Update existing record (creature_id=1)
    Row(creature_id=1, name="Nessy", species="Lake Monster", status="Legendary", location="Loch Ness"),
    # Insert new record (creature_id=3)
    Row(creature_id=3, name="Morag", species="Sea Serpent", status="Active", location="Loch Morar"),
]

# Create DataFrames
target_df = spark.createDataFrame(target_data)
source_df = spark.createDataFrame(source_data)

# Clean up and create tables
spark.sql(f"DROP TABLE IF EXISTS {BASIC_TARGET_NAME}")
spark.sql(f"DROP TABLE IF EXISTS {BASIC_SOURCE_NAME}")

target_df.write.saveAsTable(BASIC_TARGET_NAME)
source_df.write.saveAsTable(BASIC_SOURCE_NAME)

print(f"βœ“ Created target table with {target_df.count()} rows")
print("\nπŸ“Š Target table:")
spark.table(BASIC_TARGET_NAME).show()

print(f"\nβœ“ Created source table with {source_df.count()} rows")
print("πŸ“Š Source table:")
spark.table(BASIC_SOURCE_NAME).show()

Step 2: Create Basic Merge Pipeline

Now create a pipeline to merge the source data into the target:

basic_yaml_pipeline = """
name: basic_delta_merge
steps:
    read_source:
        action: READ_CATALOG_TABLE
        options:
            table_identifier: {{env:BASIC_SOURCE}}

    merge_data:
        action: WRITE_DELTA_MERGE
        options:
            table_identifier: {{env:BASIC_TARGET}}
            key_columns:
                - creature_id
            when_matched_update: true
            when_not_matched_insert: true
            use_partition_pruning: false
"""
name: basic_delta_merge
steps:
    read_source:
        action: READ_CATALOG_TABLE
        options:
            table_identifier: {{env:BASIC_SOURCE}}

    merge_data:
        action: WRITE_DELTA_MERGE
        options:
            table_identifier: {{env:BASIC_TARGET}}
            key_columns:
                - creature_id
            when_matched_update: true
            when_not_matched_insert: true
            use_partition_pruning: false

Understanding the Basic Merge

  • key_columns: [creature_id]: Matches records based on this column
  • when_matched_update: true: Updates record with creature_id=1 (status changes to "Legendary")
  • when_not_matched_insert: true: Inserts new record with creature_id=3
  • No column_mapping needed since source and target have identical column names

Step 3: Run the Basic Merge

Execute the pipeline and verify results:

from cloe_nessy.pipeline import PipelineParsingService

print("πŸš€ Running basic merge pipeline...")
basic_pipeline = PipelineParsingService.parse(yaml_str=basic_yaml_pipeline)
basic_pipeline.run()
print("βœ… Merge completed!")

# Check results
print("\nπŸ“‹ Results after merge:")
result_df = spark.table(BASIC_TARGET_NAME)
print(f"Total rows: {result_df.count()}")
result_df.orderBy("creature_id").show(truncate=False)

# Verify what happened
print("\nπŸ” What the merge did:")
print("   ✏️  Updated: creature_id=1 (status: Active β†’ Legendary)")
print("   βž• Inserted: creature_id=3 (Morag)")
print("   ⏸️  Unchanged: creature_id=2 (not in source)")

Expected Results: - 3 total rows - creature_id=1: status updated to "Legendary" - creature_id=2: unchanged - creature_id=3: newly inserted

Step 4: Clean Up Basic Example

print("🧹 Cleaning up basic example tables...")
spark.sql(f"DROP TABLE IF EXISTS {BASIC_TARGET_NAME}")
spark.sql(f"DROP TABLE IF EXISTS {BASIC_SOURCE_NAME}")
print("βœ… Clean up completed")

Part 2: Advanced Merge with Column Mapping

Now let's handle the more complex scenario where source and target have different column names. This is common when integrating legacy systems or standardizing naming conventions.

Step 1: Create Target Table with Standardized Column Names

First, let's create a target table with modern, standardized column names. This represents your production database with clean naming conventions:

import os
from pyspark.sql import Row

# Replace these with your own database object names
CATALOG = "nessy_dev_catalog"
SCHEMA = "demo"
TARGET_TABLE = "customers_standardized"
SOURCE_TABLE = "customers_legacy"

TARGET_TABLE_NAME = f"{CATALOG}.{SCHEMA}.{TARGET_TABLE}"
SOURCE_TABLE_NAME = f"{CATALOG}.{SCHEMA}.{SOURCE_TABLE}"

# Create the schema if it doesn't exist
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{SCHEMA}")

# Store table names in environment variables for the pipeline
os.environ["TABLE_TARGET"] = TARGET_TABLE_NAME
os.environ["TABLE_SOURCE"] = SOURCE_TABLE_NAME

# Create initial target data with standardized column names
# These represent existing customers in your modern system
target_data = [
    Row(
        customer_id=1,
        full_name="Nessy Monster",
        email_address="nessy@lochness.com",
        phone_number="+44-1234-567890",
        registration_date="2024-01-15"
    ),
    Row(
        customer_id=2,
        full_name="Highland Kelpie",
        email_address="kelpie@highlands.com",
        phone_number="+44-2345-678901",
        registration_date="2024-02-20"
    ),
]

# Create the target DataFrame
target_df = spark.createDataFrame(target_data)

# Clean up any existing tables
spark.sql(f"DROP TABLE IF EXISTS {TARGET_TABLE_NAME}")
spark.sql(f"DROP TABLE IF EXISTS {SOURCE_TABLE_NAME}")

# Save the target table
try:
    target_df.write.saveAsTable(TARGET_TABLE_NAME)
    print(f"βœ“ Created target table '{TARGET_TABLE_NAME}' with {target_df.count()} rows")
    print("\nπŸ“Š Target table structure (standardized column names):")
    spark.table(TARGET_TABLE_NAME).show()
except Exception as e:
    print(f"❌ Error creating target table: {e}")

Step 2: Create Source Data with Legacy Column Names

Now let's create source data that uses different column names. This simulates data coming from a legacy system or external source:

# Create source data with legacy column names
# Notice the different column names: cust_id, name, email, phone, reg_date
source_data = [
    # This record exists in target (customer_id=1) - will UPDATE
    Row(
        cust_id=1,
        name="Nessy Monster Updated",  # Updated name
        email="nessy@lochness.com",
        phone="+44-1234-999999",  # Updated phone
        reg_date="2024-01-15"
    ),
    # This is a new record (cust_id=3) - will INSERT
    Row(
        cust_id=3,
        name="Morag Serpent",
        email="morag@lochmorar.com",
        phone="+44-3456-789012",
        reg_date="2024-03-10"
    ),
    # This is also new (cust_id=4) - will INSERT
    Row(
        cust_id=4,
        name="Each-uisge Spirit",
        email="eachuisge@lochs.com",
        phone="+44-4567-890123",
        reg_date="2024-04-05"
    ),
]

# Create the source DataFrame
source_df = spark.createDataFrame(source_data)

# Save as a temporary table
try:
    source_df.write.saveAsTable(SOURCE_TABLE_NAME)
    print(f"βœ“ Created source table '{SOURCE_TABLE_NAME}' with {source_df.count()} rows")
    print("\nπŸ“Š Source table structure (legacy column names):")
    spark.table(SOURCE_TABLE_NAME).show()
except Exception as e:
    print(f"❌ Error creating source table: {e}")

Understanding the Column Name Differences

Notice how the column names differ between source and target:

Target (Standardized) Source (Legacy) Purpose
customer_id cust_id Unique identifier
full_name name Customer name
email_address email Email contact
phone_number phone Phone contact
registration_date reg_date Registration date

Without column mapping, the merge would fail because the names don't match!

Step 3: Define Your Pipeline with Column Mapping

Now let's create the pipeline that merges the data using column mapping to translate between the different naming conventions:

yaml_pipeline = """
name: merge_with_column_mapping
steps:
    read_source_data:
        action: READ_CATALOG_TABLE
        options:
            table_identifier: {{env:TABLE_SOURCE}}

    merge_to_target:
        action: WRITE_DELTA_MERGE
        options:
            table_identifier: {{env:TABLE_TARGET}}
            key_columns:
                - customer_id  # Use target column name
            column_mapping:
                # Map target column names to source column names
                # Format: target_column: source_column
                customer_id: cust_id
                full_name: name
                email_address: email
                phone_number: phone
                registration_date: reg_date
            when_matched_update: true
            when_not_matched_insert: true
            use_partition_pruning: false
"""
name: merge_with_column_mapping
steps:
    read_source_data:
        action: READ_CATALOG_TABLE
        options:
            table_identifier: {{env:TABLE_SOURCE}}

    merge_to_target:
        action: WRITE_DELTA_MERGE
        options:
            table_identifier: {{env:TABLE_TARGET}}
            key_columns:
                - customer_id  # Use target column name
            column_mapping:
                # Map target column names to source column names
                # Format: target_column: source_column
                customer_id: cust_id
                full_name: name
                email_address: email
                phone_number: phone
                registration_date: reg_date
            when_matched_update: true
            when_not_matched_insert: true
            use_partition_pruning: false

Understanding the Pipeline Steps

This pipeline has 2 steps:

  1. read_source_data: Reads data from the legacy source table with old column names
  2. merge_to_target: Merges data into the target table using column mapping

Key parameters explained:

  • key_columns: Uses customer_id (target name) to match records
  • column_mapping: Maps each target column name to its source equivalent
  • when_matched_update: true: Updates existing records (customer_id=1)
  • when_not_matched_insert: true: Inserts new records (customer_id=3,4)

Step 4: Run Your Pipeline

Now let's execute the pipeline and see the merge in action:

from cloe_nessy.pipeline import PipelineParsingService

# Convert the YAML text into a runnable pipeline
print("πŸ“ Setting up pipeline...")
pipeline = PipelineParsingService.parse(yaml_str=yaml_pipeline)

# Optional: Create a visual diagram of your pipeline steps
print("πŸ“Š Creating pipeline diagram...")
pipeline.plot_graph()

# Run the pipeline - this merges the data!
print("πŸš€ Running merge pipeline...")
pipeline.run()
print("βœ… Merge completed successfully!")

Step 5: Verify the Merge Results

Let's check the target table to see how the data was merged:

print("πŸ“‹ Checking merge results...")
result_df = spark.table(TARGET_TABLE_NAME)
row_count = result_df.count()

print(f"🎯 Target table now contains {row_count} rows")
print(f"   - Started with: 2 rows")
print(f"   - Updated: 1 row (customer_id=1)")
print(f"   - Inserted: 2 new rows (customer_id=3,4)")
print(f"   - Total: {row_count} rows")

print("\nπŸ“– Here's the merged data:")
result_df.orderBy("customer_id").show(truncate=False)

Expected Results:

You should see 4 rows in your target table:

  1. customer_id=1: Updated with new name and phone number from source
  2. customer_id=2: Unchanged (not in source data)
  3. customer_id=3: New record inserted from source
  4. customer_id=4: New record inserted from source

Notice that all data is stored using the standardized column names (customer_id, full_name, etc.) even though the source used different names!

Step 6: Understand What Happened

Let's examine the merge results in detail:

# Show the changes that were made
print("πŸ” Detailed analysis of the merge:")

# Check the updated record
updated_record = result_df.filter("customer_id = 1").first()
print(f"\n✏️ Updated record (customer_id=1):")
print(f"   Name: {updated_record.full_name}")
print(f"   Phone: {updated_record.phone_number}")

# Check the new records
new_records = result_df.filter("customer_id IN (3, 4)")
print(f"\nβž• New records inserted: {new_records.count()}")
for row in new_records.collect():
    print(f"   - customer_id={row.customer_id}: {row.full_name}")

# Check unchanged records
unchanged = result_df.filter("customer_id = 2").first()
print(f"\n⏸️ Unchanged record (customer_id=2):")
print(f"   Name: {unchanged.full_name} (not in source, so not modified)")

Advanced: Partial Column Updates

You can also exclude certain columns from updates. For example, to preserve the original registration_date and only update contact information:

yaml_pipeline_partial = """
name: merge_with_partial_update
steps:
    read_source_data:
        action: READ_CATALOG_TABLE
        options:
            table_identifier: {{env:TABLE_SOURCE}}

    merge_to_target:
        action: WRITE_DELTA_MERGE
        options:
            table_identifier: {{env:TABLE_TARGET}}
            key_columns:
                - customer_id
            column_mapping:
                customer_id: cust_id
                full_name: name
                email_address: email
                phone_number: phone
                registration_date: reg_date
            cols_to_exclude_from_update:
                - registration_date  # Don't update this field
            when_matched_update: true
            when_not_matched_insert: true
"""
name: merge_with_partial_update
steps:
    read_source_data:
        action: READ_CATALOG_TABLE
        options:
            table_identifier: {{env:TABLE_SOURCE}}

    merge_to_target:
        action: WRITE_DELTA_MERGE
        options:
            table_identifier: {{env:TABLE_TARGET}}
            key_columns:
                - customer_id
            column_mapping:
                customer_id: cust_id
                full_name: name
                email_address: email
                phone_number: phone
                registration_date: reg_date
            cols_to_exclude_from_update:
                - registration_date  # Don't update this field
            when_matched_update: true
            when_not_matched_insert: true

When to Use Column Exclusions

Use cols_to_exclude_from_update when you want to:

  • Preserve original timestamps (created_at, registration_date)
  • Keep audit fields unchanged (created_by, original_source)
  • Protect certain fields from being overwritten (customer_tier, account_status)

Note: Excluded columns are still included in INSERT operations for new records.

Step 7: Clean Up (Optional)

When you're done experimenting, remove the demo tables:

# Remove the demo tables we created
print("🧹 Cleaning up demo tables...")
spark.sql(f"DROP TABLE IF EXISTS {TARGET_TABLE_NAME}")
spark.sql(f"DROP TABLE IF EXISTS {SOURCE_TABLE_NAME}")
print("βœ… Demo tables removed successfully")

Common Use Cases

Column mapping is useful in many scenarios:

1. Legacy System Integration

Merge data from old systems with abbreviated column names into modern tables:

column_mapping:
    customer_id: cust_id
    order_number: ord_no
    order_date: ord_dt

2. Multi-Source Data Integration

Combine data from different sources with varying conventions:

# Source A uses: user_id, user_name
# Source B uses: customer_id, customer_name
# Target uses: account_id, account_holder
column_mapping:
    account_id: user_id
    account_holder: user_name

3. Data Standardization

Migrate from inconsistent naming to organizational standards:

column_mapping:
    email_address: email      # Standardize to email_address
    phone_number: phone       # Standardize to phone_number
    date_of_birth: dob        # Standardize to date_of_birth

Key Takeaways

Basic Merge:

βœ… WRITE_DELTA_MERGE performs both UPDATE and INSERT operations in one step

βœ… key_columns defines which columns to match records on

βœ… when_matched_update: true updates existing records

βœ… when_not_matched_insert: true inserts new records

Advanced Merge with Column Mapping:

βœ… Column mapping lets you merge data between tables with different column names

βœ… Use target column names in key_columns and cols_to_exclude_from_update

βœ… The column_mapping dictionary maps target β†’ source column names

βœ… Columns not in the mapping are assumed to have the same name in both tables

βœ… You can combine column mapping with partial updates using cols_to_exclude_from_update

Need Help?

If you encounter any issues while following this tutorial, check out our comprehensive Common Issues guide which covers:

  • Table and database access problems
  • Column mapping configuration errors
  • Merge operation failures
  • Data type mismatches between source and target columns