Skip to content

From SQL to Lineage Graph

Overview

You write SQL files. clpipe reads them once and builds a complete lineage graph showing how your data flows - both at the table level and column level.

No configuration. No annotations. Just point to your SQL.

from clpipe import Pipeline

# Point to your SQL files
pipeline = Pipeline.from_sql_files("queries/", dialect="bigquery")

# That's it. Complete graph built automatically.
# - Table dependencies: pipeline.table_graph
# - Column lineage: pipeline.columns, pipeline.edges

The Three-Step Process

1. Your SQL Files (Input)

You have SQL files that create tables, transform data, join datasets:

-- queries/01_raw_orders.sql
CREATE TABLE raw.orders AS
SELECT order_id, customer_id, amount, status
FROM external.orders;

-- queries/02_staging_orders.sql
CREATE TABLE staging.orders AS
SELECT
    customer_id,
    SUM(amount) as total_amount,
    COUNT(*) as order_count
FROM raw.orders
WHERE status = 'completed'
GROUP BY customer_id;

-- queries/03_customer_metrics.sql
CREATE TABLE analytics.customer_metrics AS
SELECT
    customer_id,
    total_amount,
    order_count,
    total_amount / order_count as avg_order_value
FROM staging.orders;

2. SQL → AST (Parsing)

When you call Pipeline.from_sql_files(), clpipe uses sqlglot to parse each SQL statement into an AST (Abstract Syntax Tree).

What's an AST? A structured representation of your SQL that a program can understand.

# Your SQL (what you write)
"SELECT customer_id, SUM(amount) FROM orders GROUP BY customer_id"

# AST (what the parser sees)
SELECT
  ├─ customer_id (column reference)
  ├─ SUM(amount) (aggregate function)
     └─ amount (column reference)
  FROM
  └─ orders (table reference)
  GROUP BY
  └─ customer_id (grouping key)

The AST contains everything: - Table names (raw.orders, staging.orders) - Column names (customer_id, amount, total_amount) - Transformations (SUM, COUNT, division) - Relationships (WHERE, JOIN, GROUP BY) - Expressions (total_amount / order_count)

You don't need to understand ASTs. clpipe handles all of this automatically.


3. AST → Lineage Graph (Graph Building)

clpipe walks through each AST and extracts:

Table-Level Graph

graph TD
    A[raw.orders] --> B[staging.orders]
    B --> C[analytics.customer_metrics]

What we extract: - Source tables: external.orders - Created tables: raw.orders, staging.orders, analytics.customer_metrics - Dependencies: staging.orders depends on raw.orders

Column-Level Graph

graph LR
    A1[raw.orders.order_id] -.dropped.-> N1[ ]
    A2[raw.orders.customer_id] --> B2[staging.orders.customer_id] --> C2[analytics.customer_metrics.customer_id]
    A3[raw.orders.amount] -->|SUM| B3[staging.orders.total_amount] --> C3[analytics.customer_metrics.total_amount]
    B3 -->|DIVIDE| C4[analytics.customer_metrics.avg_order_value]
    A3 -->|COUNT| B4[staging.orders.order_count] --> C5[analytics.customer_metrics.order_count]
    B4 -->|DIVIDE| C4
    A5[raw.orders.status] -.filtered.-> N2[ ]

What we extract: - Column flows: amountSUM(amount)total_amount - Transformations: SUM(), COUNT(), DIVIDE() - Dropped columns: order_id (not in final output) - Filtered columns: status (used in WHERE but not stored) - Calculated columns: avg_order_value = total_amount / order_count


What's In The Graph

Table Graph

# Access table-level dependencies
table_graph = pipeline.table_graph

# Get all tables
print(table_graph.tables)
# Output: ['raw.orders', 'staging.orders', 'analytics.customer_metrics']

# Get dependencies for a specific table
deps = table_graph.get_dependencies('analytics.customer_metrics')
# Output: ['staging.orders']

# Get execution order (topological sort)
execution_order = table_graph.get_execution_order()
# Output: [['raw.orders'], ['staging.orders'], ['analytics.customer_metrics']]

Column Lineage Graph

# Access column-level lineage directly from Pipeline

# Get all columns in a table
table_columns = [col for col in pipeline.columns.values()
                 if col.table_name == 'analytics.customer_metrics']
# Output: ['customer_id', 'total_amount', 'order_count', 'avg_order_value']

# Trace column backward to sources
sources = pipeline.trace_column_backward(
    'analytics.customer_metrics',
    'avg_order_value'
)
# Output shows: raw.orders.amount → SUM → total_amount → DIVIDE → avg_order_value

