Building Pipelines with the Fluent API¶
New to the Fluent API?
For a comprehensive guide including the pipeline lifecycle, common patterns, and best practices, see How to Use the Fluent API.
Getting Started¶
This step-by-step guide shows you how to create data pipelines using the fluent API with method chaining. This approach allows you to build pipelines programmatically with an intuitive, chainable interface instead of writing YAML configurations.
What You'll Build¶
By the end of this tutorial, you'll have a working pipeline that:
- Reads data from CSV files in a volume
- Cleans column names to standardize formatting
- Filters the data to remove invalid records
- Selects specific columns to keep only what you need
- Saves the results to a Unity Catalog table
What You'll Learn¶
- How to create sample data for testing
- How to use the PipelineBuilder class for method chaining
- How to configure read, transform, and write operations programmatically
- How to visualize and run your pipeline
- How to verify the pipeline results
Prerequisites¶
Before starting, make sure you have:
- Database Access: You can connect to Unity Catalog (Databricks' data storage system)
- Permissions: You can create schemas, volumes, and tables in your workspace
- Environment: Access to a Databricks notebook or similar Spark environment
Setup and Installation¶
Install Nessy and restart the Python kernel:
Step 1: Create Sample Data¶
First, let's create some test data to work with. This step creates sample sales transaction data with intentionally invalid records that we'll filter out:
import os
from pyspark.sql import Row
# Configuration
CATALOG = "workspace"
SCHEMA = "demo"
VOLUME = "demo_data"
DATA_PATH = f"/Volumes/{CATALOG}/{SCHEMA}/{VOLUME}"
TARGET_TABLE = f"{CATALOG}.{SCHEMA}.sales_transactions"
# Create the schema & volume if it doesn't exist
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{SCHEMA}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {CATALOG}.{SCHEMA}.{VOLUME}")
# Create sample sales data with some invalid records
sales_data = [
Row(customer_id=1, product_name="Widget A", sale_date="2024-01-15", amount=150.50, quantity=2.0),
Row(customer_id=2, product_name="Gadget B", sale_date="2024-01-16", amount=0.0, quantity=0.0), # Invalid - zero amount
Row(customer_id=3, product_name="Doohickey C", sale_date="2024-01-17", amount=299.99, quantity=1.0),
Row(customer_id=4, product_name="Thingamajig D", sale_date=None, amount=75.25, quantity=3.0), # Invalid - missing date
Row(customer_id=5, product_name="Whatsit E", sale_date="2024-01-19", amount=425.00, quantity=5.0),
]
df = spark.createDataFrame(sales_data)
print(f"📊 Sample data has {df.count()} rows (2 are invalid)")
df.display()
# Save the sample data as CSV files
df.coalesce(1).write.mode("overwrite").option("header", "true").csv(DATA_PATH)
About Test Data
We intentionally included 2 invalid records in our sample data:
- One record with a zero amount and quantity
- One record with a missing sale date
Our pipeline will filter these out, showing you how data validation works in practice.
Step 2: Build Your Pipeline¶
Now let's create the pipeline using the fluent API. The PipelineBuilder class
allows you to chain method calls to define each step:
from cloe_nessy.pipeline import PipelineBuilder
# Build the pipeline using method chaining
pipeline = (
PipelineBuilder("Basic ETL Pipeline")
.read_files(
location=f"{DATA_PATH}",
extension="csv",
add_metadata_column=False,
step_name="Load Raw Sales Data",
options={
"header": True
}
)
.transform_clean_column_names()
.transform_filter(
condition="sale_date IS NOT NULL AND amount > 0",
step_name="Filter Valid Records"
)
.transform_select_columns(
include_columns=["customer_id", "product_name", "sale_date", "amount", "quantity"],
step_name="Select Relevant Columns",
)
.write_catalog_table(
table_identifier=TARGET_TABLE,
mode="overwrite",
step_name="Write to Production",
)
.build()
)
Understanding the Fluent API
The fluent API uses method chaining to build pipelines step by step:
PipelineBuilder("Basic ETL Pipeline"): Creates a new pipeline with a name.read_files(...): Reads CSV files from the specified volume location.transform_clean_column_names(): Standardizes column names (lowercase, underscores).transform_filter(...): Keeps only valid records based on your condition.transform_select_columns(...): Selects specific columns to include in the output.write_catalog_table(...): Saves results to a Unity Catalog table.build(): Finalizes and returns the executable pipeline object
Each method returns the builder, allowing you to chain the next method call.
Step 3: Visualize Your Pipeline¶
Before running the pipeline, you can create a visual diagram to see the flow of data through each step:
# Optional: Visualize the pipeline DAG
print("📊 Creating pipeline diagram...")
pipeline.plot_graph()
Step 4: Run Your Pipeline¶
Now let's execute the pipeline and see it in action:
# Run the pipeline
print("🚀 Running pipeline...")
pipeline.run()
print("✅ Pipeline completed successfully!")
Step 5: Check Your Results¶
Let's verify that the pipeline worked correctly and see the filtered data:
# Display the result table
spark.table(TARGET_TABLE).display()
# Check detailed results
print("📋 Checking results...")
result_df = spark.table(TARGET_TABLE)
row_count = result_df.count()
print(f"🎯 Result table contains {row_count} rows")
print(f"📊 Original: 5 rows → Filtered: {row_count} rows (2 invalid removed)")
# Display the cleaned data
print("\n📖 Cleaned and filtered data:")
result_df.show()
# Verify column names are cleaned
print("\n🏷️ Column names:", result_df.columns)
Expected Results: You should see 3 rows in your result table - the valid sales records where the sale date is not null and the amount is greater than zero. The column names should be standardized (lowercase with underscores).
Step 6: Clean Up (Optional)¶
When you're done experimenting, you can remove the test data:
import shutil
# Remove the demo data from the volume
shutil.rmtree(DATA_PATH, ignore_errors=True)
# Optionally drop the result table
# spark.sql(f"DROP TABLE IF EXISTS {TARGET_TABLE}")
Advantages of the Fluent API¶
The fluent API offers several benefits over YAML-based pipeline definitions:
- Type Safety: IDE autocomplete and type checking help catch errors early
- Programmatic Control: Use variables, loops, and conditionals to build dynamic pipelines
- Familiar Syntax: Python developers can use a natural, object-oriented approach
- Debugging: Easier to debug with standard Python tools and stack traces
- Flexibility: Combine with other Python code for advanced scenarios
Need Help?¶
If you encounter any issues while following this tutorial, check out our comprehensive Common Issues guide which covers:
- Table and database access problems
- File reading and volume access issues
- Pipeline configuration errors
- Installation and import problems
- Data filtering and output issues
The guide includes step-by-step solutions and code examples to help you troubleshoot quickly.