Skip to main content
This example shows how to create agents that respond to specific events in your data stack—schema changes from Fivetran, dbt Cloud job failures, or custom webhooks—and automatically take action with Slack summaries, PR creation, or remediation workflows.

Simple Example

name: schema-change-responder
description: Respond to source schema changes

triggers:
  - type: event
    event_name: schema_change_detected
    source: fivetran

tools:
  preset: standard

prompt: |
  When a schema change is detected:
  1. Identify which staging models are affected
  2. Create an issue documenting the change
  3. Send Slack alert to #data-engineering

More Robust Example

Production-ready with multiple event types and intelligent responses:
name: data-stack-event-responder
description: Comprehensive event handling for data stack incidents and changes

triggers:
  # Schema changes from Fivetran
  - type: event
    event_name: schema_change_detected
    source: fivetran
    filters:
      schema: "raw.*"
      change_type: ["column_added", "column_removed", "column_renamed"]
  
  # dbt Cloud job failures
  - type: event
    event_name: dbt_run_failed
    source: dbt_cloud
    filters:
      environment: production
      test_severity: error
  
  # Freshness failures
  - type: event
    event_name: freshness_check_failed
    source: dbt_cloud
  
  # Custom data quality events
  - type: event
    event_name: data_quality_failure
    source: custom
    filters:
      severity: ["critical", "high"]
  
  # Scheduled backup check
  - type: scheduled
    cron: "0 */6 * * *"  # Every 6 hours
    context:
      check_type: health_check

tools:
  preset: standard

restrictions:
  files:
    allow: ["models/staging/", ".github/", ".buster/events/"]
  sql:
    read_only: true
  git_operations:
    can_create_pr: true
    can_create_issue: true

notifications:
  slack:
    channel: "#data-alerts"
    on_success: true
    on_failure: true

prompt: |
  ## Event Router
  
  Based on the trigger type and event, route to the appropriate handler:
  
If trigger.type == “event”: If event_name == “schema_change_detected”: → Handle Schema Change Else if event_name == “dbt_run_failed”: → Handle dbt Failure Else if event_name == “freshness_check_failed”: → Handle Freshness Failure Else if event_name == “data_quality_failure”: → Handle Data Quality Issue Else if trigger.type == “scheduled”: → Run Health Check

---

## Handler 1: Schema Change

**Event Data Available**:
- `event.connector`: Which Fivetran connector
- `event.schema`: Schema that changed
- `event.table`: Table that changed
- `event.change_type`: Type of change
- `event.columns`: Affected columns
- `event.timestamp`: When detected

### Process:

**1. Analyze the change**

Determine severity:
- **Critical**: Column removed, type changed (breaking)
- **High**: Column renamed (requires update)
- **Medium**: Column added (non-breaking, should document)
- **Low**: Metadata only, no impact

**2. Find affected models**

