Multi-Environment Execution
Execute your SQL pipeline against any database for testing, isolation, and portability.
One of clpipe's most powerful features is the separation of SQL parsing from SQL execution. Your pipeline logic is defined once, but can be executed against any backend—BigQuery in production, DuckDB for local testing, or any other SQL-compatible database.
The Challenge
Data engineers face several execution-related challenges:
- Testing is slow and expensive: Running tests against production BigQuery costs money and takes time
- Local development is hard: No easy way to test SQL transformations locally
- Vendor lock-in: Your pipeline is tightly coupled to a specific data warehouse
- Environment isolation: Testing in production risks affecting real data
- CI/CD limitations: Hard to run pipeline tests in automated workflows
The Solution: Bring Your Own Executor
clpipe doesn't execute SQL directly. Instead, it:
- Parses your SQL and builds a dependency graph
- Determines the correct execution order
- Calls your executor function for each query in order
This means you can swap execution backends without changing your SQL:
from clpipe import Pipeline
# Your SQL is the same everywhere
pipeline = Pipeline(queries, dialect="duckdb")
# Only the executor changes
def execute_sql(sql: str):
conn.execute(sql)
result = pipeline.run(executor=execute_sql, max_workers=1)
Use Case 1: Local Testing with DuckDB
Scenario: You want to test your BigQuery pipeline locally without incurring cloud costs.
Full Local Execution
import duckdb
from pathlib import Path
from clpipe import Pipeline
# Create in-memory DuckDB connection
conn = duckdb.connect(":memory:")
# Load SQL files
sql_dir = Path("examples/sql_files")
queries = []
for sql_file in sorted(sql_dir.glob("*.sql")):
queries.append((sql_file.stem, sql_file.read_text()))
# Build pipeline (with DuckDB dialect for local testing)
pipeline = Pipeline(queries, dialect="duckdb")
# Define DuckDB executor
def execute_sql(sql: str):
conn.execute(sql)
# Run the pipeline
result = pipeline.run(executor=execute_sql, max_workers=1, verbose=True)
print(f"Completed: {len(result['completed'])} queries")
print(f"Failed: {len(result['failed'])} queries")
print(f"Time: {result['elapsed_seconds']:.2f}s")
Output:
Level 1: 4 queries
raw_orders
raw_customers
raw_products
raw_order_items
Level 2: 1 queries
stg_orders_enriched
Level 3: 3 queries
int_daily_metrics
mart_customer_ltv
mart_product_performance
Completed: 8 queries
Failed: 0 queries
Time: 0.06s
With Fake Data Generation
For realistic testing, generate fake data before execution:
import random
from datetime import date, timedelta
def generate_fake_data(conn, num_customers=100, num_orders=500):
"""Generate fake e-commerce data in DuckDB source tables."""
# Generate customers
customers_sql = """
CREATE OR REPLACE TABLE source_customers AS
SELECT
row_number() OVER () as customer_id,
'customer' || row_number() OVER () || '@example.com' as email,
'First' || row_number() OVER () as first_name,
'Last' || row_number() OVER () as last_name,
'+1-555-' || lpad(cast(row_number() OVER () as varchar), 4, '0') as phone_number,
current_date - (random() * 1500)::int as registration_date,
(array['US', 'CA', 'UK', 'DE', 'FR'])[1 + (random() * 4)::int] as country_code,
(array['New York', 'LA', 'Chicago', 'Toronto', 'London'])[1 + (random() * 4)::int] as city,
(array['Bronze', 'Silver', 'Gold', 'Platinum'])[1 + (random() * 3)::int] as loyalty_tier,
current_timestamp as created_at
FROM generate_series(1, {num_customers})
""".format(num_customers=num_customers)
conn.execute(customers_sql)
print(f" Created source_customers: {num_customers} rows")
# Similar patterns for orders, products, order_items...
# (See run_with_duckdb.py for full implementation)
# Generate data, then run pipeline
generate_fake_data(conn, num_customers=100, num_orders=500)
result = pipeline.run(executor=execute_sql, max_workers=1, verbose=True)
Use Case 2: Production Execution with BigQuery
Scenario: Deploy the same pipeline to BigQuery in production.
from google.cloud import bigquery
from clpipe import Pipeline
# Build pipeline (BigQuery dialect)
pipeline = Pipeline(queries, dialect="bigquery")
# BigQuery executor
client = bigquery.Client(project="my-project")
def execute_sql(sql: str):
job = client.query(sql)
job.result() # Wait for completion
# Run the pipeline
result = pipeline.run(
executor=execute_sql,
max_workers=4, # Parallel execution
verbose=True
)
With Error Handling
def execute_sql_with_retry(sql: str, max_retries=3):
"""Execute SQL with retry logic for transient errors."""
from google.api_core import retry
from google.api_core.exceptions import ServiceUnavailable
@retry.Retry(predicate=retry.if_exception_type(ServiceUnavailable))
def _execute():
job = client.query(sql)
return job.result()
return _execute()
result = pipeline.run(
executor=execute_sql_with_retry,
max_workers=4
)
Use Case 3: Environment Isolation
Scenario: Run the same pipeline in different environments (dev, staging, prod) with different configurations.
Environment-Specific Executors
import os
from clpipe import Pipeline
def get_executor(environment: str):
"""Return the appropriate executor for the environment."""
if environment == "local":
import duckdb
conn = duckdb.connect(":memory:")
return lambda sql: conn.execute(sql)
elif environment == "dev":
from google.cloud import bigquery
client = bigquery.Client(project="my-project-dev")
return lambda sql: client.query(sql).result()
elif environment == "staging":
from google.cloud import bigquery
client = bigquery.Client(project="my-project-staging")
return lambda sql: client.query(sql).result()
elif environment == "prod":
from google.cloud import bigquery
client = bigquery.Client(project="my-project-prod")
return lambda sql: client.query(sql).result()
else:
raise ValueError(f"Unknown environment: {environment}")
# Usage
env = os.getenv("ENVIRONMENT", "local")
executor = get_executor(env)
pipeline = Pipeline(queries, dialect="duckdb" if env == "local" else "bigquery")
result = pipeline.run(executor=executor, max_workers=1)
Dataset/Schema Isolation
For testing in shared environments, use different datasets:
def create_isolated_executor(client, dataset_prefix: str):
"""Create an executor that rewrites table references to use a specific dataset."""
def execute_sql(sql: str):
# Rewrite table references to use isolated dataset
# e.g., raw_orders -> test_123_raw_orders
isolated_sql = sql.replace("raw_", f"{dataset_prefix}raw_")
isolated_sql = isolated_sql.replace("stg_", f"{dataset_prefix}stg_")
isolated_sql = isolated_sql.replace("int_", f"{dataset_prefix}int_")
isolated_sql = isolated_sql.replace("mart_", f"{dataset_prefix}mart_")
job = client.query(isolated_sql)
return job.result()
return execute_sql
# Create isolated executor for this test run
test_id = f"test_{uuid.uuid4().hex[:8]}_"
executor = create_isolated_executor(client, test_id)
result = pipeline.run(executor=executor)
# Clean up after tests
# (delete tables with test_id prefix)
Use Case 4: CI/CD Integration
Scenario: Run pipeline tests in GitHub Actions or other CI systems.
GitHub Actions Workflow
name: Pipeline Tests
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install clpipe duckdb pytest
- name: Run pipeline tests
run: |
python -m pytest tests/test_pipeline.py -v
Test File
# tests/test_pipeline.py
import pytest
import duckdb
from pathlib import Path
from clpipe import Pipeline
@pytest.fixture
def pipeline():
"""Load the pipeline from SQL files."""
sql_dir = Path("examples/sql_files")
queries = []
for sql_file in sorted(sql_dir.glob("*.sql")):
queries.append((sql_file.stem, sql_file.read_text()))
return Pipeline(queries, dialect="duckdb")
@pytest.fixture
def duckdb_connection():
"""Create an in-memory DuckDB connection with test data."""
conn = duckdb.connect(":memory:")
# Generate minimal test data
generate_test_data(conn)
yield conn
conn.close()
def test_pipeline_executes_successfully(pipeline, duckdb_connection):
"""Test that the entire pipeline executes without errors."""
def execute_sql(sql):
duckdb_connection.execute(sql)
result = pipeline.run(executor=execute_sql, max_workers=1)
assert len(result['failed']) == 0
assert len(result['completed']) == 8
def test_mart_customer_ltv_has_expected_columns(pipeline, duckdb_connection):
"""Test that mart_customer_ltv has expected output columns."""
def execute_sql(sql):
duckdb_connection.execute(sql)
pipeline.run(executor=execute_sql, max_workers=1)
# Verify output table structure
result = duckdb_connection.execute(
"SELECT * FROM mart_customer_ltv LIMIT 1"
).fetchdf()
expected_columns = [
'customer_id', 'customer_full_name', 'email',
'total_orders', 'lifetime_revenue', 'customer_segment'
]
for col in expected_columns:
assert col in result.columns
def test_lineage_traces_correctly(pipeline):
"""Test that lineage tracing works as expected."""
sources = pipeline.trace_column_backward("mart_customer_ltv", "lifetime_revenue")
source_tables = {s.table_name for s in sources}
assert "source_orders" in source_tables or "raw_orders" in source_tables
Use Case 5: Preventing Vendor Lock-in
Scenario: You want to write SQL that can run on multiple platforms.
Multi-Dialect Support
clpipe uses sqlglot for SQL parsing, which supports transpilation between dialects:
from clpipe import Pipeline
# Same SQL files work with different dialects
# The dialect affects parsing and can help with transpilation
# For DuckDB (local testing)
local_pipeline = Pipeline(queries, dialect="duckdb")
# For BigQuery (production)
prod_pipeline = Pipeline(queries, dialect="bigquery")
# For Snowflake
snowflake_pipeline = Pipeline(queries, dialect="snowflake")
# For PostgreSQL
pg_pipeline = Pipeline(queries, dialect="postgres")
Executor Abstraction Pattern
Create an abstraction layer for executors:
from abc import ABC, abstractmethod
class SQLExecutor(ABC):
@abstractmethod
def execute(self, sql: str) -> None:
pass
@abstractmethod
def query(self, sql: str):
pass
class DuckDBExecutor(SQLExecutor):
def __init__(self, connection):
self.conn = connection
def execute(self, sql: str) -> None:
self.conn.execute(sql)
def query(self, sql: str):
return self.conn.execute(sql).fetchdf()
class BigQueryExecutor(SQLExecutor):
def __init__(self, client):
self.client = client
def execute(self, sql: str) -> None:
self.client.query(sql).result()
def query(self, sql: str):
return self.client.query(sql).to_dataframe()
# Usage
executor = DuckDBExecutor(duckdb.connect(":memory:"))
# or
executor = BigQueryExecutor(bigquery.Client())
result = pipeline.run(executor=executor.execute, max_workers=1)
Execution Results
The pipeline.run() method returns a detailed result dictionary:
result = pipeline.run(executor=execute_sql, max_workers=1, verbose=True)
# Result structure
{
'completed': ['01_raw_orders', '02_raw_customers', ...],
'failed': [], # List of (query_id, error) tuples
'elapsed_seconds': 0.06
}
# Check results
if result['failed']:
print("Failed queries:")
for query_id, error in result['failed']:
print(f" {query_id}: {error}")
else:
print(f"All {len(result['completed'])} queries completed successfully")
Try It Yourself
Run the DuckDB execution example:
Expected output:
Generating fake data...
Created source_customers: 100 rows
Created source_products: 50 rows
Created source_orders: 500 rows
Created source_order_items: 1247 rows
Loaded 8 SQL files
Building pipeline...
8 queries
127 columns tracked
Executing pipeline...
Level 1: raw_orders, raw_customers, raw_products, raw_order_items
Level 2: stg_orders_enriched
Level 3: int_daily_metrics, mart_customer_ltv, mart_product_performance
EXECUTION RESULTS
Completed: 8 queries
Failed: 0 queries
Time: 0.06 seconds
Sample data from mart_customer_ltv:
customer_id customer_full_name total_orders lifetime_revenue customer_segment
1 First1 Last1 12 1234.56 Gold
2 First2 Last2 8 987.65 Silver
...
Key Benefits
| Traditional Approach | clpipe Portable Execution |
|---|---|
| Test against production (expensive) | Test locally with DuckDB (free) |
| Tightly coupled to one warehouse | Execute anywhere |
| Slow CI/CD feedback | Fast local tests |
| Vendor lock-in | Dialect-agnostic SQL |
| Risk testing in prod | Safe isolated environments |
Next Steps
- SQL Debugging & Lineage - Use lineage to debug and understand your pipeline
- Data Catalog & AI - Use metadata for documentation and AI applications
- API Reference - Full pipeline API documentation