Pipeline
The Pipeline class is the main entry point for clpipe. It provides methods to create pipelines from SQL, trace lineage, manage metadata, and execute queries.
Factory Methods
Create pipelines from various SQL input formats.
from_sql_list
Create a pipeline from a list of SQL strings.
Parameters:
- queries: List of SQL statements
- dialect: SQL dialect (bigquery, snowflake, postgres, etc.)
Example:
pipeline = Pipeline.from_sql_list([
"CREATE TABLE staging AS SELECT id, name FROM raw.users",
"CREATE TABLE output AS SELECT id, UPPER(name) as name FROM staging"
])
from_dict
Create a pipeline from a dictionary of query_id to SQL.
Example:
pipeline = Pipeline.from_dict({
"staging_users": "CREATE TABLE staging AS SELECT * FROM raw.users",
"final_output": "CREATE TABLE output AS SELECT * FROM staging"
})
from_tuples
Create a pipeline from a list of (query_id, sql) tuples.
Example:
pipeline = Pipeline.from_tuples([
("staging", "CREATE TABLE staging AS SELECT * FROM raw"),
("output", "CREATE TABLE output AS SELECT * FROM staging")
])
from_sql_string
Create a pipeline from a semicolon-separated SQL string.
Example:
sql = """
CREATE TABLE staging AS SELECT * FROM raw;
CREATE TABLE output AS SELECT * FROM staging;
"""
pipeline = Pipeline.from_sql_string(sql)
from_sql_files
Create a pipeline from SQL files in a directory.
Pipeline.from_sql_files(
sql_dir: str,
dialect: str = "bigquery",
pattern: str = "*.sql",
query_id_from: str = "filename"
) -> Pipeline
Parameters:
- sql_dir: Directory containing SQL files
- dialect: SQL dialect (bigquery, snowflake, postgres, etc.)
- pattern: Glob pattern for matching SQL files (default: "*.sql")
- query_id_from: How to determine query IDs:
- "filename": Use filename without extension (default)
- "comment": Extract from first line comment -- query_id: name
Pattern Parameter:
The pattern parameter accepts glob patterns to filter SQL files:
"*.sql"- All.sqlfiles in the root directory (default)"**/*.sql"- All.sqlfiles in root and all subdirectories"staging/**/*.sql"- All.sqlfiles instaging/and its subdirectories"transform_*.sql"- Files starting withtransform_in root directory"[0-9]*.sql"- Files starting with a digit in root directory
Examples:
# Load all SQL files from root directory only
pipeline = Pipeline.from_sql_files("/path/to/sql/")
# Load all SQL files recursively from subdirectories
pipeline = Pipeline.from_sql_files(
"/path/to/sql/",
pattern="**/*.sql"
)
# Load only staging queries
pipeline = Pipeline.from_sql_files(
"/path/to/sql/",
pattern="staging/**/*.sql"
)
# Load queries with specific naming pattern
pipeline = Pipeline.from_sql_files(
"/path/to/sql/",
pattern="transform_*.sql"
)
# Use query IDs from comments instead of filenames
pipeline = Pipeline.from_sql_files(
"/path/to/sql/",
query_id_from="comment"
)
Lineage Methods
Trace data flow through your pipeline.
trace_column_backward
Find all source columns that contribute to a given column.
Example:
# Find sources for output.total_revenue
sources = pipeline.trace_column_backward("output", "total_revenue")
for col in sources:
print(f"{col.table_name}.{col.column_name}")
trace_column_forward
Find all downstream columns impacted by a given column.
Example:
# Find what's impacted by raw.user_id
impacts = pipeline.trace_column_forward("raw", "user_id")
print(f"Changing user_id affects {len(impacts)} columns")
get_lineage_path
Find the lineage path between two columns.
pipeline.get_lineage_path(
from_table: str,
from_column: str,
to_table: str,
to_column: str
) -> List[ColumnEdge]
Example:
path = pipeline.get_lineage_path("raw", "amount", "output", "total")
for edge in path:
print(f"{edge.from_node.full_name} -> {edge.to_node.full_name}")
Metadata Methods
Manage column metadata including PII flags, ownership, and descriptions.
propagate_all_metadata
Propagate PII, owner, and tags through lineage edges.
Example:
# Mark source column as PII
pipeline.columns["raw.email"].pii = True
pipeline.columns["raw.email"].owner = "security-team"
pipeline.columns["raw.email"].tags.add("sensitive")
# Propagate to all downstream columns
pipeline.propagate_all_metadata()
generate_all_descriptions
Generate natural language descriptions using an LLM.
Requires LLM configuration:
from langchain_openai import ChatOpenAI
pipeline.llm = ChatOpenAI(model="gpt-4o-mini")
pipeline.generate_all_descriptions(verbose=True)
get_pii_columns
Get all columns marked as PII.
Example:
get_columns_by_owner
Get columns owned by a specific team/person.
get_columns_by_tag
Get columns with a specific tag.
Export Methods
Export pipeline data to various formats.
to_json
Export to a JSON-serializable dictionary.
Returns: Dictionary with columns, edges, and tables keys.
Example:
import json
data = pipeline.to_json()
with open("lineage.json", "w") as f:
json.dump(data, f, indent=2)
to_airflow_dag
Generate an Airflow DAG from the pipeline.
pipeline.to_airflow_dag(
executor: Callable[[str], None],
dag_id: str,
schedule: str,
start_date: datetime,
default_args: Optional[dict] = None,
**dag_kwargs
) -> DAG
Parameters:
- executor: Function that takes SQL string and executes it
Example:
from datetime import datetime
def execute_sql(sql: str):
# Execute SQL in your data warehouse
client.query(sql)
dag = pipeline.to_airflow_dag(
executor=execute_sql,
dag_id="my_pipeline",
schedule="@daily",
start_date=datetime(2024, 1, 1),
catchup=False
)
Execution Methods
Execute pipeline queries directly.
run
Execute pipeline synchronously with parallel execution.
Parameters:
- executor: Function that takes SQL string and executes it
Example:
def execute_sql(sql: str):
return client.query(sql)
results = pipeline.run(executor=execute_sql, verbose=True)
async_run
Execute pipeline asynchronously.
async pipeline.async_run(
executor: Callable[[str], Awaitable[Any]],
max_workers: int = 4,
verbose: bool = False
) -> dict
Parameters:
- executor: Async function that takes SQL string and executes it
Example:
import asyncio
async def execute_sql(sql: str):
return await async_client.query(sql)
results = asyncio.run(pipeline.async_run(executor=execute_sql))
Pipeline Management
split
Split pipeline into independent subpipelines by sink tables.
Example:
# Split into two pipelines by their final tables
subpipelines = pipeline.split(
sinks=[["realtime_metrics"], ["daily_reports"]]
)
diff
Compare this pipeline with another version.
Example:
old_pipeline = Pipeline.from_sql_files("/path/to/old/")
new_pipeline = Pipeline.from_sql_files("/path/to/new/")
diff = new_pipeline.diff(old_pipeline)
print(diff.summary())
Persistence
Save and load metadata.
save
Save all metadata to a file.
load_metadata / apply_metadata
Load and apply saved metadata.
Properties
columns
Dictionary of all columns keyed by full name.
Example:
edges
List of all lineage edges.
table_graph
Table-level dependency graph.
Example:
# Get execution order
for table in pipeline.table_graph.get_execution_order():
print(table.table_name)
dialect
SQL dialect used for parsing.