Simple Example
Copy
Ask AI
name: failure_triage
description: Investigate pipeline failures and propose fixes
prompt: |
A pipeline failure was detected.
1. Analyze the error logs from the trigger context
2. Identify the root cause
3. Open a PR with a proposed fix
4. Send Slack summary with findings and PR link
triggers:
- event: airflow
type: dag_run_failed
tools:
include:
- slack_tool
Production Example
Full investigation with root cause analysis and automated fix proposals:Copy
Ask AI
name: failure_triage
description: Investigate pipeline failures, identify root causes, and propose fixes
prompt: |
A pipeline failure has been detected. The trigger context contains:
- dag_id: The failed DAG identifier
- task_id: The specific task that failed
- execution_date: When the run was scheduled
- error_message: The error details
## 1. Gather context
Collect all relevant information:
- Full error message and stack trace from the trigger
- Recent changes to the affected DAG or task in the repository
- Historical run data (did this task fail recently?)
- Upstream dependencies and their status
## 2. Analyze the failure
Categorize the error type:
**Data Issues:**
- Schema changes in source tables
- Missing or null values in required fields
- Data type mismatches
- Volume anomalies (empty source, unexpected row counts)
**Code Issues:**
- Syntax errors in SQL or Python
- Missing imports or dependencies
- Logic errors in transformations
- Hardcoded values that are now stale
**Infrastructure Issues:**
- Connection timeouts or failures
- Resource exhaustion (memory, disk, CPU)
- Permission or authentication errors
- Service outages
## 3. Investigate root cause
Based on the error category:
a) **For data issues:**
- Query the source table to confirm the problem
- Check for recent schema changes
- Compare current data against expectations
- Identify when the issue first appeared
b) **For code issues:**
- Review recent commits to the affected files
- Check if the error correlates with a deployment
- Validate SQL syntax and logic
- Test transformations with sample data
c) **For infrastructure issues:**
- Check connection status and credentials
- Review resource utilization metrics
- Verify service health and availability
## 4. Propose a fix
If a code fix is possible:
a) **Create the fix:**
- Write the corrected code
- Include defensive checks to prevent recurrence
- Add comments explaining the fix
b) **Open a pull request:**
- Branch name: fix/{dag_id}-failure-{date}
- Clear title describing the issue
- PR description with:
- Summary of the failure
- Root cause analysis
- Explanation of the fix
- Testing recommendations
c) **If no code fix is possible:**
- Document the manual steps needed
- Identify who should take action
- Suggest preventive measures
## 5. Assess impact
Determine downstream effects:
- Which models or tables depend on this pipeline?
- Are there dashboards or reports affected?
- What business processes are blocked?
- How long has the issue been present?
## 6. Send Slack notification
Post to the appropriate channel based on severity:
**#data-incidents** (for critical failures):
🚨 **Pipeline Failure: {dag_id}**
**What happened:**
[1-2 sentence summary of the failure]
**Root cause:**
[Explanation of why it failed]
**Impact:**
• Downstream models affected: [list]
• Data freshness impact: [hours/days delayed]
**Proposed fix:**
[Link to PR] or [Manual steps required]
**Next steps:**
• [Specific action items]
• [Who should review/approve]
---
**#data-alerts** (for non-critical failures):
⚠️ **Pipeline Alert: {dag_id}**
Task `{task_id}` failed.
**Cause:** [Brief explanation]
**Fix:** [PR link] | [Manual action needed]
**Impact:** [Low/Medium - brief description]
## 7. Track resolution
Create a GitHub issue if:
- The fix requires review or discussion
- This is a recurring failure
- Infrastructure changes are needed
Link the issue to the PR and Slack thread for tracking.
triggers:
- event: airflow
type: dag_run_failed
- event: airflow
type: task_instance_failed
tools:
include:
- slack_tool
Filtering by DAG
Useincludes to limit the agent to specific DAGs:
Copy
Ask AI
triggers:
- event: airflow
type: dag_run_failed
includes:
- "critical_pipeline"
- "daily_refresh"
- "data_quality_*"
Airflow Configuration
Configure your Airflow instance to send failure notifications to Buster. Add a callback to your DAGs:Copy
Ask AI
import requests
def notify_buster_on_failure(context):
requests.post(
"https://api.buster.so/api/v2/public/airflow/events",
headers={
"Authorization": f"Bearer {BUSTER_API_KEY}",
"Content-Type": "application/json"
},
json={
"event_type": "dag_run_failed",
"dag_id": context['dag'].dag_id,
"task_id": context['task'].task_id,
"execution_date": str(context['execution_date']),
"error_message": str(context.get('exception', 'Unknown error'))
}
)
# Add to DAG default_args
default_args = {
'on_failure_callback': notify_buster_on_failure,
}
See the Triggers & Scheduling guide for more details on configuring Airflow triggers.