Reading and Writing from Unity Catalog¶
Getting Started¶
This step-by-step guide shows you how to create your first data pipeline that reads data from a database table, filters it, and saves the results to a new table. Think of it like creating a recipe that automatically processes your data.
What You'll Build¶
By the end of this tutorial, you'll have a working pipeline that:
- Reads data from a source table (like opening a spreadsheet)
- Filters the data to keep only the rows you want (like using a filter in Excel)
- Saves the results to a new table for later use
What You'll Learn¶
- How to create sample data for testing
- How to set up configuration variables
- How to write a data pipeline using simple YAML format
- How to run your pipeline and check the results
Prerequisites¶
Before starting, make sure you have:
- Database Access: You can connect to Unity Catalog (Databricks' data storage system)
- Permissions: You can create new tables and read/write data in your assigned workspace
- Environment: Access to a Databricks notebook or similar Spark environment
Setup and Installation¶
Install Nessy and restart the Python kernel:
Step 1: Create Sample Data¶
First, let's create some test data to work with. This step creates a table with sample data about mythical creatures:
import os
from pyspark.sql import Row
# Replace these with your own database object names
CATALOG = "nessy_dev_catalog"
SCHEMA = "demo"
SOURCE_TABLE = "simple_pipeline"
RESULT_TABLE = "simple_pipeline_result"
SOURCE_TABLE_NAME = f"{CATALOG}.{SCHEMA}.{SOURCE_TABLE}"
RESULT_TABLE_NAME = f"{CATALOG}.{SCHEMA}.{RESULT_TABLE}"
# create the schema if it doesn't exist
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{SCHEMA}")
# Store table names in variables that the pipeline can access later
# This makes it easy to use the same pipeline code in different environments
os.environ["TABLE_SOURCE"] = SOURCE_TABLE_NAME
os.environ["TABLE_RESULT"] = RESULT_TABLE_NAME
# Create sample data - each Row represents one record in your table
data = [
Row(id=1, name="Nessy", status="Mysterious", species="Lake Monster", location="Loch Ness"),
Row(id=2, name="Highland Kelpie", status="Legendary", species="Water Horse", location="Scottish Highlands"),
Row(id=3, name="Morag", status="Ancient", species="Sea Serpent", location="Loch Morar"),
Row(id=4, name="Each-uisge", status="Mythical", species="Water Spirit", location="Highland Lochs"),
Row(id=5, name="Beithir", status="Dormant", species="Cave Dragon", location="Ben Vair")
]
# Convert the data into a format that can be saved as a table
df = spark.createDataFrame(data)
# Clean up any existing tables with the same names (optional)
spark.sql(f"DROP TABLE IF EXISTS {SOURCE_TABLE_NAME}")
spark.sql(f"DROP TABLE IF EXISTS {RESULT_TABLE_NAME}")
# Save the sample data as a table in the database
try:
df.write.saveAsTable(SOURCE_TABLE_NAME)
print(f"โ Created source table '{SOURCE_TABLE_NAME}' with {df.count()} rows")
spark.table(SOURCE_TABLE_NAME).show()
except Exception as e:
print(f"โ Error creating source table: {e}")
About Table Names and Variables
We store table names in environment variables (like TABLE_SOURCE). This allows you to:
- Use the same pipeline code in different environments (development, testing, production)
- Change table names without modifying the pipeline code
- Keep your configuration separate from your pipeline logic
You can also write table names directly in the pipeline if you prefer.
Step 2: Define Your Pipeline¶
Now let's create the pipeline configuration. Think of this as writing a recipe that tells the system exactly what steps to follow:
# Define the pipeline as a text string using YAML format
yaml_pipeline = """
name: read_and_write_from_unity_catalog
steps:
read_source_table:
action: READ_CATALOG_TABLE
options:
table_identifier: {{env:TABLE_SOURCE}} # Get table name from the variable we set earlier
filter_data:
action: TRANSFORM_FILTER
options:
condition: "status == 'Mysterious' or status == 'Legendary'" # Keep only these two types
write_target_table:
action: WRITE_CATALOG_TABLE
options:
table_identifier: {{env:TABLE_RESULT}} # Save to the result table name we set
mode: overwrite # Replace any existing data in the result table
"""
name: read_and_write_from_unity_catalog
steps:
read_source_table:
action: READ_CATALOG_TABLE
options:
table_identifier: {{env:TABLE_SOURCE}} # Get table name from the variable we set earlier
filter_data:
action: TRANSFORM_FILTER
options:
condition: "status == 'Mysterious' or status == 'Legendary'" # Keep only these two types
write_target_table:
action: WRITE_CATALOG_TABLE
options:
table_identifier: {{env:TABLE_RESULT}} # Save to the result table name we set
mode: overwrite # Replace any existing data in the result table
Understanding the Pipeline Steps
This pipeline has 3 simple steps:
read_source_table: Opens and reads all data from your source tablefilter_data: Keeps only rows where status is "Mysterious" OR "Legendary"write_target_table: Saves the filtered results to a new table
Mode options for saving:
- overwrite: Replace all existing data (like "Save As" in a document)
- append: Add new data to existing data (like adding rows to a spreadsheet)
- error: Stop if the table already exists
- ignore: Skip saving if the table already exists
Step 3: Run Your Pipeline¶
Now let's execute the pipeline and see it in action:
from cloe_nessy.pipeline import PipelineParsingService
# Convert the YAML text into a runnable pipeline
print("๐ Setting up pipeline...")
pipeline = PipelineParsingService.parse(yaml_str=yaml_pipeline)
# Optional: Create a visual diagram of your pipeline steps
print("๐ Creating pipeline diagram...")
pipeline.plot_graph()
# Run the pipeline - this does all the work!
print("๐ Running pipeline...")
pipeline.run()
print("โ
Pipeline completed successfully!")
Step 4: Check Your Results¶
Let's verify that the pipeline worked correctly and see the filtered data:
# Load the result table and check what we got
print("๐ Checking results...")
result_df = spark.table(RESULT_TABLE_NAME)
row_count = result_df.count()
print(f"๐ฏ Result table contains {row_count} rows after filtering")
print(f"๐ Original table had 5 rows, filtered table has {row_count} rows")
# Show the actual data
print("\n๐ Here's what the filtered data looks like:")
result_df.display()
Expected Results: You should see 2 rows in your result table: - Nessy (status: Mysterious) - Highland Kelpie (status: Legendary)
The other 3 creatures were filtered out because their status was "Ancient", "Mythical", or "Dormant".
Step 5: Clean Up (Optional)¶
When you're done experimenting, you can remove the test tables:
# Remove the demo tables we created
print("๐งน Cleaning up demo tables...")
spark.sql(f"DROP TABLE IF EXISTS {SOURCE_TABLE_NAME}")
spark.sql(f"DROP TABLE IF EXISTS {RESULT_TABLE_NAME}")
print("โ
Demo tables removed successfully")
Need Help?¶
If you encounter any issues while following this tutorial, check out our comprehensive Common Issues guide which covers:
- Table and database access problems
- Environment variable configuration issues
- YAML parsing and pipeline configuration errors
- Installation and import problems
- Data filtering and output issues
The guide includes step-by-step solutions and code examples to help you troubleshoot quickly.