Skip to content

Table Lineage and Pipeline Orchestration

Overview

Once you have the lineage graph, you can use it to:

  1. Understand dependencies - Which tables depend on which
  2. Execute pipelines - Run SQL in the correct order
  3. Generate DAGs - Deploy to Airflow or other orchestrators
  4. Split pipelines - Break large graphs into smaller subpipelines

The graph becomes your execution engine.


Table Lineage

What Is Table Lineage?

Table lineage tracks which tables depend on which other tables. This is the foundation for:

  • Determining execution order
  • Impact analysis (what breaks if I change this table?)
  • Dependency visualization
  • Pipeline orchestration
from clpipe import Pipeline

pipeline = Pipeline.from_sql_files("queries/", dialect="bigquery")

# Access table-level graph
table_graph = pipeline.table_graph

Querying Table Dependencies

Get All Tables

# List all tables in the pipeline
tables = table_graph.tables
print(tables)
# Output: ['raw.orders', 'staging.orders', 'analytics.customer_metrics']

Get Dependencies

# What does this table depend on?
deps = table_graph.get_dependencies('analytics.customer_metrics')
print(deps)
# Output: ['staging.orders']

# What depends on this table?
downstream = table_graph.get_downstream('raw.orders')
print(downstream)
# Output: ['staging.orders', 'analytics.customer_metrics']

Get Execution Order

The graph provides topological ordering - the correct execution order respecting all dependencies:

execution_order = table_graph.get_execution_order()
print(execution_order)
# Output: [
#   ['raw.orders'],                    # Level 0: No dependencies
#   ['staging.orders'],                # Level 1: Depends on raw.orders
#   ['analytics.customer_metrics']     # Level 2: Depends on staging.orders
# ]

Each level can be executed in parallel - queries within the same level have no dependencies on each other.


Example: Pipeline with Multiple Branches

-- queries/01_raw_orders.sql
CREATE TABLE raw.orders AS SELECT * FROM external.orders;

-- queries/02_raw_customers.sql
CREATE TABLE raw.customers AS SELECT * FROM external.customers;

-- queries/03_enriched_orders.sql
CREATE TABLE enriched.orders AS
SELECT o.*, c.name
FROM raw.orders o
JOIN raw.customers c USING(customer_id);

-- queries/04_customer_summary.sql
CREATE TABLE analytics.customer_summary AS
SELECT customer_id, name, SUM(amount) as total_spent
FROM enriched.orders
GROUP BY customer_id, name;

Table Graph:

graph TD
    A[raw.orders] --> C[enriched.orders]
    B[raw.customers] --> C
    C --> D[analytics.customer_summary]

Execution Order:

execution_order = table_graph.get_execution_order()
# [
#   ['raw.orders', 'raw.customers'],     # Level 0: Parallel execution
#   ['enriched.orders'],                 # Level 1: Waits for both raw tables
#   ['analytics.customer_summary']       # Level 2: Waits for enriched
# ]

Pipeline Execution

The table graph determines what to run and in what order. Now you can execute it.

Synchronous Execution

Run your pipeline with concurrent execution within each level:

def my_executor(sql: str) -> None:
    """Your SQL execution function"""
    # Execute against your database
    client.execute(sql)

# Run the pipeline
results = pipeline.run(
    executor=my_executor,
    max_workers=4,      # Parallel execution within levels
    verbose=True        # Print progress
)

print(results)
# {
#   'completed': ['raw.orders', 'staging.orders', 'analytics.customer_metrics'],
#   'failed': [],
#   'elapsed_seconds': 12.5,
#   'total_queries': 3
# }

How it works:

  1. Groups queries into execution levels (based on dependencies)
  2. Executes each level sequentially
  3. Within each level, runs queries in parallel (up to max_workers)
  4. If any query fails, execution stops and returns failed queries

Asynchronous Execution

For async database clients:

import asyncio

async def my_async_executor(sql: str) -> None:
    """Your async SQL execution function"""
    await async_client.execute(sql)

# Run async
results = await pipeline.async_run(
    executor=my_async_executor,
    max_workers=4,
    verbose=True
)

Same logic as sync execution, but: - Uses asyncio.Semaphore for concurrency control - Supports async/await patterns - Ideal for async database drivers


