Simple Example
Copy
Ask AI
name: json-schema-sync
description: Sync JSON column changes to staging models
triggers:
- type: scheduled
cron: "0 */6 * * *" # Every 6 hours
context:
json_columns:
- source: segment
table: events
column: properties
staging_model: stg_segment__events
tools:
preset: standard
prompt: |
# Task: Monitor JSON schemas and adapt staging extractions
You are tasked with tracking schema evolution in JSON columns and updating staging models automatically.
## Goal
Keep JSON field extractions current—when new fields appear frequently in data, extract them automatically.
## Approach
1. **Sample and introspect**
Query recent records, flatten JSON, identify fields and coverage.
2. **Compare with baseline**
Detect new fields, type changes, removed fields.
3. **Update extractions**
Add high-coverage fields to staging SQL with proper naming conventions.
4. **Validate and PR**
Test extractions work, open PR with field statistics.
## Output
- Updated staging models with new extractions
- YAML documentation for new fields
- PR with coverage statistics and validation
More Robust Example
Production-ready with coverage-based extraction and validation:Copy
Ask AI
name: json-schema-sync
description: Monitor JSON schemas and adapt staging model extractions
triggers:
- type: scheduled
cron: "0 */6 * * *" # Every 6 hours
timezone: "America/New_York"
context:
lookback_hours: 6
min_coverage_pct: 50 # Only extract fields in >50% of records
json_columns:
- source: segment
table: events
column: properties
staging_model: stg_segment__events
- source: stripe
table: charges
column: metadata
staging_model: stg_stripe__charges
tools:
preset: standard
restrictions:
files:
allow: ["models/staging/", ".buster/schemas/"]
sql:
read_only: true
git_operations:
branch_prefix: "json/schema-sync-"
notifications:
slack:
channel: "#data-schema-changes"
on_success: true
prompt: |
# Task: Monitor JSON schemas and adapt staging extractions
You are tasked with tracking schema evolution in JSON/VARIANT columns and updating staging models to extract new fields automatically.
## Objective
Maintain staging model alignment with evolving JSON schemas by automatically detecting and extracting new high-value fields.
The goal is to surface new data proactively, not reactively when someone asks for it.
---
## Core Principles
1. **Coverage-based extraction**
Extract fields that appear frequently (>50% of records).
Skip sparse fields (<50%)—they're likely optional or test data.
2. **Defensive handling**
JSON is messy—types change, fields disappear.
Use COALESCE, TRY_CAST, and defensive extractions.
3. **Naming conventions matter**
Apply team standards: snake_case, boolean prefixes, type suffixes.
Make extracted fields look intentional, not auto-generated.
4. **Validate before committing**
Test extractions on sample data.
Ensure fields actually exist and extract cleanly.
5. **Focus on high-value fields**
Not every field deserves extraction.
Prioritize based on coverage, type, and apparent business value.
---
## Detection Strategy
### 1. Sample and introspect JSON
For each monitored JSON column:
- Query recent records (last N hours, ~1000 records)
- Flatten/unnest JSON to discover all field paths
- For each field, calculate:
- **Coverage %**: In how many records it appears
- **Data type**: STRING, NUMBER, BOOLEAN, OBJECT, ARRAY
- **Sample values**: For validation and type confirmation
- **Distinct count**: To identify categoricals
### 2. Load baseline
- Read previous schema snapshot from `.buster/schemas/{source}_{table}_{column}.json`
- If no baseline exists: create one from current state, exit (will extract in next run)
- Baseline tracks: field paths, types, coverage, first seen date, extraction status
### 3. Detect meaningful changes
**New fields (high priority for extraction):**
- Coverage >80%: Strong signal, extract
- Coverage 50-80%: Extract if meets threshold
- Coverage <50%: Don't extract (sparse, optional, or test data)
- Type: Scalar types (STRING, NUMBER, BOOLEAN) easier than OBJECT/ARRAY
**Type changes:**
- STRING → NUMBER: Update casting (common for evolving schemas)
- NUMBER → STRING: Defensive TRY_CAST
- Type consistency issues: Handle both types gracefully
**Removed fields:**
- Not seen in >1000 recent records
- Keep extraction but add defensive COALESCE NULL
- May be conditional/seasonal, not truly gone
**Coverage changes:**
- Previously sparse, now >50%: Promote to extraction
- Previously high, now sparse: Add defensive handling
---
## Update Staging Models
### Adding new field extractions
**Current model:**
```sql
SELECT
event_id,
properties:user_id::string as user_id,
properties:page_url::string as page_url
FROM {{ source('segment', 'events') }}
```
**Updated with new fields:**
```sql
SELECT
event_id,
properties:user_id::string as user_id,
properties:page_url::string as page_url,
properties:session_id::string as session_id, -- NEW (95% coverage)
properties:utm_source::string as utm_source, -- NEW (87% coverage)
properties:utm_campaign::string as utm_campaign -- NEW (82% coverage)
FROM {{ source('segment', 'events') }}
```
### Handling type changes
```sql
-- Type changed from STRING to NUMBER
properties:price::number as price, -- Updated from ::string
-- Defensive handling for inconsistent types
TRY_CAST(properties:amount AS NUMBER) as amount,
```
### Handling removed/sparse fields
```sql
-- Field rarely appears now, defensive extraction
COALESCE(properties:legacy_field::string, NULL) as legacy_field
```
### Nested objects
```sql
-- Extract nested fields with dot notation
properties:user:email::string as user_email,
properties:user:name::string as user_name
```
### Arrays (handle carefully)
```sql
-- Extract first element or count
GET(properties:tags, 0)::string as first_tag,
ARRAY_SIZE(properties:tags) as tag_count
-- Or leave as array for downstream
properties:tags::array as tags
```
### Apply naming conventions
Automatically convert:
- `userId` → `user_id` (camelCase to snake_case)
- `qty` → `quantity` (expand abbreviations)
- `signup` → `signup_at` (add type suffix for timestamps)
- `active` → `is_active` (prefix booleans)
---
## YAML Documentation
Add documentation for new columns:
```yaml
- name: session_id
description: "Session identifier. Auto-detected 2024-11-11, present in 95% of events."
- name: utm_source
description: "Marketing source parameter. Auto-detected 2024-11-11, present in 87% of events."
```
For type changes:
```yaml
- name: price
description: "Product price. Type updated STRING → NUMBER on 2024-11-11."
```
---
## Validation Workflow
### 1. Validate syntax
```bash
dbt parse
dbt compile --select {staging_model}
```
### 2. Test extraction on sample
```bash
dbt run --select {staging_model} --vars '{"limit": 100}'
```
### 3. Verify extracted fields
Query to confirm extractions work and have expected coverage:
```sql
SELECT
session_id,
COUNT(*) as total,
COUNT(session_id) as non_null,
COUNT(session_id) * 100.0 / COUNT(*) as coverage_pct
FROM {{ ref('stg_segment__events') }}
WHERE _loaded_at >= CURRENT_TIMESTAMP - INTERVAL '1 hour'
```
---
## PR Creation
Create informative PR if significant changes detected:
**Title**: `fix(staging): Sync JSON schema changes for {source}.{table}`
**Description:**
```markdown
## JSON Schema Changes Detected
**Source**: `segment.events.properties`
**Analysis Window**: Last {X} hours ({Y} records sampled)
---
### 🆕 New Fields ({count})
| Field Path | Type | Coverage | Action |
|------------|------|----------|--------|
| `session_id` | STRING | 95% | ✅ Extracted |
| `utm_source` | STRING | 87% | ✅ Extracted |
| `experiment_id` | STRING | 12% | ❌ Too sparse |
### 🔄 Type Changes ({count})
| Field Path | Old → New | Action |
|------------|-----------|--------|
| `price` | STRING → NUMBER | ✅ Updated cast |
### 🗑️ Potentially Removed ({count})
| Field Path | Last Seen | Action |
|------------|-----------|--------|
| `legacy_id` | 7 days ago | ✅ Added defensive COALESCE |
---
### ✅ Changes Made
**Updated**: `models/staging/segment/stg_segment__events.sql`
- Added {X} new field extractions
- Updated {Y} type casts
- Added defensive handling for {Z} sparse fields
**Updated**: YAML documentation with detection dates
**Updated**: Baseline schema in `.buster/schemas/`
---
### 📊 Statistics
- Records analyzed: {count}
- Unique fields found: {count}
- New high-coverage fields: {count}
- Type changes: {count}
**Validation**: ✅ dbt parse, compile, and sample run successful
---
### 💡 Opportunities
{Any business insights from new fields, e.g.:}
- `session_id` enables session-level analysis
- Marketing UTM parameters now available for attribution
```
**Labels**: `json-schema`, `automated`, `staging`
---
## Decision Logic
### Create PR if:
- High-coverage fields detected (>threshold)
- Type changes that affect extractions
- Significant coverage changes
### Skip PR if:
- Only low-coverage fields (<threshold)
- No meaningful changes
- Update baseline silently, continue monitoring
### Notification only:
For minor changes, just Slack:
```
ℹ️ Minor JSON schema changes in `segment.events`
• 2 new low-coverage fields (<50%)
• No extraction changes needed
• Baseline updated, monitoring continues
```
---
## Edge Cases
- **JSON parsing fails**: Log error, skip, retry next run
- **Too many changes** (>20 fields): Flag for manual review
- **Staging model missing**: Create issue requesting creation
- **Validation fails**: Create draft PR with errors noted
- **Inconsistent types**: Use TRY_CAST for defensive extraction
- **Deeply nested objects**: Extract top-level or key paths only
---
## Success Criteria
- High-value fields are automatically extracted
- Extractions follow naming conventions
- Changes validated before committing
- PR clearly explains what changed and why
- Baseline updated for future comparisons
- Team notified of meaningful changes
**Mindset:** You're keeping the data interface fresh—surface new fields automatically so the team can use them, not discover them months later.