Skip to main content
This agent automatically detects when source schemas change and updates your dbt staging models to match. When Airflow pipelines fail due to schema issues, or on a schedule, the agent updates your staging models with new columns, handles removed columns, regenerates documentation, and creates a pull request for your review.

Simple Example

name: schema_sync
description: Auto-update staging models when source schemas change

prompt: |
  Source schema changed.
  
  1. Find the staging model for this table
  2. Add new columns with our naming conventions
  3. Run dbt parse to validate
  4. Create PR with changes

triggers:
  - event: airflow
    type: dag_run_failed
    includes:
      - "fivetran_sync"
      - "source_refresh"

Production Example

Validation and downstream impact analysis:
name: schema_change_handler
description: Automatically adapt staging models to source schema changes

prompt: |
  Schema change detected. Investigate and update staging models.
  
  ## 1. Understand the change
  Check the source tables for schema differences:
  - Query information_schema.columns for current structure
  - Compare against existing staging model columns
  - Identify: added columns, removed columns, type changes
  
  ## 2. Find the staging model
  Look for: models/staging/[source]/stg_[table_name].sql
  
  If not found:
    - Create new staging model from template
    - Include all columns from source
  
  ## 3. Update the model
  
  For ADDED columns:
    - Add to SELECT with proper aliasing
    - Follow naming conventions:
      * snake_case
      * Prefix booleans: is_, has_
      * Suffix dates: _at, _date
    - Add comment explaining column
  
  For REMOVED columns:
    - Check if used downstream (search marts/)
    - If used: Comment out + add deprecation notice
    - If not used: Remove from SELECT
  
  For TYPE CHANGES:
    - Update CAST if needed
    - Add comment about type change
    - Check downstream for compatibility
  
  ## 4. Update documentation
  Update the YAML file:
    - Add descriptions for new columns
    - Mark removed columns as deprecated
    - Update data types
  
  ## 5. Validate
  Run: dbt parse --models stg_[table_name]
  
  If errors:
    - Fix syntax issues
    - Retry validation
  
  ## 6. Check downstream impact
  Search for ref('stg_[table_name]') in marts/
  
  If removed columns are used downstream:
    - Add to PR description
    - Create follow-up issue
    - Alert team in Slack
  
  ## 7. Create PR
  Title: "feat(staging): Sync [table_name] with source schema"
  
  Description:
  Source schema changed for [table_name].
  
  Changes made:
  - Added columns: [list]
  - Removed columns: [list]
  - Type changes: [list]
  
  Downstream impact:
  [List any marts affected]
  
  ## 8. Notify team
  Send Slack to #data-pipeline:
  "📊 Schema change detected: [table_name]
   PR created: [link]
   Review needed: [impact summary]"

triggers:
  - event: airflow
    type: dag_run_failed
    includes:
      - "fivetran_sync"
      - "source_refresh"
  
  - event: scheduled
    cron: "0 6 * * *"  # Daily at 6 AM UTC
    branches: ['main']

tools:
  include:
    - slack_tool