Execution Levels & Parallelism

Example with 6 queries:

# Execution order
# Level 0: [query_a, query_b, query_c]    ← 3 queries, no dependencies
# Level 1: [query_d, query_e]              ← 2 queries, depend on Level 0
# Level 2: [query_f]                       ← 1 query, depends on Level 1

results = pipeline.run(executor=my_executor, max_workers=4)

# Execution timeline (with max_workers=4):
# t=0:  query_a, query_b, query_c start (parallel, 3 workers)
# t=5:  All Level 0 complete
# t=5:  query_d, query_e start (parallel, 2 workers)
# t=10: All Level 1 complete
# t=10: query_f starts (1 worker)
# t=12: Complete

Sequential execution: 12 queries × 2s each = 24 seconds Parallel execution: 3 levels × ~5s each = ~15 seconds Speedup: 38% faster


Airflow Integration

Generate Airflow DAGs automatically from your pipeline:

from clpipe import Pipeline

pipeline = Pipeline.from_sql_files("queries/", dialect="bigquery")

# Generate Airflow DAG
dag = pipeline.to_airflow_dag(
    executor=my_executor,
    dag_id="my_data_pipeline",
    schedule="@daily",
    start_date=datetime(2025, 1, 1),
    default_args={
        'owner': 'data-team',
        'retries': 3,
        'retry_delay': timedelta(minutes=5)
    }
)

What gets generated:

  1. One task per query - Each SQL query becomes an Airflow task
  2. Dependencies wired automatically - Based on table lineage
  3. Full DAG customization - All Airflow DAG parameters supported

Airflow DAG Example

Your SQL:

-- query_a.sql
CREATE TABLE raw.orders AS SELECT * FROM external.orders;

-- query_b.sql
CREATE TABLE staging.orders AS SELECT * FROM raw.orders;

-- query_c.sql
CREATE TABLE analytics.metrics AS SELECT * FROM staging.orders;

Generated Airflow DAG:

@dag(
    dag_id="my_data_pipeline",
    schedule="@daily",
    start_date=datetime(2025, 1, 1),
    default_args={'owner': 'data-team', 'retries': 3}
)
def my_data_pipeline():
    @task
    def query_a():
        executor("CREATE TABLE raw.orders AS SELECT * FROM external.orders")

    @task
    def query_b():
        executor("CREATE TABLE staging.orders AS SELECT * FROM raw.orders")

    @task
    def query_c():
        executor("CREATE TABLE analytics.metrics AS SELECT * FROM staging.orders")

    # Dependencies wired automatically
    query_a() >> query_b() >> query_c()

dag = my_data_pipeline()

Deploy to Airflow:

# Copy to Airflow DAGs folder
cp my_pipeline.py $AIRFLOW_HOME/dags/

# Airflow picks it up automatically
airflow dags list | grep my_data_pipeline

Advanced DAG Parameters

All Airflow DAG parameters are supported:

from datetime import datetime, timedelta

dag = pipeline.to_airflow_dag(
    executor=my_executor,
    dag_id="complex_pipeline",
    schedule="0 2 * * *",                    # Daily at 2 AM
    start_date=datetime(2025, 1, 1),
    catchup=False,                           # Don't backfill
    max_active_runs=1,                       # One run at a time
    dagrun_timeout=timedelta(hours=2),       # Timeout after 2 hours
    default_args={
        'owner': 'data-team',
        'retries': 3,
        'retry_delay': timedelta(minutes=5),
        'email': ['alerts@company.com'],
        'email_on_failure': True,
        'depends_on_past': False,
    },
    tags=['production', 'daily', 'analytics']
)

Pipeline Manipulation

Splitting Large Pipelines

For large pipelines, you may want to split into smaller subpipelines based on update frequency or ownership:

# Split pipeline into 3 subpipelines
subpipelines = pipeline.split(
    sinks=[
        "final_table_1",              # Subpipeline 1: Everything needed for this table
        ["table_2", "table_3"],       # Subpipeline 2: Everything for these 2 tables
        "final_table_4"               # Subpipeline 3: Everything for this table
    ]
)

# Each subpipeline is independent
for i, subpipeline in enumerate(subpipelines):
    print(f"Subpipeline {i}: {len(subpipeline.table_graph.tables)} tables")

How it works:

  1. Backward traversal - From each sink, trace backward to find all dependencies
  2. Non-overlapping assignment - Shared dependencies go to the first subpipeline that needs them
  3. Complete subgraphs - Each subpipeline has everything needed to build its sinks

Example: Splitting by Update Frequency

# Your pipeline has:
# - Real-time tables (update every 5 min)
# - Hourly tables
# - Daily tables

subpipelines = pipeline.split(
    sinks=[
        ["realtime_dashboard", "realtime_alerts"],    # Every 5 min
        ["hourly_summary", "hourly_metrics"],         # Hourly
        ["daily_report", "daily_aggregates"]          # Daily
    ]
)

# Schedule each independently
realtime_dag = subpipelines[0].to_airflow_dag(
    executor=my_executor,
    dag_id="realtime_pipeline",
    schedule="*/5 * * * *"  # Every 5 minutes
)

hourly_dag = subpipelines[1].to_airflow_dag(
    executor=my_executor,
    dag_id="hourly_pipeline",
    schedule="0 * * * *"    # Every hour
)

daily_dag = subpipelines[2].to_airflow_dag(
    executor=my_executor,
    dag_id="daily_pipeline",
    schedule="0 2 * * *"    # Daily at 2 AM
)

Example: Splitting by Team Ownership

# Split by team
subpipelines = pipeline.split(
    sinks=[
        ["finance_report", "revenue_metrics"],        # Finance team
        ["user_analytics", "engagement_dashboard"],   # Product team
        ["ml_features", "training_data"]              # ML team
    ]
)

# Each team maintains their own DAG
for i, (team, subpipeline) in enumerate([
    ("finance", subpipelines[0]),
    ("product", subpipelines[1]),
    ("ml", subpipelines[2])
]):
    dag = subpipeline.to_airflow_dag(
        executor=my_executor,
        dag_id=f"{team}_pipeline",
        schedule="@daily",
        default_args={'owner': team}
    )

Complete Example

Putting it all together:

from clpipe import Pipeline
from datetime import datetime, timedelta

# 1. Parse SQL files
pipeline = Pipeline.from_sql_files("queries/", dialect="bigquery")

# 2. Understand table lineage
print(f"Total tables: {len(pipeline.table_graph.tables)}")
print(f"Execution levels: {len(pipeline.table_graph.get_execution_order())}")

# 3. Execute locally (for testing)
def bigquery_executor(sql: str) -> None:
    from google.cloud import bigquery
    client = bigquery.Client()
    client.query(sql).result()

results = pipeline.run(
    executor=bigquery_executor,
    max_workers=4,
    verbose=True
)

if results['failed']:
    print(f"Failed queries: {results['failed']}")
else:
    print(f"✅ All {results['total_queries']} queries completed in {results['elapsed_seconds']}s")

# 4. Generate Airflow DAG (for production)
dag = pipeline.to_airflow_dag(
    executor=bigquery_executor,
    dag_id="production_pipeline",
    schedule="0 2 * * *",
    start_date=datetime(2025, 1, 1),
    default_args={
        'owner': 'data-team',
        'retries': 3,
        'retry_delay': timedelta(minutes=5),
        'email_on_failure': True
    }
)

# 5. Optional: Split for different schedules
subpipelines = pipeline.split(
    sinks=[
        ["realtime_dashboard"],     # Every 5 min
        ["daily_report"]            # Daily
    ]
)

Key Takeaways

Table Lineage

  • Dependency tracking - Know what depends on what
  • Execution order - Topological sort respects all dependencies
  • Impact analysis - See downstream effects of changes

Pipeline Execution

  • Sync & async - Both execution modes supported
  • Parallel execution - Queries in the same level run concurrently
  • Error handling - Stops on failure, returns results

Airflow Integration

  • Automatic DAG generation - One task per query
  • Full customization - All Airflow parameters supported
  • Dependencies wired - Based on table lineage

Pipeline Manipulation

  • Split large pipelines - Create non-overlapping subpipelines
  • Schedule independently - Different cadences for different parts
  • Team ownership - Separate DAGs for different teams

Next Steps