Skip to content

Working with Nessy Metadata

Getting Started

This step-by-step guide shows you how to create your first data pipeline that reads data from a database table, filters it, and saves the results to a new table using Nessy Metadata. The core concepts are similar to the previous example, but here we focus on how to utilize Nessy's metadata features.

Setup and Installation

Install Nessy and restart the Python kernel:

%pip install cloe_nessy
%restart_python

Step 1: Create Sample Data

First, let's create some test data to work with. This step creates a table with sample data about mythical creatures:

import os
from pyspark.sql import Row

# Replace these with your own database object names
CATALOG = "nessy_3_catalog"
SCHEMA = "demo"
SOURCE_TABLE = "metadata_example"
RESULT_TABLE = "metadata_example_result"

SOURCE_TABLE_NAME = f"{CATALOG}.{SCHEMA}.{SOURCE_TABLE}"
RESULT_TABLE_NAME = f"{CATALOG}.{SCHEMA}.{RESULT_TABLE}"

# create the schema if it doesn't exist
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{SCHEMA}")

# Store table names in variables that the pipeline can access later
# This makes it easy to use the same pipeline code in different environments
os.environ["TABLE_SOURCE"] = SOURCE_TABLE_NAME
os.environ["TABLE_RESULT"] = RESULT_TABLE_NAME

# Create sample data - each Row represents one record in your table
data = [
    Row(id=1, name="Nessy", status="Mysterious", species="Lake Monster", location="Loch Ness"),
    Row(id=2, name="Highland Kelpie", status="Legendary", species="Water Horse", location="Scottish Highlands"),
    Row(id=3, name="Morag", status="Ancient", species="Sea Serpent", location="Loch Morar"),
    Row(id=4, name="Each-uisge", status="Mythical", species="Water Spirit", location="Highland Lochs"),
    Row(id=5, name="Beithir", status="Dormant", species="Cave Dragon", location="Ben Vair")
]

# Convert the data into a format that can be saved as a table
df = spark.createDataFrame(data)

# Clean up any existing tables with the same names (optional)
spark.sql(f"DROP TABLE IF EXISTS {SOURCE_TABLE_NAME}")
spark.sql(f"DROP TABLE IF EXISTS {RESULT_TABLE_NAME}")

# Save the sample data as a table in the database
try:
    df.write.saveAsTable(SOURCE_TABLE_NAME)
    print(f"✓ Created source table '{SOURCE_TABLE_NAME}' with {df.count()} rows")
    spark.table(SOURCE_TABLE_NAME).show()
except Exception as e:
    print(f"❌ Error creating source table: {e}")

Step 2: Create Metadata for the Target Table

Next, we define the metadata for the target table where the results will be saved. The full metadata reference can be found here.

Create a YAML file named mythical_creatures_metadata.yaml with the following content:

name: mythical_creatures
is_external: false
partition_by:
    - "location"
liquid_clustering: true
data_source_format: "delta"
properties:
    "delta.autoOptimize.optimizeWrite": "true"
    "delta.enableDeletionVectors": "true"
constraints:
  validIdConstraint:
      expression: "(id > 0)"
      description: "Ensures ID is a positive number"
  statusConstraint:
      expression: "(status IN ('Mysterious', 'Legendary', 'Ancient', 'Mythical', 'Dormant'))"
      description: "Ensures status is one of the valid mythical creature statuses"
columns:
    - name: "id"
      data_type: "long"
      nullable: false
      description: "Unique identifier for the mythical creature"
    - name: "name"
      data_type: "string"
      nullable: false
      description: "Name of the mythical creature"
    - name: "status"
      data_type: "string"
      nullable: false
      description: "Current status of the creature (e.g., Mysterious, Legendary)"
    - name: "species"
      data_type: "string"
      nullable: false
      description: "Species classification of the creature"
    - name: "location"
      data_type: "string"
      nullable: false
      description: "Geographic location where the creature is found"
business_properties:
  responsible_manager: "Dr. Mythical Creatures Research Team"
  data_classification: "Public"
  purpose: "Tracking and cataloging mythical creatures for research purposes"

Read Metadata

You can serialize the metadata to a Python object using the following code:

from cloe_nessy.models import Table

table, errors = Table.read_instance_from_file(
    "mythical_creatures_metadata.yaml",
    catalog_name=CATALOG,
    schema_name=SCHEMA
)

Step 3: Define the Data Pipeline

yaml_pipeline = """
name: metadata_pipeline
steps:
    read_source_table:
        action: READ_CATALOG_TABLE
        options:
            table_identifier: {{env:TABLE_SOURCE}}  # Get table name from the variable we set

    read_metadata:
        action: READ_METADATA_YAML_ACTION
        options:
            file_path: "mythical_creatures_metadata.yaml"  # Path to the metadata file
            catalog_name: "nessy_3_catalog"
            schema_name: "demo"

    write_target_table:
        action: WRITE_CATALOG_TABLE
        # notice, how we use the metadata read in the previous step instead of
        # passing the table identifier directly
        options:
            mode: append
"""
name: metadata_pipeline
steps:
    read_source_table:
        action: READ_CATALOG_TABLE
        options:
            table_identifier: {{env:TABLE_SOURCE}}  # Get table name from the variable we set

    read_metadata:
        action: READ_METADATA_YAML_ACTION
        options:
            file_path: "mythical_creatures_metadata.yaml"  # Path to the metadata file
            catalog_name: "nessy_3_catalog"
            schema_name: "demo"

    write_target_table:
        action: WRITE_CATALOG_TABLE
        # notice, how we use the metadata read in the previous step instead of
        # passing the table identifier directly
        options:
            mode: append

Step 4: Run Your Pipeline

Let's execute the pipeline:

from cloe_nessy.pipeline import PipelineParsingService

# Convert the YAML text into a runnable pipeline
print("📝 Setting up pipeline...")
pipeline = PipelineParsingService.parse(yaml_str=yaml_pipeline)

# Run the pipeline
pipeline.run()

Table Creation

The target table will be automatically created based on the metadata defined in mythical_creatures_metadata.yaml if it does not already exist.

You can verify this by checking the logs:

2029-01-01 00:00:00,00 - Table [ 'nessy_3_catalog.demo.mythical_creatures' ] exists: False
2029-01-01 00:00:00,00 - Creating table: nessy_3_catalog.demo.mythical_creatures

Verify the table was created as expected by running:

display(spark.sql(f"DESCRIBE DETAIL {RESULT_TABLE_NAME}"))