Skip to content

Data Catalog & AI Applications

Use clpipe's metadata capabilities to build data catalogs, enable AI applications, and maintain data governance.

clpipe extracts and propagates metadata through your SQL pipeline, making it the foundation for data documentation, PII tracking, and AI-powered data discovery.


The Challenge

Modern data teams struggle with:

  • Undocumented columns: No one knows what xfr_amt_adj_v2 means
  • PII scattered everywhere: Sensitive data flows through unknown paths
  • Manual data catalogs: Documentation is outdated before it's published
  • AI hallucinations: LLMs make up table names when generating SQL
  • Ownership confusion: No clear owner for data issues

The Solution: Metadata Built Into Lineage

clpipe treats metadata as a first-class citizen:

  1. Inline metadata: Extract descriptions, PII flags, and ownership from SQL comments
  2. Programmatic metadata: Set metadata on columns directly
  3. Propagation: Metadata flows through lineage automatically
  4. Persistence: Save and load metadata across sessions
  5. Export: JSON export for integration with any tool

Use Case 1: Inline SQL Comment Metadata

Scenario: Document your data where it's defined—in the SQL itself.

Metadata Format

clpipe parses structured metadata from SQL comments:

SELECT
    user_id,           -- User identifier [pii: false]
    email,             -- Email address [pii: true, owner: data-team]
    SUM(amount) as total_revenue  /* Total revenue [tags: metric finance] */
FROM user_activity
GROUP BY user_id, email

Format: <description> [key: value, key2: value2, ...]

Supported keys:

Key Type Description
pii boolean Flag for personally identifiable information
owner string Team or person responsible for the data
tags space-separated Categorical tags for the column
Any custom key string Custom metadata fields

Extracting Inline Metadata

from clpipe import Pipeline

sql = """
SELECT
    customer_id,     -- Unique customer identifier [owner: platform-team]
    email,           -- Customer email address [pii: true, owner: data-team]
    phone_number,    -- Phone number [pii: true, tags: contact sensitive]
    total_orders,    -- Lifetime order count [tags: metric customer]
    lifetime_value   -- Customer LTV [tags: metric finance]
FROM customers
"""

pipeline = Pipeline([("customers", sql)], dialect="duckdb")

# Access extracted metadata
email_col = pipeline.columns["customers.email"]
print(f"Description: {email_col.description}")  # "Customer email address"
print(f"PII: {email_col.pii}")                  # True
print(f"Owner: {email_col.owner}")              # "data-team"

phone_col = pipeline.columns["customers.phone_number"]
print(f"Tags: {phone_col.tags}")                # {"contact", "sensitive"}

Use Case 2: Programmatic Metadata Assignment

Scenario: Add metadata to columns that don't have inline comments.

Direct Assignment

from clpipe import Pipeline

pipeline = Pipeline(queries, dialect="duckdb")

# Set metadata directly on columns
for col in pipeline.columns.values():
    if col.table_name == "raw_customers":
        if col.column_name == "email":
            col.pii = True
            col.owner = "data-team"
            col.description = "Customer email address"
        elif col.column_name == "phone_number":
            col.pii = True
            col.owner = "data-team"
            col.tags.add("contact")

# Bulk assignment for order_items table
order_item_metadata = {
    "quantity": ("operations", "Number of units ordered", {"metric"}),
    "unit_price": ("finance", "Price per unit", {"metric", "revenue"}),
    "line_total": ("finance", "Total line item amount", {"metric", "revenue"}),
}

for col in pipeline.columns.values():
    if col.table_name == "raw_order_items" and col.column_name in order_item_metadata:
        owner, description, tags = order_item_metadata[col.column_name]
        col.owner = owner
        col.description = description
        col.tags.update(tags)

Custom Metadata

# Add any custom metadata
col.custom_metadata["sensitivity"] = "high"
col.custom_metadata["retention_days"] = 365
col.custom_metadata["data_classification"] = "confidential"

Use Case 3: Metadata Propagation

Scenario: Automatically track PII and ownership through transformations.

How Propagation Works

When metadata propagates through lineage:

  • PII: If any source column is PII, the derived column is PII
  • Owner: First owner found in sources is inherited
  • Tags: Union of all source tags
from clpipe import Pipeline

pipeline = Pipeline(queries, dialect="duckdb")

# Check PII before propagation
pii_before = len(list(pipeline.get_pii_columns()))
print(f"PII columns before propagation: {pii_before}")

# Propagate metadata through lineage
pipeline.propagate_all_metadata()

# Check PII after propagation
pii_after = len(list(pipeline.get_pii_columns()))
print(f"PII columns after propagation: {pii_after}")
print(f"New PII columns discovered: {pii_after - pii_before}")

Example output:

PII columns before propagation: 4
PII columns after propagation: 12
New PII columns discovered: 8

The 4 source PII columns (email, phone_number, ip_address, shipping_address) propagate to 8 derived columns in staging and mart tables.


