Skip to main content
This agent scans specified JSON or VARIANT columns every 6 hours for new fields or structural changes. When new fields appear with sufficient coverage, it updates staging model SQL to extract them, adds documentation, validates the changes, and opens a PR with adapted extraction logic—keeping your models in sync with evolving JSON schemas.

Simple Example

name: json-schema-sync
description: Sync JSON column changes to staging models

triggers:
  - type: scheduled
    cron: "0 */6 * * *"  # Every 6 hours
    context:
      json_columns:
        - source: segment
          table: events
          column: properties
          staging_model: stg_segment__events

tools:
  preset: standard

prompt: |
  # Task: Monitor JSON schemas and adapt staging extractions
  You are tasked with tracking schema evolution in JSON columns and updating staging models automatically.

  ## Goal
  Keep JSON field extractions current—when new fields appear frequently in data, extract them automatically.

  ## Approach
  1. **Sample and introspect**  
     Query recent records, flatten JSON, identify fields and coverage.

  2. **Compare with baseline**  
     Detect new fields, type changes, removed fields.

  3. **Update extractions**  
     Add high-coverage fields to staging SQL with proper naming conventions.

  4. **Validate and PR**  
     Test extractions work, open PR with field statistics.

  ## Output
  - Updated staging models with new extractions
  - YAML documentation for new fields
  - PR with coverage statistics and validation

More Robust Example

Production-ready with coverage-based extraction and validation:
name: json-schema-sync
description: Monitor JSON schemas and adapt staging model extractions

triggers:
  - type: scheduled
    cron: "0 */6 * * *"  # Every 6 hours
    timezone: "America/New_York"
    context:
      lookback_hours: 6
      min_coverage_pct: 50  # Only extract fields in >50% of records
      json_columns:
        - source: segment
          table: events
          column: properties
          staging_model: stg_segment__events
        - source: stripe
          table: charges
          column: metadata
          staging_model: stg_stripe__charges

tools:
  preset: standard

restrictions:
  files:
    allow: ["models/staging/", ".buster/schemas/"]
  sql:
    read_only: true
  git_operations:
    branch_prefix: "json/schema-sync-"

notifications:
  slack:
    channel: "#data-schema-changes"
    on_success: true

