How to Use the Fluent API¶
Quick Start
The Fluent API provides an intuitive, chainable interface for building data pipelines. This guide covers everything you need to know to use it effectively.
Overview¶
The Fluent API uses the PipelineBuilder class to create pipelines through
method chaining. Understanding the lifecycle of a pipeline is crucial for
avoiding common mistakes.
The Pipeline Lifecycle¶
When working with the Fluent API, you'll work with three different object types at different stages:
graph LR
A[PipelineBuilder] -->|.build| B[Pipeline]
B -->|.run| C[PipelineContext]
style A fill:#e1f5ff
style B fill:#fff4e1
style C fill:#e8f5e9
1. PipelineBuilder - Before .build()¶
This is the composition phase where you add and configure steps.
What you can do:
- ✅ Add steps with
.read_*(),.transform_*(),.write_*() - ✅ Chain multiple operations
- ✅ Reuse and extend builders
- ✅ Add steps conditionally
Example:
from cloe_nessy.pipeline.pipeline_builder import PipelineBuilder
builder = (
PipelineBuilder("My Pipeline")
.read_catalog_table(table_identifier="catalog.schema.customers")
.transform_filter(condition="status = 'active'")
.transform_select(columns=["id", "name", "email"])
)
# builder is still a PipelineBuilder - you can keep adding steps
2. Pipeline - After .build()¶
Once you call .build(), you get an immutable Pipeline object.
What you can do:
- ✅ Execute with
.run(context) - ✅ Run multiple times with different contexts
- ✅ Inspect steps and metadata
What you CANNOT do:
- ❌ Add more steps (it's immutable)
- ❌ Modify existing steps
Example:
# Build the pipeline
pipeline = builder.build()
# Now you can execute it
context = PipelineContext()
result = pipeline.run(context)
3. PipelineContext - After .run()¶
The result of executing a pipeline is a PipelineContext containing your data and metadata.
What you can do:
- ✅ Access the resulting DataFrame with .data
- ✅ Access parameters with .params
- ✅ Chain into another pipeline
Example:
result = pipeline.run(context)
# Access the data
df = result.data
df.show()
# Chain into another pipeline
next_result = next_pipeline.run(result)
Critical Concept: When to Call .build()¶
Most Common Mistake: Calling .build() Too Early
If you want to reuse or extend a pipeline builder, DON'T call
.build() until you're completely done composing it.
Wrong - Building Too Early¶
# This creates an immutable Pipeline
base_pipeline = (
PipelineBuilder("Base")
.read_catalog_table(table_identifier="catalog.schema.customers")
.build() # ← Converts to Pipeline!
)
# This will FAIL - Pipeline has no .transform_filter() method
extended = base_pipeline.transform_filter(condition="age > 18") # ❌ AttributeError!
Correct - Keep It as a Builder¶
# Keep it as a PipelineBuilder (no .build() yet)
base_builder = (
PipelineBuilder("Base")
.read_catalog_table(table_identifier="catalog.schema.customers")
# Notice: NO .build() here!
)
# Now you can extend it
pipeline_a = base_builder.transform_filter(condition="age > 18").build()
pipeline_b = base_builder.transform_select(columns=["id", "name"]).build()
Common Patterns¶
Pattern 1: Simple Pipeline¶
Build and execute a straightforward pipeline:
from cloe_nessy.pipeline.pipeline_builder import PipelineBuilder
from cloe_nessy.pipeline.pipeline_context import PipelineContext
# Build the pipeline
pipeline = (
PipelineBuilder("Simple ETL")
.read_catalog_table(table_identifier="bronze.raw_customers")
.transform_filter(condition="status = 'active'")
.transform_select(columns=["id", "name", "email", "created_date"])
.write_catalog_table(table_identifier="silver.customers")
.build() # Call .build() when done
)
# Execute it
context = PipelineContext()
result = pipeline.run(context)
Pattern 2: Reusable Components¶
Create base pipelines that can be extended for different use cases:
# Define a reusable base (DON'T call .build())
base_customers = (
PipelineBuilder("Customer Base")
.read_catalog_table(table_identifier="bronze.raw_customers")
.transform_filter(condition="status = 'active'")
.transform_filter(condition="email IS NOT NULL")
)
# Extend it for different purposes
pipeline_names_only = (
base_customers
.transform_select(columns=["id", "name"])
.build()
)
pipeline_with_emails = (
base_customers
.transform_select(columns=["id", "name", "email"])
.transform_with_column(
column_name="email_domain",
expression="substring_index(email, '@', -1)"
)
.build()
)
pipeline_aggregated = (
base_customers
.transform_group_by(
group_by_columns=["country"],
agg_columns={"id": "count"}
)
.build()
)
Pattern 3: Dynamic Pipeline Construction¶
Build pipelines based on runtime conditions or configuration:
def build_customer_pipeline(config: dict):
"""Build a customer pipeline based on configuration."""
builder = (
PipelineBuilder("Dynamic Customer Pipeline")
.read_catalog_table(table_identifier=config["source_table"])
)
# Add filters based on config
if config.get("filter_active"):
builder = builder.transform_filter(condition="status = 'active'")
if config.get("filter_country"):
country = config["filter_country"]
builder = builder.transform_filter(condition=f"country = '{country}'")
# Add PII masking if required
if config.get("mask_pii"):
builder = builder.transform_with_column(
column_name="email",
expression="concat(substring(email, 1, 3), '***@', substring_index(email, '@', -1))"
)
# Select columns
if config.get("columns"):
builder = builder.transform_select(columns=config["columns"])
# Add output
builder = builder.write_catalog_table(table_identifier=config["target_table"])
return builder.build() # Build at the end
# Use it
config = {
"source_table": "bronze.customers",
"target_table": "silver.customers",
"filter_active": True,
"filter_country": "DE",
"mask_pii": True,
"columns": ["id", "name", "email", "country"]
}
pipeline = build_customer_pipeline(config)
result = pipeline.run(PipelineContext())
Pattern 4: Joining Pipelines¶
Create reusable data sources and join them:
# Define reusable data sources (no .build())
customers_pipeline = (
PipelineBuilder("Customers")
.read_catalog_table(table_identifier="bronze.customers")
.transform_filter(condition="status = 'active'")
)
orders_pipeline = (
PipelineBuilder("Orders")
.read_catalog_table(table_identifier="bronze.orders")
.transform_filter(condition="order_date >= '2024-01-01'")
)
# Join them together
joined_pipeline = (
customers_pipeline
.join_data(
joined_data=orders_pipeline,
join_condition="customers.id = orders.customer_id",
join_type="inner"
)
.transform_select(columns=[
"customers.id",
"customers.name",
"orders.order_id",
"orders.amount"
])
.build()
)
result = joined_pipeline.run(PipelineContext())
Common Pitfalls¶
Pitfall 1: Calling .build() on Reusable Components¶
# ❌ WRONG
base = PipelineBuilder("Base").read_catalog_table(...).build()
extended = base.transform_filter(...) # Error!
# ✅ CORRECT
base = PipelineBuilder("Base").read_catalog_table(...) # No .build()
extended = base.transform_filter(...).build()
Pitfall 2: Trying to Add Steps to a Pipeline¶
# ❌ WRONG
pipeline = builder.build()
pipeline.transform_filter(...) # Error! Pipeline is immutable
# ✅ CORRECT - add steps before building
builder = builder.transform_filter(...)
pipeline = builder.build()
Pitfall 3: Forgetting to Call .build() Before .run()¶
# ❌ WRONG
builder = PipelineBuilder("Test").read_catalog_table(...)
result = builder.run(context) # Error! Builder has no .run()
# ✅ CORRECT
pipeline = builder.build()
result = pipeline.run(context)
Pitfall 4: Mixing Up .data Access¶
# ❌ WRONG
builder = PipelineBuilder("Test").read_catalog_table(...)
df = builder.data # Error! Builder has no .data
# ✅ CORRECT
pipeline = builder.build()
result = pipeline.run(context)
df = result.data # PipelineContext has .data
Quick Reference¶
| Stage | Object | Add Steps? | Execute? | Access Data? |
|---|---|---|---|---|
| Building | PipelineBuilder |
✅ Yes | ❌ No | ❌ No |
| Built | Pipeline |
❌ No | ✅ Yes | ❌ No |
| Result | PipelineContext |
❌ No | ❌ No | ✅ Yes |
Key Methods:
- PipelineBuilder.build() → creates Pipeline
- Pipeline.run(context) → creates PipelineContext
- PipelineContext.data → access resulting DataFrame
Examples and Further Reading¶
- Basic Fluent API Example - Simple pipeline walkthrough
- Joining Data with Fluent API - Working with joins
- Dynamic Pipelines
- Common Issues - Troubleshooting guide