Use Case 4: PII Tracking and Compliance

Scenario: Generate a PII audit report for GDPR/CCPA compliance.

Finding All PII Columns

# Get all PII columns
pii_columns = list(pipeline.get_pii_columns())

print(f"Found {len(pii_columns)} PII columns:\n")

# Group by table for cleaner output
pii_by_table = {}
for col in pii_columns:
    if col.table_name not in pii_by_table:
        pii_by_table[col.table_name] = []
    pii_by_table[col.table_name].append(col.column_name)

for table in sorted(pii_by_table.keys()):
    columns = sorted(set(pii_by_table[table]))
    print(f"{table}:")
    for col_name in columns:
        print(f"  - {col_name}")

Tracing PII Flow

# Where does PII flow from source to marts?
print("Forward trace: Where does raw_customers.email go?")
impacts = pipeline.trace_column_forward("raw_customers", "email")

for impact in impacts:
    pii_flag = " [PII]" if impact.pii else ""
    print(f"  -> {impact.table_name}.{impact.column_name}{pii_flag}")

# Backward trace: Where did this PII originate?
print("\nBackward trace: Where did mart_customer_ltv.email come from?")
sources = pipeline.trace_column_backward("mart_customer_ltv", "email")

for source in sources:
    pii_flag = " [PII SOURCE]" if source.pii else ""
    print(f"  <- {source.table_name}.{source.column_name}{pii_flag}")

Use Case 5: Querying by Metadata

Scenario: Find columns by owner, tag, or other metadata.

By Owner

# Find all columns owned by the finance team
finance_columns = list(pipeline.get_columns_by_owner("finance"))

print(f"Columns owned by finance team: {len(finance_columns)}")
for col in finance_columns[:10]:
    print(f"  {col.table_name}.{col.column_name}")

By Tag

# Find all metric columns
metric_columns = list(pipeline.get_columns_by_tag("metric"))

print(f"Columns tagged as 'metric': {len(metric_columns)}")
for col in metric_columns[:10]:
    print(f"  {col.table_name}.{col.column_name}")

# Find revenue-related columns
revenue_columns = list(pipeline.get_columns_by_tag("revenue"))

All Owners and Tags

# Discover all owners in the pipeline
all_owners = set()
for col in pipeline.columns.values():
    if col.owner:
        all_owners.add(col.owner)

print(f"Owners: {sorted(all_owners)}")

# Discover all tags
all_tags = set()
for col in pipeline.columns.values():
    all_tags.update(col.tags)

print(f"Tags: {sorted(all_tags)}")

Use Case 6: LLM-Powered Description Generation

Scenario: Auto-generate descriptions for columns that don't have them.

Setup with Ollama (Local, Free)

from langchain_ollama import ChatOllama

# Connect to local Ollama
llm = ChatOllama(
    model="llama3:latest",  # or llama3.2, qwen3-coder:30b
    temperature=0.3,
)

pipeline.llm = llm

Setup with OpenAI

from langchain_openai import ChatOpenAI

llm = ChatOpenAI(
    model="gpt-4",
    temperature=0,
)

pipeline.llm = llm

Generate Descriptions

# Generate descriptions for all columns without them
pipeline.generate_all_descriptions()

# Or generate for specific columns
from clpipe.column import generate_description

for col in pipeline.columns.values():
    if not col.description and col.table_name.startswith("mart_"):
        generate_description(col, pipeline.llm, pipeline)
        print(f"{col.full_name}: {col.description}")

Why LLM-generated descriptions are good:

The LLM sees lineage context including: - Source columns and their types - Transformations (SUM, JOIN, CASE) - Filter conditions (WHERE clauses) - Aggregation logic (GROUP BY)

This produces descriptions that understand the data flow, not just the column name.


Use Case 7: AI-Powered Data Discovery (Text-to-SQL)

Scenario: Use the lineage graph to provide context for text-to-SQL queries.

The Problem with Traditional Text-to-SQL

LLMs often hallucinate table and column names when generating SQL:

User: "Show me total revenue by customer"
LLM: SELECT customer, SUM(revenue) FROM sales...

Problem: There's no "sales" table or "revenue" column!

The Solution: Graph-Grounded Context

import json

# Export the graph as context for the LLM
context = {
    "tables": list(pipeline.table_graph.tables.keys()),
    "columns_by_table": {},
}

for table_name in pipeline.table_graph.tables:
    context["columns_by_table"][table_name] = [
        col.column_name
        for col in pipeline.columns.values()
        if col.table_name == table_name
    ]

# Include in your LLM prompt
prompt = f"""
You are a SQL assistant. Generate SQL queries using ONLY the tables and columns
listed below. Never invent table or column names.

Available schema:
{json.dumps(context, indent=2)}

User question: Show me total revenue by customer
"""

# LLM now has real context and can't hallucinate

