Skip to content

Metadata from SQL Comments

Overview

clpipe automatically extracts metadata from inline SQL comments, making it easy to document columns right where they're defined. No separate metadata files needed—just add structured comments to your SQL.

SELECT
  user_id,  -- User identifier [pii: false]
  email,    -- Email address [pii: true, owner: data-team]
  SUM(revenue) as total  /* Total revenue [tags: metric finance] */
FROM users
GROUP BY user_id, email

The metadata is automatically: - Extracted during SQL parsing - Attached to column nodes in the lineage graph - Propagated through transformations (PII, tags, owner) - Accessible via the Pipeline API


Comment Format

Basic Syntax

-- <description> [key: value, key2: value2, ...]

or

/* <description> [key: value, key2: value2] */

Examples

SELECT
  user_id,  -- User identifier
  email     -- Email address
FROM users
SELECT
  user_id,  -- User identifier [pii: false]
  email     -- Email address [pii: true]
FROM users
SELECT
  user_id,  -- User ID [pii: false, owner: data-team]
  email,    -- Email [pii: true, owner: data-team, tags: contact auth]
  revenue   /* Revenue [pii: false, owner: finance-team, tags: metric] */
FROM users

Supported Metadata Fields

Standard Fields

Field Type Description Example
description String Natural language description User email address
pii Boolean PII flag (propagates automatically) true or false
owner String Data owner or team data-team
tags Set[String] Space-separated tags metric finance critical

Custom Fields

Any other key-value pairs are stored in custom_metadata:

SELECT
  user_id,  -- User ID [quality: high, confidence: 95, source: production]
  email     -- Email [sensitivity: high, retention_days: 365]
FROM users

Usage Examples

Basic Extraction

from clpipe import Pipeline

sql = """
SELECT
  user_id,  -- User identifier [pii: false]
  email,    -- Email address [pii: true, owner: data-team]
  name      -- Full name [pii: true]
FROM users
"""

pipeline = Pipeline([("query", sql)], dialect="bigquery")

# Access metadata
email_col = pipeline.columns["query.email"]
print(email_col.description)  # "Email address"
print(email_col.pii)          # True
print(email_col.owner)        # "data-team"

With Expressions

Comments work on any expression, not just simple columns:

sql = """
SELECT
  user_id,
  UPPER(email) as email_upper,  -- Uppercased email [pii: true]

  CASE
    WHEN status = 'active' THEN 1
    ELSE 0
  END as is_active,  /* Active status indicator [pii: false] */

  SUM(revenue) as total_revenue  -- Total revenue [owner: finance, tags: metric]
FROM users
GROUP BY user_id, email, status
"""

pipeline = Pipeline([("metrics", sql)], dialect="bigquery")

# All expressions have metadata
total_col = pipeline.columns["metrics.total_revenue"]
print(total_col.description)  # "Total revenue"
print(total_col.owner)        # "finance"
print(total_col.tags)         # {"metric"}

In CTEs

Metadata works seamlessly with CTEs:

sql = """
WITH base AS (
  SELECT
    user_id,  -- User ID [pii: false]
    email     -- Email [pii: true, owner: data-team]
  FROM raw_users
),

metrics AS (
  SELECT
    user_id,
    COUNT(*) as event_count  -- Event count [tags: metric engagement]
  FROM events
  GROUP BY user_id
)

SELECT
  b.user_id,
  b.email,
  COALESCE(m.event_count, 0) as total_events
FROM base b
LEFT JOIN metrics m ON b.user_id = m.user_id
"""

pipeline = Pipeline([("report", sql)], dialect="bigquery")

# Metadata from CTE is preserved
base_email = next(
    col for col in pipeline.columns.values()
    if col.column_name == "email" and "base" in col.table_name
)
print(base_email.description)  # "Email"
print(base_email.pii)          # True

Metadata Propagation

Certain metadata fields automatically propagate through lineage:

PII Flag Propagation

If any source column is PII, the derived column is PII:

sql = """
SELECT
  email,  -- Email [pii: true]
  UPPER(email) as email_upper,
  phone,  -- Phone [pii: true]
  CONCAT(email, ':', phone) as contact
FROM users
"""

pipeline = Pipeline([("query", sql)], dialect="bigquery")

# Run propagation
for col in pipeline.columns.values():
    col.propagate_metadata(pipeline)

# email_upper inherits PII from email
# contact inherits PII from both email and phone
email_upper = pipeline.columns["query.email_upper"]
print(email_upper.pii)  # True (inherited)

contact = pipeline.columns["query.contact"]
print(contact.pii)  # True (inherited from sources)

Owner Propagation

Owner propagates only if all source columns have the same owner:

sql = """
SELECT
  col1,  -- Column 1 [owner: team-a]
  col2,  -- Column 2 [owner: team-a]
  col3,  -- Column 3 [owner: team-b]

  CONCAT(col1, col2) as combined_same,  -- Same owner
  CONCAT(col1, col3) as combined_diff   -- Different owners
FROM table1
"""

pipeline = Pipeline([("query", sql)], dialect="bigquery")

for col in pipeline.columns.values():
    col.propagate_metadata(pipeline)

# combined_same inherits owner (both sources have team-a)
combined_same = pipeline.columns["query.combined_same"]
print(combined_same.owner)  # "team-a"

