Skip to content

Joining Data with the Fluent API

New to the Fluent API?

For a comprehensive guide including the pipeline lifecycle, common patterns, and best practices, see How to Use the Fluent API.

Getting Started

This step-by-step guide shows you how to join data from multiple sources using the fluent API. Joining data is a fundamental operation in data engineering that allows you to combine information from different tables to create enriched datasets.

What You'll Build

By the end of this tutorial, you'll have working pipelines that:

  1. Load customer data from a dimension table
  2. Load order data from a fact table
  3. Filter both datasets to include only relevant records
  4. Join the data to enrich orders with customer information
  5. Save enriched results to a new table
  6. Perform different join types (inner and left joins)

What You'll Learn

  • How to create related datasets for testing joins
  • How to build reusable pipeline components
  • How to pass one PipelineBuilder into another for joins
  • How to configure different join types (inner, left, right, outer)
  • How to specify join conditions with single or multiple keys
  • How to verify join results and data enrichment

Prerequisites

Before starting, make sure you have:

  • Database Access: You can connect to Unity Catalog (Databricks' data storage system)
  • Permissions: You can create schemas and tables in your workspace
  • Environment: Access to a Databricks notebook or similar Spark environment
  • Basic Understanding: Familiarity with SQL joins (inner, left, right, outer)

Setup and Installation

Install Nessy and restart the Python kernel:

%pip install cloe_nessy
%restart_python

Step 1: Create Sample Data

First, let's create two related tables that we'll join together. We'll create a customer dimension table and an orders fact table:

from pyspark.sql import Row

# Configuration
CATALOG = "workspace"
SCHEMA = "demo"
CUSTOMERS_TABLE = f"{CATALOG}.{SCHEMA}.customers"
ORDERS_TABLE = f"{CATALOG}.{SCHEMA}.orders"
ENRICHED_TABLE = f"{CATALOG}.{SCHEMA}.enriched_orders"

# Create schema
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{SCHEMA}")

# Create customer dimension data
customers_data = [
    Row(customer_id=1, customer_name="Alice Smith", email="alice@example.com",
        registration_date="2023-01-15", status="active"),
    Row(customer_id=2, customer_name="Bob Johnson", email="bob@example.com",
        registration_date="2023-02-20", status="active"),
    Row(customer_id=3, customer_name="Charlie Brown", email="charlie@example.com",
        registration_date="2023-03-10", status="inactive"),
    Row(customer_id=4, customer_name="Diana Prince", email="diana@example.com",
        registration_date="2023-04-05", status="active"),
]

# Create order fact data
orders_data = [
    Row(order_id=101, customer_id=1, order_date="2024-01-15", total_amount=250.50),
    Row(order_id=102, customer_id=2, order_date="2024-01-16", total_amount=189.99),
    Row(order_id=103, customer_id=1, order_date="2024-01-17", total_amount=475.00),
    Row(order_id=104, customer_id=4, order_date="2024-01-18", total_amount=325.75),
    Row(order_id=105, customer_id=3, order_date="2024-01-19", total_amount=150.00),  # Inactive customer
    Row(order_id=106, customer_id=2, order_date="2024-01-20", total_amount=299.99),
]

# Create and save DataFrames
customers_df = spark.createDataFrame(customers_data)
orders_df = spark.createDataFrame(orders_data)

# Clean up any existing tables
spark.sql(f"DROP TABLE IF EXISTS {CUSTOMERS_TABLE}")
spark.sql(f"DROP TABLE IF EXISTS {ORDERS_TABLE}")
spark.sql(f"DROP TABLE IF EXISTS {ENRICHED_TABLE}")

# Save as tables
customers_df.write.saveAsTable(CUSTOMERS_TABLE)
orders_df.write.saveAsTable(ORDERS_TABLE)

print(f"โœ“ Created customers table with {customers_df.count()} rows")
print(f"โœ“ Created orders table with {orders_df.count()} rows")

print("\n๐Ÿ“Š Customer Data:")
customers_df.display()

print("\n๐Ÿ“Š Orders Data:")
orders_df.display()

!!! tip "Understanding the Sample Data" We created two related tables:

- **Customers table**: 4 customers (3 active, 1 inactive)
- **Orders table**: 6 orders linked to customers via `customer_id`

Notice that order #105 belongs to Charlie Brown, an inactive customer. This will be filtered out in our inner join example.

Step 2: Build Reusable Pipeline Components

One powerful feature of the fluent API is the ability to build pipeline components that can be reused and composed together. Let's build a customer dimension pipeline first:

from cloe_nessy.pipeline import PipelineBuilder

# Define customer dimension pipeline
customers_pipeline = (
    PipelineBuilder("Customer Dimension")
    .read_catalog_table(
        table_identifier=CUSTOMERS_TABLE,
        step_name="Load Customers"
    )
    .transform_filter(
        condition="status = 'active'",
        step_name="Filter Active Customers"
    )
    .transform_select_columns(
        include_columns=["customer_id", "customer_name", "email", "registration_date"],
        step_name="Select Customer Attributes"
    )
)

!!! info "Reusable Pipeline Components" Notice that we don't call .build() on the customer pipeline. This keeps it as a PipelineBuilder object that can be passed into other pipelines for joining.

This approach allows you to:

- Define common data transformations once
- Reuse the same logic across multiple pipelines
- Keep your code DRY (Don't Repeat Yourself)
- Test components independently

Step 3: Build the Main Pipeline with Join

Now let's build the main pipeline that reads orders and joins them with the customer data:

# Define order fact pipeline with join
enriched_orders_pipeline = (
    PipelineBuilder("Enriched Orders")
    .read_catalog_table(
        table_identifier=ORDERS_TABLE,
        step_name="Load Orders"
    )
    .transform_filter(
        condition="order_date >= '2024-01-01'",
        step_name="Filter Recent Orders"
    )
    .transform_join(
        joined_data=customers_pipeline,  # Pass the builder directly!
        join_on="customer_id",
        how="inner",
        step_name="Join with Customer Data"
    )
    .transform_select_columns(
        include_columns=[
            "order_id",
            "customer_id",
            "customer_name",
            "email",
            "order_date",
            "total_amount"
        ],
        step_name="Select Final Columns"
    )
    .write_catalog_table(
        table_identifier=ENRICHED_TABLE,
        mode="overwrite",
        step_name="Write Results"
    )
    .build()
)

!!! info "Understanding the Join Operation" The .transform_join() method accepts several parameters:

- **`joined_data`**: Another `PipelineBuilder` object or a step name from the current pipeline
- **`join_on`**: The column(s) to join on. Can be:
    - A string: `"customer_id"` (same column name in both tables)
    - A dict: `{"left_col": "right_col"}` (different column names)
    - A list: `["col1", "col2"]` (multiple columns)
- **`how`**: The join type - `"inner"`, `"left"`, `"right"`, `"outer"`, `"left_semi"`, `"left_anti"`
- **`step_name`**: Optional descriptive name for this join step

Step 4: Visualize the Pipeline

Before running, let's see the complete pipeline structure including the join:

# View the pipeline structure
print("๐Ÿ“‹ Pipeline Structure:")
enriched_orders_pipeline.plot_graph()

The diagram will show both the main pipeline and the customer dimension pipeline that gets joined in.

Step 5: Run the Pipeline

Now let's execute the complete pipeline:

# Run the pipeline
print("\n๐Ÿš€ Executing pipeline...")
enriched_orders_pipeline.run()
print("โœ… Pipeline completed successfully!")

Step 6: Verify the Results

Let's check the enriched data to verify the join worked correctly:

# Load results
print("๐Ÿ“‹ Checking enriched orders...")
result_df = spark.table(ENRICHED_TABLE)
row_count = result_df.count()

print(f"\n๐ŸŽฏ Results Summary:")
print(f"  - Original orders: 6")
print(f"  - Active customers: 3 (filtered out 1 inactive)")
print(f"  - Enriched orders: {row_count} (excludes order #105 from inactive customer)")

# Display the enriched data
print("\n๐Ÿ“– Enriched Orders with Customer Information:")
result_df.display()

# Verify we have customer information
print("\n๐Ÿท๏ธ  Available columns:", result_df.columns)
assert "customer_name" in result_df.columns, "Missing customer_name column!"
assert "email" in result_df.columns, "Missing email column!"
print("โœ“ Customer data successfully joined!")

Expected Results: You should see 5 enriched orders in your result table. Order #105 is excluded because it belongs to Charlie Brown, whose status is "inactive" and was filtered out in the customer dimension pipeline. Each order now includes the customer's name and email address.

Advanced: Using Different Join Types

Left Join - Keep All Orders

Sometimes you want to keep all records from the main table even if there's no match in the joined table. Here's how to use a left join:

all_orders = (
    PipelineBuilder("All Orders with Optional Customer Data")
    .read_catalog_table(table_identifier=ORDERS_TABLE)
    .transform_join(
        joined_data=(
            PipelineBuilder("Customer Lookup")
            .read_catalog_table(table_identifier=CUSTOMERS_TABLE)
            .transform_select_columns(include_columns=["customer_id", "customer_name", "status"])
        ),
        join_on={"customer_id": "customer_id"},
        how="left",  # LEFT JOIN keeps all orders
        step_name="Left Join Customers"
    )
    .build()
)

all_orders.run()

# Display results
all_orders.steps.get("Left Join Customers").result.data.display()

!!! tip "Choosing the Right Join Type" - Inner Join (how="inner"): Only keeps records that exist in both tables - Left Join (how="left"): Keeps all records from the left (main) table - Right Join (how="right"): Keeps all records from the right (joined) table - Outer Join (how="outer"): Keeps all records from both tables - Left Semi (how="left_semi"): Like inner join, but only returns columns from left table - Left Anti (how="left_anti"): Returns records from left table that DON'T match right table

Multiple Join Conditions

You can join on multiple columns by providing a list:

.transform_join(
    joined_data=other_pipeline,
    join_on=["customer_id", "region_id"],
    how="inner"
)

Different Column Names

If the join columns have different names in each table:

.transform_join(
    joined_data=other_pipeline,
    join_on={"customer_id": "cust_id"},  # left_column: right_column
    how="inner"
)

Step 7: Clean Up (Optional)

When you're done experimenting, you can remove the test tables:

# Remove the demo tables we created
print("๐Ÿงน Cleaning up demo tables...")
spark.sql(f"DROP TABLE IF EXISTS {CUSTOMERS_TABLE}")
spark.sql(f"DROP TABLE IF EXISTS {ORDERS_TABLE}")
spark.sql(f"DROP TABLE IF EXISTS {ENRICHED_TABLE}")
print("โœ… Demo tables removed successfully")

Key Takeaways

Here are the important concepts from this tutorial:

  1. Composable Pipelines: Build pipeline components without calling .build() to reuse them in joins
  2. Fluent Join Syntax: Pass PipelineBuilder objects directly into .transform_join()
  3. Join Types: Choose the appropriate join type based on whether you want to keep unmatched records
  4. Join Conditions: Support for simple, complex, and multi-column join conditions
  5. Data Enrichment: Joins are essential for combining dimensional and fact data

Need Help?

If you encounter any issues while following this tutorial, check out our comprehensive Common Issues guide which covers:

  • Table and database access problems
  • Join condition errors and column mismatches
  • Pipeline configuration errors
  • Installation and import problems
  • Data filtering and output issues

The guide includes step-by-step solutions and code examples to help you troubleshoot quickly.