Joining Data with the Fluent API¶
New to the Fluent API?
For a comprehensive guide including the pipeline lifecycle, common patterns, and best practices, see How to Use the Fluent API.
Getting Started¶
This step-by-step guide shows you how to join data from multiple sources using the fluent API. Joining data is a fundamental operation in data engineering that allows you to combine information from different tables to create enriched datasets.
What You'll Build¶
By the end of this tutorial, you'll have working pipelines that:
- Load customer data from a dimension table
- Load order data from a fact table
- Filter both datasets to include only relevant records
- Join the data to enrich orders with customer information
- Save enriched results to a new table
- Perform different join types (inner and left joins)
What You'll Learn¶
- How to create related datasets for testing joins
- How to build reusable pipeline components
- How to pass one PipelineBuilder into another for joins
- How to configure different join types (inner, left, right, outer)
- How to specify join conditions with single or multiple keys
- How to verify join results and data enrichment
Prerequisites¶
Before starting, make sure you have:
- Database Access: You can connect to Unity Catalog (Databricks' data storage system)
- Permissions: You can create schemas and tables in your workspace
- Environment: Access to a Databricks notebook or similar Spark environment
- Basic Understanding: Familiarity with SQL joins (inner, left, right, outer)
Setup and Installation¶
Install Nessy and restart the Python kernel:
Step 1: Create Sample Data¶
First, let's create two related tables that we'll join together. We'll create a customer dimension table and an orders fact table:
from pyspark.sql import Row
# Configuration
CATALOG = "workspace"
SCHEMA = "demo"
CUSTOMERS_TABLE = f"{CATALOG}.{SCHEMA}.customers"
ORDERS_TABLE = f"{CATALOG}.{SCHEMA}.orders"
ENRICHED_TABLE = f"{CATALOG}.{SCHEMA}.enriched_orders"
# Create schema
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{SCHEMA}")
# Create customer dimension data
customers_data = [
Row(customer_id=1, customer_name="Alice Smith", email="alice@example.com",
registration_date="2023-01-15", status="active"),
Row(customer_id=2, customer_name="Bob Johnson", email="bob@example.com",
registration_date="2023-02-20", status="active"),
Row(customer_id=3, customer_name="Charlie Brown", email="charlie@example.com",
registration_date="2023-03-10", status="inactive"),
Row(customer_id=4, customer_name="Diana Prince", email="diana@example.com",
registration_date="2023-04-05", status="active"),
]
# Create order fact data
orders_data = [
Row(order_id=101, customer_id=1, order_date="2024-01-15", total_amount=250.50),
Row(order_id=102, customer_id=2, order_date="2024-01-16", total_amount=189.99),
Row(order_id=103, customer_id=1, order_date="2024-01-17", total_amount=475.00),
Row(order_id=104, customer_id=4, order_date="2024-01-18", total_amount=325.75),
Row(order_id=105, customer_id=3, order_date="2024-01-19", total_amount=150.00), # Inactive customer
Row(order_id=106, customer_id=2, order_date="2024-01-20", total_amount=299.99),
]
# Create and save DataFrames
customers_df = spark.createDataFrame(customers_data)
orders_df = spark.createDataFrame(orders_data)
# Clean up any existing tables
spark.sql(f"DROP TABLE IF EXISTS {CUSTOMERS_TABLE}")
spark.sql(f"DROP TABLE IF EXISTS {ORDERS_TABLE}")
spark.sql(f"DROP TABLE IF EXISTS {ENRICHED_TABLE}")
# Save as tables
customers_df.write.saveAsTable(CUSTOMERS_TABLE)
orders_df.write.saveAsTable(ORDERS_TABLE)
print(f"โ Created customers table with {customers_df.count()} rows")
print(f"โ Created orders table with {orders_df.count()} rows")
print("\n๐ Customer Data:")
customers_df.display()
print("\n๐ Orders Data:")
orders_df.display()
!!! tip "Understanding the Sample Data" We created two related tables:
- **Customers table**: 4 customers (3 active, 1 inactive)
- **Orders table**: 6 orders linked to customers via `customer_id`
Notice that order #105 belongs to Charlie Brown, an inactive customer. This will be filtered out in our inner join example.
Step 2: Build Reusable Pipeline Components¶
One powerful feature of the fluent API is the ability to build pipeline components that can be reused and composed together. Let's build a customer dimension pipeline first:
from cloe_nessy.pipeline import PipelineBuilder
# Define customer dimension pipeline
customers_pipeline = (
PipelineBuilder("Customer Dimension")
.read_catalog_table(
table_identifier=CUSTOMERS_TABLE,
step_name="Load Customers"
)
.transform_filter(
condition="status = 'active'",
step_name="Filter Active Customers"
)
.transform_select_columns(
include_columns=["customer_id", "customer_name", "email", "registration_date"],
step_name="Select Customer Attributes"
)
)
!!! info "Reusable Pipeline Components" Notice that we don't call .build()
on the customer pipeline. This keeps it as a PipelineBuilder object that
can be passed into other pipelines for joining.
This approach allows you to:
- Define common data transformations once
- Reuse the same logic across multiple pipelines
- Keep your code DRY (Don't Repeat Yourself)
- Test components independently
Step 3: Build the Main Pipeline with Join¶
Now let's build the main pipeline that reads orders and joins them with the customer data:
# Define order fact pipeline with join
enriched_orders_pipeline = (
PipelineBuilder("Enriched Orders")
.read_catalog_table(
table_identifier=ORDERS_TABLE,
step_name="Load Orders"
)
.transform_filter(
condition="order_date >= '2024-01-01'",
step_name="Filter Recent Orders"
)
.transform_join(
joined_data=customers_pipeline, # Pass the builder directly!
join_on="customer_id",
how="inner",
step_name="Join with Customer Data"
)
.transform_select_columns(
include_columns=[
"order_id",
"customer_id",
"customer_name",
"email",
"order_date",
"total_amount"
],
step_name="Select Final Columns"
)
.write_catalog_table(
table_identifier=ENRICHED_TABLE,
mode="overwrite",
step_name="Write Results"
)
.build()
)
!!! info "Understanding the Join Operation" The .transform_join() method
accepts several parameters:
- **`joined_data`**: Another `PipelineBuilder` object or a step name from the current pipeline
- **`join_on`**: The column(s) to join on. Can be:
- A string: `"customer_id"` (same column name in both tables)
- A dict: `{"left_col": "right_col"}` (different column names)
- A list: `["col1", "col2"]` (multiple columns)
- **`how`**: The join type - `"inner"`, `"left"`, `"right"`, `"outer"`, `"left_semi"`, `"left_anti"`
- **`step_name`**: Optional descriptive name for this join step
Step 4: Visualize the Pipeline¶
Before running, let's see the complete pipeline structure including the join:
# View the pipeline structure
print("๐ Pipeline Structure:")
enriched_orders_pipeline.plot_graph()
The diagram will show both the main pipeline and the customer dimension pipeline that gets joined in.
Step 5: Run the Pipeline¶
Now let's execute the complete pipeline:
# Run the pipeline
print("\n๐ Executing pipeline...")
enriched_orders_pipeline.run()
print("โ
Pipeline completed successfully!")
Step 6: Verify the Results¶
Let's check the enriched data to verify the join worked correctly:
# Load results
print("๐ Checking enriched orders...")
result_df = spark.table(ENRICHED_TABLE)
row_count = result_df.count()
print(f"\n๐ฏ Results Summary:")
print(f" - Original orders: 6")
print(f" - Active customers: 3 (filtered out 1 inactive)")
print(f" - Enriched orders: {row_count} (excludes order #105 from inactive customer)")
# Display the enriched data
print("\n๐ Enriched Orders with Customer Information:")
result_df.display()
# Verify we have customer information
print("\n๐ท๏ธ Available columns:", result_df.columns)
assert "customer_name" in result_df.columns, "Missing customer_name column!"
assert "email" in result_df.columns, "Missing email column!"
print("โ Customer data successfully joined!")
Expected Results: You should see 5 enriched orders in your result table. Order #105 is excluded because it belongs to Charlie Brown, whose status is "inactive" and was filtered out in the customer dimension pipeline. Each order now includes the customer's name and email address.
Advanced: Using Different Join Types¶
Left Join - Keep All Orders¶
Sometimes you want to keep all records from the main table even if there's no match in the joined table. Here's how to use a left join:
all_orders = (
PipelineBuilder("All Orders with Optional Customer Data")
.read_catalog_table(table_identifier=ORDERS_TABLE)
.transform_join(
joined_data=(
PipelineBuilder("Customer Lookup")
.read_catalog_table(table_identifier=CUSTOMERS_TABLE)
.transform_select_columns(include_columns=["customer_id", "customer_name", "status"])
),
join_on={"customer_id": "customer_id"},
how="left", # LEFT JOIN keeps all orders
step_name="Left Join Customers"
)
.build()
)
all_orders.run()
# Display results
all_orders.steps.get("Left Join Customers").result.data.display()
!!! tip "Choosing the Right Join Type" - Inner Join (how="inner"): Only
keeps records that exist in both tables - Left Join (how="left"):
Keeps all records from the left (main) table - Right Join
(how="right"): Keeps all records from the right (joined) table - Outer
Join (how="outer"): Keeps all records from both tables - Left Semi
(how="left_semi"): Like inner join, but only returns columns from left
table - Left Anti (how="left_anti"): Returns records from left table
that DON'T match right table
Multiple Join Conditions¶
You can join on multiple columns by providing a list:
Different Column Names¶
If the join columns have different names in each table:
.transform_join(
joined_data=other_pipeline,
join_on={"customer_id": "cust_id"}, # left_column: right_column
how="inner"
)
Step 7: Clean Up (Optional)¶
When you're done experimenting, you can remove the test tables:
# Remove the demo tables we created
print("๐งน Cleaning up demo tables...")
spark.sql(f"DROP TABLE IF EXISTS {CUSTOMERS_TABLE}")
spark.sql(f"DROP TABLE IF EXISTS {ORDERS_TABLE}")
spark.sql(f"DROP TABLE IF EXISTS {ENRICHED_TABLE}")
print("โ
Demo tables removed successfully")
Key Takeaways¶
Here are the important concepts from this tutorial:
- Composable Pipelines: Build pipeline components without calling
.build()to reuse them in joins - Fluent Join Syntax: Pass PipelineBuilder objects directly into
.transform_join() - Join Types: Choose the appropriate join type based on whether you want to keep unmatched records
- Join Conditions: Support for simple, complex, and multi-column join conditions
- Data Enrichment: Joins are essential for combining dimensional and fact data
Need Help?¶
If you encounter any issues while following this tutorial, check out our comprehensive Common Issues guide which covers:
- Table and database access problems
- Join condition errors and column mismatches
- Pipeline configuration errors
- Installation and import problems
- Data filtering and output issues
The guide includes step-by-step solutions and code examples to help you troubleshoot quickly.