Search for staging models that reference this source:
```bash
grep -r "source('{{event.schema}}', '{{event.table}}')" models/staging/
For each affected model:
  • Check if it explicitly selects affected columns
  • Determine if it will break or continue working
3. Take action based on severity Critical (Breaking Change):
  • Create HIGH priority issue:
    ## 🚨 Breaking Schema Change Detected
    
    **Source**: `{{event.schema}}.{{event.table}}`
    **Change**: Column `{{column}}` removed
    **Detected**: {{event.timestamp}}
    
    ### Affected Models
    - `stg_salesforce__accounts` - References removed column on line 45
    - `stg_salesforce__opportunities` - May be affected
    
    ### Immediate Actions Required
    1. Update staging models to handle missing column
    2. Check downstream impacts
    3. Decide on backfill strategy
    
    ### Details
    {{full event data}}
    
  • Send urgent Slack alert:
    🚨 **BREAKING SCHEMA CHANGE**
    
    **Source**: `{{event.schema}}.{{event.table}}`
    **Change**: Column removed: `{{column}}`
    
    **Impact**: {{count}} staging models affected
    
    **Action Required**: Review and update models
    **Issue**: {{issue_link}}
    
    @data-oncall
    
  • DO NOT auto-create PR (too risky, needs human review)
High (Column Renamed):
  • Create PR to update staging model:
    -- Update column reference
    {{old_column_name}} as {{standardized_name}},  -- Old reference
    {{new_column_name}} as {{standardized_name}},  -- Updated to new column
    
  • Send Slack notification:
    📝 **Schema Change - PR Created**
    
    **Source**: `{{event.schema}}.{{event.table}}`
    **Change**: Column renamed: `{{old}}` → `{{new}}`
    
    **PR**: {{pr_link}}
    Review and merge to keep models in sync.
    
Medium (Column Added):
  • Create issue suggesting update:
    ## ➕ New Column Available
    
    **Source**: `{{event.schema}}.{{event.table}}`
    **New Column**: `{{column}}` ({{data_type}})
    
    ### Suggestion
    Consider adding this column to staging model if relevant:
    
    ```sql
    {{column}}::{{data_type}} as {{standardized_name}},
    

    Context

    • Appears in % of recent records
    • Sample values:
  • Send informational Slack:
    ℹ️  **New Column Detected**
    
    `{{event.table}}.{{column}}` added to source
    
    Issue created for review: {{issue_link}}
    
4. Log event Record in .buster/events/schema_changes.jsonl:
{"timestamp": "...", "source": "...", "change_type": "...", "severity": "...", "handled": true}

Handler 2: dbt Run Failure

Event Data Available:
  • event.job_id: dbt Cloud job ID
  • event.run_id: Specific run ID
  • event.environment: prod/staging/dev
  • event.failed_models: List of failed models
  • event.error_messages: Error details
  • event.run_url: Link to dbt Cloud

Process:

1. Classify failure type Analyze error messages:
  • SQL Error: Syntax, missing ref, column not found
  • Data Quality: Test failure (unique, not_null, etc.)
  • Compilation: Macro issue, config error
  • Infrastructure: Timeout, warehouse connection
  • Permission: Access denied
2. Determine if auto-fixable Some failures can be automatically addressed:
  • Missing ref: If model was renamed, update references
  • Test failure (data): May self-resolve, retry
  • Timeout: Retry with longer timeout
Others need human intervention:
  • SQL syntax error: Needs code fix
  • Data quality: Needs investigation
  • Permission error: Needs admin
3. Take appropriate action Auto-fixable:
  • Attempt fix
  • Trigger retry
  • Report outcome
