Lineage Classes
Classes for tracing and understanding data flow through SQL queries.
SQLColumnTracer
Trace column lineage for a single SQL query.
Constructor
Parameters:
- sql: SQL query to analyze
- dialect: SQL dialect
Methods
build_column_lineage_graph
Build the complete lineage graph for the query.
get_forward_lineage
Impact analysis - find all outputs affected by input columns.
Returns: Dictionary with:
- impacted_outputs: List of output column names
- impacted_ctes: List of CTE names in the path
- paths: List of path dictionaries with transformations
Example:
query = """
WITH cte AS (SELECT id, SUM(amount) as total FROM orders GROUP BY id)
SELECT id, total FROM cte
"""
tracer = SQLColumnTracer(query)
forward = tracer.get_forward_lineage(["orders.amount"])
print(forward["impacted_outputs"]) # ['total']
print(forward["impacted_ctes"]) # ['cte']
get_backward_lineage
Source tracing - find all inputs required for output columns.
Returns: Dictionary with:
- required_inputs: Dict mapping table names to column lists
- required_ctes: List of CTE names in the path
- paths: List of path dictionaries
Example:
backward = tracer.get_backward_lineage(["total"])
print(backward["required_inputs"]) # {'orders': ['amount']}
print(backward["required_ctes"]) # ['cte']
get_output_columns
Get list of output column names.
ColumnNode
Unified column representation for both single-query and multi-query lineage analysis.
Properties
| Property | Type | Description |
|---|---|---|
column_name |
str |
Column name |
table_name |
str |
Table name |
full_name |
str |
Fully qualified name (table.column) |
query_id |
Optional[str] |
Query that produces this column (for pipelines) |
unit_id |
Optional[str] |
CTE/subquery unit within a query |
node_type |
str |
"source", "intermediate", "output", "base_column", "star", etc. |
layer |
Optional[str] |
"input", "cte", "subquery", "output" (for compatibility) |
expression |
Optional[str] |
SQL expression |
operation |
Optional[str] |
Operation type (SUM, CASE, JOIN, etc.) |
description |
Optional[str] |
Column description |
owner |
Optional[str] |
Column owner |
pii |
bool |
Whether marked as PII |
tags |
Set[str] |
Custom tags |
custom_metadata |
Dict[str, Any] |
Custom key-value metadata |
Methods
set_source_description
Set a user-provided description.
is_computed
Check if this is a computed/derived column.
Example
# Access column
col = pipeline.columns["output.total_revenue"]
# Set metadata
col.pii = False
col.owner = "analytics-team"
col.tags.add("financial")
col.set_source_description("Total revenue in USD")
# Check properties
print(f"Expression: {col.expression}")
print(f"Operation: {col.operation}")
print(f"Is computed: {col.is_computed()}")
Metadata Functions
For metadata propagation and description generation, use the utility functions:
from clpipe.column import propagate_metadata, generate_description
# Propagate metadata from source columns
propagate_metadata(col, pipeline)
# Generate description using LLM (requires LLM configuration)
generate_description(col, llm, pipeline)
ColumnEdge
Unified edge representing lineage relationships between columns.
Properties
| Property | Type | Description |
|---|---|---|
from_node |
ColumnNode |
Source column |
to_node |
ColumnNode |
Target column |
edge_type |
str |
Type: "direct", "transform", "aggregate", "join", "star_passthrough", "cross_query" |
context |
Optional[str] |
Context: "SELECT", "CTE", "main_query", "cross_query" |
transformation |
Optional[str] |
Description of transformation |
query_id |
Optional[str] |
Query where edge exists |
expression |
Optional[str] |
SQL expression |
Example
for edge in pipeline.edges:
print(f"{edge.from_node.full_name} -> {edge.to_node.full_name}")
print(f" Type: {edge.edge_type}")
print(f" Transformation: {edge.transformation}")
TableDependencyGraph
Table-level dependency graph for a pipeline.
Methods
get_execution_order
Get tables in topological order for execution.
Example:
get_source_tables
Get external source tables (not created by any query).
get_final_tables
Get final tables (not read by any query).
get_dependencies
Get upstream tables that a table depends on.
Example:
deps = graph.get_dependencies("analytics.user_metrics")
print(f"Depends on: {[t.table_name for t in deps]}")
get_downstream
Get downstream tables that depend on this table.
Example:
downstream = graph.get_downstream("staging.orders")
print(f"Feeds into: {[t.table_name for t in downstream]}")
topological_sort
Get query IDs in execution order.
Properties
| Property | Type | Description |
|---|---|---|
tables |
Dict[str, TableNode] |
All tables by name |
queries |
Dict[str, ParsedQuery] |
All queries by ID |
TableNode
Represents a table in the dependency graph.
Properties
| Property | Type | Description |
|---|---|---|
table_name |
str |
Fully qualified table name |
is_source |
bool |
Whether external source |
created_by |
Optional[str] |
Query ID that creates this table |
modified_by |
List[str] |
Query IDs that modify this table |
read_by |
List[str] |
Query IDs that read this table |
columns |
Set[str] |
Column names in table |
description |
Optional[str] |
Table description |
Methods
get_columns
Get all columns for this table.