Skip to content

Pipeline

The Pipeline class is the main entry point for clpipe. It provides methods to create pipelines from SQL, trace lineage, manage metadata, and execute queries.

from clpipe import Pipeline

Factory Methods

Create pipelines from various SQL input formats.

from_sql_list

Create a pipeline from a list of SQL strings.

Pipeline.from_sql_list(
    queries: List[str],
    dialect: str = "bigquery"
) -> Pipeline

Parameters: - queries: List of SQL statements - dialect: SQL dialect (bigquery, snowflake, postgres, etc.)

Example:

pipeline = Pipeline.from_sql_list([
    "CREATE TABLE staging AS SELECT id, name FROM raw.users",
    "CREATE TABLE output AS SELECT id, UPPER(name) as name FROM staging"
])

from_dict

Create a pipeline from a dictionary of query_id to SQL.

Pipeline.from_dict(
    queries: Dict[str, str],
    dialect: str = "bigquery"
) -> Pipeline

Example:

pipeline = Pipeline.from_dict({
    "staging_users": "CREATE TABLE staging AS SELECT * FROM raw.users",
    "final_output": "CREATE TABLE output AS SELECT * FROM staging"
})

from_tuples

Create a pipeline from a list of (query_id, sql) tuples.

Pipeline.from_tuples(
    queries: List[Tuple[str, str]],
    dialect: str = "bigquery"
) -> Pipeline

Example:

pipeline = Pipeline.from_tuples([
    ("staging", "CREATE TABLE staging AS SELECT * FROM raw"),
    ("output", "CREATE TABLE output AS SELECT * FROM staging")
])

from_sql_string

Create a pipeline from a semicolon-separated SQL string.

Pipeline.from_sql_string(
    sql: str,
    dialect: str = "bigquery"
) -> Pipeline

Example:

sql = """
CREATE TABLE staging AS SELECT * FROM raw;
CREATE TABLE output AS SELECT * FROM staging;
"""
pipeline = Pipeline.from_sql_string(sql)

from_sql_files

Create a pipeline from SQL files in a directory.

Pipeline.from_sql_files(
    sql_dir: str,
    dialect: str = "bigquery",
    pattern: str = "*.sql",
    query_id_from: str = "filename"
) -> Pipeline

Parameters: - sql_dir: Directory containing SQL files - dialect: SQL dialect (bigquery, snowflake, postgres, etc.) - pattern: Glob pattern for matching SQL files (default: "*.sql") - query_id_from: How to determine query IDs: - "filename": Use filename without extension (default) - "comment": Extract from first line comment -- query_id: name

Pattern Parameter:

The pattern parameter accepts glob patterns to filter SQL files:

  • "*.sql" - All .sql files in the root directory (default)
  • "**/*.sql" - All .sql files in root and all subdirectories
  • "staging/**/*.sql" - All .sql files in staging/ and its subdirectories
  • "transform_*.sql" - Files starting with transform_ in root directory
  • "[0-9]*.sql" - Files starting with a digit in root directory

Examples:

# Load all SQL files from root directory only
pipeline = Pipeline.from_sql_files("/path/to/sql/")

# Load all SQL files recursively from subdirectories
pipeline = Pipeline.from_sql_files(
    "/path/to/sql/",
    pattern="**/*.sql"
)

# Load only staging queries
pipeline = Pipeline.from_sql_files(
    "/path/to/sql/",
    pattern="staging/**/*.sql"
)

# Load queries with specific naming pattern
pipeline = Pipeline.from_sql_files(
    "/path/to/sql/",
    pattern="transform_*.sql"
)

# Use query IDs from comments instead of filenames
pipeline = Pipeline.from_sql_files(
    "/path/to/sql/",
    query_id_from="comment"
)

Lineage Methods

Trace data flow through your pipeline.

trace_column_backward

Find all source columns that contribute to a given column.

pipeline.trace_column_backward(
    table_name: str,
    column_name: str
) -> List[ColumnNode]

Example:

# Find sources for output.total_revenue
sources = pipeline.trace_column_backward("output", "total_revenue")
for col in sources:
    print(f"{col.table_name}.{col.column_name}")

trace_column_forward

Find all downstream columns impacted by a given column.

pipeline.trace_column_forward(
    table_name: str,
    column_name: str
) -> List[ColumnNode]

Example:

# Find what's impacted by raw.user_id
impacts = pipeline.trace_column_forward("raw", "user_id")
print(f"Changing user_id affects {len(impacts)} columns")

get_lineage_path

Find the lineage path between two columns.

pipeline.get_lineage_path(
    from_table: str,
    from_column: str,
    to_table: str,
    to_column: str
) -> List[ColumnEdge]

Example:

