Working with Nessy Metadata¶
Getting Started¶
This step-by-step guide shows you how to create your first data pipeline that reads data from a database table, filters it, and saves the results to a new table using Nessy Metadata. The core concepts are similar to the previous example, but here we focus on how to utilize Nessy's metadata features.
Setup and Installation¶
Install Nessy and restart the Python kernel:
Step 1: Create Sample Data¶
First, let's create some test data to work with. This step creates a table with sample data about mythical creatures:
import os
from pyspark.sql import Row
# Replace these with your own database object names
CATALOG = "nessy_3_catalog"
SCHEMA = "demo"
SOURCE_TABLE = "metadata_example"
RESULT_TABLE = "metadata_example_result"
SOURCE_TABLE_NAME = f"{CATALOG}.{SCHEMA}.{SOURCE_TABLE}"
RESULT_TABLE_NAME = f"{CATALOG}.{SCHEMA}.{RESULT_TABLE}"
# create the schema if it doesn't exist
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{SCHEMA}")
# Store table names in variables that the pipeline can access later
# This makes it easy to use the same pipeline code in different environments
os.environ["TABLE_SOURCE"] = SOURCE_TABLE_NAME
os.environ["TABLE_RESULT"] = RESULT_TABLE_NAME
# Create sample data - each Row represents one record in your table
data = [
Row(id=1, name="Nessy", status="Mysterious", species="Lake Monster", location="Loch Ness"),
Row(id=2, name="Highland Kelpie", status="Legendary", species="Water Horse", location="Scottish Highlands"),
Row(id=3, name="Morag", status="Ancient", species="Sea Serpent", location="Loch Morar"),
Row(id=4, name="Each-uisge", status="Mythical", species="Water Spirit", location="Highland Lochs"),
Row(id=5, name="Beithir", status="Dormant", species="Cave Dragon", location="Ben Vair")
]
# Convert the data into a format that can be saved as a table
df = spark.createDataFrame(data)
# Clean up any existing tables with the same names (optional)
spark.sql(f"DROP TABLE IF EXISTS {SOURCE_TABLE_NAME}")
spark.sql(f"DROP TABLE IF EXISTS {RESULT_TABLE_NAME}")
# Save the sample data as a table in the database
try:
df.write.saveAsTable(SOURCE_TABLE_NAME)
print(f"✓ Created source table '{SOURCE_TABLE_NAME}' with {df.count()} rows")
spark.table(SOURCE_TABLE_NAME).show()
except Exception as e:
print(f"❌ Error creating source table: {e}")
Step 2: Create Metadata for the Target Table¶
Next, we define the metadata for the target table where the results will be saved. The full metadata reference can be found here.
Create a YAML file named mythical_creatures_metadata.yaml with the following content:
name: mythical_creatures
is_external: false
partition_by:
- "location"
liquid_clustering: true
data_source_format: "delta"
properties:
"delta.autoOptimize.optimizeWrite": "true"
"delta.enableDeletionVectors": "true"
constraints:
validIdConstraint:
expression: "(id > 0)"
description: "Ensures ID is a positive number"
statusConstraint:
expression: "(status IN ('Mysterious', 'Legendary', 'Ancient', 'Mythical', 'Dormant'))"
description: "Ensures status is one of the valid mythical creature statuses"
columns:
- name: "id"
data_type: "long"
nullable: false
description: "Unique identifier for the mythical creature"
- name: "name"
data_type: "string"
nullable: false
description: "Name of the mythical creature"
- name: "status"
data_type: "string"
nullable: false
description: "Current status of the creature (e.g., Mysterious, Legendary)"
- name: "species"
data_type: "string"
nullable: false
description: "Species classification of the creature"
- name: "location"
data_type: "string"
nullable: false
description: "Geographic location where the creature is found"
business_properties:
responsible_manager: "Dr. Mythical Creatures Research Team"
data_classification: "Public"
purpose: "Tracking and cataloging mythical creatures for research purposes"
Read Metadata
You can serialize the metadata to a Python object using the following code:
Step 3: Define the Data Pipeline¶
yaml_pipeline = """
name: metadata_pipeline
steps:
read_source_table:
action: READ_CATALOG_TABLE
options:
table_identifier: {{env:TABLE_SOURCE}} # Get table name from the variable we set
read_metadata:
action: READ_METADATA_YAML_ACTION
options:
file_path: "mythical_creatures_metadata.yaml" # Path to the metadata file
catalog_name: "nessy_3_catalog"
schema_name: "demo"
write_target_table:
action: WRITE_CATALOG_TABLE
# notice, how we use the metadata read in the previous step instead of
# passing the table identifier directly
options:
mode: append
"""
name: metadata_pipeline
steps:
read_source_table:
action: READ_CATALOG_TABLE
options:
table_identifier: {{env:TABLE_SOURCE}} # Get table name from the variable we set
read_metadata:
action: READ_METADATA_YAML_ACTION
options:
file_path: "mythical_creatures_metadata.yaml" # Path to the metadata file
catalog_name: "nessy_3_catalog"
schema_name: "demo"
write_target_table:
action: WRITE_CATALOG_TABLE
# notice, how we use the metadata read in the previous step instead of
# passing the table identifier directly
options:
mode: append
Step 4: Run Your Pipeline¶
Let's execute the pipeline:
from cloe_nessy.pipeline import PipelineParsingService
# Convert the YAML text into a runnable pipeline
print("📝 Setting up pipeline...")
pipeline = PipelineParsingService.parse(yaml_str=yaml_pipeline)
# Run the pipeline
pipeline.run()
Table Creation
The target table will be automatically created based on the metadata defined
in mythical_creatures_metadata.yaml if it does not already exist.
You can verify this by checking the logs:
2029-01-01 00:00:00,00 - Table [ 'nessy_3_catalog.demo.mythical_creatures' ] exists: False
2029-01-01 00:00:00,00 - Creating table: nessy_3_catalog.demo.mythical_creatures
Verify the table was created as expected by running: