Skip to content

SQL Debugging & Lineage

Use lineage to understand, debug, and confidently modify your SQL pipelines.

When working with complex SQL transformations, you often need to answer questions like:

  • Where does this metric come from?
  • What happens if I change this column?
  • Why is this value wrong?

clpipe's lineage analysis answers these questions instantly, without executing a single query.


The Challenge

SQL pipelines grow organically. A simple 3-table pipeline becomes 50 tables across 8 layers. When something breaks or needs changing, you're left manually tracing through SQL files, hoping you don't miss a dependency.

Common pain points:

  • Debugging a wrong metric means reading dozens of SQL files
  • Renaming a column requires manual search across all files
  • Understanding data flow takes hours of SQL archaeology
  • Changes introduce unexpected downstream breakages

The Solution: Parse-Time Lineage

clpipe parses your SQL files and builds a complete dependency graph—tables, columns, and transformations—without executing anything.

from pathlib import Path
from clpipe import Pipeline

# Load SQL files from a directory
sql_dir = Path("examples/sql_files")
queries = []
for sql_file in sorted(sql_dir.glob("*.sql")):
    queries.append((sql_file.stem, sql_file.read_text()))

# Build the pipeline graph
pipeline = Pipeline(queries, dialect="duckdb")

print(f"Queries: {len(pipeline.table_graph.queries)}")
print(f"Columns: {len(pipeline.columns)}")
print(f"Edges: {len(pipeline.edges)}")

Output:

Queries: 8
Columns: 127
Edges: 89

In milliseconds, you have a complete map of your data pipeline.


Use Case 1: Root Cause Analysis

Scenario: Your mart_customer_ltv.lifetime_revenue metric shows unexpected values. Where does the data come from?

Backward Lineage: Trace to Source

# Find the ultimate sources of a column
sources = pipeline.trace_column_backward("mart_customer_ltv", "lifetime_revenue")

print("lifetime_revenue comes from:")
for source in sources:
    print(f"  <- {source.table_name}.{source.column_name}")

Output:

lifetime_revenue comes from:
  <- source_orders.total_amount
  <- source_orders.customer_id

Now you know exactly which source tables to investigate.

Full Lineage Path: See Every Step

For complex debugging, you may need to see every transformation:

# Get the complete path through tables
path = pipeline.get_table_lineage_path("mart_customer_ltv", "lifetime_revenue")

print("Complete lineage path:")
for table, column, query_id in path:
    print(f"  {table}.{column}")
    if query_id:
        print(f"    (transformed by {query_id})")

Output:

Complete lineage path:
  source_orders.total_amount
    (transformed by 01_raw_orders)
  raw_orders.total_amount
    (transformed by 05_stg_orders_enriched)
  stg_orders_enriched.total_amount
    (transformed by 07_mart_customer_ltv)
  mart_customer_ltv.lifetime_revenue

Full Transparency: Including CTEs

For debugging complex queries with CTEs, get complete transparency:

# See all nodes including CTEs
nodes, edges = pipeline.trace_column_backward_full(
    "mart_customer_ltv", "lifetime_revenue", include_ctes=True
)

print("All lineage nodes:")
for node in nodes:
    layer_tag = " [CTE]" if node.layer == "cte" else ""
    print(f"  {node.table_name}.{node.column_name}{layer_tag}")

print("\nTransformation edges:")
for edge in edges:
    print(f"  {edge.from_node.table_name}.{edge.from_node.column_name}")
    print(f"    -> {edge.to_node.table_name}.{edge.to_node.column_name}")
    print(f"    ({edge.edge_type})")

Use Case 2: Impact Analysis Before Changes

Scenario: You need to rename raw_orders.total_amount to raw_orders.order_total. What breaks?

Forward Lineage: See All Impacts

# Find everything affected by this column
impacts = pipeline.trace_column_forward("raw_orders", "total_amount")

print(f"Changing raw_orders.total_amount affects {len(impacts)} downstream columns:")
for impact in impacts:
    print(f"  -> {impact.table_name}.{impact.column_name}")

Output:

Changing raw_orders.total_amount affects 12 downstream columns:
  -> stg_orders_enriched.total_amount
  -> stg_orders_enriched.revenue_per_item
  -> int_daily_metrics.gross_revenue
  -> int_daily_metrics.avg_order_value
  -> mart_customer_ltv.lifetime_revenue
  -> mart_customer_ltv.avg_order_value
  -> mart_product_performance.total_revenue
  ...

Now you know every file that needs updating before making the change.

Table-Level Impact Path

# See the complete impact path through tables
path = pipeline.get_table_impact_path("raw_orders", "total_amount")

