From SQL to Lineage Graph
Overview
You write SQL files. clpipe reads them once and builds a complete lineage graph showing how your data flows - both at the table level and column level.
No configuration. No annotations. Just point to your SQL.
from clpipe import Pipeline
# Point to your SQL files
pipeline = Pipeline.from_sql_files("queries/", dialect="bigquery")
# That's it. Complete graph built automatically.
# - Table dependencies: pipeline.table_graph
# - Column lineage: pipeline.columns, pipeline.edges
The Three-Step Process
1. Your SQL Files (Input)
You have SQL files that create tables, transform data, join datasets:
-- queries/01_raw_orders.sql
CREATE TABLE raw.orders AS
SELECT order_id, customer_id, amount, status
FROM external.orders;
-- queries/02_staging_orders.sql
CREATE TABLE staging.orders AS
SELECT
customer_id,
SUM(amount) as total_amount,
COUNT(*) as order_count
FROM raw.orders
WHERE status = 'completed'
GROUP BY customer_id;
-- queries/03_customer_metrics.sql
CREATE TABLE analytics.customer_metrics AS
SELECT
customer_id,
total_amount,
order_count,
total_amount / order_count as avg_order_value
FROM staging.orders;
2. SQL → AST (Parsing)
When you call Pipeline.from_sql_files(), clpipe uses sqlglot to parse each SQL statement into an AST (Abstract Syntax Tree).
What's an AST? A structured representation of your SQL that a program can understand.
# Your SQL (what you write)
"SELECT customer_id, SUM(amount) FROM orders GROUP BY customer_id"
# AST (what the parser sees)
SELECT
├─ customer_id (column reference)
├─ SUM(amount) (aggregate function)
│ └─ amount (column reference)
FROM
└─ orders (table reference)
GROUP BY
└─ customer_id (grouping key)
The AST contains everything: - Table names (raw.orders, staging.orders) - Column names (customer_id, amount, total_amount) - Transformations (SUM, COUNT, division) - Relationships (WHERE, JOIN, GROUP BY) - Expressions (total_amount / order_count)
You don't need to understand ASTs. clpipe handles all of this automatically.
3. AST → Lineage Graph (Graph Building)
clpipe walks through each AST and extracts:
Table-Level Graph
graph TD
A[raw.orders] --> B[staging.orders]
B --> C[analytics.customer_metrics]
What we extract:
- Source tables: external.orders
- Created tables: raw.orders, staging.orders, analytics.customer_metrics
- Dependencies: staging.orders depends on raw.orders
Column-Level Graph
graph LR
A1[raw.orders.order_id] -.dropped.-> N1[ ]
A2[raw.orders.customer_id] --> B2[staging.orders.customer_id] --> C2[analytics.customer_metrics.customer_id]
A3[raw.orders.amount] -->|SUM| B3[staging.orders.total_amount] --> C3[analytics.customer_metrics.total_amount]
B3 -->|DIVIDE| C4[analytics.customer_metrics.avg_order_value]
A3 -->|COUNT| B4[staging.orders.order_count] --> C5[analytics.customer_metrics.order_count]
B4 -->|DIVIDE| C4
A5[raw.orders.status] -.filtered.-> N2[ ]
What we extract:
- Column flows: amount → SUM(amount) → total_amount
- Transformations: SUM(), COUNT(), DIVIDE()
- Dropped columns: order_id (not in final output)
- Filtered columns: status (used in WHERE but not stored)
- Calculated columns: avg_order_value = total_amount / order_count
What's In The Graph
Table Graph
# Access table-level dependencies
table_graph = pipeline.table_graph
# Get all tables
print(table_graph.tables)
# Output: ['raw.orders', 'staging.orders', 'analytics.customer_metrics']
# Get dependencies for a specific table
deps = table_graph.get_dependencies('analytics.customer_metrics')
# Output: ['staging.orders']
# Get execution order (topological sort)
execution_order = table_graph.get_execution_order()
# Output: [['raw.orders'], ['staging.orders'], ['analytics.customer_metrics']]
Column Lineage Graph
# Access column-level lineage directly from Pipeline
# Get all columns in a table
table_columns = [col for col in pipeline.columns.values()
if col.table_name == 'analytics.customer_metrics']
# Output: ['customer_id', 'total_amount', 'order_count', 'avg_order_value']
# Trace column backward to sources
sources = pipeline.trace_column_backward(
'analytics.customer_metrics',
'avg_order_value'
)
# Output shows: raw.orders.amount → SUM → total_amount → DIVIDE → avg_order_value
# Trace column forward to downstream
affected = pipeline.trace_column_forward(
'raw.orders',
'amount'
)
# Output: All columns that depend on raw.orders.amount
Why The AST Matters
The AST contains everything your SQL already describes:
Example: SUM Aggregation
What the AST tells us:
- total_amount is a new column (not in source)
- It's created by SUM(amount) (aggregation function)
- Source column: raw.orders.amount
- Transformation type: SUM
- Grouping key: customer_id
Without parsing: You'd have to read the SQL manually and guess what happens.
With AST parsing: We extract exact relationships automatically.
Example: Calculated Column
What the AST tells us:
- avg_order_value depends on two columns: total_amount and order_count
- Transformation: DIVIDE operation
- Both source columns flow into this calculation
This is why clpipe can trace backward and say:
avg_order_value comes from:
- total_amount (which comes from SUM(raw.orders.amount))
- order_count (which comes from COUNT(*) on raw.orders)
Supported SQL Dialects
clpipe uses sqlglot, which supports many SQL dialects:
- ✅ BigQuery
- ✅ Snowflake
- ✅ PostgreSQL
- ✅ DuckDB
- ✅ Redshift
- ✅ Spark SQL
- ✅ And many more
Just specify the dialect when creating the pipeline:
# BigQuery
pipeline = Pipeline.from_sql_files("queries/", dialect="bigquery")
# Snowflake
pipeline = Pipeline.from_sql_files("queries/", dialect="snowflake")
# DuckDB
pipeline = Pipeline.from_sql_files("queries/", dialect="duckdb")
What Gets Extracted
From your SQL, clpipe automatically extracts:
Tables
- Source tables (FROM clause)
- Created tables (CREATE TABLE statements)
- Joined tables (JOIN clauses)
- CTEs (WITH clauses)
Columns
- Selected columns (SELECT clause)
- Renamed columns (AS aliases)
- Calculated columns (expressions)
- Aggregated columns (SUM, AVG, COUNT, etc.)
- Joined columns (JOIN conditions)
- Filtered columns (WHERE clause)
Transformations
- Aggregate functions: SUM, COUNT, AVG, MAX, MIN
- Window functions: ROW_NUMBER, RANK, LAG, LEAD
- Conditional logic: CASE WHEN
- Mathematical operations: +, -, *, /
- String operations: CONCAT, SUBSTRING, UPPER, LOWER
- Type conversions: CAST, SAFE_CAST
Relationships
- Table dependencies (which table depends on which)
- Column flows (which column comes from which source)
- Join relationships (how tables connect)
- Filter conditions (WHERE clauses)
- Group by keys (aggregation dimensions)
Example: Complete Flow
Let's trace one column through the entire pipeline:
SQL
-- Step 1
CREATE TABLE raw.orders AS
SELECT order_id, customer_id, amount, status FROM external.orders;
-- Step 2
CREATE TABLE staging.orders AS
SELECT
customer_id,
SUM(amount) as total_amount
FROM raw.orders
WHERE status = 'completed'
GROUP BY customer_id;
-- Step 3
CREATE TABLE analytics.customer_metrics AS
SELECT
customer_id,
total_amount,
total_amount * 1.1 as total_with_tax
FROM staging.orders;
Lineage Trace: analytics.customer_metrics.total_with_tax
sources = pipeline.trace_column_backward(
'analytics.customer_metrics',
'total_with_tax'
)
# Output:
# 1. external.orders.amount (SOURCE)
# 2. → raw.orders.amount (DIRECT copy)
# 3. → staging.orders.total_amount (SUM aggregation, WHERE status='completed')
# 4. → analytics.customer_metrics.total_amount (DIRECT copy)
# 5. → analytics.customer_metrics.total_with_tax (MULTIPLY by 1.1)
Complete provenance in 0.003 seconds.
Key Takeaways
- You write SQL - Just normal SQL files, no special syntax
- We parse to AST - Using sqlglot to understand structure
- We build the graph - Extracting tables, columns, transformations
- You get lineage - Complete table and column-level tracking
No configuration. No annotations. No manual mapping.
Your SQL already describes the lineage. We just extract it.
Next Steps
- Table Lineage & Orchestration - Learn what you can do with the graph
- Quick Start - Try it yourself
- API Documentation - Full API reference