Merging Data Using Delta Merge¶
Getting Started¶
This step-by-step guide shows you how to merge data between tables using the
WRITE_DELTA_MERGE action. We'll start with a basic merge where source and target
have the same column names, then progress to handling tables with different
column names.
What You'll Build¶
By the end of this tutorial, you'll have working pipelines that:
- Basic Merge: Merges data when source and target have the same column names
- Advanced Merge with Column Mapping: Merges data when source and target have different column names
- Updates existing records and inserts new ones based on key columns
What You'll Learn¶
- How to perform basic delta merge operations (UPDATE and INSERT)
- How to merge data when source and target have different column names
- How to use column mapping to translate between naming conventions
- How to work with the
WRITE_DELTA_MERGEaction
Prerequisites¶
Before starting, make sure you have:
- Database Access: You can connect to Unity Catalog (Databricks' data storage system)
- Permissions: You can create new tables and read/write data in your assigned workspace
- Environment: Access to a Databricks notebook or similar Spark environment
Setup and Installation¶
Install Nessy and restart the Python kernel:
Part 1: Basic Merge (Same Column Names)¶
Let's start with a simple merge where source and target tables have identical column names. This is the most common use case.
Step 1: Create Target and Source Tables¶
First, create both target and source tables with the same structure:
import os
from pyspark.sql import Row
# Replace these with your own database object names
CATALOG = "nessy_dev_catalog"
SCHEMA = "demo"
BASIC_TARGET_TABLE = "creatures"
BASIC_SOURCE_TABLE = "creatures_updates"
BASIC_TARGET_NAME = f"{CATALOG}.{SCHEMA}.{BASIC_TARGET_TABLE}"
BASIC_SOURCE_NAME = f"{CATALOG}.{SCHEMA}.{BASIC_SOURCE_TABLE}"
# Create the schema if it doesn't exist
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{SCHEMA}")
# Store table names in environment variables
os.environ["BASIC_TARGET"] = BASIC_TARGET_NAME
os.environ["BASIC_SOURCE"] = BASIC_SOURCE_NAME
# Create target table with existing data
target_data = [
Row(creature_id=1, name="Nessy", species="Lake Monster", status="Active", location="Loch Ness"),
Row(creature_id=2, name="Highland Kelpie", species="Water Horse", status="Active", location="Highlands"),
]
# Create source table with updates and new records
source_data = [
# Update existing record (creature_id=1)
Row(creature_id=1, name="Nessy", species="Lake Monster", status="Legendary", location="Loch Ness"),
# Insert new record (creature_id=3)
Row(creature_id=3, name="Morag", species="Sea Serpent", status="Active", location="Loch Morar"),
]
# Create DataFrames
target_df = spark.createDataFrame(target_data)
source_df = spark.createDataFrame(source_data)
# Clean up and create tables
spark.sql(f"DROP TABLE IF EXISTS {BASIC_TARGET_NAME}")
spark.sql(f"DROP TABLE IF EXISTS {BASIC_SOURCE_NAME}")
target_df.write.saveAsTable(BASIC_TARGET_NAME)
source_df.write.saveAsTable(BASIC_SOURCE_NAME)
print(f"β Created target table with {target_df.count()} rows")
print("\nπ Target table:")
spark.table(BASIC_TARGET_NAME).show()
print(f"\nβ Created source table with {source_df.count()} rows")
print("π Source table:")
spark.table(BASIC_SOURCE_NAME).show()
Step 2: Create Basic Merge Pipeline¶
Now create a pipeline to merge the source data into the target:
basic_yaml_pipeline = """
name: basic_delta_merge
steps:
read_source:
action: READ_CATALOG_TABLE
options:
table_identifier: {{env:BASIC_SOURCE}}
merge_data:
action: WRITE_DELTA_MERGE
options:
table_identifier: {{env:BASIC_TARGET}}
key_columns:
- creature_id
when_matched_update: true
when_not_matched_insert: true
use_partition_pruning: false
"""
name: basic_delta_merge
steps:
read_source:
action: READ_CATALOG_TABLE
options:
table_identifier: {{env:BASIC_SOURCE}}
merge_data:
action: WRITE_DELTA_MERGE
options:
table_identifier: {{env:BASIC_TARGET}}
key_columns:
- creature_id
when_matched_update: true
when_not_matched_insert: true
use_partition_pruning: false
Understanding the Basic Merge
key_columns: [creature_id]: Matches records based on this columnwhen_matched_update: true: Updates record with creature_id=1 (status changes to "Legendary")when_not_matched_insert: true: Inserts new record with creature_id=3- No column_mapping needed since source and target have identical column names
Step 3: Run the Basic Merge¶
Execute the pipeline and verify results:
from cloe_nessy.pipeline import PipelineParsingService
print("π Running basic merge pipeline...")
basic_pipeline = PipelineParsingService.parse(yaml_str=basic_yaml_pipeline)
basic_pipeline.run()
print("β
Merge completed!")
# Check results
print("\nπ Results after merge:")
result_df = spark.table(BASIC_TARGET_NAME)
print(f"Total rows: {result_df.count()}")
result_df.orderBy("creature_id").show(truncate=False)
# Verify what happened
print("\nπ What the merge did:")
print(" βοΈ Updated: creature_id=1 (status: Active β Legendary)")
print(" β Inserted: creature_id=3 (Morag)")
print(" βΈοΈ Unchanged: creature_id=2 (not in source)")
Expected Results: - 3 total rows - creature_id=1: status updated to "Legendary" - creature_id=2: unchanged - creature_id=3: newly inserted
Step 4: Clean Up Basic Example¶
print("π§Ή Cleaning up basic example tables...")
spark.sql(f"DROP TABLE IF EXISTS {BASIC_TARGET_NAME}")
spark.sql(f"DROP TABLE IF EXISTS {BASIC_SOURCE_NAME}")
print("β
Clean up completed")
Part 2: Advanced Merge with Column Mapping¶
Now let's handle the more complex scenario where source and target have different column names. This is common when integrating legacy systems or standardizing naming conventions.
Step 1: Create Target Table with Standardized Column Names¶
First, let's create a target table with modern, standardized column names. This represents your production database with clean naming conventions:
import os
from pyspark.sql import Row
# Replace these with your own database object names
CATALOG = "nessy_dev_catalog"
SCHEMA = "demo"
TARGET_TABLE = "customers_standardized"
SOURCE_TABLE = "customers_legacy"
TARGET_TABLE_NAME = f"{CATALOG}.{SCHEMA}.{TARGET_TABLE}"
SOURCE_TABLE_NAME = f"{CATALOG}.{SCHEMA}.{SOURCE_TABLE}"
# Create the schema if it doesn't exist
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{SCHEMA}")
# Store table names in environment variables for the pipeline
os.environ["TABLE_TARGET"] = TARGET_TABLE_NAME
os.environ["TABLE_SOURCE"] = SOURCE_TABLE_NAME
# Create initial target data with standardized column names
# These represent existing customers in your modern system
target_data = [
Row(
customer_id=1,
full_name="Nessy Monster",
email_address="nessy@lochness.com",
phone_number="+44-1234-567890",
registration_date="2024-01-15"
),
Row(
customer_id=2,
full_name="Highland Kelpie",
email_address="kelpie@highlands.com",
phone_number="+44-2345-678901",
registration_date="2024-02-20"
),
]
# Create the target DataFrame
target_df = spark.createDataFrame(target_data)
# Clean up any existing tables
spark.sql(f"DROP TABLE IF EXISTS {TARGET_TABLE_NAME}")
spark.sql(f"DROP TABLE IF EXISTS {SOURCE_TABLE_NAME}")
# Save the target table
try:
target_df.write.saveAsTable(TARGET_TABLE_NAME)
print(f"β Created target table '{TARGET_TABLE_NAME}' with {target_df.count()} rows")
print("\nπ Target table structure (standardized column names):")
spark.table(TARGET_TABLE_NAME).show()
except Exception as e:
print(f"β Error creating target table: {e}")
Step 2: Create Source Data with Legacy Column Names¶
Now let's create source data that uses different column names. This simulates data coming from a legacy system or external source:
# Create source data with legacy column names
# Notice the different column names: cust_id, name, email, phone, reg_date
source_data = [
# This record exists in target (customer_id=1) - will UPDATE
Row(
cust_id=1,
name="Nessy Monster Updated", # Updated name
email="nessy@lochness.com",
phone="+44-1234-999999", # Updated phone
reg_date="2024-01-15"
),
# This is a new record (cust_id=3) - will INSERT
Row(
cust_id=3,
name="Morag Serpent",
email="morag@lochmorar.com",
phone="+44-3456-789012",
reg_date="2024-03-10"
),
# This is also new (cust_id=4) - will INSERT
Row(
cust_id=4,
name="Each-uisge Spirit",
email="eachuisge@lochs.com",
phone="+44-4567-890123",
reg_date="2024-04-05"
),
]
# Create the source DataFrame
source_df = spark.createDataFrame(source_data)
# Save as a temporary table
try:
source_df.write.saveAsTable(SOURCE_TABLE_NAME)
print(f"β Created source table '{SOURCE_TABLE_NAME}' with {source_df.count()} rows")
print("\nπ Source table structure (legacy column names):")
spark.table(SOURCE_TABLE_NAME).show()
except Exception as e:
print(f"β Error creating source table: {e}")
Understanding the Column Name Differences
Notice how the column names differ between source and target:
| Target (Standardized) | Source (Legacy) | Purpose |
|---|---|---|
customer_id |
cust_id |
Unique identifier |
full_name |
name |
Customer name |
email_address |
email |
Email contact |
phone_number |
phone |
Phone contact |
registration_date |
reg_date |
Registration date |
Without column mapping, the merge would fail because the names don't match!
Step 3: Define Your Pipeline with Column Mapping¶
Now let's create the pipeline that merges the data using column mapping to translate between the different naming conventions:
yaml_pipeline = """
name: merge_with_column_mapping
steps:
read_source_data:
action: READ_CATALOG_TABLE
options:
table_identifier: {{env:TABLE_SOURCE}}
merge_to_target:
action: WRITE_DELTA_MERGE
options:
table_identifier: {{env:TABLE_TARGET}}
key_columns:
- customer_id # Use target column name
column_mapping:
# Map target column names to source column names
# Format: target_column: source_column
customer_id: cust_id
full_name: name
email_address: email
phone_number: phone
registration_date: reg_date
when_matched_update: true
when_not_matched_insert: true
use_partition_pruning: false
"""
name: merge_with_column_mapping
steps:
read_source_data:
action: READ_CATALOG_TABLE
options:
table_identifier: {{env:TABLE_SOURCE}}
merge_to_target:
action: WRITE_DELTA_MERGE
options:
table_identifier: {{env:TABLE_TARGET}}
key_columns:
- customer_id # Use target column name
column_mapping:
# Map target column names to source column names
# Format: target_column: source_column
customer_id: cust_id
full_name: name
email_address: email
phone_number: phone
registration_date: reg_date
when_matched_update: true
when_not_matched_insert: true
use_partition_pruning: false
Understanding the Pipeline Steps
This pipeline has 2 steps:
read_source_data: Reads data from the legacy source table with old column namesmerge_to_target: Merges data into the target table using column mapping
Key parameters explained:
key_columns: Usescustomer_id(target name) to match recordscolumn_mapping: Maps each target column name to its source equivalentwhen_matched_update: true: Updates existing records (customer_id=1)when_not_matched_insert: true: Inserts new records (customer_id=3,4)
Step 4: Run Your Pipeline¶
Now let's execute the pipeline and see the merge in action:
from cloe_nessy.pipeline import PipelineParsingService
# Convert the YAML text into a runnable pipeline
print("π Setting up pipeline...")
pipeline = PipelineParsingService.parse(yaml_str=yaml_pipeline)
# Optional: Create a visual diagram of your pipeline steps
print("π Creating pipeline diagram...")
pipeline.plot_graph()
# Run the pipeline - this merges the data!
print("π Running merge pipeline...")
pipeline.run()
print("β
Merge completed successfully!")
Step 5: Verify the Merge Results¶
Let's check the target table to see how the data was merged:
print("π Checking merge results...")
result_df = spark.table(TARGET_TABLE_NAME)
row_count = result_df.count()
print(f"π― Target table now contains {row_count} rows")
print(f" - Started with: 2 rows")
print(f" - Updated: 1 row (customer_id=1)")
print(f" - Inserted: 2 new rows (customer_id=3,4)")
print(f" - Total: {row_count} rows")
print("\nπ Here's the merged data:")
result_df.orderBy("customer_id").show(truncate=False)
Expected Results:
You should see 4 rows in your target table:
- customer_id=1: Updated with new name and phone number from source
- customer_id=2: Unchanged (not in source data)
- customer_id=3: New record inserted from source
- customer_id=4: New record inserted from source
Notice that all data is stored using the standardized column names
(customer_id, full_name, etc.) even though the source used different names!
Step 6: Understand What Happened¶
Let's examine the merge results in detail:
# Show the changes that were made
print("π Detailed analysis of the merge:")
# Check the updated record
updated_record = result_df.filter("customer_id = 1").first()
print(f"\nβοΈ Updated record (customer_id=1):")
print(f" Name: {updated_record.full_name}")
print(f" Phone: {updated_record.phone_number}")
# Check the new records
new_records = result_df.filter("customer_id IN (3, 4)")
print(f"\nβ New records inserted: {new_records.count()}")
for row in new_records.collect():
print(f" - customer_id={row.customer_id}: {row.full_name}")
# Check unchanged records
unchanged = result_df.filter("customer_id = 2").first()
print(f"\nβΈοΈ Unchanged record (customer_id=2):")
print(f" Name: {unchanged.full_name} (not in source, so not modified)")
Advanced: Partial Column Updates¶
You can also exclude certain columns from updates. For example, to preserve the
original registration_date and only update contact information:
yaml_pipeline_partial = """
name: merge_with_partial_update
steps:
read_source_data:
action: READ_CATALOG_TABLE
options:
table_identifier: {{env:TABLE_SOURCE}}
merge_to_target:
action: WRITE_DELTA_MERGE
options:
table_identifier: {{env:TABLE_TARGET}}
key_columns:
- customer_id
column_mapping:
customer_id: cust_id
full_name: name
email_address: email
phone_number: phone
registration_date: reg_date
cols_to_exclude_from_update:
- registration_date # Don't update this field
when_matched_update: true
when_not_matched_insert: true
"""
name: merge_with_partial_update
steps:
read_source_data:
action: READ_CATALOG_TABLE
options:
table_identifier: {{env:TABLE_SOURCE}}
merge_to_target:
action: WRITE_DELTA_MERGE
options:
table_identifier: {{env:TABLE_TARGET}}
key_columns:
- customer_id
column_mapping:
customer_id: cust_id
full_name: name
email_address: email
phone_number: phone
registration_date: reg_date
cols_to_exclude_from_update:
- registration_date # Don't update this field
when_matched_update: true
when_not_matched_insert: true
When to Use Column Exclusions
Use cols_to_exclude_from_update when you want to:
- Preserve original timestamps (created_at, registration_date)
- Keep audit fields unchanged (created_by, original_source)
- Protect certain fields from being overwritten (customer_tier, account_status)
Note: Excluded columns are still included in INSERT operations for new records.
Step 7: Clean Up (Optional)¶
When you're done experimenting, remove the demo tables:
# Remove the demo tables we created
print("π§Ή Cleaning up demo tables...")
spark.sql(f"DROP TABLE IF EXISTS {TARGET_TABLE_NAME}")
spark.sql(f"DROP TABLE IF EXISTS {SOURCE_TABLE_NAME}")
print("β
Demo tables removed successfully")
Common Use Cases¶
Column mapping is useful in many scenarios:
1. Legacy System Integration¶
Merge data from old systems with abbreviated column names into modern tables:
2. Multi-Source Data Integration¶
Combine data from different sources with varying conventions:
# Source A uses: user_id, user_name
# Source B uses: customer_id, customer_name
# Target uses: account_id, account_holder
column_mapping:
account_id: user_id
account_holder: user_name
3. Data Standardization¶
Migrate from inconsistent naming to organizational standards:
column_mapping:
email_address: email # Standardize to email_address
phone_number: phone # Standardize to phone_number
date_of_birth: dob # Standardize to date_of_birth
Key Takeaways¶
Basic Merge:
β
WRITE_DELTA_MERGE performs both UPDATE and INSERT operations in one step
β
key_columns defines which columns to match records on
β
when_matched_update: true updates existing records
β
when_not_matched_insert: true inserts new records
Advanced Merge with Column Mapping:
β Column mapping lets you merge data between tables with different column names
β
Use target column names in key_columns and cols_to_exclude_from_update
β
The column_mapping dictionary maps target β source column names
β Columns not in the mapping are assumed to have the same name in both tables
β
You can combine column mapping with partial updates using cols_to_exclude_from_update
Need Help?¶
If you encounter any issues while following this tutorial, check out our comprehensive Common Issues guide which covers:
- Table and database access problems
- Column mapping configuration errors
- Merge operation failures
- Data type mismatches between source and target columns