Metadata from SQL Comments
Overview
clpipe automatically extracts metadata from inline SQL comments, making it easy to document columns right where they're defined. No separate metadata files needed—just add structured comments to your SQL.
SELECT
user_id, -- User identifier [pii: false]
email, -- Email address [pii: true, owner: data-team]
SUM(revenue) as total /* Total revenue [tags: metric finance] */
FROM users
GROUP BY user_id, email
The metadata is automatically: - Extracted during SQL parsing - Attached to column nodes in the lineage graph - Propagated through transformations (PII, tags, owner) - Accessible via the Pipeline API
Comment Format
Basic Syntax
or
Examples
Supported Metadata Fields
Standard Fields
| Field | Type | Description | Example |
|---|---|---|---|
description |
String | Natural language description | User email address |
pii |
Boolean | PII flag (propagates automatically) | true or false |
owner |
String | Data owner or team | data-team |
tags |
Set[String] | Space-separated tags | metric finance critical |
Custom Fields
Any other key-value pairs are stored in custom_metadata:
SELECT
user_id, -- User ID [quality: high, confidence: 95, source: production]
email -- Email [sensitivity: high, retention_days: 365]
FROM users
Usage Examples
Basic Extraction
from clpipe import Pipeline
sql = """
SELECT
user_id, -- User identifier [pii: false]
email, -- Email address [pii: true, owner: data-team]
name -- Full name [pii: true]
FROM users
"""
pipeline = Pipeline([("query", sql)], dialect="bigquery")
# Access metadata
email_col = pipeline.columns["query.email"]
print(email_col.description) # "Email address"
print(email_col.pii) # True
print(email_col.owner) # "data-team"
With Expressions
Comments work on any expression, not just simple columns:
sql = """
SELECT
user_id,
UPPER(email) as email_upper, -- Uppercased email [pii: true]
CASE
WHEN status = 'active' THEN 1
ELSE 0
END as is_active, /* Active status indicator [pii: false] */
SUM(revenue) as total_revenue -- Total revenue [owner: finance, tags: metric]
FROM users
GROUP BY user_id, email, status
"""
pipeline = Pipeline([("metrics", sql)], dialect="bigquery")
# All expressions have metadata
total_col = pipeline.columns["metrics.total_revenue"]
print(total_col.description) # "Total revenue"
print(total_col.owner) # "finance"
print(total_col.tags) # {"metric"}
In CTEs
Metadata works seamlessly with CTEs:
sql = """
WITH base AS (
SELECT
user_id, -- User ID [pii: false]
email -- Email [pii: true, owner: data-team]
FROM raw_users
),
metrics AS (
SELECT
user_id,
COUNT(*) as event_count -- Event count [tags: metric engagement]
FROM events
GROUP BY user_id
)
SELECT
b.user_id,
b.email,
COALESCE(m.event_count, 0) as total_events
FROM base b
LEFT JOIN metrics m ON b.user_id = m.user_id
"""
pipeline = Pipeline([("report", sql)], dialect="bigquery")
# Metadata from CTE is preserved
base_email = next(
col for col in pipeline.columns.values()
if col.column_name == "email" and "base" in col.table_name
)
print(base_email.description) # "Email"
print(base_email.pii) # True
Metadata Propagation
Certain metadata fields automatically propagate through lineage:
PII Flag Propagation
If any source column is PII, the derived column is PII:
sql = """
SELECT
email, -- Email [pii: true]
UPPER(email) as email_upper,
phone, -- Phone [pii: true]
CONCAT(email, ':', phone) as contact
FROM users
"""
pipeline = Pipeline([("query", sql)], dialect="bigquery")
# Run propagation
for col in pipeline.columns.values():
col.propagate_metadata(pipeline)
# email_upper inherits PII from email
# contact inherits PII from both email and phone
email_upper = pipeline.columns["query.email_upper"]
print(email_upper.pii) # True (inherited)
contact = pipeline.columns["query.contact"]
print(contact.pii) # True (inherited from sources)
Owner Propagation
Owner propagates only if all source columns have the same owner:
sql = """
SELECT
col1, -- Column 1 [owner: team-a]
col2, -- Column 2 [owner: team-a]
col3, -- Column 3 [owner: team-b]
CONCAT(col1, col2) as combined_same, -- Same owner
CONCAT(col1, col3) as combined_diff -- Different owners
FROM table1
"""
pipeline = Pipeline([("query", sql)], dialect="bigquery")
for col in pipeline.columns.values():
col.propagate_metadata(pipeline)
# combined_same inherits owner (both sources have team-a)
combined_same = pipeline.columns["query.combined_same"]
print(combined_same.owner) # "team-a"
# combined_diff has no owner (conflicting sources)
combined_diff = pipeline.columns["query.combined_diff"]
print(combined_diff.owner) # None
Tags Propagation
Tags are unioned across all source columns:
sql = """
SELECT
col1, -- Column 1 [tags: finance metric]
col2, -- Column 2 [tags: metric critical]
CONCAT(col1, col2) as combined
FROM table1
"""
pipeline = Pipeline([("query", sql)], dialect="bigquery")
for col in pipeline.columns.values():
col.propagate_metadata(pipeline)
# combined has union of all source tags
combined = pipeline.columns["query.combined"]
print(combined.tags) # {"finance", "metric", "critical"}
Best Practices
1. Be Consistent
Use a consistent format across your SQL files:
-- ✅ Good: Consistent format
SELECT
user_id, -- User ID [pii: false, owner: data-team]
email, -- Email [pii: true, owner: data-team]
name -- Name [pii: true, owner: data-team]
FROM users
-- ❌ Avoid: Inconsistent formatting
SELECT
user_id, -- [pii: false]
email, -- email address (pii=true)
name /* Name - PII */
FROM users
2. Document PII Early
Mark PII at the source, and it will propagate automatically:
-- ✅ Mark PII at source
WITH raw AS (
SELECT
user_id, -- User ID [pii: false]
email -- Email [pii: true] ← Marked here
FROM source
)
SELECT
user_id,
UPPER(email) as email_upper -- Inherits PII automatically
FROM raw
3. Use Tags for Organization
Tags help categorize and find columns:
SELECT
revenue, -- Revenue [tags: metric finance]
login_count, -- Logins [tags: metric engagement]
error_count, -- Errors [tags: metric reliability critical]
conversion_rate -- Conversion [tags: metric business kpi]
FROM analytics
Then query by tag:
# Find all metrics
metrics = pipeline.get_columns_by_tag("metric")
# Find critical columns
critical = pipeline.get_columns_by_tag("critical")
# Find finance-owned metrics
finance_metrics = [
col for col in pipeline.get_columns_by_tag("metric")
if col.owner == "finance"
]
4. Keep Descriptions Concise
Aim for 1-2 sentences, natural language:
-- ✅ Good: Clear and concise
SELECT
user_id, -- Unique identifier for user
total_revenue, -- Sum of all order amounts per user
last_login -- Most recent login timestamp
FROM users
-- ❌ Avoid: Too technical or verbose
SELECT
user_id, -- INTEGER PRIMARY KEY AUTO_INCREMENT NOT NULL
total_revenue, -- This column contains the aggregated sum of...
last_login -- TIMESTAMP WITH TIME ZONE stored in UTC...
FROM users
Integration with LLM Documentation
Metadata from comments serves as the foundation for LLM-powered documentation:
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4", temperature=0)
# Generate descriptions for columns without metadata
for col in pipeline.columns.values():
if col.is_computed() and not col.description:
col.generate_description(llm, pipeline)
# Source metadata is preserved
email = pipeline.columns["query.email"]
print(email.description_source) # DescriptionSource.SOURCE (from comment)
# Generated metadata is marked
email_upper = pipeline.columns["query.email_upper"]
print(email_upper.description_source) # DescriptionSource.GENERATED (from LLM)
Finding Columns
Use Pipeline methods to query metadata:
# Find all PII columns
pii_columns = pipeline.get_pii_columns()
for col in pii_columns:
print(f"{col.full_name}: {col.description}")
# Find columns by owner
data_team_cols = pipeline.get_columns_by_owner("data-team")
# Find columns by tag
metrics = pipeline.get_columns_by_tag("metric")
finance_cols = pipeline.get_columns_by_tag("finance")
# Manual filtering
high_quality = [
col for col in pipeline.columns.values()
if col.custom_metadata.get("quality") == "high"
]
Export to Data Catalogs
Export metadata to external systems:
# Export to dict
metadata_dict = {}
for full_name, col in pipeline.columns.items():
metadata_dict[full_name] = {
"description": col.description,
"pii": col.pii,
"owner": col.owner,
"tags": list(col.tags),
"custom": col.custom_metadata
}
# Export to dbt schema.yml format
dbt_schema = pipeline.to_dbt_schema()
# Export to BigQuery schema (future)
# bq_schema = pipeline.to_bigquery_schema()
FAQ
Q: Do I need to add metadata to every column?
A: No! Metadata is optional. Add it where it's useful (PII columns, important metrics, etc.).
Q: Can I use both comment styles (-- and /* */)?
A: Yes, both work identically. Choose what fits your team's style.
Q: What if I have a typo in the metadata?
A: Invalid key-value pairs are skipped. The column will still have the description (if valid) and any valid metadata fields.
Q: Does metadata affect query execution?
A: No, comments are stripped during execution. Metadata is purely for documentation and lineage.
Q: Can I override propagated metadata?
A: Yes, explicitly set metadata takes precedence:
col = pipeline.columns["query.email_upper"]
col.pii = False # Override inherited PII
col.owner = "different-team" # Override owner
Next Steps
- Table Lineage & Orchestration - Learn about pipeline execution
- API Documentation - Full Pipeline API reference
- Examples - More hands-on examples