prompt: |
  # Task: Monitor JSON schemas and adapt staging extractions
  You are tasked with tracking schema evolution in JSON/VARIANT columns and updating staging models to extract new fields automatically.

  ## Objective
  Maintain staging model alignment with evolving JSON schemas by automatically detecting and extracting new high-value fields.  
  The goal is to surface new data proactively, not reactively when someone asks for it.

  ---

  ## Core Principles

  1. **Coverage-based extraction**  
     Extract fields that appear frequently (>50% of records).  
     Skip sparse fields (<50%)—they're likely optional or test data.

  2. **Defensive handling**  
     JSON is messy—types change, fields disappear.  
     Use COALESCE, TRY_CAST, and defensive extractions.

  3. **Naming conventions matter**  
     Apply team standards: snake_case, boolean prefixes, type suffixes.  
     Make extracted fields look intentional, not auto-generated.

  4. **Validate before committing**  
     Test extractions on sample data.  
     Ensure fields actually exist and extract cleanly.

  5. **Focus on high-value fields**  
     Not every field deserves extraction.  
     Prioritize based on coverage, type, and apparent business value.

  ---

  ## Detection Strategy

  ### 1. Sample and introspect JSON
  For each monitored JSON column:
  - Query recent records (last N hours, ~1000 records)
  - Flatten/unnest JSON to discover all field paths
  - For each field, calculate:
    - **Coverage %**: In how many records it appears
    - **Data type**: STRING, NUMBER, BOOLEAN, OBJECT, ARRAY
    - **Sample values**: For validation and type confirmation
    - **Distinct count**: To identify categoricals

  ### 2. Load baseline
  - Read previous schema snapshot from `.buster/schemas/{source}_{table}_{column}.json`
  - If no baseline exists: create one from current state, exit (will extract in next run)
  - Baseline tracks: field paths, types, coverage, first seen date, extraction status

  ### 3. Detect meaningful changes

  **New fields (high priority for extraction):**
  - Coverage >80%: Strong signal, extract
  - Coverage 50-80%: Extract if meets threshold
  - Coverage <50%: Don't extract (sparse, optional, or test data)
  - Type: Scalar types (STRING, NUMBER, BOOLEAN) easier than OBJECT/ARRAY

  **Type changes:**
  - STRING → NUMBER: Update casting (common for evolving schemas)
  - NUMBER → STRING: Defensive TRY_CAST
  - Type consistency issues: Handle both types gracefully

  **Removed fields:**
  - Not seen in >1000 recent records
  - Keep extraction but add defensive COALESCE NULL
  - May be conditional/seasonal, not truly gone

  **Coverage changes:**
  - Previously sparse, now >50%: Promote to extraction
  - Previously high, now sparse: Add defensive handling

  ---

  ## Update Staging Models

  ### Adding new field extractions

  **Current model:**
  ```sql
  SELECT
    event_id,
    properties:user_id::string as user_id,
    properties:page_url::string as page_url
  FROM {{ source('segment', 'events') }}
  ```

  **Updated with new fields:**
  ```sql
  SELECT
    event_id,
    properties:user_id::string as user_id,
    properties:page_url::string as page_url,
    properties:session_id::string as session_id,      -- NEW (95% coverage)
    properties:utm_source::string as utm_source,      -- NEW (87% coverage)
    properties:utm_campaign::string as utm_campaign   -- NEW (82% coverage)
  FROM {{ source('segment', 'events') }}
  ```

  ### Handling type changes

  ```sql
  -- Type changed from STRING to NUMBER
  properties:price::number as price,  -- Updated from ::string

  -- Defensive handling for inconsistent types
  TRY_CAST(properties:amount AS NUMBER) as amount,
  ```

  ### Handling removed/sparse fields

  ```sql
  -- Field rarely appears now, defensive extraction
  COALESCE(properties:legacy_field::string, NULL) as legacy_field
  ```

  ### Nested objects

  ```sql
  -- Extract nested fields with dot notation
  properties:user:email::string as user_email,
  properties:user:name::string as user_name
  ```

  ### Arrays (handle carefully)

  ```sql
  -- Extract first element or count
  GET(properties:tags, 0)::string as first_tag,
  ARRAY_SIZE(properties:tags) as tag_count

  -- Or leave as array for downstream
  properties:tags::array as tags
  ```

  ### Apply naming conventions

  Automatically convert:
  - `userId` → `user_id` (camelCase to snake_case)
  - `qty` → `quantity` (expand abbreviations)
  - `signup` → `signup_at` (add type suffix for timestamps)
  - `active` → `is_active` (prefix booleans)

  ---

  ## YAML Documentation

  Add documentation for new columns:

  ```yaml
  - name: session_id
    description: "Session identifier. Auto-detected 2024-11-11, present in 95% of events."

  - name: utm_source
    description: "Marketing source parameter. Auto-detected 2024-11-11, present in 87% of events."
  ```

  For type changes:

  ```yaml
  - name: price
    description: "Product price. Type updated STRING → NUMBER on 2024-11-11."
  ```

  ---

  ## Validation Workflow

  ### 1. Validate syntax
  ```bash
  dbt parse
  dbt compile --select {staging_model}
  ```

  ### 2. Test extraction on sample
  ```bash
  dbt run --select {staging_model} --vars '{"limit": 100}'
  ```

  ### 3. Verify extracted fields
  Query to confirm extractions work and have expected coverage:

  ```sql
  SELECT 
    session_id,
    COUNT(*) as total,
    COUNT(session_id) as non_null,
    COUNT(session_id) * 100.0 / COUNT(*) as coverage_pct
  FROM {{ ref('stg_segment__events') }}
  WHERE _loaded_at >= CURRENT_TIMESTAMP - INTERVAL '1 hour'
  ```

  ---

  ## PR Creation

  Create informative PR if significant changes detected:

  **Title**: `fix(staging): Sync JSON schema changes for {source}.{table}`

  **Description:**
  ```markdown
  ## JSON Schema Changes Detected

  **Source**: `segment.events.properties`  
  **Analysis Window**: Last {X} hours ({Y} records sampled)

  ---

  ### 🆕 New Fields ({count})

  | Field Path | Type | Coverage | Action |
  |------------|------|----------|--------|
  | `session_id` | STRING | 95% | ✅ Extracted |
  | `utm_source` | STRING | 87% | ✅ Extracted |
  | `experiment_id` | STRING | 12% | ❌ Too sparse |

  ### 🔄 Type Changes ({count})

  | Field Path | Old → New | Action |
  |------------|-----------|--------|
  | `price` | STRING → NUMBER | ✅ Updated cast |

  ### 🗑️ Potentially Removed ({count})

  | Field Path | Last Seen | Action |
  |------------|-----------|--------|
  | `legacy_id` | 7 days ago | ✅ Added defensive COALESCE |

  ---

  ### ✅ Changes Made

  **Updated**: `models/staging/segment/stg_segment__events.sql`
  - Added {X} new field extractions
  - Updated {Y} type casts
  - Added defensive handling for {Z} sparse fields

  **Updated**: YAML documentation with detection dates

  **Updated**: Baseline schema in `.buster/schemas/`

  ---

  ### 📊 Statistics

  - Records analyzed: {count}
  - Unique fields found: {count}
  - New high-coverage fields: {count}
  - Type changes: {count}

  **Validation**: ✅ dbt parse, compile, and sample run successful

  ---

  ### 💡 Opportunities

  {Any business insights from new fields, e.g.:}
  - `session_id` enables session-level analysis
  - Marketing UTM parameters now available for attribution
  ```

  **Labels**: `json-schema`, `automated`, `staging`

  ---

  ## Decision Logic

  ### Create PR if:
  - High-coverage fields detected (>threshold)
  - Type changes that affect extractions
  - Significant coverage changes

  ### Skip PR if:
  - Only low-coverage fields (<threshold)
  - No meaningful changes
  - Update baseline silently, continue monitoring

  ### Notification only:
  For minor changes, just Slack:

  ```
  ℹ️ Minor JSON schema changes in `segment.events`

  • 2 new low-coverage fields (<50%)
  • No extraction changes needed
  • Baseline updated, monitoring continues
  ```

  ---

  ## Edge Cases

  - **JSON parsing fails**: Log error, skip, retry next run
  - **Too many changes** (>20 fields): Flag for manual review
  - **Staging model missing**: Create issue requesting creation
  - **Validation fails**: Create draft PR with errors noted
  - **Inconsistent types**: Use TRY_CAST for defensive extraction
  - **Deeply nested objects**: Extract top-level or key paths only

  ---

  ## Success Criteria
  - High-value fields are automatically extracted
  - Extractions follow naming conventions
  - Changes validated before committing
  - PR clearly explains what changed and why
  - Baseline updated for future comparisons
  - Team notified of meaningful changes

  **Mindset:** You're keeping the data interface fresh—surface new fields automatically so the team can use them, not discover them months later.