Skip to content

Lineage Classes

Classes for tracing and understanding data flow through SQL queries.


SQLColumnTracer

Trace column lineage for a single SQL query.

from clpipe import SQLColumnTracer

Constructor

SQLColumnTracer(sql: str, dialect: str = "bigquery")

Parameters: - sql: SQL query to analyze - dialect: SQL dialect

Methods

build_column_lineage_graph

Build the complete lineage graph for the query.

tracer.build_column_lineage_graph() -> ColumnLineageGraph

get_forward_lineage

Impact analysis - find all outputs affected by input columns.

tracer.get_forward_lineage(
    input_columns: List[str]
) -> Dict[str, Any]

Returns: Dictionary with: - impacted_outputs: List of output column names - impacted_ctes: List of CTE names in the path - paths: List of path dictionaries with transformations

Example:

query = """
WITH cte AS (SELECT id, SUM(amount) as total FROM orders GROUP BY id)
SELECT id, total FROM cte
"""

tracer = SQLColumnTracer(query)
forward = tracer.get_forward_lineage(["orders.amount"])

print(forward["impacted_outputs"])  # ['total']
print(forward["impacted_ctes"])     # ['cte']

get_backward_lineage

Source tracing - find all inputs required for output columns.

tracer.get_backward_lineage(
    output_columns: List[str]
) -> Dict[str, Any]

Returns: Dictionary with: - required_inputs: Dict mapping table names to column lists - required_ctes: List of CTE names in the path - paths: List of path dictionaries

Example:

backward = tracer.get_backward_lineage(["total"])

print(backward["required_inputs"])  # {'orders': ['amount']}
print(backward["required_ctes"])    # ['cte']

get_output_columns

Get list of output column names.

tracer.get_output_columns() -> List[str]

ColumnNode

Unified column representation for both single-query and multi-query lineage analysis.

from clpipe import ColumnNode

Properties

Property Type Description
column_name str Column name
table_name str Table name
full_name str Fully qualified name (table.column)
query_id Optional[str] Query that produces this column (for pipelines)
unit_id Optional[str] CTE/subquery unit within a query
node_type str "source", "intermediate", "output", "base_column", "star", etc.
layer Optional[str] "input", "cte", "subquery", "output" (for compatibility)
expression Optional[str] SQL expression
operation Optional[str] Operation type (SUM, CASE, JOIN, etc.)
description Optional[str] Column description
owner Optional[str] Column owner
pii bool Whether marked as PII
tags Set[str] Custom tags
custom_metadata Dict[str, Any] Custom key-value metadata

Methods

set_source_description

Set a user-provided description.

col.set_source_description(description: str)

is_computed

Check if this is a computed/derived column.

col.is_computed() -> bool

Example

# Access column
col = pipeline.columns["output.total_revenue"]

# Set metadata
col.pii = False
col.owner = "analytics-team"
col.tags.add("financial")
col.set_source_description("Total revenue in USD")

# Check properties
print(f"Expression: {col.expression}")
print(f"Operation: {col.operation}")
print(f"Is computed: {col.is_computed()}")

Metadata Functions

For metadata propagation and description generation, use the utility functions:

from clpipe.column import propagate_metadata, generate_description

# Propagate metadata from source columns
propagate_metadata(col, pipeline)

# Generate description using LLM (requires LLM configuration)
generate_description(col, llm, pipeline)

ColumnEdge

Unified edge representing lineage relationships between columns.

from clpipe import ColumnEdge

Properties

Property Type Description
from_node ColumnNode Source column
to_node ColumnNode Target column
edge_type str Type: "direct", "transform", "aggregate", "join", "star_passthrough", "cross_query"
context Optional[str] Context: "SELECT", "CTE", "main_query", "cross_query"
transformation Optional[str] Description of transformation
query_id Optional[str] Query where edge exists
expression Optional[str] SQL expression

Example

for edge in pipeline.edges:
    print(f"{edge.from_node.full_name} -> {edge.to_node.full_name}")
    print(f"  Type: {edge.edge_type}")
    print(f"  Transformation: {edge.transformation}")

TableDependencyGraph

Table-level dependency graph for a pipeline.

graph = pipeline.table_graph

Methods

get_execution_order

Get tables in topological order for execution.

graph.get_execution_order() -> List[TableNode]

Example:

for table in graph.get_execution_order():
    print(f"Execute: {table.table_name}")

get_source_tables

Get external source tables (not created by any query).

graph.get_source_tables() -> List[TableNode]

get_final_tables

Get final tables (not read by any query).

graph.get_final_tables() -> List[TableNode]

get_dependencies

Get upstream tables that a table depends on.

graph.get_dependencies(table_name: str) -> List[TableNode]

Example:

deps = graph.get_dependencies("analytics.user_metrics")
print(f"Depends on: {[t.table_name for t in deps]}")

get_downstream

Get downstream tables that depend on this table.

graph.get_downstream(table_name: str) -> List[TableNode]

Example:

downstream = graph.get_downstream("staging.orders")
print(f"Feeds into: {[t.table_name for t in downstream]}")

topological_sort

Get query IDs in execution order.

graph.topological_sort() -> List[str]

Properties

Property Type Description
tables Dict[str, TableNode] All tables by name
queries Dict[str, ParsedQuery] All queries by ID

TableNode

Represents a table in the dependency graph.

Properties

Property Type Description
table_name str Fully qualified table name
is_source bool Whether external source
created_by Optional[str] Query ID that creates this table
modified_by List[str] Query IDs that modify this table
read_by List[str] Query IDs that read this table
columns Set[str] Column names in table
description Optional[str] Table description

Methods

get_columns

Get all columns for this table.

table.get_columns(pipeline) -> List[ColumnNode]

Example

table = graph.tables["analytics.user_metrics"]

print(f"Table: {table.table_name}")
print(f"Is source: {table.is_source}")
print(f"Created by: {table.created_by}")
print(f"Read by: {table.read_by}")