Skip to content

Examples

Metadata from SQL Comments

Automatically extract metadata from inline SQL comments:

from clpipe import Pipeline

sql = """
SELECT
  user_id,  -- User identifier [pii: false]
  email,    -- Email address [pii: true, owner: data-team]
  SUM(revenue) as total_revenue  /* Total revenue [tags: metric finance] */
FROM user_activity
GROUP BY user_id, email
"""

pipeline = Pipeline([("user_metrics", sql)], dialect="bigquery")

# Access metadata
email_col = pipeline.columns["user_metrics.email"]
print(email_col.description)  # "Email address"
print(email_col.pii)          # True
print(email_col.owner)        # "data-team"

# Find PII columns
pii_columns = pipeline.get_pii_columns()
for col in pii_columns:
    print(f"PII: {col.full_name} - {col.description}")

# Find columns by tag
metrics = pipeline.get_columns_by_tag("metric")
for col in metrics:
    print(f"Metric: {col.full_name} - {col.description}")

Learn more: Metadata from Comments


More Examples

Real-world use cases and patterns for clpipe.


Example 1: PII Compliance Audit

Scenario: You need to find all columns containing PII across your entire data warehouse.

from clpipe import Pipeline

# Parse your SQL pipeline
pipeline = Pipeline.from_sql_files("queries/", dialect="bigquery")

# Mark PII columns at source
pii_sources = {
    "raw.users.email": True,
    "raw.users.phone": True,
    "raw.users.ssn": True,
    "raw.orders.customer_id": True,
}

for column_path, is_pii in pii_sources.items():
    pipeline.columns[column_path].pii = is_pii
    pipeline.columns[column_path].tags = ["gdpr", "sensitive"]

# Propagate through entire pipeline
pipeline.propagate_all_metadata()

# Generate compliance report
pii_columns = pipeline.get_pii_columns()

print(f"Found {len(pii_columns)} columns containing PII:\n")

by_table = {}
for col in pii_columns:
    if col.table_name not in by_table:
        by_table[col.table_name] = []
    by_table[col.table_name].append(col.column_name)

for table, columns in sorted(by_table.items()):
    print(f"{table}:")
    for col in columns:
        print(f"  - {col}")

# Export for data catalog
with open("pii_audit.json", "w") as f:
    import json
    audit_data = {
        table: cols for table, cols in by_table.items()
    }
    json.dump(audit_data, f, indent=2)

print(f"\n✅ Audit complete: {len(by_table)} tables contain PII")

Result: Complete PII audit in seconds, automatically tracking propagation through joins and transformations.


Example 2: Breaking Change Impact Analysis

Scenario: Planning to rename raw.orders.amount to raw.orders.revenue. What breaks?

from clpipe import Pipeline

pipeline = Pipeline.from_sql_files("queries/", dialect="bigquery")

# Find all affected columns
table_name = "raw.orders"
column_name = "amount"

affected = pipeline.trace_column_forward(table_name, column_name)

print(f"Renaming {table_name}.{column_name} affects {len(affected)} downstream columns:\n")

# Group by table
by_table = {}
for col in affected:
    if col.table_name not in by_table:
        by_table[col.table_name] = []
    by_table[col.table_name].append({
        'column': col.column_name,
        'operation': col.operation
    })

# Print impact report
for table, columns in sorted(by_table.items()):
    print(f"📊 {table}:")
    for col in columns:
        print(f"   - {col['column']} ({col['operation']})")

# Find owners to notify
owners = set()
for col in affected:
    if col.owner:
        owners.add(col.owner)

if owners:
    print(f"\n👥 Notify these teams: {', '.join(owners)}")

print(f"\n⚠️  Total impact: {len(by_table)} tables need updates")

Result: Exact impact analysis with transformation types and owner notifications.


Example 3: Multi-Schedule Pipeline

Scenario: Different tables update at different frequencies (realtime vs daily).

# dags/multi_schedule_pipeline.py
# Place this file in your Airflow DAGs folder

from clpipe import Pipeline
from datetime import datetime

pipeline = Pipeline.from_sql_files("queries/", dialect="bigquery")

