SQL Debugging & Lineage
Use lineage to understand, debug, and confidently modify your SQL pipelines.
When working with complex SQL transformations, you often need to answer questions like:
- Where does this metric come from?
- What happens if I change this column?
- Why is this value wrong?
clpipe's lineage analysis answers these questions instantly, without executing a single query.
The Challenge
SQL pipelines grow organically. A simple 3-table pipeline becomes 50 tables across 8 layers. When something breaks or needs changing, you're left manually tracing through SQL files, hoping you don't miss a dependency.
Common pain points:
- Debugging a wrong metric means reading dozens of SQL files
- Renaming a column requires manual search across all files
- Understanding data flow takes hours of SQL archaeology
- Changes introduce unexpected downstream breakages
The Solution: Parse-Time Lineage
clpipe parses your SQL files and builds a complete dependency graph—tables, columns, and transformations—without executing anything.
from pathlib import Path
from clpipe import Pipeline
# Load SQL files from a directory
sql_dir = Path("examples/sql_files")
queries = []
for sql_file in sorted(sql_dir.glob("*.sql")):
queries.append((sql_file.stem, sql_file.read_text()))
# Build the pipeline graph
pipeline = Pipeline(queries, dialect="duckdb")
print(f"Queries: {len(pipeline.table_graph.queries)}")
print(f"Columns: {len(pipeline.columns)}")
print(f"Edges: {len(pipeline.edges)}")
Output:
In milliseconds, you have a complete map of your data pipeline.
Use Case 1: Root Cause Analysis
Scenario: Your mart_customer_ltv.lifetime_revenue metric shows unexpected values. Where does the data come from?
Backward Lineage: Trace to Source
# Find the ultimate sources of a column
sources = pipeline.trace_column_backward("mart_customer_ltv", "lifetime_revenue")
print("lifetime_revenue comes from:")
for source in sources:
print(f" <- {source.table_name}.{source.column_name}")
Output:
Now you know exactly which source tables to investigate.
Full Lineage Path: See Every Step
For complex debugging, you may need to see every transformation:
# Get the complete path through tables
path = pipeline.get_table_lineage_path("mart_customer_ltv", "lifetime_revenue")
print("Complete lineage path:")
for table, column, query_id in path:
print(f" {table}.{column}")
if query_id:
print(f" (transformed by {query_id})")
Output:
Complete lineage path:
source_orders.total_amount
(transformed by 01_raw_orders)
raw_orders.total_amount
(transformed by 05_stg_orders_enriched)
stg_orders_enriched.total_amount
(transformed by 07_mart_customer_ltv)
mart_customer_ltv.lifetime_revenue
Full Transparency: Including CTEs
For debugging complex queries with CTEs, get complete transparency:
# See all nodes including CTEs
nodes, edges = pipeline.trace_column_backward_full(
"mart_customer_ltv", "lifetime_revenue", include_ctes=True
)
print("All lineage nodes:")
for node in nodes:
layer_tag = " [CTE]" if node.layer == "cte" else ""
print(f" {node.table_name}.{node.column_name}{layer_tag}")
print("\nTransformation edges:")
for edge in edges:
print(f" {edge.from_node.table_name}.{edge.from_node.column_name}")
print(f" -> {edge.to_node.table_name}.{edge.to_node.column_name}")
print(f" ({edge.edge_type})")
Use Case 2: Impact Analysis Before Changes
Scenario: You need to rename raw_orders.total_amount to raw_orders.order_total. What breaks?
Forward Lineage: See All Impacts
# Find everything affected by this column
impacts = pipeline.trace_column_forward("raw_orders", "total_amount")
print(f"Changing raw_orders.total_amount affects {len(impacts)} downstream columns:")
for impact in impacts:
print(f" -> {impact.table_name}.{impact.column_name}")
Output:
Changing raw_orders.total_amount affects 12 downstream columns:
-> stg_orders_enriched.total_amount
-> stg_orders_enriched.revenue_per_item
-> int_daily_metrics.gross_revenue
-> int_daily_metrics.avg_order_value
-> mart_customer_ltv.lifetime_revenue
-> mart_customer_ltv.avg_order_value
-> mart_product_performance.total_revenue
...
Now you know every file that needs updating before making the change.
Table-Level Impact Path
# See the complete impact path through tables
path = pipeline.get_table_impact_path("raw_orders", "total_amount")
print("Impact flows through these tables:")
for table, column, query_id in path:
print(f" {table}.{column}")
Use Case 3: Understanding Query Dependencies
Scenario: You need to understand the execution order and dependencies of your queries.
Topological Sort: Correct Execution Order
# Get queries in dependency order
query_order = pipeline.table_graph.topological_sort()
print("Query execution order:")
for i, query_id in enumerate(query_order, 1):
query = pipeline.table_graph.queries[query_id]
dest = query.destination_table or "(no destination)"
print(f" {i}. {query_id} -> {dest}")
Output:
Query execution order:
1. 01_raw_orders -> raw_orders
2. 02_raw_customers -> raw_customers
3. 03_raw_products -> raw_products
4. 04_raw_order_items -> raw_order_items
5. 05_stg_orders_enriched -> stg_orders_enriched
6. 06_int_daily_metrics -> int_daily_metrics
7. 07_mart_customer_ltv -> mart_customer_ltv
8. 08_mart_product_perf -> mart_product_performance
Query Dependencies
# See what each query depends on
for query_id in query_order:
query = pipeline.table_graph.queries[query_id]
deps = pipeline.table_graph.get_dependencies(query.destination_table)
if deps:
dep_names = [d.table_name for d in deps]
print(f"{query_id} depends on: {', '.join(dep_names)}")
else:
print(f"{query_id} has no dependencies (source)")
Use Case 4: Understanding Table Relationships
Source Tables (Pipeline Inputs)
# Find tables that are external inputs (not created by any query)
source_tables = pipeline.table_graph.get_source_tables()
print("Source tables (external inputs):")
for table in source_tables:
print(f" {table.table_name}")
Final Tables (Pipeline Outputs)
# Find tables that are final outputs (not read by any downstream query)
final_tables = pipeline.table_graph.get_final_tables()
print("Final tables (pipeline outputs):")
for table in final_tables:
print(f" {table.table_name}")
Downstream Dependencies
# See what tables depend on a specific source
downstream = pipeline.table_graph.get_downstream("raw_orders")
print("Tables that use raw_orders:")
for table in downstream:
print(f" -> {table.table_name}")
Use Case 5: Column-Level Deep Dive
Understanding Column Origins
# Get direct upstream columns (one step back)
upstream = pipeline.column_graph.get_upstream("mart_customer_ltv.lifetime_revenue")
print("Direct upstream columns:")
for col in upstream:
print(f" <- {col.full_name}")
Understanding Column Impact
# Get direct downstream columns (one step forward)
downstream = pipeline.column_graph.get_downstream("raw_orders.total_amount")
print("Direct downstream columns:")
for col in downstream:
print(f" -> {col.full_name}")
Source and Final Columns
# Columns that are inputs (no upstream)
source_cols = pipeline.column_graph.get_source_columns()
print(f"Source columns: {len(source_cols)}")
# Columns that are outputs (no downstream)
final_cols = pipeline.column_graph.get_final_columns()
print(f"Final columns: {len(final_cols)}")
Real-World Example: E-Commerce Pipeline
The examples in this documentation use an e-commerce pipeline with the following structure:
source_orders ─────┐
source_customers ──┼──> raw_* tables ──> stg_orders_enriched ──┬──> int_daily_metrics
source_products ───┤ ├──> mart_customer_ltv
source_order_items─┘ └──> mart_product_performance
Layers:
| Layer | Tables | Purpose |
|---|---|---|
| Raw | raw_orders, raw_customers, raw_products, raw_order_items |
Direct source copies |
| Staging | stg_orders_enriched |
Enriched orders with CTEs, JOINs, window functions |
| Intermediate | int_daily_metrics |
Daily aggregations with running totals |
| Mart | mart_customer_ltv, mart_product_performance |
Business metrics (RFM scoring, rankings) |
SQL Features Demonstrated:
- CTEs (multi-stage transformations)
- Window functions (LAG, ROW_NUMBER, RANK, PERCENT_RANK)
- Aggregations (GROUP BY, COUNT DISTINCT, AVG, SUM)
- Joins (INNER, LEFT, CROSS)
- CASE statements (customer segmentation, scoring)
Try It Yourself
Run the lineage example from the clpipe repository:
This will parse the example SQL files and demonstrate all the lineage capabilities described above.
Key Benefits
| Without Lineage | With clpipe Lineage |
|---|---|
| Hours reading SQL files | Seconds to trace any column |
| Manual dependency tracking | Automatic graph building |
| Risky schema changes | Confident impact analysis |
| "I think this is everything" | "I know this is everything" |
Next Steps
- Multi-Environment Execution - Execute your pipeline against different databases
- Data Catalog & AI - Use metadata for documentation and AI applications
- API Reference - Full lineage API documentation