print("Impact flows through these tables:")
for table, column, query_id in path:
    print(f"  {table}.{column}")

Use Case 3: Understanding Query Dependencies

Scenario: You need to understand the execution order and dependencies of your queries.

Topological Sort: Correct Execution Order

# Get queries in dependency order
query_order = pipeline.table_graph.topological_sort()

print("Query execution order:")
for i, query_id in enumerate(query_order, 1):
    query = pipeline.table_graph.queries[query_id]
    dest = query.destination_table or "(no destination)"
    print(f"  {i}. {query_id} -> {dest}")

Output:

Query execution order:
  1. 01_raw_orders -> raw_orders
  2. 02_raw_customers -> raw_customers
  3. 03_raw_products -> raw_products
  4. 04_raw_order_items -> raw_order_items
  5. 05_stg_orders_enriched -> stg_orders_enriched
  6. 06_int_daily_metrics -> int_daily_metrics
  7. 07_mart_customer_ltv -> mart_customer_ltv
  8. 08_mart_product_perf -> mart_product_performance

Query Dependencies

# See what each query depends on
for query_id in query_order:
    query = pipeline.table_graph.queries[query_id]
    deps = pipeline.table_graph.get_dependencies(query.destination_table)

    if deps:
        dep_names = [d.table_name for d in deps]
        print(f"{query_id} depends on: {', '.join(dep_names)}")
    else:
        print(f"{query_id} has no dependencies (source)")

Use Case 4: Understanding Table Relationships

Source Tables (Pipeline Inputs)

# Find tables that are external inputs (not created by any query)
source_tables = pipeline.table_graph.get_source_tables()

print("Source tables (external inputs):")
for table in source_tables:
    print(f"  {table.table_name}")

Final Tables (Pipeline Outputs)

# Find tables that are final outputs (not read by any downstream query)
final_tables = pipeline.table_graph.get_final_tables()

print("Final tables (pipeline outputs):")
for table in final_tables:
    print(f"  {table.table_name}")

Downstream Dependencies

# See what tables depend on a specific source
downstream = pipeline.table_graph.get_downstream("raw_orders")

print("Tables that use raw_orders:")
for table in downstream:
    print(f"  -> {table.table_name}")

Use Case 5: Column-Level Deep Dive

Understanding Column Origins

# Get direct upstream columns (one step back)
upstream = pipeline.column_graph.get_upstream("mart_customer_ltv.lifetime_revenue")

print("Direct upstream columns:")
for col in upstream:
    print(f"  <- {col.full_name}")

Understanding Column Impact

# Get direct downstream columns (one step forward)
downstream = pipeline.column_graph.get_downstream("raw_orders.total_amount")

print("Direct downstream columns:")
for col in downstream:
    print(f"  -> {col.full_name}")

Source and Final Columns

# Columns that are inputs (no upstream)
source_cols = pipeline.column_graph.get_source_columns()
print(f"Source columns: {len(source_cols)}")

# Columns that are outputs (no downstream)
final_cols = pipeline.column_graph.get_final_columns()
print(f"Final columns: {len(final_cols)}")

Real-World Example: E-Commerce Pipeline

The examples in this documentation use an e-commerce pipeline with the following structure:

source_orders ─────┐
source_customers ──┼──> raw_* tables ──> stg_orders_enriched ──┬──> int_daily_metrics
source_products ───┤                                           ├──> mart_customer_ltv
source_order_items─┘                                           └──> mart_product_performance

Layers:

Layer Tables Purpose
Raw raw_orders, raw_customers, raw_products, raw_order_items Direct source copies
Staging stg_orders_enriched Enriched orders with CTEs, JOINs, window functions
Intermediate int_daily_metrics Daily aggregations with running totals
Mart mart_customer_ltv, mart_product_performance Business metrics (RFM scoring, rankings)

SQL Features Demonstrated:

  • CTEs (multi-stage transformations)
  • Window functions (LAG, ROW_NUMBER, RANK, PERCENT_RANK)
  • Aggregations (GROUP BY, COUNT DISTINCT, AVG, SUM)
  • Joins (INNER, LEFT, CROSS)
  • CASE statements (customer segmentation, scoring)

Try It Yourself

Run the lineage example from the clpipe repository:

cd clpipe
uv run python examples/sql_files/run_lineage.py

This will parse the example SQL files and demonstrate all the lineage capabilities described above.


Key Benefits

Without Lineage With clpipe Lineage
Hours reading SQL files Seconds to trace any column
Manual dependency tracking Automatic graph building
Risky schema changes Confident impact analysis
"I think this is everything" "I know this is everything"

Next Steps