# Split by update frequency
subpipelines = pipeline.split(
    sinks=[
        # Realtime dashboard (every 5 minutes)
        ["realtime.user_activity", "realtime.live_metrics"],

        # Hourly rollups
        ["analytics.hourly_summary", "analytics.hourly_revenue"],

        # Daily reports
        ["reports.daily_dashboard", "reports.daily_email"]
    ]
)

# Create DAGs directly - Airflow discovers them automatically
realtime_dag = subpipelines[0].to_airflow_dag(
    executor=my_executor,
    dag_id="realtime_pipeline",
    schedule="*/5 * * * *",  # Every 5 minutes
    start_date=datetime(2025, 1, 1)
)

hourly_dag = subpipelines[1].to_airflow_dag(
    executor=my_executor,
    dag_id="hourly_pipeline",
    schedule="0 * * * *",  # Every hour
    start_date=datetime(2025, 1, 1)
)

daily_dag = subpipelines[2].to_airflow_dag(
    executor=my_executor,
    dag_id="daily_pipeline",
    schedule="0 2 * * *",  # Daily at 2 AM
    start_date=datetime(2025, 1, 1)
)

Result: Three independent DAGs in a single file, automatically discovered by Airflow with optimal scheduling for each use case.


Example 4: LLM-Powered Documentation

Scenario: Auto-generate descriptions for your entire data warehouse.

from clpipe import Pipeline
from langchain_openai import ChatOpenAI

pipeline = Pipeline.from_sql_files("queries/", dialect="bigquery")

# Set up LLM
pipeline.llm = ChatOpenAI(
    model="gpt-4",
    temperature=0  # Deterministic output
)

# Generate descriptions for all columns
print("Generating AI-powered descriptions...")
pipeline.generate_all_descriptions()

# Export to markdown
with open("data_dictionary.md", "w") as f:
    f.write("# Data Dictionary\n\n")
    f.write("*Auto-generated with AI and column lineage context*\n\n")

    for table_name in sorted(pipeline.table_graph.tables):
        f.write(f"## {table_name}\n\n")

        columns = [
            col for col in pipeline.columns.values()
            if col.table_name == table_name
        ]

        for col in sorted(columns, key=lambda c: c.column_name):
            f.write(f"### `{col.column_name}`\n\n")
            f.write(f"{col.description}\n\n")

            # Add lineage info
            if col.pii:
                f.write(f"**⚠️  Contains PII**\n\n")
            if col.owner:
                f.write(f"**Owner:** {col.owner}\n\n")
            if col.tags:
                f.write(f"**Tags:** {', '.join(col.tags)}\n\n")

            f.write("---\n\n")

print("✅ Documentation generated: data_dictionary.md")

Result: Complete, context-aware documentation that stays in sync with your code.


Example 5: Root Cause Analysis

Scenario: Dashboard shows unexpected values. Trace back to find the issue.

from clpipe import Pipeline

pipeline = Pipeline.from_sql_files("queries/", dialect="bigquery")

# Problem: dashboard.metrics.total_revenue is 10% higher than expected
problem_table = "dashboard.metrics"
problem_column = "total_revenue"

# Trace backward to source
sources = pipeline.trace_column_backward(
    problem_table,
    problem_column
)

print(f"Tracing {problem_table}.{problem_column} back to source:\n")

for i, col in enumerate(sources, 1):
    print(f"{i}. {col.table_name}.{col.column_name}")
    if col.expression:
        print(f"   Expression: {col.expression}")
    if col.operation:
        print(f"   Operation: {col.operation}")
    print()

# Check for recent changes in source tables
source_tables = {col.table_name for col in sources if col.node_type == "source"}
print(f"Source tables to investigate:")
for table in source_tables:
    print(f"  - {table}")

Result: Complete provenance chain from dashboard to raw sources with expressions and operations.


Example 6: Team-Based Pipeline Split

Scenario: Three teams maintain different parts of the pipeline.

# dags/team_pipelines.py
# Place this file in your Airflow DAGs folder

from clpipe import Pipeline

pipeline = Pipeline.from_sql_files("queries/", dialect="bigquery")

# Split by team ownership
team_pipelines = pipeline.split(
    sinks=[
        # Finance team
        ["finance.revenue_report", "finance.expense_dashboard"],

        # Product team
        ["product.user_metrics", "product.feature_adoption"],

        # ML team
        ["ml.training_data", "ml.feature_store"]
    ]
)

