Skip to content

Multi-Environment Execution

Execute your SQL pipeline against any database for testing, isolation, and portability.

One of clpipe's most powerful features is the separation of SQL parsing from SQL execution. Your pipeline logic is defined once, but can be executed against any backend—BigQuery in production, DuckDB for local testing, or any other SQL-compatible database.


The Challenge

Data engineers face several execution-related challenges:

  • Testing is slow and expensive: Running tests against production BigQuery costs money and takes time
  • Local development is hard: No easy way to test SQL transformations locally
  • Vendor lock-in: Your pipeline is tightly coupled to a specific data warehouse
  • Environment isolation: Testing in production risks affecting real data
  • CI/CD limitations: Hard to run pipeline tests in automated workflows

The Solution: Bring Your Own Executor

clpipe doesn't execute SQL directly. Instead, it:

  1. Parses your SQL and builds a dependency graph
  2. Determines the correct execution order
  3. Calls your executor function for each query in order

This means you can swap execution backends without changing your SQL:

from clpipe import Pipeline

# Your SQL is the same everywhere
pipeline = Pipeline(queries, dialect="duckdb")

# Only the executor changes
def execute_sql(sql: str):
    conn.execute(sql)

result = pipeline.run(executor=execute_sql, max_workers=1)

Use Case 1: Local Testing with DuckDB

Scenario: You want to test your BigQuery pipeline locally without incurring cloud costs.

Full Local Execution

import duckdb
from pathlib import Path
from clpipe import Pipeline

# Create in-memory DuckDB connection
conn = duckdb.connect(":memory:")

# Load SQL files
sql_dir = Path("examples/sql_files")
queries = []
for sql_file in sorted(sql_dir.glob("*.sql")):
    queries.append((sql_file.stem, sql_file.read_text()))

# Build pipeline (with DuckDB dialect for local testing)
pipeline = Pipeline(queries, dialect="duckdb")

# Define DuckDB executor
def execute_sql(sql: str):
    conn.execute(sql)

# Run the pipeline
result = pipeline.run(executor=execute_sql, max_workers=1, verbose=True)

print(f"Completed: {len(result['completed'])} queries")
print(f"Failed: {len(result['failed'])} queries")
print(f"Time: {result['elapsed_seconds']:.2f}s")

Output:

Level 1: 4 queries
  raw_orders
  raw_customers
  raw_products
  raw_order_items

Level 2: 1 queries
  stg_orders_enriched

Level 3: 3 queries
  int_daily_metrics
  mart_customer_ltv
  mart_product_performance

Completed: 8 queries
Failed: 0 queries
Time: 0.06s

With Fake Data Generation

For realistic testing, generate fake data before execution:

import random
from datetime import date, timedelta

def generate_fake_data(conn, num_customers=100, num_orders=500):
    """Generate fake e-commerce data in DuckDB source tables."""

    # Generate customers
    customers_sql = """
    CREATE OR REPLACE TABLE source_customers AS
    SELECT
        row_number() OVER () as customer_id,
        'customer' || row_number() OVER () || '@example.com' as email,
        'First' || row_number() OVER () as first_name,
        'Last' || row_number() OVER () as last_name,
        '+1-555-' || lpad(cast(row_number() OVER () as varchar), 4, '0') as phone_number,
        current_date - (random() * 1500)::int as registration_date,
        (array['US', 'CA', 'UK', 'DE', 'FR'])[1 + (random() * 4)::int] as country_code,
        (array['New York', 'LA', 'Chicago', 'Toronto', 'London'])[1 + (random() * 4)::int] as city,
        (array['Bronze', 'Silver', 'Gold', 'Platinum'])[1 + (random() * 3)::int] as loyalty_tier,
        current_timestamp as created_at
    FROM generate_series(1, {num_customers})
    """.format(num_customers=num_customers)

    conn.execute(customers_sql)
    print(f"  Created source_customers: {num_customers} rows")

    # Similar patterns for orders, products, order_items...
    # (See run_with_duckdb.py for full implementation)

# Generate data, then run pipeline
generate_fake_data(conn, num_customers=100, num_orders=500)
result = pipeline.run(executor=execute_sql, max_workers=1, verbose=True)

Use Case 2: Production Execution with BigQuery

Scenario: Deploy the same pipeline to BigQuery in production.

from google.cloud import bigquery
from clpipe import Pipeline

# Build pipeline (BigQuery dialect)
pipeline = Pipeline(queries, dialect="bigquery")

# BigQuery executor
client = bigquery.Client(project="my-project")

def execute_sql(sql: str):
    job = client.query(sql)
    job.result()  # Wait for completion

# Run the pipeline
result = pipeline.run(
    executor=execute_sql,
    max_workers=4,  # Parallel execution
    verbose=True
)

With Error Handling

def execute_sql_with_retry(sql: str, max_retries=3):
    """Execute SQL with retry logic for transient errors."""
    from google.api_core import retry
    from google.api_core.exceptions import ServiceUnavailable

    @retry.Retry(predicate=retry.if_exception_type(ServiceUnavailable))
    def _execute():
        job = client.query(sql)
        return job.result()

    return _execute()

result = pipeline.run(
    executor=execute_sql_with_retry,
    max_workers=4
)

Use Case 3: Environment Isolation

Scenario: Run the same pipeline in different environments (dev, staging, prod) with different configurations.

Environment-Specific Executors

import os
from clpipe import Pipeline

def get_executor(environment: str):
    """Return the appropriate executor for the environment."""

    if environment == "local":
        import duckdb
        conn = duckdb.connect(":memory:")
        return lambda sql: conn.execute(sql)

    elif environment == "dev":
        from google.cloud import bigquery
        client = bigquery.Client(project="my-project-dev")
        return lambda sql: client.query(sql).result()

    elif environment == "staging":
        from google.cloud import bigquery
        client = bigquery.Client(project="my-project-staging")
        return lambda sql: client.query(sql).result()

    elif environment == "prod":
        from google.cloud import bigquery
        client = bigquery.Client(project="my-project-prod")
        return lambda sql: client.query(sql).result()

    else:
        raise ValueError(f"Unknown environment: {environment}")

# Usage
env = os.getenv("ENVIRONMENT", "local")
executor = get_executor(env)

pipeline = Pipeline(queries, dialect="duckdb" if env == "local" else "bigquery")
result = pipeline.run(executor=executor, max_workers=1)

Dataset/Schema Isolation

For testing in shared environments, use different datasets:

def create_isolated_executor(client, dataset_prefix: str):
    """Create an executor that rewrites table references to use a specific dataset."""

    def execute_sql(sql: str):
        # Rewrite table references to use isolated dataset
        # e.g., raw_orders -> test_123_raw_orders
        isolated_sql = sql.replace("raw_", f"{dataset_prefix}raw_")
        isolated_sql = isolated_sql.replace("stg_", f"{dataset_prefix}stg_")
        isolated_sql = isolated_sql.replace("int_", f"{dataset_prefix}int_")
        isolated_sql = isolated_sql.replace("mart_", f"{dataset_prefix}mart_")

        job = client.query(isolated_sql)
        return job.result()

    return execute_sql

# Create isolated executor for this test run
test_id = f"test_{uuid.uuid4().hex[:8]}_"
executor = create_isolated_executor(client, test_id)

result = pipeline.run(executor=executor)

# Clean up after tests
# (delete tables with test_id prefix)

Use Case 4: CI/CD Integration

Scenario: Run pipeline tests in GitHub Actions or other CI systems.

GitHub Actions Workflow

name: Pipeline Tests

on: [push, pull_request]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'

      - name: Install dependencies
        run: |
          pip install clpipe duckdb pytest

      - name: Run pipeline tests
        run: |
          python -m pytest tests/test_pipeline.py -v

Test File

# tests/test_pipeline.py
import pytest
import duckdb
from pathlib import Path
from clpipe import Pipeline

@pytest.fixture
def pipeline():
    """Load the pipeline from SQL files."""
    sql_dir = Path("examples/sql_files")
    queries = []
    for sql_file in sorted(sql_dir.glob("*.sql")):
        queries.append((sql_file.stem, sql_file.read_text()))
    return Pipeline(queries, dialect="duckdb")

@pytest.fixture
def duckdb_connection():
    """Create an in-memory DuckDB connection with test data."""
    conn = duckdb.connect(":memory:")
    # Generate minimal test data
    generate_test_data(conn)
    yield conn
    conn.close()

def test_pipeline_executes_successfully(pipeline, duckdb_connection):
    """Test that the entire pipeline executes without errors."""
    def execute_sql(sql):
        duckdb_connection.execute(sql)

    result = pipeline.run(executor=execute_sql, max_workers=1)

    assert len(result['failed']) == 0
    assert len(result['completed']) == 8

def test_mart_customer_ltv_has_expected_columns(pipeline, duckdb_connection):
    """Test that mart_customer_ltv has expected output columns."""
    def execute_sql(sql):
        duckdb_connection.execute(sql)

    pipeline.run(executor=execute_sql, max_workers=1)

    # Verify output table structure
    result = duckdb_connection.execute(
        "SELECT * FROM mart_customer_ltv LIMIT 1"
    ).fetchdf()

    expected_columns = [
        'customer_id', 'customer_full_name', 'email',
        'total_orders', 'lifetime_revenue', 'customer_segment'
    ]
    for col in expected_columns:
        assert col in result.columns

def test_lineage_traces_correctly(pipeline):
    """Test that lineage tracing works as expected."""
    sources = pipeline.trace_column_backward("mart_customer_ltv", "lifetime_revenue")

    source_tables = {s.table_name for s in sources}
    assert "source_orders" in source_tables or "raw_orders" in source_tables

Use Case 5: Preventing Vendor Lock-in

Scenario: You want to write SQL that can run on multiple platforms.

Multi-Dialect Support

clpipe uses sqlglot for SQL parsing, which supports transpilation between dialects:

from clpipe import Pipeline

# Same SQL files work with different dialects
# The dialect affects parsing and can help with transpilation

# For DuckDB (local testing)
local_pipeline = Pipeline(queries, dialect="duckdb")

# For BigQuery (production)
prod_pipeline = Pipeline(queries, dialect="bigquery")

# For Snowflake
snowflake_pipeline = Pipeline(queries, dialect="snowflake")

# For PostgreSQL
pg_pipeline = Pipeline(queries, dialect="postgres")

Executor Abstraction Pattern

Create an abstraction layer for executors:

from abc import ABC, abstractmethod

class SQLExecutor(ABC):
    @abstractmethod
    def execute(self, sql: str) -> None:
        pass

    @abstractmethod
    def query(self, sql: str):
        pass

class DuckDBExecutor(SQLExecutor):
    def __init__(self, connection):
        self.conn = connection

    def execute(self, sql: str) -> None:
        self.conn.execute(sql)

    def query(self, sql: str):
        return self.conn.execute(sql).fetchdf()

class BigQueryExecutor(SQLExecutor):
    def __init__(self, client):
        self.client = client

    def execute(self, sql: str) -> None:
        self.client.query(sql).result()

    def query(self, sql: str):
        return self.client.query(sql).to_dataframe()

# Usage
executor = DuckDBExecutor(duckdb.connect(":memory:"))
# or
executor = BigQueryExecutor(bigquery.Client())

result = pipeline.run(executor=executor.execute, max_workers=1)

Execution Results

The pipeline.run() method returns a detailed result dictionary:

result = pipeline.run(executor=execute_sql, max_workers=1, verbose=True)

# Result structure
{
    'completed': ['01_raw_orders', '02_raw_customers', ...],
    'failed': [],  # List of (query_id, error) tuples
    'elapsed_seconds': 0.06
}

# Check results
if result['failed']:
    print("Failed queries:")
    for query_id, error in result['failed']:
        print(f"  {query_id}: {error}")
else:
    print(f"All {len(result['completed'])} queries completed successfully")

Try It Yourself

Run the DuckDB execution example:

cd clpipe
uv run python examples/sql_files/run_with_duckdb.py

Expected output:

Generating fake data...
  Created source_customers: 100 rows
  Created source_products: 50 rows
  Created source_orders: 500 rows
  Created source_order_items: 1247 rows

Loaded 8 SQL files

Building pipeline...
  8 queries
  127 columns tracked

Executing pipeline...
  Level 1: raw_orders, raw_customers, raw_products, raw_order_items
  Level 2: stg_orders_enriched
  Level 3: int_daily_metrics, mart_customer_ltv, mart_product_performance

EXECUTION RESULTS
  Completed: 8 queries
  Failed: 0 queries
  Time: 0.06 seconds

Sample data from mart_customer_ltv:
  customer_id  customer_full_name  total_orders  lifetime_revenue  customer_segment
           1        First1 Last1            12          1234.56              Gold
           2        First2 Last2             8           987.65            Silver
           ...


Key Benefits

Traditional Approach clpipe Portable Execution
Test against production (expensive) Test locally with DuckDB (free)
Tightly coupled to one warehouse Execute anywhere
Slow CI/CD feedback Fast local tests
Vendor lock-in Dialect-agnostic SQL
Risk testing in prod Safe isolated environments

Next Steps