Skip to content

Reading and Writing from Unity Catalog

Getting Started

This step-by-step guide shows you how to create your first data pipeline that reads data from a database table, filters it, and saves the results to a new table. Think of it like creating a recipe that automatically processes your data.

What You'll Build

By the end of this tutorial, you'll have a working pipeline that:

  1. Reads data from a source table (like opening a spreadsheet)
  2. Filters the data to keep only the rows you want (like using a filter in Excel)
  3. Saves the results to a new table for later use

What You'll Learn

  • How to create sample data for testing
  • How to set up configuration variables
  • How to write a data pipeline using simple YAML format
  • How to run your pipeline and check the results

Prerequisites

Before starting, make sure you have:

  • Database Access: You can connect to Unity Catalog (Databricks' data storage system)
  • Permissions: You can create new tables and read/write data in your assigned workspace
  • Environment: Access to a Databricks notebook or similar Spark environment

Setup and Installation

Install Nessy and restart the Python kernel:

%pip install cloe_nessy
%restart_python

Step 1: Create Sample Data

First, let's create some test data to work with. This step creates a table with sample data about mythical creatures:

import os
from pyspark.sql import Row

# Replace these with your own database object names
CATALOG = "nessy_dev_catalog"
SCHEMA = "demo"
SOURCE_TABLE = "simple_pipeline"
RESULT_TABLE = "simple_pipeline_result"

SOURCE_TABLE_NAME = f"{CATALOG}.{SCHEMA}.{SOURCE_TABLE}"
RESULT_TABLE_NAME = f"{CATALOG}.{SCHEMA}.{RESULT_TABLE}"

# create the schema if it doesn't exist
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{SCHEMA}")

# Store table names in variables that the pipeline can access later
# This makes it easy to use the same pipeline code in different environments
os.environ["TABLE_SOURCE"] = SOURCE_TABLE_NAME
os.environ["TABLE_RESULT"] = RESULT_TABLE_NAME

# Create sample data - each Row represents one record in your table
data = [
    Row(id=1, name="Nessy", status="Mysterious", species="Lake Monster", location="Loch Ness"),
    Row(id=2, name="Highland Kelpie", status="Legendary", species="Water Horse", location="Scottish Highlands"),
    Row(id=3, name="Morag", status="Ancient", species="Sea Serpent", location="Loch Morar"),
    Row(id=4, name="Each-uisge", status="Mythical", species="Water Spirit", location="Highland Lochs"),
    Row(id=5, name="Beithir", status="Dormant", species="Cave Dragon", location="Ben Vair")
]

# Convert the data into a format that can be saved as a table
df = spark.createDataFrame(data)

# Clean up any existing tables with the same names (optional)
spark.sql(f"DROP TABLE IF EXISTS {SOURCE_TABLE_NAME}")
spark.sql(f"DROP TABLE IF EXISTS {RESULT_TABLE_NAME}")

# Save the sample data as a table in the database
try:
    df.write.saveAsTable(SOURCE_TABLE_NAME)
    print(f"โœ“ Created source table '{SOURCE_TABLE_NAME}' with {df.count()} rows")
    spark.table(SOURCE_TABLE_NAME).show()
except Exception as e:
    print(f"โŒ Error creating source table: {e}")

About Table Names and Variables

We store table names in environment variables (like TABLE_SOURCE). This allows you to:

  • Use the same pipeline code in different environments (development, testing, production)
  • Change table names without modifying the pipeline code
  • Keep your configuration separate from your pipeline logic

You can also write table names directly in the pipeline if you prefer.

Step 2: Define Your Pipeline

Now let's create the pipeline configuration. Think of this as writing a recipe that tells the system exactly what steps to follow:

# Define the pipeline as a text string using YAML format
yaml_pipeline = """
name: read_and_write_from_unity_catalog
steps:
    read_source_table:
        action: READ_CATALOG_TABLE
        options:
            table_identifier: {{env:TABLE_SOURCE}}  # Get table name from the variable we set earlier

    filter_data:
        action: TRANSFORM_FILTER
        options:
            condition: "status == 'Mysterious' or status == 'Legendary'"  # Keep only these two types

    write_target_table:
        action: WRITE_CATALOG_TABLE
        options:
            table_identifier: {{env:TABLE_RESULT}}  # Save to the result table name we set
            mode: overwrite  # Replace any existing data in the result table
"""
name: read_and_write_from_unity_catalog
steps:
    read_source_table:
        action: READ_CATALOG_TABLE
        options:
            table_identifier: {{env:TABLE_SOURCE}}  # Get table name from the variable we set earlier

    filter_data:
        action: TRANSFORM_FILTER
        options:
            condition: "status == 'Mysterious' or status == 'Legendary'"  # Keep only these two types

    write_target_table:
        action: WRITE_CATALOG_TABLE
        options:
            table_identifier: {{env:TABLE_RESULT}}  # Save to the result table name we set
            mode: overwrite  # Replace any existing data in the result table

Understanding the Pipeline Steps

This pipeline has 3 simple steps:

  1. read_source_table: Opens and reads all data from your source table
  2. filter_data: Keeps only rows where status is "Mysterious" OR "Legendary"
  3. write_target_table: Saves the filtered results to a new table

Mode options for saving: - overwrite: Replace all existing data (like "Save As" in a document) - append: Add new data to existing data (like adding rows to a spreadsheet) - error: Stop if the table already exists - ignore: Skip saving if the table already exists

Step 3: Run Your Pipeline

Now let's execute the pipeline and see it in action:

from cloe_nessy.pipeline import PipelineParsingService

# Convert the YAML text into a runnable pipeline
print("๐Ÿ“ Setting up pipeline...")
pipeline = PipelineParsingService.parse(yaml_str=yaml_pipeline)

# Optional: Create a visual diagram of your pipeline steps
print("๐Ÿ“Š Creating pipeline diagram...")
pipeline.plot_graph()

# Run the pipeline - this does all the work!
print("๐Ÿš€ Running pipeline...")
pipeline.run()
print("โœ… Pipeline completed successfully!")

Step 4: Check Your Results

Let's verify that the pipeline worked correctly and see the filtered data:

# Load the result table and check what we got
print("๐Ÿ“‹ Checking results...")
result_df = spark.table(RESULT_TABLE_NAME)
row_count = result_df.count()

print(f"๐ŸŽฏ Result table contains {row_count} rows after filtering")
print(f"๐Ÿ“Š Original table had 5 rows, filtered table has {row_count} rows")

# Show the actual data
print("\n๐Ÿ“– Here's what the filtered data looks like:")
result_df.display()

Expected Results: You should see 2 rows in your result table: - Nessy (status: Mysterious) - Highland Kelpie (status: Legendary)

The other 3 creatures were filtered out because their status was "Ancient", "Mythical", or "Dormant".

Step 5: Clean Up (Optional)

When you're done experimenting, you can remove the test tables:

# Remove the demo tables we created
print("๐Ÿงน Cleaning up demo tables...")
spark.sql(f"DROP TABLE IF EXISTS {SOURCE_TABLE_NAME}")
spark.sql(f"DROP TABLE IF EXISTS {RESULT_TABLE_NAME}")
print("โœ… Demo tables removed successfully")

Need Help?

If you encounter any issues while following this tutorial, check out our comprehensive Common Issues guide which covers:

  • Table and database access problems
  • Environment variable configuration issues
  • YAML parsing and pipeline configuration errors
  • Installation and import problems
  • Data filtering and output issues

The guide includes step-by-step solutions and code examples to help you troubleshoot quickly.