# Helper to set ownership metadata
def set_team_ownership(subpipeline, team):
    for table in subpipeline.table_graph.tables:
        for col in subpipeline.columns.values():
            if col.table_name == table:
                col.owner = f"{team}-team"

# Finance team DAG
set_team_ownership(team_pipelines[0], "finance")
finance_dag = team_pipelines[0].to_airflow_dag(
    executor=my_executor,
    dag_id="finance_pipeline",
    schedule="@daily",
    default_args={
        'owner': 'finance-team',
        'email': ['finance@company.com'],
        'email_on_failure': True
    }
)

# Product team DAG
set_team_ownership(team_pipelines[1], "product")
product_dag = team_pipelines[1].to_airflow_dag(
    executor=my_executor,
    dag_id="product_pipeline",
    schedule="@daily",
    default_args={
        'owner': 'product-team',
        'email': ['product@company.com'],
        'email_on_failure': True
    }
)

# ML team DAG
set_team_ownership(team_pipelines[2], "ml")
ml_dag = team_pipelines[2].to_airflow_dag(
    executor=my_executor,
    dag_id="ml_pipeline",
    schedule="@daily",
    default_args={
        'owner': 'ml-team',
        'email': ['ml@company.com'],
        'email_on_failure': True
    }
)

Result: Three team-specific DAGs in a single file, automatically discovered by Airflow with proper ownership and notifications.


Example 7: Data Quality Checks

Scenario: Add validation before pipeline execution.

from clpipe import Pipeline

pipeline = Pipeline.from_sql_files("queries/", dialect="bigquery")

# Define quality checks
quality_checks = {
    "raw.orders": [
        "SELECT COUNT(*) FROM raw.orders WHERE amount < 0",  # No negative amounts
        "SELECT COUNT(*) FROM raw.orders WHERE customer_id IS NULL",  # No nulls
    ],
    "staging.orders": [
        "SELECT COUNT(*) FROM staging.orders WHERE total_amount != (SELECT SUM(amount) FROM raw.orders)",
    ]
}

def executor_with_validation(sql: str) -> None:
    """Execute SQL with pre-flight validation"""
    # Extract table being created
    if "CREATE TABLE" in sql:
        table_name = sql.split("CREATE TABLE")[1].split("AS")[0].strip()

        # Run quality checks for this table
        if table_name in quality_checks:
            print(f"Running quality checks for {table_name}...")
            for check in quality_checks[table_name]:
                result = client.query(check).result()
                count = list(result)[0][0]
                if count > 0:
                    raise ValueError(f"Quality check failed for {table_name}: {count} issues found")
            print(f"✅ Quality checks passed for {table_name}")

    # Execute main SQL
    client.execute(sql)

# Run with validation
results = pipeline.run(
    executor=executor_with_validation,
    max_workers=4,
    verbose=True
)

if results['failed']:
    print(f"❌ Pipeline failed quality checks: {results['failed']}")
else:
    print(f"✅ All quality checks passed")

Result: Built-in data quality validation during execution.


Example 8: Incremental Processing

Scenario: Only process new data since last run.

from clpipe import Pipeline
from datetime import datetime, timedelta

pipeline = Pipeline.from_sql_files("queries/", dialect="bigquery")

# Get last run timestamp
last_run = datetime.now() - timedelta(hours=24)

def incremental_executor(sql: str) -> None:
    """Add incremental logic to SQL"""
    # Add WHERE clause for incremental processing
    if "FROM raw.orders" in sql:
        # Modify SQL to only process new records
        sql = sql.replace(
            "FROM raw.orders",
            f"FROM raw.orders WHERE created_at > TIMESTAMP('{last_run}')"
        )

    print(f"Executing (incremental): {sql[:100]}...")
    client.execute(sql)

# Run incrementally
results = pipeline.run(
    executor=incremental_executor,
    max_workers=4,
    verbose=True
)

print(f"✅ Processed data since {last_run}")

Result: Efficient incremental processing for large datasets.


Next Steps


More Patterns

Looking for specific patterns? Check out: