Skip to content

How to Use the Fluent API

Quick Start

The Fluent API provides an intuitive, chainable interface for building data pipelines. This guide covers everything you need to know to use it effectively.

Overview

The Fluent API uses the PipelineBuilder class to create pipelines through method chaining. Understanding the lifecycle of a pipeline is crucial for avoiding common mistakes.


The Pipeline Lifecycle

When working with the Fluent API, you'll work with three different object types at different stages:

graph LR
    A[PipelineBuilder] -->|.build| B[Pipeline]
    B -->|.run| C[PipelineContext]
    style A fill:#e1f5ff
    style B fill:#fff4e1
    style C fill:#e8f5e9

1. PipelineBuilder - Before .build()

This is the composition phase where you add and configure steps.

What you can do:

  • ✅ Add steps with .read_*(), .transform_*(), .write_*()
  • ✅ Chain multiple operations
  • ✅ Reuse and extend builders
  • ✅ Add steps conditionally

Example:

from cloe_nessy.pipeline.pipeline_builder import PipelineBuilder

builder = (
    PipelineBuilder("My Pipeline")
    .read_catalog_table(table_identifier="catalog.schema.customers")
    .transform_filter(condition="status = 'active'")
    .transform_select(columns=["id", "name", "email"])
)
# builder is still a PipelineBuilder - you can keep adding steps

2. Pipeline - After .build()

Once you call .build(), you get an immutable Pipeline object.

What you can do:

  • ✅ Execute with .run(context)
  • ✅ Run multiple times with different contexts
  • ✅ Inspect steps and metadata