Rich Context with Metadata

# Include descriptions and relationships
rich_context = {
    "tables": {},
    "relationships": [],
}

for table_name, table in pipeline.table_graph.tables.items():
    rich_context["tables"][table_name] = {
        "columns": [],
        "is_source": table.is_source,
    }

    for col in pipeline.columns.values():
        if col.table_name == table_name:
            rich_context["tables"][table_name]["columns"].append({
                "name": col.column_name,
                "description": col.description or "",
                "pii": col.pii,
                "tags": list(col.tags),
            })

# Add lineage relationships
for edge in pipeline.edges:
    rich_context["relationships"].append({
        "from": edge.from_node.full_name,
        "to": edge.to_node.full_name,
        "type": edge.edge_type,
    })

Use Case 8: Persistence and Export

Scenario: Save metadata and share with external tools.

Save and Load Metadata

# Save metadata to file
pipeline.save("metadata.pkl")

# Later: Load metadata and apply to a fresh pipeline
loaded_metadata = Pipeline.load_metadata("metadata.pkl")

# Create new pipeline (e.g., from updated SQL files)
fresh_pipeline = Pipeline(queries, dialect="duckdb")

# Apply saved metadata
fresh_pipeline.apply_metadata(loaded_metadata)

print(f"Applied metadata for {len(loaded_metadata['columns'])} columns")

Export to JSON

import json

# Export full pipeline with metadata
json_data = pipeline.to_json(include_metadata=True)

print(f"Exported:")
print(f"  - {len(json_data['columns'])} columns")
print(f"  - {len(json_data['edges'])} edges")
print(f"  - {len(json_data['tables'])} tables")

# Save to file
with open("lineage.json", "w") as f:
    json.dump(json_data, f, indent=2)

# Sample column entry
sample_col = json_data["columns"][0]
print(f"\nSample column: {json.dumps(sample_col, indent=2)}")

Sample output:

{
  "table_name": "raw_customers",
  "column_name": "email",
  "pii": true,
  "owner": "data-team",
  "tags": ["contact"],
  "description": "Customer email address"
}

Integration with External Data Catalogs

# Export for DataHub, Atlan, or custom catalogs
def export_for_datahub(pipeline):
    """Generate DataHub-compatible metadata."""
    datasets = []

    for table_name in pipeline.table_graph.tables:
        fields = []
        for col in pipeline.columns.values():
            if col.table_name == table_name:
                fields.append({
                    "fieldPath": col.column_name,
                    "description": col.description or "",
                    "nativeDataType": "string",  # You could infer this
                    "glossaryTerms": list(col.tags),
                    "customProperties": {
                        "pii": str(col.pii),
                        "owner": col.owner or "",
                    }
                })

        datasets.append({
            "urn": f"urn:li:dataset:(urn:li:dataPlatform:bigquery,{table_name},PROD)",
            "fields": fields,
        })

    return datasets

Real-World Example Output

Running run_metadata.py produces output like:

================================================================================
E-Commerce Pipeline Metadata Management
================================================================================

Building pipeline...
  Built pipeline with 8 queries
  Found 127 columns

1. INLINE SQL COMMENT METADATA
--------------------------------------------------------------------------------
  Found 15 columns with inline metadata:
    raw_orders.order_id:
      description: Unique order identifier
      owner: platform-team
    raw_customers.email:
      description: Customer email address
      pii: true
      owner: data-team
    ...

3. METADATA PROPAGATION
--------------------------------------------------------------------------------
  PII columns before propagation: 4
  PII columns after propagation: 12
  New PII columns discovered: 8

5. QUERYING COLUMNS BY METADATA
--------------------------------------------------------------------------------
  5a. PII Columns
    raw_customers:
      - email
      - phone_number
    raw_orders:
      - ip_address
      - shipping_address
    stg_orders_enriched:
      - customer_email
      - ip_address
    mart_customer_ltv:
      - email

  5b. Columns by Owner
    data-team: 8 unique columns
    finance: 12 unique columns
    operations: 6 unique columns
    platform-team: 15 unique columns

================================================================================
METADATA SUMMARY
================================================================================
  Total columns: 127
  PII columns: 12 unique
  Owned columns: 41 unique
  Tagged columns: 23 unique

  Owners:
    - data-team: 8 columns
    - finance: 12 columns
    - operations: 6 columns
    - platform-team: 15 columns

  Tags:
    - contact: 4 columns
    - finance: 8 columns
    - metric: 15 columns
    - revenue: 6 columns

Try It Yourself

Run the metadata example:

cd clpipe
uv run python examples/sql_files/run_metadata.py

Key Benefits

Traditional Approach clpipe Metadata
Manual data catalog updates Auto-extracted from SQL
PII tracking spreadsheets Automatic propagation
Outdated documentation Always in sync with code
LLM hallucinations Graph-grounded context
Unknown data ownership Clear accountability

Next Steps