Skip to content

Streaming Data from Unity Catalog

Getting Started

This guide shows you how to create a streaming data pipeline.

What You'll Build

By the end of this tutorial, you'll have a working streaming pipeline that:

  1. Monitors a source table for new data (like watching a folder for new files)
  2. Saves results to a target table without duplicating previously processed data
  3. Remembers where it left off so it only processes truly new data

What You'll Learn

  • How to use checkpoints to avoid reprocessing the same data
  • How to test streaming functionality by adding new records

Setup and Installation

Install Nessy and restart the Python kernel:

%pip install cloe_nessy
%restart_python

Step 1: Create Sample Data

First, let's create some initial data and set up the checkpoint location for streaming:

import os
from pyspark.sql import Row

# Replace these with your own database object names
CATALOG = "nessy_dev_catalog"
SCHEMA = "demo"
SOURCE_TABLE = "streaming_pipeline"
RESULT_TABLE = "streaming_pipeline_result"

SOURCE_TABLE_NAME = f"{CATALOG}.{SCHEMA}.{SOURCE_TABLE}"
RESULT_TABLE_NAME = f"{CATALOG}.{SCHEMA}.{RESULT_TABLE}"

# create the schema if it doesn't exist
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{SCHEMA}")

CHECKPOINT_LOCATION = "abfss://landing@icsdp04nessysa.dfs.core.windows.net/demo/checkpoints"

# Store configuration in variables that the pipeline can access later
os.environ["TABLE_SOURCE"] = SOURCE_TABLE_NAME
os.environ["TABLE_RESULT"] = RESULT_TABLE_NAME
os.environ["CHECKPOINT_LOCATION"] = CHECKPOINT_LOCATION

# Create initial sample data - we'll add more later to test streaming
initial_data = [
    Row(id=1, name="Nessy", status="Mysterious", species="Lake Monster", location="Loch Ness"),
    Row(id=2, name="Highland Kelpie", status="Legendary", species="Water Horse", location="Scottish Highlands"),
    Row(id=3, name="Morag", status="Ancient", species="Sea Serpent", location="Loch Morar"),
    # We'll add these records later to demonstrate streaming:
    # Row(id=4, name="Each-uisge", status="Mythical", species="Water Spirit", location="Highland Lochs"),
    # Row(id=5, name="Beithir", status="Dormant", species="Cave Dragon", location="Ben Vair")
]

# Convert the data into a table format
df = spark.createDataFrame(initial_data)

# Clean up any existing tables (optional)
spark.sql(f"DROP TABLE IF EXISTS {SOURCE_TABLE_NAME}")
spark.sql(f"DROP TABLE IF EXISTS {RESULT_TABLE_NAME}")

# Create the source table with initial data
try:
    df.write.saveAsTable(SOURCE_TABLE_NAME)
    print(f"โœ“ Created source table '{SOURCE_TABLE_NAME}' with {df.count()} rows")
    print("๐Ÿ“Š Initial data:")
    display(spark.table(SOURCE_TABLE_NAME))
except Exception as e:
    print(f"โŒ Error creating source table: {e}")

Checkpoint Location

Important: The CHECKPOINT_LOCATION must point to an empty directory or a location that doesn't exist yet. Checkpoints store information about what data has already been processed. If you reuse a checkpoint location from a previous run, it might cause issues.

Replace the checkpoint location with a path accessible in your environment.

Step 2: Define Your Streaming Pipeline

Now let's create the streaming pipeline configuration. The key difference from batch processing is the stream: true setting:

# Define the streaming pipeline as a text string using YAML format
yaml_pipeline = """
name: streaming_from_unity_catalog
steps:
    read_source_table:
        action: READ_CATALOG_TABLE
        options:
            table_identifier: {{env:TABLE_SOURCE}}  # Get table name from the variable we set
            stream: true  # This enables streaming mode!

    select_columns:
        action: TRANSFORM_SELECT_COLUMNS
        options:
            include_columns:
                - id
                - name
                - location

    write_target_table:
        action: WRITE_CATALOG_TABLE
        options:
            table_identifier: {{env:TABLE_RESULT}}  # Save to the result table
            mode: append  # Always append in streaming (never overwrite)
            checkpoint_location: {{env:CHECKPOINT_LOCATION}}  # Track what's been processed
"""
name: streaming_from_unity_catalog
steps:
    read_source_table:
        action: READ_CATALOG_TABLE
        options:
            table_identifier: {{env:TABLE_SOURCE}}  # Get table name from the variable we set
            stream: true  # This enables streaming mode!

    select_columns:
        action: TRANSFORM_SELECT_COLUMNS
        options:
            include_columns:
                - id
                - name
                - location

    write_target_table:
        action: WRITE_CATALOG_TABLE
        options:
            table_identifier: {{env:TABLE_RESULT}}  # Save to the result table
            mode: append  # Always append in streaming (never overwrite)
            checkpoint_location: {{env:CHECKPOINT_LOCATION}}  # Track what's been processed

