Skip to content

Diff Classes

Track changes between pipeline versions for impact analysis and documentation updates.


PipelineDiff

Compare two pipeline versions to identify changes.

Constructor

diff = pipeline.diff(other_pipeline)
# or
diff = PipelineDiff(new_graph=new_pipeline, old_graph=old_pipeline)

Methods

has_changes

Check if there are any differences.

diff.has_changes() -> bool

summary

Get a human-readable summary of changes.

diff.summary() -> str

Example:

old_pipeline = Pipeline.from_sql_files("/path/to/v1/")
new_pipeline = Pipeline.from_sql_files("/path/to/v2/")

diff = new_pipeline.diff(old_pipeline)

if diff.has_changes():
    print(diff.summary())

Output:

Pipeline Diff Summary
====================
Columns added: 3
Columns removed: 1
Columns modified: 5

Added:
  - analytics.new_metric

Removed:
  - staging.old_column

Modified:
  - output.total: expression changed
  - output.avg_value: lineage changed

get_sql_changes

Get only changes to SQL expressions.

diff.get_sql_changes() -> List[ColumnDiff]

get_lineage_changes

Get only changes to column lineage.

diff.get_lineage_changes() -> List[ColumnDiff]

get_columns_needing_update

Get columns that need description regeneration.

diff.get_columns_needing_update() -> List[str]

Example:

# Find columns that need new descriptions after SQL changes
columns_to_update = diff.get_columns_needing_update()

for col_name in columns_to_update:
    col = new_pipeline.columns[col_name]
    col.generate_description(llm, new_pipeline)

Properties

Property Type Description
columns_added List[str] Columns in new but not old
columns_removed List[str] Columns in old but not new
columns_modified List[ColumnDiff] Columns with changes

Example:

print(f"Added: {diff.columns_added}")
print(f"Removed: {diff.columns_removed}")

for change in diff.columns_modified:
    print(f"{change.full_name}: {change.field_name} changed")


ColumnDiff

Represents a single column change.

Properties

Property Type Description
column_name str Column name
table_name str Table name
full_name str Fully qualified name
field_name str Field that changed (expression, lineage, etc.)
old_value Any Previous value
new_value Any New value

Example

for change in diff.columns_modified:
    print(f"Column: {change.full_name}")
    print(f"  Changed: {change.field_name}")
    print(f"  From: {change.old_value}")
    print(f"  To: {change.new_value}")

Use Cases

CI/CD Integration

def check_pipeline_changes():
    """Check for breaking changes in CI."""
    old = Pipeline.from_sql_files("/main/sql/")
    new = Pipeline.from_sql_files("/pr/sql/")

    diff = new.diff(old)

    if diff.columns_removed:
        print(f"WARNING: {len(diff.columns_removed)} columns removed")
        for col in diff.columns_removed:
            print(f"  - {col}")
        return False

    return True

Documentation Updates

def update_docs_after_changes():
    """Regenerate descriptions for changed columns."""
    old = Pipeline.from_sql_files("/prev/")
    new = Pipeline.from_sql_files("/current/")

    diff = new.diff(old)

    # Only regenerate for columns with actual changes
    for col_name in diff.get_columns_needing_update():
        col = new.columns[col_name]
        col.generate_description(llm, new)

    new.save("metadata.json")

Impact Analysis

def analyze_schema_change_impact():
    """Analyze impact before making schema changes."""
    current = Pipeline.from_sql_files("/current/")
    proposed = Pipeline.from_sql_files("/proposed/")

    diff = proposed.diff(current)

    # Check what's affected
    sql_changes = diff.get_sql_changes()
    lineage_changes = diff.get_lineage_changes()

    print(f"SQL expression changes: {len(sql_changes)}")
    print(f"Lineage changes: {len(lineage_changes)}")