What you CANNOT do:

  • ❌ Add more steps (it's immutable)
  • ❌ Modify existing steps

Example:

# Build the pipeline
pipeline = builder.build()

# Now you can execute it
context = PipelineContext()
result = pipeline.run(context)

3. PipelineContext - After .run()

The result of executing a pipeline is a PipelineContext containing your data and metadata.

What you can do: - ✅ Access the resulting DataFrame with .data - ✅ Access parameters with .params - ✅ Chain into another pipeline

Example:

result = pipeline.run(context)

# Access the data
df = result.data
df.show()

# Chain into another pipeline
next_result = next_pipeline.run(result)


Critical Concept: When to Call .build()

Most Common Mistake: Calling .build() Too Early

If you want to reuse or extend a pipeline builder, DON'T call .build() until you're completely done composing it.

Wrong - Building Too Early

# This creates an immutable Pipeline
base_pipeline = (
    PipelineBuilder("Base")
    .read_catalog_table(table_identifier="catalog.schema.customers")
    .build()  # ← Converts to Pipeline!
)

# This will FAIL - Pipeline has no .transform_filter() method
extended = base_pipeline.transform_filter(condition="age > 18")  # ❌ AttributeError!

Correct - Keep It as a Builder

# Keep it as a PipelineBuilder (no .build() yet)
base_builder = (
    PipelineBuilder("Base")
    .read_catalog_table(table_identifier="catalog.schema.customers")
    # Notice: NO .build() here!
)

# Now you can extend it
pipeline_a = base_builder.transform_filter(condition="age > 18").build()
pipeline_b = base_builder.transform_select(columns=["id", "name"]).build()

Common Patterns

Pattern 1: Simple Pipeline

Build and execute a straightforward pipeline:

from cloe_nessy.pipeline.pipeline_builder import PipelineBuilder
from cloe_nessy.pipeline.pipeline_context import PipelineContext

# Build the pipeline
pipeline = (
    PipelineBuilder("Simple ETL")
    .read_catalog_table(table_identifier="bronze.raw_customers")
    .transform_filter(condition="status = 'active'")
    .transform_select(columns=["id", "name", "email", "created_date"])
    .write_catalog_table(table_identifier="silver.customers")
    .build()  # Call .build() when done
)

# Execute it
context = PipelineContext()
result = pipeline.run(context)

Pattern 2: Reusable Components

Create base pipelines that can be extended for different use cases:

# Define a reusable base (DON'T call .build())
base_customers = (
    PipelineBuilder("Customer Base")
    .read_catalog_table(table_identifier="bronze.raw_customers")
    .transform_filter(condition="status = 'active'")
    .transform_filter(condition="email IS NOT NULL")
)

# Extend it for different purposes
pipeline_names_only = (
    base_customers
    .transform_select(columns=["id", "name"])
    .build()
)

pipeline_with_emails = (
    base_customers
    .transform_select(columns=["id", "name", "email"])
    .transform_with_column(
        column_name="email_domain",
        expression="substring_index(email, '@', -1)"
    )
    .build()
)

pipeline_aggregated = (
    base_customers
    .transform_group_by(
        group_by_columns=["country"],
        agg_columns={"id": "count"}
    )
    .build()
)

Pattern 3: Dynamic Pipeline Construction

Build pipelines based on runtime conditions or configuration:

def build_customer_pipeline(config: dict):
    """Build a customer pipeline based on configuration."""
    builder = (
        PipelineBuilder("Dynamic Customer Pipeline")
        .read_catalog_table(table_identifier=config["source_table"])
    )

    # Add filters based on config
    if config.get("filter_active"):
        builder = builder.transform_filter(condition="status = 'active'")

    if config.get("filter_country"):
        country = config["filter_country"]
        builder = builder.transform_filter(condition=f"country = '{country}'")

    # Add PII masking if required
    if config.get("mask_pii"):
        builder = builder.transform_with_column(
            column_name="email",
            expression="concat(substring(email, 1, 3), '***@', substring_index(email, '@', -1))"
        )

    # Select columns
    if config.get("columns"):
        builder = builder.transform_select(columns=config["columns"])

    # Add output
    builder = builder.write_catalog_table(table_identifier=config["target_table"])

    return builder.build()  # Build at the end

# Use it
config = {
    "source_table": "bronze.customers",
    "target_table": "silver.customers",
    "filter_active": True,
    "filter_country": "DE",
    "mask_pii": True,
    "columns": ["id", "name", "email", "country"]
}

pipeline = build_customer_pipeline(config)
result = pipeline.run(PipelineContext())

Pattern 4: Joining Pipelines

Create reusable data sources and join them:

# Define reusable data sources (no .build())
customers_pipeline = (
    PipelineBuilder("Customers")
    .read_catalog_table(table_identifier="bronze.customers")
    .transform_filter(condition="status = 'active'")
)

orders_pipeline = (
    PipelineBuilder("Orders")
    .read_catalog_table(table_identifier="bronze.orders")
    .transform_filter(condition="order_date >= '2024-01-01'")
)

# Join them together
joined_pipeline = (
    customers_pipeline
    .join_data(
        joined_data=orders_pipeline,
        join_condition="customers.id = orders.customer_id",
        join_type="inner"
    )
    .transform_select(columns=[
        "customers.id",
        "customers.name",
        "orders.order_id",
        "orders.amount"
    ])
    .build()
)

result = joined_pipeline.run(PipelineContext())

Common Pitfalls

Pitfall 1: Calling .build() on Reusable Components

# ❌ WRONG
base = PipelineBuilder("Base").read_catalog_table(...).build()
extended = base.transform_filter(...)  # Error!

# ✅ CORRECT
base = PipelineBuilder("Base").read_catalog_table(...)  # No .build()
extended = base.transform_filter(...).build()

Pitfall 2: Trying to Add Steps to a Pipeline

# ❌ WRONG
pipeline = builder.build()
pipeline.transform_filter(...)  # Error! Pipeline is immutable

# ✅ CORRECT - add steps before building
builder = builder.transform_filter(...)
pipeline = builder.build()

Pitfall 3: Forgetting to Call .build() Before .run()

# ❌ WRONG
builder = PipelineBuilder("Test").read_catalog_table(...)
result = builder.run(context)  # Error! Builder has no .run()

# ✅ CORRECT
pipeline = builder.build()
result = pipeline.run(context)

Pitfall 4: Mixing Up .data Access

# ❌ WRONG
builder = PipelineBuilder("Test").read_catalog_table(...)
df = builder.data  # Error! Builder has no .data

# ✅ CORRECT
pipeline = builder.build()
result = pipeline.run(context)
df = result.data  # PipelineContext has .data

Quick Reference

Stage Object Add Steps? Execute? Access Data?
Building PipelineBuilder ✅ Yes ❌ No ❌ No
Built Pipeline ❌ No ✅ Yes ❌ No
Result PipelineContext ❌ No ❌ No ✅ Yes

Key Methods: - PipelineBuilder.build() → creates Pipeline - Pipeline.run(context) → creates PipelineContext - PipelineContext.data → access resulting DataFrame


Examples and Further Reading