path = pipeline.get_lineage_path("raw", "amount", "output", "total")
for edge in path:
    print(f"{edge.from_node.full_name} -> {edge.to_node.full_name}")


Metadata Methods

Manage column metadata including PII flags, ownership, and descriptions.

propagate_all_metadata

Propagate PII, owner, and tags through lineage edges.

pipeline.propagate_all_metadata(verbose: bool = False)

Example:

# Mark source column as PII
pipeline.columns["raw.email"].pii = True
pipeline.columns["raw.email"].owner = "security-team"
pipeline.columns["raw.email"].tags.add("sensitive")

# Propagate to all downstream columns
pipeline.propagate_all_metadata()

generate_all_descriptions

Generate natural language descriptions using an LLM.

pipeline.generate_all_descriptions(
    batch_size: int = 10,
    verbose: bool = False
)

Requires LLM configuration:

from langchain_openai import ChatOpenAI

pipeline.llm = ChatOpenAI(model="gpt-4o-mini")
pipeline.generate_all_descriptions(verbose=True)

get_pii_columns

Get all columns marked as PII.

pipeline.get_pii_columns() -> List[ColumnNode]

Example:

pii_columns = pipeline.get_pii_columns()
print(f"Found {len(pii_columns)} PII columns")

get_columns_by_owner

Get columns owned by a specific team/person.

pipeline.get_columns_by_owner(owner: str) -> List[ColumnNode]

get_columns_by_tag

Get columns with a specific tag.

pipeline.get_columns_by_tag(tag: str) -> List[ColumnNode]

Export Methods

Export pipeline data to various formats.

to_json

Export to a JSON-serializable dictionary.

pipeline.to_json(include_metadata: bool = True) -> Dict[str, Any]

Returns: Dictionary with columns, edges, and tables keys.

Example:

import json

data = pipeline.to_json()
with open("lineage.json", "w") as f:
    json.dump(data, f, indent=2)

to_airflow_dag

Generate an Airflow DAG from the pipeline.

pipeline.to_airflow_dag(
    executor: Callable[[str], None],
    dag_id: str,
    schedule: str,
    start_date: datetime,
    default_args: Optional[dict] = None,
    **dag_kwargs
) -> DAG

Parameters: - executor: Function that takes SQL string and executes it

Example:

from datetime import datetime

def execute_sql(sql: str):
    # Execute SQL in your data warehouse
    client.query(sql)

dag = pipeline.to_airflow_dag(
    executor=execute_sql,
    dag_id="my_pipeline",
    schedule="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False
)


Execution Methods

Execute pipeline queries directly.

run

Execute pipeline synchronously with parallel execution.

pipeline.run(
    executor: Callable[[str], Any],
    max_workers: int = 4,
    verbose: bool = False
) -> dict

Parameters: - executor: Function that takes SQL string and executes it

Example:

def execute_sql(sql: str):
    return client.query(sql)

results = pipeline.run(executor=execute_sql, verbose=True)

async_run

Execute pipeline asynchronously.

async pipeline.async_run(
    executor: Callable[[str], Awaitable[Any]],
    max_workers: int = 4,
    verbose: bool = False
) -> dict

Parameters: - executor: Async function that takes SQL string and executes it

Example:

import asyncio

async def execute_sql(sql: str):
    return await async_client.query(sql)

results = asyncio.run(pipeline.async_run(executor=execute_sql))


Pipeline Management

split

Split pipeline into independent subpipelines by sink tables.

pipeline.split(sinks: List[List[str]]) -> List[Pipeline]

Example:

# Split into two pipelines by their final tables
subpipelines = pipeline.split(
    sinks=[["realtime_metrics"], ["daily_reports"]]
)

diff

Compare this pipeline with another version.

pipeline.diff(other: Pipeline) -> PipelineDiff

Example:

old_pipeline = Pipeline.from_sql_files("/path/to/old/")
new_pipeline = Pipeline.from_sql_files("/path/to/new/")

diff = new_pipeline.diff(old_pipeline)
print(diff.summary())


Persistence

Save and load metadata.

save

Save all metadata to a file.

pipeline.save(file_path: str)

load_metadata / apply_metadata

Load and apply saved metadata.

metadata = Pipeline.load_metadata("metadata.json")
pipeline.apply_metadata(metadata)

Properties

columns

Dictionary of all columns keyed by full name.

pipeline.columns: Dict[str, ColumnNode]

Example:

for name, col in pipeline.columns.items():
    print(f"{name}: {col.expression}")

edges

List of all lineage edges.

pipeline.edges: List[ColumnEdge]

table_graph

Table-level dependency graph.

pipeline.table_graph: TableDependencyGraph

Example:

# Get execution order
for table in pipeline.table_graph.get_execution_order():
    print(table.table_name)

dialect

SQL dialect used for parsing.

pipeline.dialect: str