# Trace column forward to downstream
affected = pipeline.trace_column_forward(
    'raw.orders',
    'amount'
)
# Output: All columns that depend on raw.orders.amount

Why The AST Matters

The AST contains everything your SQL already describes:

Example: SUM Aggregation

SELECT customer_id, SUM(amount) as total_amount
FROM raw.orders
GROUP BY customer_id

What the AST tells us: - total_amount is a new column (not in source) - It's created by SUM(amount) (aggregation function) - Source column: raw.orders.amount - Transformation type: SUM - Grouping key: customer_id

Without parsing: You'd have to read the SQL manually and guess what happens.

With AST parsing: We extract exact relationships automatically.

Example: Calculated Column

SELECT
    total_amount,
    order_count,
    total_amount / order_count as avg_order_value
FROM staging.orders

What the AST tells us: - avg_order_value depends on two columns: total_amount and order_count - Transformation: DIVIDE operation - Both source columns flow into this calculation

This is why clpipe can trace backward and say:

avg_order_value comes from:
  - total_amount (which comes from SUM(raw.orders.amount))
  - order_count (which comes from COUNT(*) on raw.orders)


Supported SQL Dialects

clpipe uses sqlglot, which supports many SQL dialects:

  • BigQuery
  • Snowflake
  • PostgreSQL
  • DuckDB
  • Redshift
  • Spark SQL
  • ✅ And many more

Just specify the dialect when creating the pipeline:

# BigQuery
pipeline = Pipeline.from_sql_files("queries/", dialect="bigquery")

# Snowflake
pipeline = Pipeline.from_sql_files("queries/", dialect="snowflake")

# DuckDB
pipeline = Pipeline.from_sql_files("queries/", dialect="duckdb")

What Gets Extracted

From your SQL, clpipe automatically extracts:

Tables

  • Source tables (FROM clause)
  • Created tables (CREATE TABLE statements)
  • Joined tables (JOIN clauses)
  • CTEs (WITH clauses)

Columns

  • Selected columns (SELECT clause)
  • Renamed columns (AS aliases)
  • Calculated columns (expressions)
  • Aggregated columns (SUM, AVG, COUNT, etc.)
  • Joined columns (JOIN conditions)
  • Filtered columns (WHERE clause)

Transformations

  • Aggregate functions: SUM, COUNT, AVG, MAX, MIN
  • Window functions: ROW_NUMBER, RANK, LAG, LEAD
  • Conditional logic: CASE WHEN
  • Mathematical operations: +, -, *, /
  • String operations: CONCAT, SUBSTRING, UPPER, LOWER
  • Type conversions: CAST, SAFE_CAST

Relationships

  • Table dependencies (which table depends on which)
  • Column flows (which column comes from which source)
  • Join relationships (how tables connect)
  • Filter conditions (WHERE clauses)
  • Group by keys (aggregation dimensions)

Example: Complete Flow

Let's trace one column through the entire pipeline:

SQL

-- Step 1
CREATE TABLE raw.orders AS
SELECT order_id, customer_id, amount, status FROM external.orders;

-- Step 2
CREATE TABLE staging.orders AS
SELECT
    customer_id,
    SUM(amount) as total_amount
FROM raw.orders
WHERE status = 'completed'
GROUP BY customer_id;

-- Step 3
CREATE TABLE analytics.customer_metrics AS
SELECT
    customer_id,
    total_amount,
    total_amount * 1.1 as total_with_tax
FROM staging.orders;

Lineage Trace: analytics.customer_metrics.total_with_tax

sources = pipeline.trace_column_backward(
    'analytics.customer_metrics',
    'total_with_tax'
)

# Output:
# 1. external.orders.amount (SOURCE)
# 2. → raw.orders.amount (DIRECT copy)
# 3. → staging.orders.total_amount (SUM aggregation, WHERE status='completed')
# 4. → analytics.customer_metrics.total_amount (DIRECT copy)
# 5. → analytics.customer_metrics.total_with_tax (MULTIPLY by 1.1)

Complete provenance in 0.003 seconds.


Key Takeaways

  1. You write SQL - Just normal SQL files, no special syntax
  2. We parse to AST - Using sqlglot to understand structure
  3. We build the graph - Extracting tables, columns, transformations
  4. You get lineage - Complete table and column-level tracking

No configuration. No annotations. No manual mapping.

Your SQL already describes the lineage. We just extract it.


Next Steps