Skip to main content
This agent automatically investigates pipeline failures and proposes fixes. When an Airflow DAG fails, the agent analyzes the error logs, identifies the root cause, opens a pull request with a suggested fix, and sends a Slack summary so your team can quickly resolve issues.

Simple Example

name: failure_triage
description: Investigate pipeline failures and propose fixes

prompt: |
  A pipeline failure was detected.
  
  1. Analyze the error logs from the trigger context
  2. Identify the root cause
  3. Open a PR with a proposed fix
  4. Send Slack summary with findings and PR link

triggers:
  - event: airflow
    type: dag_run_failed

tools:
  include:
    - slack_tool

Production Example

Full investigation with root cause analysis and automated fix proposals:
name: failure_triage
description: Investigate pipeline failures, identify root causes, and propose fixes

prompt: |
  A pipeline failure has been detected. The trigger context contains:
  - dag_id: The failed DAG identifier
  - task_id: The specific task that failed
  - execution_date: When the run was scheduled
  - error_message: The error details
  
  ## 1. Gather context
  
  Collect all relevant information:
  - Full error message and stack trace from the trigger
  - Recent changes to the affected DAG or task in the repository
  - Historical run data (did this task fail recently?)
  - Upstream dependencies and their status
  
  ## 2. Analyze the failure
  
  Categorize the error type:
  
  **Data Issues:**
  - Schema changes in source tables
  - Missing or null values in required fields
  - Data type mismatches
  - Volume anomalies (empty source, unexpected row counts)
  
  **Code Issues:**
  - Syntax errors in SQL or Python
  - Missing imports or dependencies
  - Logic errors in transformations
  - Hardcoded values that are now stale
  
  **Infrastructure Issues:**
  - Connection timeouts or failures
  - Resource exhaustion (memory, disk, CPU)
  - Permission or authentication errors
  - Service outages
  
  ## 3. Investigate root cause
  
  Based on the error category:
  
  a) **For data issues:**
     - Query the source table to confirm the problem
     - Check for recent schema changes
     - Compare current data against expectations
     - Identify when the issue first appeared
  
  b) **For code issues:**
     - Review recent commits to the affected files
     - Check if the error correlates with a deployment
     - Validate SQL syntax and logic
     - Test transformations with sample data
  
  c) **For infrastructure issues:**
     - Check connection status and credentials
     - Review resource utilization metrics
     - Verify service health and availability
  
  ## 4. Propose a fix
  
  If a code fix is possible:
  
  a) **Create the fix:**
     - Write the corrected code
     - Include defensive checks to prevent recurrence
     - Add comments explaining the fix
  
  b) **Open a pull request:**
     - Branch name: fix/{dag_id}-failure-{date}
     - Clear title describing the issue
     - PR description with:
       - Summary of the failure
       - Root cause analysis
       - Explanation of the fix
       - Testing recommendations
  
  c) **If no code fix is possible:**
     - Document the manual steps needed
     - Identify who should take action
     - Suggest preventive measures
  
  ## 5. Assess impact
  
  Determine downstream effects:
  - Which models or tables depend on this pipeline?
  - Are there dashboards or reports affected?
  - What business processes are blocked?
  - How long has the issue been present?
  
  ## 6. Send Slack notification
  
  Post to the appropriate channel based on severity:
  
  **#data-incidents** (for critical failures):
  
  🚨 **Pipeline Failure: {dag_id}**
  
  **What happened:**
  [1-2 sentence summary of the failure]
  
  **Root cause:**
  [Explanation of why it failed]
  
  **Impact:**
  • Downstream models affected: [list]
  • Data freshness impact: [hours/days delayed]
  
  **Proposed fix:**
  [Link to PR] or [Manual steps required]
  
  **Next steps:**
  • [Specific action items]
  • [Who should review/approve]
  
  ---
  
  **#data-alerts** (for non-critical failures):
  
  ⚠️ **Pipeline Alert: {dag_id}**
  
  Task `{task_id}` failed.
  
  **Cause:** [Brief explanation]
  **Fix:** [PR link] | [Manual action needed]
  **Impact:** [Low/Medium - brief description]
  
  ## 7. Track resolution
  
  Create a GitHub issue if:
  - The fix requires review or discussion
  - This is a recurring failure
  - Infrastructure changes are needed
  
  Link the issue to the PR and Slack thread for tracking.

triggers:
  - event: airflow
    type: dag_run_failed
  - event: airflow
    type: task_instance_failed

tools:
  include:
    - slack_tool

Filtering by DAG

Use includes to limit the agent to specific DAGs:
triggers:
  - event: airflow
    type: dag_run_failed
    includes:
      - "critical_pipeline"
      - "daily_refresh"
      - "data_quality_*"

Airflow Configuration

Configure your Airflow instance to send failure notifications to Buster. Add a callback to your DAGs:
import requests

def notify_buster_on_failure(context):
    requests.post(
        "https://api.buster.so/api/v2/public/airflow/events",
        headers={
            "Authorization": f"Bearer {BUSTER_API_KEY}",
            "Content-Type": "application/json"
        },
        json={
            "event_type": "dag_run_failed",
            "dag_id": context['dag'].dag_id,
            "task_id": context['task'].task_id,
            "execution_date": str(context['execution_date']),
            "error_message": str(context.get('exception', 'Unknown error'))
        }
    )

# Add to DAG default_args
default_args = {
    'on_failure_callback': notify_buster_on_failure,
}
See the Triggers & Scheduling guide for more details on configuring Airflow triggers.