Streaming Data from Unity Catalog¶
Getting Started¶
This guide shows you how to create a streaming data pipeline.
What You'll Build¶
By the end of this tutorial, you'll have a working streaming pipeline that:
- Monitors a source table for new data (like watching a folder for new files)
- Saves results to a target table without duplicating previously processed data
- Remembers where it left off so it only processes truly new data
What You'll Learn¶
- How to use checkpoints to avoid reprocessing the same data
- How to test streaming functionality by adding new records
Setup and Installation¶
Install Nessy and restart the Python kernel:
Step 1: Create Sample Data¶
First, let's create some initial data and set up the checkpoint location for streaming:
import os
from pyspark.sql import Row
# Replace these with your own database object names
CATALOG = "nessy_dev_catalog"
SCHEMA = "demo"
SOURCE_TABLE = "streaming_pipeline"
RESULT_TABLE = "streaming_pipeline_result"
SOURCE_TABLE_NAME = f"{CATALOG}.{SCHEMA}.{SOURCE_TABLE}"
RESULT_TABLE_NAME = f"{CATALOG}.{SCHEMA}.{RESULT_TABLE}"
# create the schema if it doesn't exist
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{SCHEMA}")
CHECKPOINT_LOCATION = "abfss://landing@icsdp04nessysa.dfs.core.windows.net/demo/checkpoints"
# Store configuration in variables that the pipeline can access later
os.environ["TABLE_SOURCE"] = SOURCE_TABLE_NAME
os.environ["TABLE_RESULT"] = RESULT_TABLE_NAME
os.environ["CHECKPOINT_LOCATION"] = CHECKPOINT_LOCATION
# Create initial sample data - we'll add more later to test streaming
initial_data = [
Row(id=1, name="Nessy", status="Mysterious", species="Lake Monster", location="Loch Ness"),
Row(id=2, name="Highland Kelpie", status="Legendary", species="Water Horse", location="Scottish Highlands"),
Row(id=3, name="Morag", status="Ancient", species="Sea Serpent", location="Loch Morar"),
# We'll add these records later to demonstrate streaming:
# Row(id=4, name="Each-uisge", status="Mythical", species="Water Spirit", location="Highland Lochs"),
# Row(id=5, name="Beithir", status="Dormant", species="Cave Dragon", location="Ben Vair")
]
# Convert the data into a table format
df = spark.createDataFrame(initial_data)
# Clean up any existing tables (optional)
spark.sql(f"DROP TABLE IF EXISTS {SOURCE_TABLE_NAME}")
spark.sql(f"DROP TABLE IF EXISTS {RESULT_TABLE_NAME}")
# Create the source table with initial data
try:
df.write.saveAsTable(SOURCE_TABLE_NAME)
print(f"โ Created source table '{SOURCE_TABLE_NAME}' with {df.count()} rows")
print("๐ Initial data:")
display(spark.table(SOURCE_TABLE_NAME))
except Exception as e:
print(f"โ Error creating source table: {e}")
Checkpoint Location
Important: The CHECKPOINT_LOCATION must point to an empty directory or
a location that doesn't exist yet. Checkpoints store information about what
data has already been processed. If you reuse a checkpoint location from a
previous run, it might cause issues.
Replace the checkpoint location with a path accessible in your environment.
Step 2: Define Your Streaming Pipeline¶
Now let's create the streaming pipeline configuration. The key difference from batch processing is the stream: true setting:
# Define the streaming pipeline as a text string using YAML format
yaml_pipeline = """
name: streaming_from_unity_catalog
steps:
read_source_table:
action: READ_CATALOG_TABLE
options:
table_identifier: {{env:TABLE_SOURCE}} # Get table name from the variable we set
stream: true # This enables streaming mode!
select_columns:
action: TRANSFORM_SELECT_COLUMNS
options:
include_columns:
- id
- name
- location
write_target_table:
action: WRITE_CATALOG_TABLE
options:
table_identifier: {{env:TABLE_RESULT}} # Save to the result table
mode: append # Always append in streaming (never overwrite)
checkpoint_location: {{env:CHECKPOINT_LOCATION}} # Track what's been processed
"""
name: streaming_from_unity_catalog
steps:
read_source_table:
action: READ_CATALOG_TABLE
options:
table_identifier: {{env:TABLE_SOURCE}} # Get table name from the variable we set
stream: true # This enables streaming mode!
select_columns:
action: TRANSFORM_SELECT_COLUMNS
options:
include_columns:
- id
- name
- location
write_target_table:
action: WRITE_CATALOG_TABLE
options:
table_identifier: {{env:TABLE_RESULT}} # Save to the result table
mode: append # Always append in streaming (never overwrite)
checkpoint_location: {{env:CHECKPOINT_LOCATION}} # Track what's been processed
Understanding Streaming Pipeline Steps
This streaming pipeline has 3 steps:
read_source_table: Checks the source table for new recordsstream: truetells it to check for changes instead of reading the whole table every time
select_columns: Keeps only the columns we want (id, name, location)- Drops the 'status' and 'species' columns from the output
write_target_table: Saves new processed records to the result tablemode: appendadds new data without removing existing datacheckpoint_locationremembers what data has already been processed
Step 3: Run Your Streaming Pipeline¶
Let's execute the pipeline and process the initial data:
from cloe_nessy.pipeline import PipelineParsingService
# Convert the YAML text into a runnable pipeline
print("๐ Setting up streaming pipeline...")
pipeline = PipelineParsingService.parse(yaml_str=yaml_pipeline)
# Optional: Create a visual diagram of your pipeline steps
print("๐ Creating pipeline diagram...")
pipeline.plot_graph()
# Run the pipeline - this processes the initial data and sets up streaming
print("๐ Running streaming pipeline...")
pipeline.run()
print("โ
Initial streaming completed!")
Step 4: Check Initial Results¶
Let's verify that the initial data was processed correctly:
# Load the result table and check what we got
print("๐ Checking initial results...")
result_df = spark.table(RESULT_TABLE_NAME)
row_count = result_df.count()
print(f"๐ฏ Result table contains {row_count} rows after initial streaming")
print(f"๐ Original table had 3 rows, result table has {row_count} rows")
# Show the actual data (should have only id, name, location columns)
print("\n๐ Here's what the processed data looks like:")
result_df.display()
Step 5: Test Streaming with New Data¶
Now let's test the streaming functionality by adding new records to the source table:
from pyspark.sql import Row
# Create additional data to test streaming
print("๐ฅ Adding new data to source table...")
new_data = [
Row(id=4, name="Each-uisge", status="Mythical", species="Water Spirit", location="Highland Lochs"),
Row(id=5, name="Beithir", status="Dormant", species="Cave Dragon", location="Ben Vair")
]
new_df = spark.createDataFrame(new_data)
# Add the new data to the source table
new_df.write.mode("append").saveAsTable(SOURCE_TABLE_NAME)
print(f"โ Added {new_df.count()} new rows to source table")
Rerun the pipeline to process the new data:
print("๐ Running pipeline again to process new data...")
pipeline.run()
print("โ
Streaming update completed!")
Check the updated results:
# Display the updated results
print("๐ Checking updated results...")
result_df = spark.table(RESULT_TABLE_NAME)
new_row_count = result_df.count()
print(f"๐ฏ Result table now contains {new_row_count} rows")
print(f"๐ Added {new_row_count - row_count} new rows from streaming")
# Show all the data
print("\n๐ Complete result table:")
result_df.display()
Expected Results:
- After the first run: 3 rows (from initial data)
- After adding new data and rerunning: 5 rows total
- Each row should only have 3 columns: id, name, location
How Checkpointing Works
Checkpoints are like bookmarks that remember what data has already been processed:
- First run: Processes all existing data and creates a checkpoint
- Subsequent runs: Only processes data added after the last checkpoint
- Location: Must be accessible by your Spark environment
- Reset: Delete the checkpoint directory to reprocess all data from the beginning
Step 6: Clean Up (Optional)¶
When you're done experimenting, you can remove the test tables:
# Remove the demo tables we created
print("๐งน Cleaning up demo tables...")
spark.sql(f"DROP TABLE IF EXISTS {SOURCE_TABLE_NAME}")
spark.sql(f"DROP TABLE IF EXISTS {RESULT_TABLE_NAME}")
print("โ
Demo tables removed successfully")
# Note: You may also want to clean up the checkpoint location if you're done testing
print("๐ก To reset streaming completely, we also delete the checkpoint directory")
print(f" Checkpoint location: {CHECKPOINT_LOCATION}")
dbutils.fs.rm(CHECKPOINT_LOCATION, recurse=True)
print("โ
Checkpoint directory removed successfully")
Need Help?¶
If you encounter any issues while following this streaming tutorial, check out our comprehensive Common Issues guide which covers:
- Streaming-specific problems like checkpoint issues
- Table and database access problems
- Environment variable configuration issues
- YAML parsing and pipeline configuration errors
The guide includes step-by-step solutions and code examples to help you troubleshoot quickly.