Skip to content

Quick Start

Get up and running with clpipe in 5 minutes.


Installation

pip install git+https://github.com/clpipe/clpipe.git

See Installation for more options.


Your First Pipeline

Step 1: Create SQL Files

Create a directory with your SQL files:

mkdir queries

Create queries/01_raw_orders.sql:

CREATE TABLE raw.orders AS
SELECT order_id, customer_id, amount, status
FROM external.orders;

Create queries/02_staging_orders.sql:

CREATE TABLE staging.orders AS
SELECT
    customer_id,
    SUM(amount) as total_amount,
    COUNT(*) as order_count
FROM raw.orders
WHERE status = 'completed'
GROUP BY customer_id;

Create queries/03_customer_metrics.sql:

CREATE TABLE analytics.customer_metrics AS
SELECT
    customer_id,
    total_amount,
    order_count,
    total_amount / order_count as avg_order_value
FROM staging.orders;

Step 2: Parse Your SQL

from clpipe import Pipeline

# Point to your SQL files
pipeline = Pipeline.from_sql_files("queries/", dialect="bigquery")

# That's it! Complete graph built automatically
print(f"Found {len(pipeline.table_graph.tables)} tables")
# Output: Found 3 tables

Step 3: Explore Table Lineage

# Get all tables
tables = pipeline.table_graph.tables
print(tables)
# Output: ['raw.orders', 'staging.orders', 'analytics.customer_metrics']

# Get execution order
execution_order = pipeline.table_graph.get_execution_order()
print(execution_order)
# Output: [
#   ['raw.orders'],
#   ['staging.orders'],
#   ['analytics.customer_metrics']
# ]

# What depends on this table?
downstream = pipeline.table_graph.get_downstream('raw.orders')
print(downstream)
# Output: ['staging.orders', 'analytics.customer_metrics']

Step 4: Trace Column Lineage

# Where does this column come from?
sources = pipeline.trace_column_backward(
    'analytics.customer_metrics',
    'avg_order_value'
)

print(sources)
# Output shows:
# raw.orders.amount → SUM() → staging.orders.total_amount → DIVIDE → avg_order_value
# raw.orders.*      → COUNT() → staging.orders.order_count → DIVIDE → avg_order_value

# What breaks if I change this column?
affected = pipeline.trace_column_forward(
    'raw.orders',
    'amount'
)

print(affected)
# Output:
# - staging.orders.total_amount (SUM aggregation)
# - analytics.customer_metrics.total_amount (direct copy)
# - analytics.customer_metrics.avg_order_value (calculation input)

Step 5: Add Metadata

# Mark PII at source
pipeline.columns["raw.orders.customer_id"].pii = True
pipeline.columns["raw.orders.customer_id"].owner = "privacy_team"

# Propagate through entire pipeline
pipeline.propagate_all_metadata()

# Query anywhere
pii_columns = pipeline.get_pii_columns()
print(pii_columns)
# Output: All columns containing customer_id (automatically propagated)

Execute Your Pipeline

Local Execution

def my_executor(sql: str) -> None:
    """Your SQL execution function"""
    print(f"Executing: {sql[:50]}...")
    # Replace with your database client
    # client.execute(sql)

# Run the pipeline
results = pipeline.run(
    executor=my_executor,
    max_workers=4,
    verbose=True
)

print(f"Completed: {len(results['completed'])} queries")
print(f"Failed: {len(results['failed'])} queries")
print(f"Time: {results['elapsed_seconds']:.2f}s")

Generate Airflow DAG

from datetime import datetime

# Generate Airflow DAG
dag = pipeline.to_airflow_dag(
    executor=my_executor,
    dag_id="my_first_pipeline",
    schedule="@daily",
    start_date=datetime(2025, 1, 1)
)

# Save to file
with open("my_pipeline_dag.py", "w") as f:
    f.write(f"""
from airflow.decorators import dag, task
from datetime import datetime

{dag}
""")

print("✅ Airflow DAG generated: my_pipeline_dag.py")

Real-World Example

Let's put it all together with a realistic scenario:

from clpipe import Pipeline
from langchain_openai import ChatOpenAI

# 1. Parse your SQL pipeline
pipeline = Pipeline.from_sql_files("queries/", dialect="bigquery")

# 2. Set metadata at source
pipeline.columns["raw.orders.customer_id"].pii = True
pipeline.columns["raw.orders.email"].pii = True
pipeline.propagate_all_metadata()

# 3. Generate documentation with AI
pipeline.llm = ChatOpenAI(model="gpt-4")
pipeline.generate_all_descriptions()

# 4. Export lineage for data catalog
lineage_json = pipeline.to_json()
with open("lineage.json", "w") as f:
    f.write(lineage_json)

# 5. Check for PII in final tables
final_tables = ["analytics.customer_metrics"]
for table in final_tables:
    pii_cols = [
        col.column_name
        for col in pipeline.get_pii_columns()
        if col.table_name == table
    ]
    if pii_cols:
        print(f"⚠️  {table} contains PII: {pii_cols}")

# 6. Generate production DAG
dag = pipeline.to_airflow_dag(
    executor=my_executor,
    dag_id="production_pipeline",
    schedule="0 2 * * *",  # Daily at 2 AM
    default_args={
        'owner': 'data-team',
        'retries': 3
    }
)

Common Patterns

Pattern 1: Check Dependencies Before Changes

# Before modifying a table
table_name = "staging.orders"
downstream = pipeline.table_graph.get_downstream(table_name)

if downstream:
    print(f"⚠️  Warning: {len(downstream)} tables depend on {table_name}")
    print(f"   Affected: {', '.join(downstream)}")
else:
    print(f"✅ Safe to modify {table_name} (no dependencies)")

Pattern 2: Find Source of Bad Data

# Trace problematic column back to source
bad_column = "analytics.customer_metrics.total_amount"
table, column = bad_column.split(".")[-2:]

sources = pipeline.trace_column_backward(table, column)

print("Data flow:")
for source in sources:
    print(f"  → {source.table_name}.{source.column_name} ({source.operation})")

Pattern 3: Impact Analysis for Schema Changes

# Planning to rename raw.orders.amount → raw.orders.revenue
affected = pipeline.trace_column_forward("raw.orders", "amount")

print(f"Renaming will affect {len(affected)} columns:")
for col in affected:
    print(f"  - {col.table_name}.{col.column_name}")

Next Steps

Now that you've got the basics:


Quick Reference

from clpipe import Pipeline

# Parse SQL
pipeline = Pipeline.from_sql_files("queries/", dialect="bigquery")

# Table lineage
tables = pipeline.table_graph.tables
execution_order = pipeline.table_graph.get_execution_order()
downstream = pipeline.table_graph.get_downstream("table_name")

# Column lineage
sources = pipeline.trace_column_backward("table", "column")
affected = pipeline.trace_column_forward("table", "column")

# Metadata
pipeline.columns["table.column"].pii = True
pipeline.propagate_all_metadata()
pii_columns = pipeline.get_pii_columns()

# Execution
results = pipeline.run(executor=my_executor, max_workers=4)
results = await pipeline.async_run(executor=my_async_executor)

# Airflow
dag = pipeline.to_airflow_dag(executor=my_executor, dag_id="pipeline")

# Split
subpipelines = pipeline.split(sinks=[["table1"], ["table2"]])

Ready to dive deeper? Check out Examples for more patterns.