Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.buster.so/llms.txt

Use this file to discover all available pages before exploring further.

This agent automatically investigates pipeline failures and proposes fixes. When an Airflow DAG fails, the agent analyzes the error logs, identifies the root cause, opens a pull request with a suggested fix, and sends a Slack summary so your team can quickly resolve issues.

Simple Example

name: failure_triage
description: Investigate pipeline failures and propose fixes

prompt: |
  A pipeline failure was detected.
  
  1. Analyze the error logs from the trigger context
  2. Identify the root cause
  3. Open a PR with a proposed fix
  4. Send Slack summary with findings and PR link

triggers:
  - event: airflow
    type: dag_run_failed

tools:
  include:
    - slack_tool

Production Example

Full investigation with root cause analysis and automated fix proposals:
name: failure_triage
description: Investigate pipeline failures, identify root causes, and propose fixes

prompt: |
  A pipeline failure has been detected. The trigger context contains:
  - dag_id: The failed DAG identifier
  - task_id: The specific task that failed
  - execution_date: When the run was scheduled
  - error_message: The error details
  
  ## 1. Gather context
  
  Collect all relevant information:
  - Full error message and stack trace from the trigger
  - Recent changes to the affected DAG or task in the repository
  - Historical run data (did this task fail recently?)
  - Upstream dependencies and their status
  
  ## 2. Analyze the failure
  
  Categorize the error type:
  
  **Data Issues:**
  - Schema changes in source tables
  - Missing or null values in required fields
  - Data type mismatches
  - Volume anomalies (empty source, unexpected row counts)
  
  **Code Issues:**
  - Syntax errors in SQL or Python
  - Missing imports or dependencies
  - Logic errors in transformations
  - Hardcoded values that are now stale
  
  **Infrastructure Issues:**
  - Connection timeouts or failures
  - Resource exhaustion (memory, disk, CPU)
  - Permission or authentication errors
  - Service outages
  
  ## 3. Investigate root cause
  
  Based on the error category:
  
  a) **For data issues:**
     - Query the source table to confirm the problem
     - Check for recent schema changes
     - Compare current data against expectations
     - Identify when the issue first appeared
  
  b) **For code issues:**
     - Review recent commits to the affected files
     - Check if the error correlates with a deployment
     - Validate SQL syntax and logic
     - Test transformations with sample data
  
  c) **For infrastructure issues:**
     - Check connection status and credentials
     - Review resource utilization metrics
     - Verify service health and availability
  
  ## 4. Propose a fix
  
  If a code fix is possible:
  
  a) **Create the fix:**
     - Write the corrected code
     - Include defensive checks to prevent recurrence
     - Add comments explaining the fix
  
  b) **Open a pull request:**
     - Branch name: fix/{dag_id}-failure-{date}
     - Clear title describing the issue
     - PR description with:
       - Summary of the failure
       - Root cause analysis
       - Explanation of the fix
       - Testing recommendations
  
  c) **If no code fix is possible:**
     - Document the manual steps needed
     - Identify who should take action
     - Suggest preventive measures
  
  ## 5. Assess impact
  
  Determine downstream effects:
  - Which models or tables depend on this pipeline?
  - Are there dashboards or reports affected?
  - What business processes are blocked?
  - How long has the issue been present?
  
  ## 6. Send Slack notification
  
  Post to the appropriate channel based on severity:
  
  **#data-incidents** (for critical failures):
  
  🚨 **Pipeline Failure: {dag_id}**
  
  **What happened:**
  [1-2 sentence summary of the failure]
  
  **Root cause:**
  [Explanation of why it failed]
  
  **Impact:**
  • Downstream models affected: [list]
  • Data freshness impact: [hours/days delayed]
  
  **Proposed fix:**
  [Link to PR] or [Manual steps required]
  
  **Next steps:**
  • [Specific action items]
  • [Who should review/approve]
  
  ---
  
  **#data-alerts** (for non-critical failures):
  
  ⚠️ **Pipeline Alert: {dag_id}**
  
  Task `{task_id}` failed.
  
  **Cause:** [Brief explanation]
  **Fix:** [PR link] | [Manual action needed]
  **Impact:** [Low/Medium - brief description]
  
  ## 7. Track resolution
  
  Create a GitHub issue if:
  - The fix requires review or discussion
  - This is a recurring failure
  - Infrastructure changes are needed
  
  Link the issue to the PR and Slack thread for tracking.

triggers:
  - event: airflow
    type: dag_run_failed
  - event: airflow
    type: task_instance_failed

tools:
  include:
    - slack_tool

Filtering by DAG

Use includes to limit the agent to specific DAGs:
triggers:
  - event: airflow
    type: dag_run_failed
    includes:
      - "critical_pipeline"
      - "daily_refresh"
      - "data_quality_*"

Airflow Configuration

Configure your Airflow instance to send failure notifications to Buster. Add a callback to your DAGs:
import requests

def notify_buster_on_failure(context):
    requests.post(
        "https://api.buster.so/api/v2/public/airflow/events",
        headers={
            "Authorization": f"Bearer {BUSTER_API_KEY}",
            "Content-Type": "application/json"
        },
        json={
            "event_type": "dag_run_failed",
            "dag_id": context['dag'].dag_id,
            "task_id": context['task'].task_id,
            "execution_date": str(context['execution_date']),
            "error_message": str(context.get('exception', 'Unknown error'))
        }
    )

# Add to DAG default_args
default_args = {
    'on_failure_callback': notify_buster_on_failure,
}
See the Triggers & Scheduling guide for more details on configuring Airflow triggers.