Needs human:
  • Create detailed issue:
    ## ❌ dbt Job Failed - {{environment}}
    
    **Job**: [{{job_name}}]({{job_url}})
    **Run**: [#{{run_id}}]({{run_url}})
    **Time**: {{timestamp}}
    
    ### Failed Models
    
    - `fct_orders`: SQL compilation error
    
    
    - `dim_customers`: Test failure (unique test on customer_id)
    
    Got 3 duplicate customer_id values
    
    ### Suggested Actions
    
    1. **fct_orders**: Check SQL syntax on line 45
    2. **dim_customers**: Investigate duplicate customer records
    
    ### Run Details
    {{full event payload}}
    
  • Send Slack alert (severity based on environment):
    ❌ **dbt Job Failed in PRODUCTION**
    
    **Job**: {{job_name}}
    **Failed Models**: {{count}}
    - fct_orders (SQL error)
    - dim_customers (test failure)
    
    **Run**: {{run_url}}
    **Issue**: {{issue_link}}
    
    @data-oncall {{if environment == "production"}}
    
4. Check for patterns If this model fails frequently:
  • Flag as unstable
  • Suggest adding better error handling
  • Consider splitting into smaller models

Handler 3: Freshness Failure

Event Data Available:
  • event.source: Which source
  • event.table: Which table
  • event.max_loaded_at: Last data timestamp
  • event.threshold: Expected freshness
  • event.status: warn or error

Process:

1. Check if expected Some freshness failures are known:
  • Weekend data loads
  • Scheduled maintenance windows
  • Holiday periods
Read .buster/config/expected_delays.yml:
sources:
  - name: salesforce
    table: opportunities
    skip_freshness_check:
      days_of_week: [6, 7]  # Skip Sat/Sun
      dates: ["2024-12-25", "2024-01-01"]
2. Investigate cause Query the source:
SELECT 
  MAX({{timestamp_column}}) as last_record,
  COUNT(*) as total_records,
  COUNT(*) FILTER (WHERE {{timestamp_column}} >= CURRENT_TIMESTAMP - INTERVAL '24 hours') as recent_records
FROM {{source_table}}
Determine:
  • Is data completely missing or just delayed?
  • Is this source-side issue or pipeline issue?
  • Are other tables from same source affected?
3. Alert with context Send Slack with investigation results:
⏰ **Data Freshness Alert**

**Source**: `{{source}}.{{table}}`
**Expected**: Data within {{threshold}}
**Actual**: Last data {{hours_ago}} hours ago

**Investigation**:
• Total records: {{count}}
• Recent records (24h): {{recent_count}}
• Other tables from {{source}}: {{status}}

**Likely Cause**: {{diagnosis}}

**Action**: {{recommendation}}
4. Create issue if not resolved If freshness doesn’t resolve in next check cycle:
  • Create issue for tracking
  • Escalate if critical source

Handler 4: Data Quality Failure

Event Data Available:
  • event.check_name: Which quality check
  • event.table: Affected table
  • event.severity: critical/high/medium/low
  • event.metric: What failed (null_rate, duplicate_count, etc.)
  • event.threshold: Expected value
  • event.actual: Actual value

Process:

1. Run diagnostic queries Based on the failure type, investigate: High null rate:
SELECT 
  {{column}},
  COUNT(*) as null_count,
  COUNT(*) * 100.0 / (SELECT COUNT(*) FROM {{table}}) as null_pct
FROM {{table}}
WHERE {{column}} IS NULL
LIMIT 100
Duplicate keys:
SELECT 
  {{key_column}},
  COUNT(*) as duplicate_count
FROM {{table}}
GROUP BY {{key_column}}
HAVING COUNT(*) > 1
ORDER BY duplicate_count DESC
LIMIT 100
2. Classify severity
  • Critical: Core business metric affected, blocks reporting
  • High: Important quality issue, needs fix soon
  • Medium: Degraded quality, investigate when possible
  • Low: Minor anomaly, monitor
3. Create detailed report Send to Slack with findings:
🔴 **Critical Data Quality Issue**

**Check**: {{check_name}}
**Table**: `{{table}}`
**Issue**: {{description}}

**Metrics**:
• Expected: {{threshold}}
• Actual: {{actual}} ({{pct_diff}}% deviation)

**Sample Records** (first 5):
{{sample_data}}

**Root Cause Hypothesis**: {{diagnosis}}

**Recommended Fix**: {{recommendation}}

**Issue**: {{issue_link}}
@data-quality-team

Handler 5: Health Check (Scheduled)

Run proactive checks every 6 hours: 1. Check warehouse connection
SELECT 1
2. Check recent dbt runs
dbt list --select state:modified --state ./ --output json
3. Check for stale models Find models not built recently:
SELECT 
  table_schema,
  table_name,
  last_altered
FROM information_schema.tables
WHERE table_schema IN ('staging', 'marts')
  AND last_altered < CURRENT_TIMESTAMP - INTERVAL '3 days'
4. Report status If all healthy: Log silently If issues found: Send summary to Slack

Error Handling

For any handler:
  • If event data is malformed: Log error, alert #data-platform
  • If can’t determine severity: Default to HIGH, escalate
  • If action fails: Report failure, create issue for manual handling
  • Always log every event processed for audit trail