Quick Start
Get up and running with clpipe in 5 minutes.
Installation
See Installation for more options.
Your First Pipeline
Step 1: Create SQL Files
Create a directory with your SQL files:
Create queries/01_raw_orders.sql:
Create queries/02_staging_orders.sql:
CREATE TABLE staging.orders AS
SELECT
customer_id,
SUM(amount) as total_amount,
COUNT(*) as order_count
FROM raw.orders
WHERE status = 'completed'
GROUP BY customer_id;
Create queries/03_customer_metrics.sql:
CREATE TABLE analytics.customer_metrics AS
SELECT
customer_id,
total_amount,
order_count,
total_amount / order_count as avg_order_value
FROM staging.orders;
Step 2: Parse Your SQL
from clpipe import Pipeline
# Point to your SQL files
pipeline = Pipeline.from_sql_files("queries/", dialect="bigquery")
# That's it! Complete graph built automatically
print(f"Found {len(pipeline.table_graph.tables)} tables")
# Output: Found 3 tables
Step 3: Explore Table Lineage
# Get all tables
tables = pipeline.table_graph.tables
print(tables)
# Output: ['raw.orders', 'staging.orders', 'analytics.customer_metrics']
# Get execution order
execution_order = pipeline.table_graph.get_execution_order()
print(execution_order)
# Output: [
# ['raw.orders'],
# ['staging.orders'],
# ['analytics.customer_metrics']
# ]
# What depends on this table?
downstream = pipeline.table_graph.get_downstream('raw.orders')
print(downstream)
# Output: ['staging.orders', 'analytics.customer_metrics']
Step 4: Trace Column Lineage
# Where does this column come from?
sources = pipeline.trace_column_backward(
'analytics.customer_metrics',
'avg_order_value'
)
print(sources)
# Output shows:
# raw.orders.amount → SUM() → staging.orders.total_amount → DIVIDE → avg_order_value
# raw.orders.* → COUNT() → staging.orders.order_count → DIVIDE → avg_order_value
# What breaks if I change this column?
affected = pipeline.trace_column_forward(
'raw.orders',
'amount'
)
print(affected)
# Output:
# - staging.orders.total_amount (SUM aggregation)
# - analytics.customer_metrics.total_amount (direct copy)
# - analytics.customer_metrics.avg_order_value (calculation input)
Step 5: Add Metadata
# Mark PII at source
pipeline.columns["raw.orders.customer_id"].pii = True
pipeline.columns["raw.orders.customer_id"].owner = "privacy_team"
# Propagate through entire pipeline
pipeline.propagate_all_metadata()
# Query anywhere
pii_columns = pipeline.get_pii_columns()
print(pii_columns)
# Output: All columns containing customer_id (automatically propagated)
Execute Your Pipeline
Local Execution
def my_executor(sql: str) -> None:
"""Your SQL execution function"""
print(f"Executing: {sql[:50]}...")
# Replace with your database client
# client.execute(sql)
# Run the pipeline
results = pipeline.run(
executor=my_executor,
max_workers=4,
verbose=True
)
print(f"Completed: {len(results['completed'])} queries")
print(f"Failed: {len(results['failed'])} queries")
print(f"Time: {results['elapsed_seconds']:.2f}s")
Generate Airflow DAG
from datetime import datetime
# Generate Airflow DAG
dag = pipeline.to_airflow_dag(
executor=my_executor,
dag_id="my_first_pipeline",
schedule="@daily",
start_date=datetime(2025, 1, 1)
)
# Save to file
with open("my_pipeline_dag.py", "w") as f:
f.write(f"""
from airflow.decorators import dag, task
from datetime import datetime
{dag}
""")
print("✅ Airflow DAG generated: my_pipeline_dag.py")
Real-World Example
Let's put it all together with a realistic scenario:
from clpipe import Pipeline
from langchain_openai import ChatOpenAI
# 1. Parse your SQL pipeline
pipeline = Pipeline.from_sql_files("queries/", dialect="bigquery")
# 2. Set metadata at source
pipeline.columns["raw.orders.customer_id"].pii = True
pipeline.columns["raw.orders.email"].pii = True
pipeline.propagate_all_metadata()
# 3. Generate documentation with AI
pipeline.llm = ChatOpenAI(model="gpt-4")
pipeline.generate_all_descriptions()
# 4. Export lineage for data catalog
lineage_json = pipeline.to_json()
with open("lineage.json", "w") as f:
f.write(lineage_json)
# 5. Check for PII in final tables
final_tables = ["analytics.customer_metrics"]
for table in final_tables:
pii_cols = [
col.column_name
for col in pipeline.get_pii_columns()
if col.table_name == table
]
if pii_cols:
print(f"⚠️ {table} contains PII: {pii_cols}")
# 6. Generate production DAG
dag = pipeline.to_airflow_dag(
executor=my_executor,
dag_id="production_pipeline",
schedule="0 2 * * *", # Daily at 2 AM
default_args={
'owner': 'data-team',
'retries': 3
}
)
Common Patterns
Pattern 1: Check Dependencies Before Changes
# Before modifying a table
table_name = "staging.orders"
downstream = pipeline.table_graph.get_downstream(table_name)
if downstream:
print(f"⚠️ Warning: {len(downstream)} tables depend on {table_name}")
print(f" Affected: {', '.join(downstream)}")
else:
print(f"✅ Safe to modify {table_name} (no dependencies)")
Pattern 2: Find Source of Bad Data
# Trace problematic column back to source
bad_column = "analytics.customer_metrics.total_amount"
table, column = bad_column.split(".")[-2:]
sources = pipeline.trace_column_backward(table, column)
print("Data flow:")
for source in sources:
print(f" → {source.table_name}.{source.column_name} ({source.operation})")
Pattern 3: Impact Analysis for Schema Changes
# Planning to rename raw.orders.amount → raw.orders.revenue
affected = pipeline.trace_column_forward("raw.orders", "amount")
print(f"Renaming will affect {len(affected)} columns:")
for col in affected:
print(f" - {col.table_name}.{col.column_name}")
Next Steps
Now that you've got the basics:
- Examples - See more real-world use cases
- Concepts - Understand how it works
- API Documentation - Full API reference
Quick Reference
from clpipe import Pipeline
# Parse SQL
pipeline = Pipeline.from_sql_files("queries/", dialect="bigquery")
# Table lineage
tables = pipeline.table_graph.tables
execution_order = pipeline.table_graph.get_execution_order()
downstream = pipeline.table_graph.get_downstream("table_name")
# Column lineage
sources = pipeline.trace_column_backward("table", "column")
affected = pipeline.trace_column_forward("table", "column")
# Metadata
pipeline.columns["table.column"].pii = True
pipeline.propagate_all_metadata()
pii_columns = pipeline.get_pii_columns()
# Execution
results = pipeline.run(executor=my_executor, max_workers=4)
results = await pipeline.async_run(executor=my_async_executor)
# Airflow
dag = pipeline.to_airflow_dag(executor=my_executor, dag_id="pipeline")
# Split
subpipelines = pipeline.split(sinks=[["table1"], ["table2"]])
Ready to dive deeper? Check out Examples for more patterns.