Understanding Streaming Pipeline Steps

This streaming pipeline has 3 steps:

  1. read_source_table: Checks the source table for new records
    • stream: true tells it to check for changes instead of reading the whole table every time
  2. select_columns: Keeps only the columns we want (id, name, location)
    • Drops the 'status' and 'species' columns from the output
  3. write_target_table: Saves new processed records to the result table
    • mode: append adds new data without removing existing data
    • checkpoint_location remembers what data has already been processed

Step 3: Run Your Streaming Pipeline

Let's execute the pipeline and process the initial data:

from cloe_nessy.pipeline import PipelineParsingService

# Convert the YAML text into a runnable pipeline
print("๐Ÿ“ Setting up streaming pipeline...")
pipeline = PipelineParsingService.parse(yaml_str=yaml_pipeline)

# Optional: Create a visual diagram of your pipeline steps
print("๐Ÿ“Š Creating pipeline diagram...")
pipeline.plot_graph()

# Run the pipeline - this processes the initial data and sets up streaming
print("๐Ÿš€ Running streaming pipeline...")
pipeline.run()
print("โœ… Initial streaming completed!")

Step 4: Check Initial Results

Let's verify that the initial data was processed correctly:

# Load the result table and check what we got
print("๐Ÿ“‹ Checking initial results...")
result_df = spark.table(RESULT_TABLE_NAME)
row_count = result_df.count()

print(f"๐ŸŽฏ Result table contains {row_count} rows after initial streaming")
print(f"๐Ÿ“Š Original table had 3 rows, result table has {row_count} rows")

# Show the actual data (should have only id, name, location columns)
print("\n๐Ÿ“– Here's what the processed data looks like:")
result_df.display()

Step 5: Test Streaming with New Data

Now let's test the streaming functionality by adding new records to the source table:

from pyspark.sql import Row

# Create additional data to test streaming
print("๐Ÿ“ฅ Adding new data to source table...")
new_data = [
    Row(id=4, name="Each-uisge", status="Mythical", species="Water Spirit", location="Highland Lochs"),
    Row(id=5, name="Beithir", status="Dormant", species="Cave Dragon", location="Ben Vair")
]

new_df = spark.createDataFrame(new_data)

# Add the new data to the source table
new_df.write.mode("append").saveAsTable(SOURCE_TABLE_NAME)
print(f"โœ“ Added {new_df.count()} new rows to source table")

Rerun the pipeline to process the new data:

print("๐Ÿ”„ Running pipeline again to process new data...")
pipeline.run()
print("โœ… Streaming update completed!")

Check the updated results:

# Display the updated results
print("๐Ÿ“‹ Checking updated results...")
result_df = spark.table(RESULT_TABLE_NAME)
new_row_count = result_df.count()

print(f"๐ŸŽฏ Result table now contains {new_row_count} rows")
print(f"๐Ÿ“ˆ Added {new_row_count - row_count} new rows from streaming")

# Show all the data
print("\n๐Ÿ“– Complete result table:")
result_df.display()

Expected Results:

  • After the first run: 3 rows (from initial data)
  • After adding new data and rerunning: 5 rows total
  • Each row should only have 3 columns: id, name, location

How Checkpointing Works

Checkpoints are like bookmarks that remember what data has already been processed:

  • First run: Processes all existing data and creates a checkpoint
  • Subsequent runs: Only processes data added after the last checkpoint
  • Location: Must be accessible by your Spark environment
  • Reset: Delete the checkpoint directory to reprocess all data from the beginning

Step 6: Clean Up (Optional)

When you're done experimenting, you can remove the test tables:

# Remove the demo tables we created
print("๐Ÿงน Cleaning up demo tables...")
spark.sql(f"DROP TABLE IF EXISTS {SOURCE_TABLE_NAME}")
spark.sql(f"DROP TABLE IF EXISTS {RESULT_TABLE_NAME}")
print("โœ… Demo tables removed successfully")

# Note: You may also want to clean up the checkpoint location if you're done testing
print("๐Ÿ’ก To reset streaming completely, we also delete the checkpoint directory")
print(f"   Checkpoint location: {CHECKPOINT_LOCATION}")
dbutils.fs.rm(CHECKPOINT_LOCATION, recurse=True)
print("โœ… Checkpoint directory removed successfully")

Need Help?

If you encounter any issues while following this streaming tutorial, check out our comprehensive Common Issues guide which covers:

  • Streaming-specific problems like checkpoint issues
  • Table and database access problems
  • Environment variable configuration issues
  • YAML parsing and pipeline configuration errors

The guide includes step-by-step solutions and code examples to help you troubleshoot quickly.