# combined_diff has no owner (conflicting sources)
combined_diff = pipeline.columns["query.combined_diff"]
print(combined_diff.owner)  # None

Tags Propagation

Tags are unioned across all source columns:

sql = """
SELECT
  col1,  -- Column 1 [tags: finance metric]
  col2,  -- Column 2 [tags: metric critical]
  CONCAT(col1, col2) as combined
FROM table1
"""

pipeline = Pipeline([("query", sql)], dialect="bigquery")

for col in pipeline.columns.values():
    col.propagate_metadata(pipeline)

# combined has union of all source tags
combined = pipeline.columns["query.combined"]
print(combined.tags)  # {"finance", "metric", "critical"}

Best Practices

1. Be Consistent

Use a consistent format across your SQL files:

-- ✅ Good: Consistent format
SELECT
  user_id,  -- User ID [pii: false, owner: data-team]
  email,    -- Email [pii: true, owner: data-team]
  name      -- Name [pii: true, owner: data-team]
FROM users
-- ❌ Avoid: Inconsistent formatting
SELECT
  user_id,  -- [pii: false]
  email,    -- email address (pii=true)
  name      /* Name - PII */
FROM users

2. Document PII Early

Mark PII at the source, and it will propagate automatically:

-- ✅ Mark PII at source
WITH raw AS (
  SELECT
    user_id,  -- User ID [pii: false]
    email     -- Email [pii: true]  ← Marked here
  FROM source
)

SELECT
  user_id,
  UPPER(email) as email_upper  -- Inherits PII automatically
FROM raw

3. Use Tags for Organization

Tags help categorize and find columns:

SELECT
  revenue,        -- Revenue [tags: metric finance]
  login_count,    -- Logins [tags: metric engagement]
  error_count,    -- Errors [tags: metric reliability critical]
  conversion_rate -- Conversion [tags: metric business kpi]
FROM analytics

Then query by tag:

# Find all metrics
metrics = pipeline.get_columns_by_tag("metric")

# Find critical columns
critical = pipeline.get_columns_by_tag("critical")

# Find finance-owned metrics
finance_metrics = [
    col for col in pipeline.get_columns_by_tag("metric")
    if col.owner == "finance"
]

4. Keep Descriptions Concise

Aim for 1-2 sentences, natural language:

-- ✅ Good: Clear and concise
SELECT
  user_id,        -- Unique identifier for user
  total_revenue,  -- Sum of all order amounts per user
  last_login      -- Most recent login timestamp
FROM users

-- ❌ Avoid: Too technical or verbose
SELECT
  user_id,        -- INTEGER PRIMARY KEY AUTO_INCREMENT NOT NULL
  total_revenue,  -- This column contains the aggregated sum of...
  last_login      -- TIMESTAMP WITH TIME ZONE stored in UTC...
FROM users

Integration with LLM Documentation

Metadata from comments serves as the foundation for LLM-powered documentation:

from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4", temperature=0)

# Generate descriptions for columns without metadata
for col in pipeline.columns.values():
    if col.is_computed() and not col.description:
        col.generate_description(llm, pipeline)

# Source metadata is preserved
email = pipeline.columns["query.email"]
print(email.description_source)  # DescriptionSource.SOURCE (from comment)

# Generated metadata is marked
email_upper = pipeline.columns["query.email_upper"]
print(email_upper.description_source)  # DescriptionSource.GENERATED (from LLM)

Finding Columns

Use Pipeline methods to query metadata:

# Find all PII columns
pii_columns = pipeline.get_pii_columns()
for col in pii_columns:
    print(f"{col.full_name}: {col.description}")

# Find columns by owner
data_team_cols = pipeline.get_columns_by_owner("data-team")

# Find columns by tag
metrics = pipeline.get_columns_by_tag("metric")
finance_cols = pipeline.get_columns_by_tag("finance")

# Manual filtering
high_quality = [
    col for col in pipeline.columns.values()
    if col.custom_metadata.get("quality") == "high"
]

Export to Data Catalogs

Export metadata to external systems:

# Export to dict
metadata_dict = {}
for full_name, col in pipeline.columns.items():
    metadata_dict[full_name] = {
        "description": col.description,
        "pii": col.pii,
        "owner": col.owner,
        "tags": list(col.tags),
        "custom": col.custom_metadata
    }

# Export to dbt schema.yml format
dbt_schema = pipeline.to_dbt_schema()

# Export to BigQuery schema (future)
# bq_schema = pipeline.to_bigquery_schema()

FAQ

Q: Do I need to add metadata to every column?

A: No! Metadata is optional. Add it where it's useful (PII columns, important metrics, etc.).

Q: Can I use both comment styles (-- and /* */)?

A: Yes, both work identically. Choose what fits your team's style.

Q: What if I have a typo in the metadata?

A: Invalid key-value pairs are skipped. The column will still have the description (if valid) and any valid metadata fields.

Q: Does metadata affect query execution?

A: No, comments are stripped during execution. Metadata is purely for documentation and lineage.

Q: Can I override propagated metadata?

A: Yes, explicitly set metadata takes precedence:

col = pipeline.columns["query.email_upper"]
col.pii = False  # Override inherited PII
col.owner = "different-team"  # Override owner

Next Steps