Skip to content

Building Dynamic Pipelines

New to the Fluent API?

For a comprehensive guide including the pipeline lifecycle, common patterns, and best practices, see How to Use the Fluent API.

Getting Started

This step-by-step guide shows you how to build dynamic pipelines that adapt based on runtime conditions. Unlike static YAML configurations, the fluent API allows you to use Python logic to conditionally add or remove steps, making your pipelines flexible and configuration-driven.

What You'll Build

By the end of this tutorial, you'll have working examples of:

  1. Conditional step execution based on environment or configuration
  2. Dynamic column transformations applied iteratively
  3. Configuration-driven pipelines that adapt to settings
  4. Environment-specific logic (development vs production)
  5. Feature flag patterns for enabling/disabling transformations

What You'll Learn

  • How to use Python conditionals to add pipeline steps
  • How to build pipelines with loops for repeated operations
  • How to create configuration-driven pipeline builders
  • How to handle optional transformations
  • Best practices for dynamic pipeline construction

Prerequisites

Before starting, make sure you have:

  • Database Access: You can connect to Unity Catalog (Databricks' data storage system)
  • Permissions: You can create schemas and tables in your workspace
  • Environment: Access to a Databricks notebook or similar Spark environment
  • Python Knowledge: Basic understanding of Python conditionals and loops

Setup and Installation

Install Nessy and restart the Python kernel:

%pip install cloe_nessy
%restart_python

Step 1: Create Sample Data

Let's create sample customer data with personally identifiable information (PII) that we'll conditionally mask based on environment:

from pyspark.sql import Row

# Configuration
CATALOG = "workspace"
SCHEMA = "demo"
SOURCE_TABLE = f"{CATALOG}.{SCHEMA}.customer_data"
OUTPUT_TABLE = f"{CATALOG}.{SCHEMA}.processed_customers"

# Create schema
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{SCHEMA}")

# Create customer data with PII fields
customer_data = [
    Row(
        customer_id=1,
        first_name="Alice",
        last_name="Smith",
        email="alice.smith@email.com",
        phone="555-0101",
        ssn="123-45-6789",
        salary=75000.00,
        department="Engineering",
        join_date="2023-01-15",
        status="active"
    ),
    Row(
        customer_id=2,
        first_name="Bob",
        last_name="Johnson",
        email="bob.j@email.com",
        phone="555-0102",
        ssn="987-65-4321",
        salary=82000.00,
        department="Sales",
        join_date="2023-02-20",
        status="active"
    ),
    Row(
        customer_id=3,
        first_name="Charlie",
        last_name="Brown",
        email="cbrown@email.com",
        phone="555-0103",
        ssn="456-78-9012",
        salary=68000.00,
        department="Marketing",
        join_date="2023-03-10",
        status="inactive"
    ),
]

# Create and save DataFrame
df = spark.createDataFrame(customer_data)

# Clean up existing tables
spark.sql(f"DROP TABLE IF EXISTS {SOURCE_TABLE}")
spark.sql(f"DROP TABLE IF EXISTS {OUTPUT_TABLE}")

df.write.saveAsTable(SOURCE_TABLE)

print(f"โœ“ Created source table with {df.count()} rows")
print("\n๐Ÿ“Š Sample Data:")
df.display()

About the Sample Data

Our sample data includes PII fields (email, phone, SSN, salary) that should be handled differently in development vs production environments.

Example 1: Environment-Based Conditional Steps

One of the most common use cases is applying different transformations based on the environment:

from cloe_nessy.pipeline import PipelineBuilder
import os

# Configuration - typically set via environment variables
ENVIRONMENT = os.getenv("ENV", "development")  # "development" or "production"
MASK_PII = ENVIRONMENT == "production"
INCLUDE_SALARY = ENVIRONMENT == "development"

print(f"๐Ÿ”ง Configuration:")
print(f"  - Environment: {ENVIRONMENT}")
print(f"  - Mask PII: {MASK_PII}")
print(f"  - Include Salary: {INCLUDE_SALARY}")

# Start building the pipeline
builder = PipelineBuilder("Environment-Aware Pipeline")

# Add base steps - always executed
builder = builder.read_catalog_table(
    table_identifier=SOURCE_TABLE,
    step_name="Load Customer Data"
)

# Conditionally mask PII fields in production
if MASK_PII:
    print("โœ“ Adding PII masking steps (production mode)")
    builder = (
        builder
        .transform_with_column(
            column_name="email",
            expression="regexp_replace(email, '(.{3}).*(@.*)', '$1***$2')",
            step_name="Mask Email Addresses"
        )
        .transform_with_column(
            column_name="phone",
            expression="regexp_replace(phone, '\\d', 'X')",
            step_name="Mask Phone Numbers"
        )
        .transform_with_column(
            column_name="ssn",
            expression="concat('XXX-XX-', substring(ssn, -4, 4))",
            step_name="Mask SSN"
        )
    )
else:
    print("โ„น๏ธ  Skipping PII masking (development mode)")

# Conditionally remove salary field in production
if not INCLUDE_SALARY:
    print("โœ“ Removing salary column (production mode)")
    builder = builder.transform_select_columns(
        exclude_columns=["ssn", "salary"],
        step_name="Remove Sensitive Columns"
    )

# Add final steps - always executed
builder = (
    builder
    .transform_filter(
        condition="status = 'active'",
        step_name="Filter Active Customers"
    )
    .write_catalog_table(
        table_identifier=OUTPUT_TABLE,
        mode="overwrite",
        step_name="Write Results"
    )
)

# Build and run
pipeline = builder.build()

print("\n๐Ÿ“Š Pipeline Structure:")
pipeline.plot_graph()

print("\n๐Ÿš€ Running pipeline...")
pipeline.run()
print("โœ… Pipeline completed!")

# Check results
print("\n๐Ÿ“‹ Results:")
spark.table(OUTPUT_TABLE).display()

Understanding Environment-Based Logic

This pattern demonstrates:

  • Configuration Variables: Use environment variables or config files
  • Conditional Builder Assignment: Reassign builder when adding steps conditionally
  • Separate Concerns: PII masking logic is only added when needed
  • Audit Trail: Print statements show which steps were added

Example 2: Loop-Based Dynamic Transformations

Apply the same transformation to multiple columns programmatically:

# Configuration: columns to normalize
COLUMNS_TO_NORMALIZE = ["first_name", "last_name", "department"]
APPLY_NORMALIZATION = True

# Start pipeline
builder = (
    PipelineBuilder("Dynamic Column Normalization")
    .read_catalog_table(
        table_identifier=SOURCE_TABLE,
        step_name="Load Data"
    )
)

# Dynamically add normalization for each column
if APPLY_NORMALIZATION:
    print(f"โœ“ Adding normalization for {len(COLUMNS_TO_NORMALIZE)} columns")
    for column in COLUMNS_TO_NORMALIZE:
        builder = builder.transform_with_column(
            column_name=column,
            expression=f"upper(trim({column}))",
            step_name=f"Normalize {column}"
        )
else:
    print("โ„น๏ธ  Skipping normalization")

# Complete the pipeline
pipeline = (
    builder
    .write_catalog_table(
        table_identifier=OUTPUT_TABLE,
        mode="overwrite",
        step_name="Save Normalized Data"
    )
    .build()
)

print("\n๐Ÿ“Š Pipeline Structure:")
pipeline.plot_graph()

pipeline.run()

Loop-Based Transformations

Using loops to add pipeline steps is perfect for:

  • Applying the same logic to multiple columns
  • Processing variable numbers of inputs
  • Configuration-driven column transformations
  • Reducing code duplication

Example 3: Configuration-Driven Pipeline Builder

Create a reusable function that builds pipelines based on configuration dictionaries:

def build_configurable_pipeline(config: dict) -> PipelineBuilder:
    """
    Build a pipeline based on configuration dictionary.

    Args:
        config: Dictionary containing pipeline configuration

    Returns:
        Configured PipelineBuilder ready to build()
    """
    builder = PipelineBuilder(config.get("name", "Configurable Pipeline"))

    # Add source
    builder = builder.read_catalog_table(
        table_identifier=config["source_table"],
        step_name="Load Source Data"
    )

    # Apply filters if specified
    if "filters" in config:
        for i, filter_condition in enumerate(config["filters"], 1):
            builder = builder.transform_filter(
                condition=filter_condition,
                step_name=f"Filter {i}: {filter_condition[:30]}..."
            )

    # Apply column transformations if specified
    if "transformations" in config:
        for trans in config["transformations"]:
            builder = builder.transform_with_column(
                column_name=trans["column"],
                expression=trans["expression"],
                step_name=trans.get("name", f"Transform {trans['column']}")
            )

    # Select columns if specified
    if "select_columns" in config:
        builder = builder.transform_select_columns(
            include_columns=config["select_columns"],
            step_name="Select Final Columns"
        )
    elif "exclude_columns" in config:
        builder = builder.transform_select_columns(
            exclude_columns=config["exclude_columns"],
            step_name="Exclude Sensitive Columns"
        )

    # Clean column names if requested
    if config.get("clean_column_names", False):
        builder = builder.transform_clean_column_names()

    # Add destination
    builder = builder.write_catalog_table(
        table_identifier=config["target_table"],
        mode=config.get("write_mode", "overwrite"),
        step_name="Write to Target"
    )

    return builder


# Example configuration 1: Development mode
dev_config = {
    "name": "Development ETL",
    "source_table": SOURCE_TABLE,
    "target_table": OUTPUT_TABLE,
    "filters": [
        "status = 'active'",
        "department IS NOT NULL"
    ],
    "select_columns": [
        "customer_id",
        "first_name",
        "last_name",
        "email",
        "department",
        "salary"  # Include salary in dev
    ],
    "clean_column_names": True,
    "write_mode": "overwrite"
}

# Example configuration 2: Production mode
prod_config = {
    "name": "Production ETL",
    "source_table": SOURCE_TABLE,
    "target_table": OUTPUT_TABLE,
    "filters": [
        "status = 'active'"
    ],
    "transformations": [
        {
            "column": "email",
            "expression": "regexp_replace(email, '(.{3}).*(@.*)', '$1***$2')",
            "name": "Mask Email"
        }
    ],
    "exclude_columns": ["ssn", "salary"],  # Exclude sensitive fields
    "clean_column_names": True,
    "write_mode": "overwrite"
}

# Choose configuration based on environment
current_env = os.getenv("ENV", "development")
config = prod_config if current_env == "production" else dev_config

print(f"๐Ÿ”ง Using {config['name']} configuration")

# Build and run pipeline
pipeline = build_configurable_pipeline(config).build()

print("\n๐Ÿ“Š Pipeline Structure:")
pipeline.plot_graph()

print("\n๐Ÿš€ Running pipeline...")
pipeline.run()
print("โœ… Pipeline completed!")

print("\n๐Ÿ“‹ Results:")
spark.table(OUTPUT_TABLE).display()

Configuration-Driven Patterns

This approach offers several benefits:

  • Reusability: One function handles multiple scenarios
  • Maintainability: Configuration changes don't require code changes
  • Testing: Easy to test different configurations
  • Documentation: Configuration dictionaries are self-documenting
  • Version Control: Store configurations separately from code

Example 4: Feature Flags for Optional Steps

Use feature flags to enable/disable specific transformations:

# Feature flags - could come from a feature flag service
FEATURE_FLAGS = {
    "enable_data_quality_checks": True,
    "enable_advanced_transforms": True,
    "enable_audit_columns": True,
    "enable_deduplication": False,
}

print("๐Ÿšฉ Feature Flags:")
for flag, enabled in FEATURE_FLAGS.items():
    print(f"  - {flag}: {'โœ“ Enabled' if enabled else 'โœ— Disabled'}")

# Build pipeline with feature flags
builder = (
    PipelineBuilder("Feature-Flagged Pipeline")
    .read_catalog_table(
        table_identifier=SOURCE_TABLE,
        step_name="Load Data"
    )
)

# Optional: Data quality checks
if FEATURE_FLAGS["enable_data_quality_checks"]:
    print("\nโœ“ Adding data quality checks")
    builder = (
        builder
        .transform_filter(
            condition="email IS NOT NULL AND email LIKE '%@%'",
            step_name="Validate Email Format"
        )
        .transform_filter(
            condition="customer_id IS NOT NULL",
            step_name="Validate Required Fields"
        )
    )

# Optional: Advanced transformations
if FEATURE_FLAGS["enable_advanced_transforms"]:
    print("โœ“ Adding advanced transformations")
    builder = (
        builder
        .transform_with_column(
            column_name="full_name",
            expression="concat(first_name, ' ', last_name)",
            step_name="Create Full Name"
        )
        .transform_with_column(
            column_name="email_domain",
            expression="split(email, '@')[1]",
            step_name="Extract Email Domain"
        )
    )

# Optional: Audit columns
if FEATURE_FLAGS["enable_audit_columns"]:
    print("โœ“ Adding audit columns")
    builder = (
        builder
        .transform_with_column(
            column_name="processed_at",
            expression="current_timestamp()",
            step_name="Add Processing Timestamp"
        )
        .transform_with_column(
            column_name="processed_by",
            expression="'dynamic_pipeline'",
            step_name="Add Pipeline Name"
        )
    )

# Optional: Deduplication
if FEATURE_FLAGS["enable_deduplication"]:
    print("โœ“ Adding deduplication")
    builder = builder.transform_deduplicate(
        partition_by=["customer_id"],
        order_by=["join_date"],
        step_name="Remove Duplicates"
    )

# Complete pipeline
pipeline = (
    builder
    .write_catalog_table(
        table_identifier=OUTPUT_TABLE,
        mode="overwrite",
        step_name="Write Results"
    )
    .build()
)

print("\n๐Ÿ“Š Pipeline Structure:")
pipeline.plot_graph()

print("\n๐Ÿš€ Running pipeline...")
pipeline.run()
print("โœ… Pipeline completed!")

Feature Flag Best Practices

  • Use feature flags for gradual rollouts of new transformations
  • Test new logic in production with a subset of data
  • Easy rollback by toggling a flag
  • A/B testing different transformation approaches
  • Environment-specific feature availability

Example 5: Conditional Branches Based on Data

Apply different logic based on data characteristics (requires checking data first):

# First, check the data characteristics
source_df = spark.table(SOURCE_TABLE)
row_count = source_df.count()
has_inactive = source_df.filter("status = 'inactive'").count() > 0
has_nulls = source_df.filter("email IS NULL OR phone IS NULL").count() > 0

print(f"๐Ÿ“Š Data Analysis:")
print(f"  - Total rows: {row_count}")
print(f"  - Has inactive records: {has_inactive}")
print(f"  - Has null values: {has_nulls}")

# Build pipeline based on data characteristics
builder = (
    PipelineBuilder("Data-Driven Pipeline")
    .read_catalog_table(
        table_identifier=SOURCE_TABLE,
        step_name="Load Data"
    )
)

# Large dataset optimization
if row_count > 1000000:
    print("\nโœ“ Large dataset detected - adding optimization steps")
    # In a real scenario, you might repartition or use broadcast joins
    builder = builder.transform_filter(
        condition="customer_id IS NOT NULL",
        step_name="Early Filter for Performance"
    )

# Handle inactive records if present
if has_inactive:
    print("โœ“ Inactive records detected - adding status filter")
    builder = builder.transform_filter(
        condition="status = 'active'",
        step_name="Filter Active Records"
    )

# Handle null values if present
if has_nulls:
    print("โœ“ Null values detected - adding null handling")
    builder = (
        builder
        .transform_with_column(
            column_name="email",
            expression="coalesce(email, 'unknown@example.com')",
            step_name="Handle Null Emails"
        )
        .transform_with_column(
            column_name="phone",
            expression="coalesce(phone, '000-0000')",
            step_name="Handle Null Phones"
        )
    )

# Complete pipeline
pipeline = (
    builder
    .write_catalog_table(
        table_identifier=OUTPUT_TABLE,
        mode="overwrite",
        step_name="Write Results"
    )
    .build()
)

pipeline.run()

Data-Dependent Logic Considerations

While powerful, checking data before building pipelines has trade-offs:

  • Performance: Requires reading the data twice (once to check, once to process)
  • Consistency: Data might change between check and execution
  • Complexity: Harder to predict pipeline behavior

Use this pattern when: - Data characteristics vary significantly between runs - You need adaptive error handling - The cost of the extra read is acceptable

Step 2: Clean Up (Optional)

When you're done experimenting, you can remove the test tables:

# Remove the demo tables we created
print("๐Ÿงน Cleaning up demo tables...")
spark.sql(f"DROP TABLE IF EXISTS {SOURCE_TABLE}")
spark.sql(f"DROP TABLE IF EXISTS {OUTPUT_TABLE}")
print("โœ… Demo tables removed successfully")

Best Practices for Dynamic Pipelines

1. Always Reassign the Builder

# โœ“ Correct - reassign builder
builder = builder.transform_filter(condition="status = 'active'")

# โœ— Incorrect - builder not updated
builder.transform_filter(condition="status = 'active'")

2. Use Descriptive Step Names

# โœ“ Good - clear what this step does
builder = builder.transform_filter(
    condition=filter_condition,
    step_name=f"Filter: {filter_description}"
)

# โœ— Poor - generic name
builder = builder.transform_filter(
    condition=filter_condition,
    step_name="Filter"
)

3. Log Pipeline Construction

# โœ“ Good - visibility into what's being added
if MASK_PII:
    print("Adding PII masking steps")
    builder = builder.transform_with_column(...)

4. Validate Configuration

# โœ“ Good - validate config before using
def build_configurable_pipeline(config: dict) -> PipelineBuilder:
    required_keys = ["source_table", "target_table"]
    for key in required_keys:
        if key not in config:
            raise ValueError(f"Missing required config key: {key}")
    # ... build pipeline

5. Keep Core Logic Separate

# โœ“ Good - optional steps are clearly separated
builder = (
    PipelineBuilder()
    .read_catalog_table(...)  # Core step
)

if optional_feature:
    builder = builder.transform_filter(...)  # Optional step

builder = builder.write_catalog_table(...)  # Core step

Common Patterns Summary

Pattern Use Case Example
If/Else Branching Environment-specific logic Dev vs Prod transformations
Loops Repeated operations Apply same transform to multiple columns
Configuration Dictionaries Externalized logic JSON/YAML-driven pipelines
Feature Flags Gradual rollouts Enable/disable specific steps
Helper Functions Reusable pipeline builders Factory functions for common patterns
Data-Driven Adaptive processing Handle different data shapes

Key Takeaways

  1. Python Power: The fluent API leverages Python's full programming capabilities
  2. Flexibility: Dynamically adapt pipelines to different scenarios
  3. Maintainability: Configuration-driven approaches reduce code duplication
  4. Testing: Easier to test different pipeline configurations
  5. Observability: Add logging to understand which steps are included

Need Help?

If you encounter any issues while following this tutorial, check out our comprehensive Common Issues guide which covers:

  • Pipeline configuration errors
  • Builder reassignment issues
  • Configuration validation problems
  • Installation and import problems
  • Debugging dynamic pipeline construction

The guide includes step-by-step solutions and code examples to help you troubleshoot quickly.