Building Dynamic Pipelines¶
New to the Fluent API?
For a comprehensive guide including the pipeline lifecycle, common patterns, and best practices, see How to Use the Fluent API.
Getting Started¶
This step-by-step guide shows you how to build dynamic pipelines that adapt based on runtime conditions. Unlike static YAML configurations, the fluent API allows you to use Python logic to conditionally add or remove steps, making your pipelines flexible and configuration-driven.
What You'll Build¶
By the end of this tutorial, you'll have working examples of:
- Conditional step execution based on environment or configuration
- Dynamic column transformations applied iteratively
- Configuration-driven pipelines that adapt to settings
- Environment-specific logic (development vs production)
- Feature flag patterns for enabling/disabling transformations
What You'll Learn¶
- How to use Python conditionals to add pipeline steps
- How to build pipelines with loops for repeated operations
- How to create configuration-driven pipeline builders
- How to handle optional transformations
- Best practices for dynamic pipeline construction
Prerequisites¶
Before starting, make sure you have:
- Database Access: You can connect to Unity Catalog (Databricks' data storage system)
- Permissions: You can create schemas and tables in your workspace
- Environment: Access to a Databricks notebook or similar Spark environment
- Python Knowledge: Basic understanding of Python conditionals and loops
Setup and Installation¶
Install Nessy and restart the Python kernel:
Step 1: Create Sample Data¶
Let's create sample customer data with personally identifiable information (PII) that we'll conditionally mask based on environment:
from pyspark.sql import Row
# Configuration
CATALOG = "workspace"
SCHEMA = "demo"
SOURCE_TABLE = f"{CATALOG}.{SCHEMA}.customer_data"
OUTPUT_TABLE = f"{CATALOG}.{SCHEMA}.processed_customers"
# Create schema
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{SCHEMA}")
# Create customer data with PII fields
customer_data = [
Row(
customer_id=1,
first_name="Alice",
last_name="Smith",
email="alice.smith@email.com",
phone="555-0101",
ssn="123-45-6789",
salary=75000.00,
department="Engineering",
join_date="2023-01-15",
status="active"
),
Row(
customer_id=2,
first_name="Bob",
last_name="Johnson",
email="bob.j@email.com",
phone="555-0102",
ssn="987-65-4321",
salary=82000.00,
department="Sales",
join_date="2023-02-20",
status="active"
),
Row(
customer_id=3,
first_name="Charlie",
last_name="Brown",
email="cbrown@email.com",
phone="555-0103",
ssn="456-78-9012",
salary=68000.00,
department="Marketing",
join_date="2023-03-10",
status="inactive"
),
]
# Create and save DataFrame
df = spark.createDataFrame(customer_data)
# Clean up existing tables
spark.sql(f"DROP TABLE IF EXISTS {SOURCE_TABLE}")
spark.sql(f"DROP TABLE IF EXISTS {OUTPUT_TABLE}")
df.write.saveAsTable(SOURCE_TABLE)
print(f"โ Created source table with {df.count()} rows")
print("\n๐ Sample Data:")
df.display()
About the Sample Data
Our sample data includes PII fields (email, phone, SSN, salary) that should be handled differently in development vs production environments.
Example 1: Environment-Based Conditional Steps¶
One of the most common use cases is applying different transformations based on the environment:
from cloe_nessy.pipeline import PipelineBuilder
import os
# Configuration - typically set via environment variables
ENVIRONMENT = os.getenv("ENV", "development") # "development" or "production"
MASK_PII = ENVIRONMENT == "production"
INCLUDE_SALARY = ENVIRONMENT == "development"
print(f"๐ง Configuration:")
print(f" - Environment: {ENVIRONMENT}")
print(f" - Mask PII: {MASK_PII}")
print(f" - Include Salary: {INCLUDE_SALARY}")
# Start building the pipeline
builder = PipelineBuilder("Environment-Aware Pipeline")
# Add base steps - always executed
builder = builder.read_catalog_table(
table_identifier=SOURCE_TABLE,
step_name="Load Customer Data"
)
# Conditionally mask PII fields in production
if MASK_PII:
print("โ Adding PII masking steps (production mode)")
builder = (
builder
.transform_with_column(
column_name="email",
expression="regexp_replace(email, '(.{3}).*(@.*)', '$1***$2')",
step_name="Mask Email Addresses"
)
.transform_with_column(
column_name="phone",
expression="regexp_replace(phone, '\\d', 'X')",
step_name="Mask Phone Numbers"
)
.transform_with_column(
column_name="ssn",
expression="concat('XXX-XX-', substring(ssn, -4, 4))",
step_name="Mask SSN"
)
)
else:
print("โน๏ธ Skipping PII masking (development mode)")
# Conditionally remove salary field in production
if not INCLUDE_SALARY:
print("โ Removing salary column (production mode)")
builder = builder.transform_select_columns(
exclude_columns=["ssn", "salary"],
step_name="Remove Sensitive Columns"
)
# Add final steps - always executed
builder = (
builder
.transform_filter(
condition="status = 'active'",
step_name="Filter Active Customers"
)
.write_catalog_table(
table_identifier=OUTPUT_TABLE,
mode="overwrite",
step_name="Write Results"
)
)
# Build and run
pipeline = builder.build()
print("\n๐ Pipeline Structure:")
pipeline.plot_graph()
print("\n๐ Running pipeline...")
pipeline.run()
print("โ
Pipeline completed!")
# Check results
print("\n๐ Results:")
spark.table(OUTPUT_TABLE).display()
Understanding Environment-Based Logic
This pattern demonstrates:
- Configuration Variables: Use environment variables or config files
- Conditional Builder Assignment: Reassign
builderwhen adding steps conditionally - Separate Concerns: PII masking logic is only added when needed
- Audit Trail: Print statements show which steps were added
Example 2: Loop-Based Dynamic Transformations¶
Apply the same transformation to multiple columns programmatically:
# Configuration: columns to normalize
COLUMNS_TO_NORMALIZE = ["first_name", "last_name", "department"]
APPLY_NORMALIZATION = True
# Start pipeline
builder = (
PipelineBuilder("Dynamic Column Normalization")
.read_catalog_table(
table_identifier=SOURCE_TABLE,
step_name="Load Data"
)
)
# Dynamically add normalization for each column
if APPLY_NORMALIZATION:
print(f"โ Adding normalization for {len(COLUMNS_TO_NORMALIZE)} columns")
for column in COLUMNS_TO_NORMALIZE:
builder = builder.transform_with_column(
column_name=column,
expression=f"upper(trim({column}))",
step_name=f"Normalize {column}"
)
else:
print("โน๏ธ Skipping normalization")
# Complete the pipeline
pipeline = (
builder
.write_catalog_table(
table_identifier=OUTPUT_TABLE,
mode="overwrite",
step_name="Save Normalized Data"
)
.build()
)
print("\n๐ Pipeline Structure:")
pipeline.plot_graph()
pipeline.run()
Loop-Based Transformations
Using loops to add pipeline steps is perfect for:
- Applying the same logic to multiple columns
- Processing variable numbers of inputs
- Configuration-driven column transformations
- Reducing code duplication
Example 3: Configuration-Driven Pipeline Builder¶
Create a reusable function that builds pipelines based on configuration dictionaries:
def build_configurable_pipeline(config: dict) -> PipelineBuilder:
"""
Build a pipeline based on configuration dictionary.
Args:
config: Dictionary containing pipeline configuration
Returns:
Configured PipelineBuilder ready to build()
"""
builder = PipelineBuilder(config.get("name", "Configurable Pipeline"))
# Add source
builder = builder.read_catalog_table(
table_identifier=config["source_table"],
step_name="Load Source Data"
)
# Apply filters if specified
if "filters" in config:
for i, filter_condition in enumerate(config["filters"], 1):
builder = builder.transform_filter(
condition=filter_condition,
step_name=f"Filter {i}: {filter_condition[:30]}..."
)
# Apply column transformations if specified
if "transformations" in config:
for trans in config["transformations"]:
builder = builder.transform_with_column(
column_name=trans["column"],
expression=trans["expression"],
step_name=trans.get("name", f"Transform {trans['column']}")
)
# Select columns if specified
if "select_columns" in config:
builder = builder.transform_select_columns(
include_columns=config["select_columns"],
step_name="Select Final Columns"
)
elif "exclude_columns" in config:
builder = builder.transform_select_columns(
exclude_columns=config["exclude_columns"],
step_name="Exclude Sensitive Columns"
)
# Clean column names if requested
if config.get("clean_column_names", False):
builder = builder.transform_clean_column_names()
# Add destination
builder = builder.write_catalog_table(
table_identifier=config["target_table"],
mode=config.get("write_mode", "overwrite"),
step_name="Write to Target"
)
return builder
# Example configuration 1: Development mode
dev_config = {
"name": "Development ETL",
"source_table": SOURCE_TABLE,
"target_table": OUTPUT_TABLE,
"filters": [
"status = 'active'",
"department IS NOT NULL"
],
"select_columns": [
"customer_id",
"first_name",
"last_name",
"email",
"department",
"salary" # Include salary in dev
],
"clean_column_names": True,
"write_mode": "overwrite"
}
# Example configuration 2: Production mode
prod_config = {
"name": "Production ETL",
"source_table": SOURCE_TABLE,
"target_table": OUTPUT_TABLE,
"filters": [
"status = 'active'"
],
"transformations": [
{
"column": "email",
"expression": "regexp_replace(email, '(.{3}).*(@.*)', '$1***$2')",
"name": "Mask Email"
}
],
"exclude_columns": ["ssn", "salary"], # Exclude sensitive fields
"clean_column_names": True,
"write_mode": "overwrite"
}
# Choose configuration based on environment
current_env = os.getenv("ENV", "development")
config = prod_config if current_env == "production" else dev_config
print(f"๐ง Using {config['name']} configuration")
# Build and run pipeline
pipeline = build_configurable_pipeline(config).build()
print("\n๐ Pipeline Structure:")
pipeline.plot_graph()
print("\n๐ Running pipeline...")
pipeline.run()
print("โ
Pipeline completed!")
print("\n๐ Results:")
spark.table(OUTPUT_TABLE).display()
Configuration-Driven Patterns
This approach offers several benefits:
- Reusability: One function handles multiple scenarios
- Maintainability: Configuration changes don't require code changes
- Testing: Easy to test different configurations
- Documentation: Configuration dictionaries are self-documenting
- Version Control: Store configurations separately from code
Example 4: Feature Flags for Optional Steps¶
Use feature flags to enable/disable specific transformations:
# Feature flags - could come from a feature flag service
FEATURE_FLAGS = {
"enable_data_quality_checks": True,
"enable_advanced_transforms": True,
"enable_audit_columns": True,
"enable_deduplication": False,
}
print("๐ฉ Feature Flags:")
for flag, enabled in FEATURE_FLAGS.items():
print(f" - {flag}: {'โ Enabled' if enabled else 'โ Disabled'}")
# Build pipeline with feature flags
builder = (
PipelineBuilder("Feature-Flagged Pipeline")
.read_catalog_table(
table_identifier=SOURCE_TABLE,
step_name="Load Data"
)
)
# Optional: Data quality checks
if FEATURE_FLAGS["enable_data_quality_checks"]:
print("\nโ Adding data quality checks")
builder = (
builder
.transform_filter(
condition="email IS NOT NULL AND email LIKE '%@%'",
step_name="Validate Email Format"
)
.transform_filter(
condition="customer_id IS NOT NULL",
step_name="Validate Required Fields"
)
)
# Optional: Advanced transformations
if FEATURE_FLAGS["enable_advanced_transforms"]:
print("โ Adding advanced transformations")
builder = (
builder
.transform_with_column(
column_name="full_name",
expression="concat(first_name, ' ', last_name)",
step_name="Create Full Name"
)
.transform_with_column(
column_name="email_domain",
expression="split(email, '@')[1]",
step_name="Extract Email Domain"
)
)
# Optional: Audit columns
if FEATURE_FLAGS["enable_audit_columns"]:
print("โ Adding audit columns")
builder = (
builder
.transform_with_column(
column_name="processed_at",
expression="current_timestamp()",
step_name="Add Processing Timestamp"
)
.transform_with_column(
column_name="processed_by",
expression="'dynamic_pipeline'",
step_name="Add Pipeline Name"
)
)
# Optional: Deduplication
if FEATURE_FLAGS["enable_deduplication"]:
print("โ Adding deduplication")
builder = builder.transform_deduplicate(
partition_by=["customer_id"],
order_by=["join_date"],
step_name="Remove Duplicates"
)
# Complete pipeline
pipeline = (
builder
.write_catalog_table(
table_identifier=OUTPUT_TABLE,
mode="overwrite",
step_name="Write Results"
)
.build()
)
print("\n๐ Pipeline Structure:")
pipeline.plot_graph()
print("\n๐ Running pipeline...")
pipeline.run()
print("โ
Pipeline completed!")
Feature Flag Best Practices
- Use feature flags for gradual rollouts of new transformations
- Test new logic in production with a subset of data
- Easy rollback by toggling a flag
- A/B testing different transformation approaches
- Environment-specific feature availability
Example 5: Conditional Branches Based on Data¶
Apply different logic based on data characteristics (requires checking data first):
# First, check the data characteristics
source_df = spark.table(SOURCE_TABLE)
row_count = source_df.count()
has_inactive = source_df.filter("status = 'inactive'").count() > 0
has_nulls = source_df.filter("email IS NULL OR phone IS NULL").count() > 0
print(f"๐ Data Analysis:")
print(f" - Total rows: {row_count}")
print(f" - Has inactive records: {has_inactive}")
print(f" - Has null values: {has_nulls}")
# Build pipeline based on data characteristics
builder = (
PipelineBuilder("Data-Driven Pipeline")
.read_catalog_table(
table_identifier=SOURCE_TABLE,
step_name="Load Data"
)
)
# Large dataset optimization
if row_count > 1000000:
print("\nโ Large dataset detected - adding optimization steps")
# In a real scenario, you might repartition or use broadcast joins
builder = builder.transform_filter(
condition="customer_id IS NOT NULL",
step_name="Early Filter for Performance"
)
# Handle inactive records if present
if has_inactive:
print("โ Inactive records detected - adding status filter")
builder = builder.transform_filter(
condition="status = 'active'",
step_name="Filter Active Records"
)
# Handle null values if present
if has_nulls:
print("โ Null values detected - adding null handling")
builder = (
builder
.transform_with_column(
column_name="email",
expression="coalesce(email, 'unknown@example.com')",
step_name="Handle Null Emails"
)
.transform_with_column(
column_name="phone",
expression="coalesce(phone, '000-0000')",
step_name="Handle Null Phones"
)
)
# Complete pipeline
pipeline = (
builder
.write_catalog_table(
table_identifier=OUTPUT_TABLE,
mode="overwrite",
step_name="Write Results"
)
.build()
)
pipeline.run()
Data-Dependent Logic Considerations
While powerful, checking data before building pipelines has trade-offs:
- Performance: Requires reading the data twice (once to check, once to process)
- Consistency: Data might change between check and execution
- Complexity: Harder to predict pipeline behavior
Use this pattern when: - Data characteristics vary significantly between runs - You need adaptive error handling - The cost of the extra read is acceptable
Step 2: Clean Up (Optional)¶
When you're done experimenting, you can remove the test tables:
# Remove the demo tables we created
print("๐งน Cleaning up demo tables...")
spark.sql(f"DROP TABLE IF EXISTS {SOURCE_TABLE}")
spark.sql(f"DROP TABLE IF EXISTS {OUTPUT_TABLE}")
print("โ
Demo tables removed successfully")
Best Practices for Dynamic Pipelines¶
1. Always Reassign the Builder¶
# โ Correct - reassign builder
builder = builder.transform_filter(condition="status = 'active'")
# โ Incorrect - builder not updated
builder.transform_filter(condition="status = 'active'")
2. Use Descriptive Step Names¶
# โ Good - clear what this step does
builder = builder.transform_filter(
condition=filter_condition,
step_name=f"Filter: {filter_description}"
)
# โ Poor - generic name
builder = builder.transform_filter(
condition=filter_condition,
step_name="Filter"
)
3. Log Pipeline Construction¶
# โ Good - visibility into what's being added
if MASK_PII:
print("Adding PII masking steps")
builder = builder.transform_with_column(...)
4. Validate Configuration¶
# โ Good - validate config before using
def build_configurable_pipeline(config: dict) -> PipelineBuilder:
required_keys = ["source_table", "target_table"]
for key in required_keys:
if key not in config:
raise ValueError(f"Missing required config key: {key}")
# ... build pipeline
5. Keep Core Logic Separate¶
# โ Good - optional steps are clearly separated
builder = (
PipelineBuilder()
.read_catalog_table(...) # Core step
)
if optional_feature:
builder = builder.transform_filter(...) # Optional step
builder = builder.write_catalog_table(...) # Core step
Common Patterns Summary¶
| Pattern | Use Case | Example |
|---|---|---|
| If/Else Branching | Environment-specific logic | Dev vs Prod transformations |
| Loops | Repeated operations | Apply same transform to multiple columns |
| Configuration Dictionaries | Externalized logic | JSON/YAML-driven pipelines |
| Feature Flags | Gradual rollouts | Enable/disable specific steps |
| Helper Functions | Reusable pipeline builders | Factory functions for common patterns |
| Data-Driven | Adaptive processing | Handle different data shapes |
Key Takeaways¶
- Python Power: The fluent API leverages Python's full programming capabilities
- Flexibility: Dynamically adapt pipelines to different scenarios
- Maintainability: Configuration-driven approaches reduce code duplication
- Testing: Easier to test different pipeline configurations
- Observability: Add logging to understand which steps are included
Need Help?¶
If you encounter any issues while following this tutorial, check out our comprehensive Common Issues guide which covers:
- Pipeline configuration errors
- Builder reassignment issues
- Configuration validation problems
- Installation and import problems
- Debugging dynamic pipeline construction
The guide includes step-by-step solutions and code examples to help you troubleshoot quickly.