REL12
REL12-BP02 - Perform post-incident analysis
REL12-BP02: Perform post-incident analysis
Conduct thorough post-incident reviews to understand root causes, identify systemic issues, and implement preventive measures. Focus on learning and improvement rather than blame, creating a culture of continuous improvement and organizational learning.
Implementation Steps
1. Establish Post-Incident Review Process
Create a standardized process for conducting blameless post-incident reviews.
2. Collect Comprehensive Data
Gather all relevant information including timelines, metrics, logs, and human factors.
3. Perform Root Cause Analysis
Use systematic methods to identify underlying causes and contributing factors.
4. Generate Actionable Recommendations
Develop specific, measurable action items to prevent recurrence.
5. Track Implementation and Effectiveness
Monitor the implementation of recommendations and measure their effectiveness.
Detailed Implementation
{% raw %}
View code
import boto3
import json
import time
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Tuple
from dataclasses import dataclass, asdict
from enum import Enum
import threading
import uuid
from collections import defaultdict
class IncidentSeverity(Enum):
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
class RootCauseCategory(Enum):
HUMAN_ERROR = "human_error"
PROCESS_FAILURE = "process_failure"
TECHNOLOGY_FAILURE = "technology_failure"
EXTERNAL_DEPENDENCY = "external_dependency"
DESIGN_FLAW = "design_flaw"
CONFIGURATION_ERROR = "configuration_error"
class ActionItemStatus(Enum):
OPEN = "open"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
CANCELLED = "cancelled"
@dataclass
class IncidentTimeline:
timestamp: datetime
event: str
source: str
details: str
impact: str
@dataclass
class RootCause:
cause_id: str
category: RootCauseCategory
description: str
contributing_factors: List[str]
evidence: List[str]
likelihood: str # high, medium, low
impact: str # high, medium, low
@dataclass
class ActionItem:
action_id: str
title: str
description: str
owner: str
priority: str
due_date: datetime
status: ActionItemStatus
estimated_effort: str
success_criteria: str
related_root_causes: List[str]
@dataclass
class PostIncidentReport:
report_id: str
incident_id: str
incident_title: str
severity: IncidentSeverity
start_time: datetime
end_time: datetime
duration_minutes: int
impact_description: str
services_affected: List[str]
timeline: List[IncidentTimeline]
root_causes: List[RootCause]
action_items: List[ActionItem]
lessons_learned: List[str]
attendees: List[str]
review_date: datetime
follow_up_date: datetime
class PostIncidentAnalysisSystem:
def __init__(self, region: str = 'us-east-1'):
self.region = region
# AWS clients
self.cloudwatch = boto3.client('cloudwatch', region_name=region)
self.logs = boto3.client('logs', region_name=region)
self.s3 = boto3.client('s3', region_name=region)
self.sns = boto3.client('sns', region_name=region)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
# Analysis data
self.incident_reports: Dict[str, PostIncidentReport] = {}
self.action_items: Dict[str, ActionItem] = {}
self.analysis_templates: Dict[str, Any] = {}
# Thread safety
self.analysis_lock = threading.Lock()
def create_post_incident_report(self, incident_data: Dict[str, Any]) -> str:
"""Create a new post-incident report"""
try:
report_id = f"pir-{uuid.uuid4().hex[:8]}"
# Create timeline from incident data
timeline = self._build_incident_timeline(incident_data)
# Initialize report
report = PostIncidentReport(
report_id=report_id,
incident_id=incident_data['incident_id'],
incident_title=incident_data['title'],
severity=IncidentSeverity(incident_data['severity']),
start_time=datetime.fromisoformat(incident_data['start_time']),
end_time=datetime.fromisoformat(incident_data['end_time']),
duration_minutes=incident_data['duration_minutes'],
impact_description=incident_data['impact_description'],
services_affected=incident_data['services_affected'],
timeline=timeline,
root_causes=[],
action_items=[],
lessons_learned=[],
attendees=[],
review_date=datetime.utcnow(),
follow_up_date=datetime.utcnow() + timedelta(days=30)
)
with self.analysis_lock:
self.incident_reports[report_id] = report
self.logger.info(f"Created post-incident report: {report_id}")
return report_id
except Exception as e:
self.logger.error(f"Failed to create post-incident report: {str(e)}")
return ""
def perform_root_cause_analysis(self, report_id: str, analysis_data: Dict[str, Any]) -> List[RootCause]:
"""Perform systematic root cause analysis"""
try:
report = self.incident_reports.get(report_id)
if not report:
raise ValueError(f"Report {report_id} not found")
root_causes = []
# Use 5 Whys technique
five_whys_causes = self._perform_five_whys_analysis(analysis_data)
root_causes.extend(five_whys_causes)
# Use Fishbone diagram analysis
fishbone_causes = self._perform_fishbone_analysis(analysis_data)
root_causes.extend(fishbone_causes)
# Use Fault Tree Analysis
fault_tree_causes = self._perform_fault_tree_analysis(analysis_data)
root_causes.extend(fault_tree_causes)
# Update report
report.root_causes = root_causes
self.logger.info(f"Identified {len(root_causes)} root causes for {report_id}")
return root_causes
except Exception as e:
self.logger.error(f"Root cause analysis failed: {str(e)}")
return []
def _perform_five_whys_analysis(self, analysis_data: Dict[str, Any]) -> List[RootCause]:
"""Perform 5 Whys root cause analysis"""
try:
root_causes = []
# Example 5 Whys analysis
problem = analysis_data.get('initial_problem', '')
whys = analysis_data.get('five_whys', [])
if len(whys) >= 5:
# The final "why" typically reveals the root cause
final_why = whys[-1]
# Categorize the root cause
category = self._categorize_root_cause(final_why)
root_cause = RootCause(
cause_id=f"5w-{uuid.uuid4().hex[:8]}",
category=category,
description=final_why,
contributing_factors=whys[:-1],
evidence=[problem] + whys,
likelihood="high",
impact=analysis_data.get('impact_level', 'medium')
)
root_causes.append(root_cause)
return root_causes
except Exception as e:
self.logger.error(f"5 Whys analysis failed: {str(e)}")
return []
def _perform_fishbone_analysis(self, analysis_data: Dict[str, Any]) -> List[RootCause]:
"""Perform Fishbone (Ishikawa) diagram analysis"""
try:
root_causes = []
# Fishbone categories: People, Process, Technology, Environment
fishbone_data = analysis_data.get('fishbone', {})
for category, causes in fishbone_data.items():
for cause_desc in causes:
category_enum = self._map_fishbone_category(category)
root_cause = RootCause(
cause_id=f"fb-{uuid.uuid4().hex[:8]}",
category=category_enum,
description=cause_desc,
contributing_factors=[],
evidence=[f"Identified in {category} category"],
likelihood="medium",
impact=analysis_data.get('impact_level', 'medium')
)
root_causes.append(root_cause)
return root_causes
except Exception as e:
self.logger.error(f"Fishbone analysis failed: {str(e)}")
return []
def _perform_fault_tree_analysis(self, analysis_data: Dict[str, Any]) -> List[RootCause]:
"""Perform Fault Tree Analysis"""
try:
root_causes = []
# Fault tree analysis looks at combinations of events
fault_tree = analysis_data.get('fault_tree', {})
for fault_event, conditions in fault_tree.items():
if isinstance(conditions, list):
for condition in conditions:
category = self._categorize_root_cause(condition)
root_cause = RootCause(
cause_id=f"ft-{uuid.uuid4().hex[:8]}",
category=category,
description=condition,
contributing_factors=[fault_event],
evidence=[f"Fault tree analysis: {fault_event}"],
likelihood="medium",
impact=analysis_data.get('impact_level', 'medium')
)
root_causes.append(root_cause)
return root_causes
except Exception as e:
self.logger.error(f"Fault tree analysis failed: {str(e)}")
return []
def generate_action_items(self, report_id: str, recommendations: List[Dict[str, Any]]) -> List[ActionItem]:
"""Generate actionable items from analysis"""
try:
report = self.incident_reports.get(report_id)
if not report:
raise ValueError(f"Report {report_id} not found")
action_items = []
for rec in recommendations:
action_item = ActionItem(
action_id=f"ai-{uuid.uuid4().hex[:8]}",
title=rec['title'],
description=rec['description'],
owner=rec.get('owner', 'TBD'),
priority=rec.get('priority', 'medium'),
due_date=datetime.utcnow() + timedelta(days=rec.get('due_days', 30)),
status=ActionItemStatus.OPEN,
estimated_effort=rec.get('effort', 'TBD'),
success_criteria=rec.get('success_criteria', ''),
related_root_causes=rec.get('related_causes', [])
)
action_items.append(action_item)
self.action_items[action_item.action_id] = action_item
# Update report
report.action_items = action_items
self.logger.info(f"Generated {len(action_items)} action items for {report_id}")
return action_items
except Exception as e:
self.logger.error(f"Action item generation failed: {str(e)}")
return []
def conduct_blameless_review(self, report_id: str, review_data: Dict[str, Any]) -> Dict[str, Any]:
"""Conduct a blameless post-incident review"""
try:
report = self.incident_reports.get(report_id)
if not report:
raise ValueError(f"Report {report_id} not found")
# Update report with review data
report.attendees = review_data.get('attendees', [])
report.lessons_learned = review_data.get('lessons_learned', [])
# Generate review summary
review_summary = {
'report_id': report_id,
'incident_title': report.incident_title,
'review_date': report.review_date.isoformat(),
'attendees': report.attendees,
'duration_minutes': review_data.get('review_duration', 60),
'key_findings': {
'root_causes_identified': len(report.root_causes),
'action_items_created': len(report.action_items),
'lessons_learned': len(report.lessons_learned)
},
'follow_up_required': len([ai for ai in report.action_items if ai.status == ActionItemStatus.OPEN]) > 0,
'next_review_date': report.follow_up_date.isoformat()
}
# Send review summary
self._send_review_summary(review_summary)
self.logger.info(f"Completed blameless review for {report_id}")
return review_summary
except Exception as e:
self.logger.error(f"Blameless review failed: {str(e)}")
return {}
def track_action_item_progress(self, action_id: str, status_update: Dict[str, Any]) -> bool:
"""Track progress of action items"""
try:
action_item = self.action_items.get(action_id)
if not action_item:
raise ValueError(f"Action item {action_id} not found")
# Update status
if 'status' in status_update:
action_item.status = ActionItemStatus(status_update['status'])
# Update other fields
for field, value in status_update.items():
if hasattr(action_item, field) and field != 'action_id':
setattr(action_item, field, value)
self.logger.info(f"Updated action item {action_id}: {action_item.status.value}")
return True
except Exception as e:
self.logger.error(f"Action item update failed: {str(e)}")
return False
def generate_trend_analysis(self, time_period_days: int = 90) -> Dict[str, Any]:
"""Generate trend analysis from multiple incidents"""
try:
cutoff_date = datetime.utcnow() - timedelta(days=time_period_days)
recent_reports = [
r for r in self.incident_reports.values()
if r.review_date > cutoff_date
]
if not recent_reports:
return {'message': 'No incidents in the specified time period'}
# Analyze trends
severity_trends = defaultdict(int)
service_trends = defaultdict(int)
root_cause_trends = defaultdict(int)
duration_trends = []
for report in recent_reports:
severity_trends[report.severity.value] += 1
duration_trends.append(report.duration_minutes)
for service in report.services_affected:
service_trends[service] += 1
for root_cause in report.root_causes:
root_cause_trends[root_cause.category.value] += 1
# Calculate statistics
avg_duration = sum(duration_trends) / len(duration_trends) if duration_trends else 0
total_incidents = len(recent_reports)
trend_analysis = {
'analysis_period_days': time_period_days,
'total_incidents': total_incidents,
'average_duration_minutes': avg_duration,
'severity_distribution': dict(severity_trends),
'most_affected_services': dict(sorted(service_trends.items(), key=lambda x: x[1], reverse=True)[:5]),
'root_cause_distribution': dict(root_cause_trends),
'recommendations': self._generate_trend_recommendations(
severity_trends, service_trends, root_cause_trends, avg_duration
)
}
return trend_analysis
except Exception as e:
self.logger.error(f"Trend analysis failed: {str(e)}")
return {}
def _build_incident_timeline(self, incident_data: Dict[str, Any]) -> List[IncidentTimeline]:
"""Build detailed incident timeline"""
try:
timeline = []
# Add key events from incident data
events = incident_data.get('timeline_events', [])
for event in events:
timeline_entry = IncidentTimeline(
timestamp=datetime.fromisoformat(event['timestamp']),
event=event['event'],
source=event.get('source', 'manual'),
details=event.get('details', ''),
impact=event.get('impact', '')
)
timeline.append(timeline_entry)
# Sort by timestamp
timeline.sort(key=lambda x: x.timestamp)
return timeline
except Exception as e:
self.logger.error(f"Timeline building failed: {str(e)}")
return []
def _categorize_root_cause(self, cause_description: str) -> RootCauseCategory:
"""Categorize root cause based on description"""
try:
cause_lower = cause_description.lower()
if any(word in cause_lower for word in ['human', 'operator', 'manual', 'mistake']):
return RootCauseCategory.HUMAN_ERROR
elif any(word in cause_lower for word in ['process', 'procedure', 'workflow']):
return RootCauseCategory.PROCESS_FAILURE
elif any(word in cause_lower for word in ['hardware', 'software', 'system', 'server']):
return RootCauseCategory.TECHNOLOGY_FAILURE
elif any(word in cause_lower for word in ['external', 'third-party', 'vendor']):
return RootCauseCategory.EXTERNAL_DEPENDENCY
elif any(word in cause_lower for word in ['design', 'architecture', 'implementation']):
return RootCauseCategory.DESIGN_FLAW
elif any(word in cause_lower for word in ['configuration', 'setting', 'parameter']):
return RootCauseCategory.CONFIGURATION_ERROR
else:
return RootCauseCategory.TECHNOLOGY_FAILURE # Default
except Exception as e:
self.logger.error(f"Root cause categorization failed: {str(e)}")
return RootCauseCategory.TECHNOLOGY_FAILURE
def _map_fishbone_category(self, category: str) -> RootCauseCategory:
"""Map fishbone category to root cause category"""
mapping = {
'people': RootCauseCategory.HUMAN_ERROR,
'process': RootCauseCategory.PROCESS_FAILURE,
'technology': RootCauseCategory.TECHNOLOGY_FAILURE,
'environment': RootCauseCategory.EXTERNAL_DEPENDENCY
}
return mapping.get(category.lower(), RootCauseCategory.TECHNOLOGY_FAILURE)
def _generate_trend_recommendations(self, severity_trends: Dict, service_trends: Dict,
root_cause_trends: Dict, avg_duration: float) -> List[str]:
"""Generate recommendations based on trend analysis"""
recommendations = []
try:
# High severity incidents
if severity_trends.get('critical', 0) > 2:
recommendations.append("Consider implementing additional monitoring and alerting for critical systems")
# Frequently affected services
top_service = max(service_trends.items(), key=lambda x: x[1]) if service_trends else None
if top_service and top_service[1] > 3:
recommendations.append(f"Focus reliability improvements on {top_service[0]} service")
# Common root causes
top_cause = max(root_cause_trends.items(), key=lambda x: x[1]) if root_cause_trends else None
if top_cause:
if top_cause[0] == 'human_error':
recommendations.append("Implement additional automation to reduce human error")
elif top_cause[0] == 'configuration_error':
recommendations.append("Improve configuration management and validation processes")
# Long duration incidents
if avg_duration > 60:
recommendations.append("Focus on reducing mean time to resolution (MTTR)")
return recommendations
except Exception as e:
self.logger.error(f"Trend recommendations failed: {str(e)}")
return []
def _send_review_summary(self, summary: Dict[str, Any]) -> None:
"""Send review summary to stakeholders"""
try:
message = f"""
Post-Incident Review Summary
Incident: {summary['incident_title']}
Review Date: {summary['review_date']}
Attendees: {', '.join(summary['attendees'])}
Key Findings:
- Root Causes Identified: {summary['key_findings']['root_causes_identified']}
- Action Items Created: {summary['key_findings']['action_items_created']}
- Lessons Learned: {summary['key_findings']['lessons_learned']}
Follow-up Required: {'Yes' if summary['follow_up_required'] else 'No'}
Next Review: {summary['next_review_date']}
"""
# Send via SNS (if configured)
try:
self.sns.publish(
TopicArn=f'arn:aws:sns:{self.region}:123456789012:post-incident-reviews',
Message=message,
Subject=f"Post-Incident Review: {summary['incident_title']}"
)
except Exception as e:
self.logger.warning(f"Failed to send SNS notification: {str(e)}")
except Exception as e:
self.logger.error(f"Review summary sending failed: {str(e)}")
def export_report(self, report_id: str, format_type: str = 'json') -> str:
"""Export post-incident report"""
try:
report = self.incident_reports.get(report_id)
if not report:
raise ValueError(f"Report {report_id} not found")
if format_type == 'json':
return json.dumps(asdict(report), indent=2, default=str)
elif format_type == 'markdown':
return self._generate_markdown_report(report)
else:
raise ValueError(f"Unsupported format: {format_type}")
except Exception as e:
self.logger.error(f"Report export failed: {str(e)}")
return ""
def _generate_markdown_report(self, report: PostIncidentReport) -> str:
"""Generate markdown format report"""
try:
markdown = f"""# Post-Incident Report: {report.incident_title}
## Incident Summary
- **Incident ID**: {report.incident_id}
- **Severity**: {report.severity.value}
- **Start Time**: {report.start_time}
- **End Time**: {report.end_time}
- **Duration**: {report.duration_minutes} minutes
- **Services Affected**: {', '.join(report.services_affected)}
## Impact Description
{report.impact_description}
## Timeline
"""
for event in report.timeline:
markdown += f"- **{event.timestamp}**: {event.event}\n"
markdown += "\n## Root Causes\n"
for i, cause in enumerate(report.root_causes, 1):
markdown += f"{i}. **{cause.category.value}**: {cause.description}\n"
markdown += "\n## Action Items\n"
for i, item in enumerate(report.action_items, 1):
markdown += f"{i}. **{item.title}** (Owner: {item.owner}, Due: {item.due_date.date()})\n"
markdown += f" - {item.description}\n"
markdown += "\n## Lessons Learned\n"
for i, lesson in enumerate(report.lessons_learned, 1):
markdown += f"{i}. {lesson}\n"
return markdown
except Exception as e:
self.logger.error(f"Markdown generation failed: {str(e)}")
return ""
# Example usage
def main():
# Initialize post-incident analysis system
analysis_system = PostIncidentAnalysisSystem(region='us-east-1')
# Create a post-incident report
incident_data = {
'incident_id': 'incident-2024-001',
'title': 'Database Connection Pool Exhaustion',
'severity': 'high',
'start_time': '2024-01-15T14:30:00Z',
'end_time': '2024-01-15T16:45:00Z',
'duration_minutes': 135,
'impact_description': 'Users experienced login failures and slow response times',
'services_affected': ['user-service', 'auth-service', 'api-gateway'],
'timeline_events': [
{
'timestamp': '2024-01-15T14:30:00Z',
'event': 'High error rate detected',
'source': 'monitoring',
'details': 'CloudWatch alarm triggered',
'impact': 'Users experiencing errors'
},
{
'timestamp': '2024-01-15T14:35:00Z',
'event': 'Incident declared',
'source': 'ops-team',
'details': 'Severity set to HIGH',
'impact': 'Response team activated'
}
]
}
print("Creating post-incident report...")
report_id = analysis_system.create_post_incident_report(incident_data)
if report_id:
print(f"Created report: {report_id}")
# Perform root cause analysis
analysis_data = {
'five_whys': [
'Why did users experience login failures?',
'Because the database connection pool was exhausted',
'Why was the connection pool exhausted?',
'Because connections were not being released properly',
'Why were connections not being released?',
'Because the application had a connection leak in the user service',
'Why was there a connection leak?',
'Because exception handling was not properly closing connections',
'Why was exception handling inadequate?',
'Because code review process did not catch the resource leak pattern'
],
'fishbone': {
'people': ['Insufficient code review', 'Lack of connection pool monitoring'],
'process': ['Inadequate testing procedures', 'Missing resource leak detection'],
'technology': ['Connection pool configuration', 'Application code defect'],
'environment': ['High user load', 'Database performance']
},
'impact_level': 'high'
}
root_causes = analysis_system.perform_root_cause_analysis(report_id, analysis_data)
print(f"Identified {len(root_causes)} root causes")
# Generate action items
recommendations = [
{
'title': 'Fix connection leak in user service',
'description': 'Update exception handling to ensure connections are properly closed',
'owner': 'dev-team',
'priority': 'high',
'due_days': 7,
'effort': '2 days',
'success_criteria': 'Connection leak eliminated, monitoring confirms stable pool usage'
},
{
'title': 'Implement connection pool monitoring',
'description': 'Add CloudWatch metrics for connection pool utilization',
'owner': 'ops-team',
'priority': 'medium',
'due_days': 14,
'effort': '1 day',
'success_criteria': 'Connection pool metrics available in dashboard'
},
{
'title': 'Enhance code review checklist',
'description': 'Add resource management patterns to code review checklist',
'owner': 'tech-lead',
'priority': 'medium',
'due_days': 21,
'effort': '0.5 days',
'success_criteria': 'Updated checklist in use by all reviewers'
}
]
action_items = analysis_system.generate_action_items(report_id, recommendations)
print(f"Generated {len(action_items)} action items")
# Conduct blameless review
review_data = {
'attendees': ['ops-team', 'dev-team', 'tech-lead', 'product-manager'],
'review_duration': 90,
'lessons_learned': [
'Connection pool monitoring is critical for early detection',
'Code review process needs enhancement for resource management',
'Load testing should include connection pool stress testing'
]
}
review_summary = analysis_system.conduct_blameless_review(report_id, review_data)
print(f"Completed blameless review: {json.dumps(review_summary, indent=2)}")
# Export report
markdown_report = analysis_system.export_report(report_id, 'markdown')
print(f"Generated markdown report ({len(markdown_report)} characters)")
if __name__ == "__main__":
main(){% endraw %}
AWS Services
Primary Services
- Amazon S3: Storage for incident reports, documentation, and analysis data
- Amazon CloudWatch: Historical metrics and logs for incident analysis
- Amazon CloudWatch Logs: Log analysis for root cause investigation
- Amazon SNS: Notifications for review summaries and action item updates
Supporting Services
- AWS Lambda: Automated report generation and analysis workflows
- Amazon QuickSight: Visualization and dashboards for trend analysis
- Amazon EventBridge: Event-driven workflows for post-incident processes
- AWS Step Functions: Complex analysis workflow orchestration
Benefits
- Systematic Learning: Structured approach to understanding and preventing incidents
- Blameless Culture: Focus on improvement rather than blame
- Actionable Insights: Generate specific, measurable improvement actions
- Trend Analysis: Identify patterns and systemic issues across incidents
- Knowledge Retention: Capture and share lessons learned across the organization