Diff Classes
Track changes between pipeline versions for impact analysis and documentation updates.
PipelineDiff
Compare two pipeline versions to identify changes.
Constructor
diff = pipeline.diff(other_pipeline)
# or
diff = PipelineDiff(new_graph=new_pipeline, old_graph=old_pipeline)
Methods
has_changes
Check if there are any differences.
summary
Get a human-readable summary of changes.
Example:
old_pipeline = Pipeline.from_sql_files("/path/to/v1/")
new_pipeline = Pipeline.from_sql_files("/path/to/v2/")
diff = new_pipeline.diff(old_pipeline)
if diff.has_changes():
print(diff.summary())
Output:
Pipeline Diff Summary
====================
Columns added: 3
Columns removed: 1
Columns modified: 5
Added:
- analytics.new_metric
Removed:
- staging.old_column
Modified:
- output.total: expression changed
- output.avg_value: lineage changed
get_sql_changes
Get only changes to SQL expressions.
get_lineage_changes
Get only changes to column lineage.
get_columns_needing_update
Get columns that need description regeneration.
Example:
# Find columns that need new descriptions after SQL changes
columns_to_update = diff.get_columns_needing_update()
for col_name in columns_to_update:
col = new_pipeline.columns[col_name]
col.generate_description(llm, new_pipeline)
Properties
| Property | Type | Description |
|---|---|---|
columns_added |
List[str] |
Columns in new but not old |
columns_removed |
List[str] |
Columns in old but not new |
columns_modified |
List[ColumnDiff] |
Columns with changes |
Example:
print(f"Added: {diff.columns_added}")
print(f"Removed: {diff.columns_removed}")
for change in diff.columns_modified:
print(f"{change.full_name}: {change.field_name} changed")
ColumnDiff
Represents a single column change.
Properties
| Property | Type | Description |
|---|---|---|
column_name |
str |
Column name |
table_name |
str |
Table name |
full_name |
str |
Fully qualified name |
field_name |
str |
Field that changed (expression, lineage, etc.) |
old_value |
Any |
Previous value |
new_value |
Any |
New value |
Example
for change in diff.columns_modified:
print(f"Column: {change.full_name}")
print(f" Changed: {change.field_name}")
print(f" From: {change.old_value}")
print(f" To: {change.new_value}")
Use Cases
CI/CD Integration
def check_pipeline_changes():
"""Check for breaking changes in CI."""
old = Pipeline.from_sql_files("/main/sql/")
new = Pipeline.from_sql_files("/pr/sql/")
diff = new.diff(old)
if diff.columns_removed:
print(f"WARNING: {len(diff.columns_removed)} columns removed")
for col in diff.columns_removed:
print(f" - {col}")
return False
return True
Documentation Updates
def update_docs_after_changes():
"""Regenerate descriptions for changed columns."""
old = Pipeline.from_sql_files("/prev/")
new = Pipeline.from_sql_files("/current/")
diff = new.diff(old)
# Only regenerate for columns with actual changes
for col_name in diff.get_columns_needing_update():
col = new.columns[col_name]
col.generate_description(llm, new)
new.save("metadata.json")
Impact Analysis
def analyze_schema_change_impact():
"""Analyze impact before making schema changes."""
current = Pipeline.from_sql_files("/current/")
proposed = Pipeline.from_sql_files("/proposed/")
diff = proposed.diff(current)
# Check what's affected
sql_changes = diff.get_sql_changes()
lineage_changes = diff.get_lineage_changes()
print(f"SQL expression changes: {len(sql_changes)}")
print(f"Lineage changes: {len(lineage_changes)}")