REL12
REL12-BP01 - Use playbooks to investigate failures
REL12-BP01: Use playbooks to investigate failures
Develop and maintain standardized playbooks that guide teams through systematic investigation of failures. These playbooks ensure consistent, thorough analysis and faster resolution of incidents by providing step-by-step procedures, decision trees, and escalation paths.
Implementation Steps
1. Create Incident Response Playbooks
Develop standardized procedures for different types of incidents and failure scenarios.
2. Implement Automated Diagnostics
Build automated tools that gather relevant information and perform initial analysis.
3. Establish Decision Trees
Create decision trees that guide responders through systematic troubleshooting.
4. Define Escalation Procedures
Establish clear escalation paths and communication protocols.
5. Maintain and Update Playbooks
Regularly review and update playbooks based on lessons learned and system changes.
Detailed Implementation
{% raw %}
View code
import boto3
import json
import time
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Tuple
from dataclasses import dataclass, asdict
from enum import Enum
import threading
import yaml
import subprocess
import requests
class IncidentSeverity(Enum):
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
class PlaybookStatus(Enum):
NOT_STARTED = "not_started"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
ESCALATED = "escalated"
FAILED = "failed"
class DiagnosticType(Enum):
SYSTEM_HEALTH = "system_health"
PERFORMANCE = "performance"
CONNECTIVITY = "connectivity"
RESOURCE_USAGE = "resource_usage"
LOG_ANALYSIS = "log_analysis"
@dataclass
class PlaybookStep:
step_id: str
title: str
description: str
action_type: str # manual, automated, decision
commands: List[str]
expected_output: str
success_criteria: str
failure_action: str
estimated_duration: int
required_permissions: List[str]
@dataclass
class IncidentPlaybook:
playbook_id: str
name: str
description: str
incident_types: List[str]
severity_levels: List[IncidentSeverity]
steps: List[PlaybookStep]
escalation_criteria: Dict[str, Any]
prerequisites: List[str]
tools_required: List[str]
@dataclass
class PlaybookExecution:
execution_id: str
playbook_id: str
incident_id: str
started_by: str
start_time: datetime
end_time: Optional[datetime]
status: PlaybookStatus
current_step: int
step_results: List[Dict[str, Any]]
escalated: bool
notes: List[str]
class FailureInvestigationSystem:
def __init__(self, region: str = 'us-east-1'):
self.region = region
# AWS clients
self.cloudwatch = boto3.client('cloudwatch', region_name=region)
self.logs = boto3.client('logs', region_name=region)
self.ec2 = boto3.client('ec2', region_name=region)
self.elbv2 = boto3.client('elbv2', region_name=region)
self.rds = boto3.client('rds', region_name=region)
self.lambda_client = boto3.client('lambda', region_name=region)
self.ssm = boto3.client('ssm', region_name=region)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
# Playbook management
self.playbooks: Dict[str, IncidentPlaybook] = {}
self.active_executions: Dict[str, PlaybookExecution] = {}
self.execution_history: List[PlaybookExecution] = []
# Diagnostic tools
self.diagnostic_tools: Dict[str, Any] = {}
# Thread safety
self.execution_lock = threading.Lock()
def register_playbook(self, playbook: IncidentPlaybook) -> bool:
"""Register a new incident response playbook"""
try:
self.playbooks[playbook.playbook_id] = playbook
self.logger.info(f"Registered playbook: {playbook.name}")
return True
except Exception as e:
self.logger.error(f"Failed to register playbook {playbook.playbook_id}: {str(e)}")
return False
def create_standard_playbooks(self) -> List[IncidentPlaybook]:
"""Create standard incident response playbooks"""
playbooks = []
try:
# High CPU Utilization Playbook
cpu_playbook = IncidentPlaybook(
playbook_id="cpu-high-utilization",
name="High CPU Utilization Investigation",
description="Systematic investigation of high CPU utilization incidents",
incident_types=["high_cpu", "performance_degradation"],
severity_levels=[IncidentSeverity.HIGH, IncidentSeverity.CRITICAL],
steps=[
PlaybookStep(
step_id="cpu-001",
title="Verify CPU Metrics",
description="Check current CPU utilization across all instances",
action_type="automated",
commands=["get_cpu_metrics"],
expected_output="CPU utilization percentages for all instances",
success_criteria="CPU metrics retrieved successfully",
failure_action="escalate",
estimated_duration=2,
required_permissions=["cloudwatch:GetMetricStatistics"]
),
PlaybookStep(
step_id="cpu-002",
title="Identify Top Processes",
description="Identify processes consuming the most CPU",
action_type="automated",
commands=["get_top_processes"],
expected_output="List of top CPU-consuming processes",
success_criteria="Process list retrieved",
failure_action="continue",
estimated_duration=3,
required_permissions=["ssm:SendCommand"]
),
PlaybookStep(
step_id="cpu-003",
title="Check Auto Scaling Status",
description="Verify if Auto Scaling is responding appropriately",
action_type="automated",
commands=["check_autoscaling_activity"],
expected_output="Auto Scaling group status and recent activities",
success_criteria="Auto Scaling status retrieved",
failure_action="continue",
estimated_duration=2,
required_permissions=["autoscaling:DescribeAutoScalingGroups"]
),
PlaybookStep(
step_id="cpu-004",
title="Analyze Application Logs",
description="Review application logs for errors or unusual patterns",
action_type="automated",
commands=["analyze_application_logs"],
expected_output="Log analysis results with error patterns",
success_criteria="Log analysis completed",
failure_action="continue",
estimated_duration=5,
required_permissions=["logs:FilterLogEvents"]
),
PlaybookStep(
step_id="cpu-005",
title="Decision Point: Scale or Investigate",
description="Determine if immediate scaling is needed or further investigation required",
action_type="decision",
commands=["evaluate_scaling_decision"],
expected_output="Scaling recommendation",
success_criteria="Decision made",
failure_action="escalate",
estimated_duration=3,
required_permissions=[]
)
],
escalation_criteria={
"cpu_threshold": 90,
"duration_minutes": 15,
"failed_steps": 2
},
prerequisites=["CloudWatch monitoring enabled", "SSM agent installed"],
tools_required=["AWS CLI", "CloudWatch", "Systems Manager"]
)
playbooks.append(cpu_playbook)
self.register_playbook(cpu_playbook)
# Database Connection Issues Playbook
db_playbook = IncidentPlaybook(
playbook_id="database-connection-issues",
name="Database Connection Issues Investigation",
description="Systematic investigation of database connectivity problems",
incident_types=["database_connection", "timeout_errors"],
severity_levels=[IncidentSeverity.CRITICAL, IncidentSeverity.HIGH],
steps=[
PlaybookStep(
step_id="db-001",
title="Check Database Status",
description="Verify database instance status and availability",
action_type="automated",
commands=["check_database_status"],
expected_output="Database instance status and metrics",
success_criteria="Database status retrieved",
failure_action="escalate",
estimated_duration=2,
required_permissions=["rds:DescribeDBInstances"]
),
PlaybookStep(
step_id="db-002",
title="Test Database Connectivity",
description="Test connection from application servers to database",
action_type="automated",
commands=["test_database_connectivity"],
expected_output="Connection test results from each app server",
success_criteria="Connection tests completed",
failure_action="continue",
estimated_duration=3,
required_permissions=["ssm:SendCommand"]
),
PlaybookStep(
step_id="db-003",
title="Check Connection Pool Status",
description="Analyze database connection pool metrics",
action_type="automated",
commands=["analyze_connection_pool"],
expected_output="Connection pool utilization and wait times",
success_criteria="Connection pool analysis completed",
failure_action="continue",
estimated_duration=3,
required_permissions=["cloudwatch:GetMetricStatistics"]
),
PlaybookStep(
step_id="db-004",
title="Review Database Logs",
description="Examine database logs for errors and slow queries",
action_type="automated",
commands=["analyze_database_logs"],
expected_output="Database log analysis with error patterns",
success_criteria="Log analysis completed",
failure_action="continue",
estimated_duration=5,
required_permissions=["rds:DescribeDBLogFiles"]
),
PlaybookStep(
step_id="db-005",
title="Check Network Connectivity",
description="Verify network path between app servers and database",
action_type="automated",
commands=["check_network_connectivity"],
expected_output="Network connectivity test results",
success_criteria="Network tests completed",
failure_action="continue",
estimated_duration=4,
required_permissions=["ec2:DescribeSecurityGroups"]
)
],
escalation_criteria={
"connection_failure_rate": 50,
"response_time_threshold": 5000,
"failed_steps": 1
},
prerequisites=["Database monitoring enabled", "Network access configured"],
tools_required=["AWS CLI", "Database client", "Network tools"]
)
playbooks.append(db_playbook)
self.register_playbook(db_playbook)
self.logger.info(f"Created {len(playbooks)} standard playbooks")
return playbooks
except Exception as e:
self.logger.error(f"Failed to create standard playbooks: {str(e)}")
return playbooks
def execute_playbook(self, playbook_id: str, incident_id: str, executed_by: str) -> str:
"""Execute an incident response playbook"""
try:
playbook = self.playbooks.get(playbook_id)
if not playbook:
raise ValueError(f"Playbook {playbook_id} not found")
execution_id = f"exec-{int(time.time())}-{playbook_id}"
with self.execution_lock:
execution = PlaybookExecution(
execution_id=execution_id,
playbook_id=playbook_id,
incident_id=incident_id,
started_by=executed_by,
start_time=datetime.utcnow(),
end_time=None,
status=PlaybookStatus.IN_PROGRESS,
current_step=0,
step_results=[],
escalated=False,
notes=[]
)
self.active_executions[execution_id] = execution
# Execute playbook steps
self._execute_playbook_steps(execution, playbook)
self.logger.info(f"Started playbook execution: {execution_id}")
return execution_id
except Exception as e:
self.logger.error(f"Failed to execute playbook {playbook_id}: {str(e)}")
return ""
def _execute_playbook_steps(self, execution: PlaybookExecution, playbook: IncidentPlaybook) -> None:
"""Execute individual playbook steps"""
try:
for i, step in enumerate(playbook.steps):
execution.current_step = i
self.logger.info(f"Executing step {step.step_id}: {step.title}")
step_result = {
'step_id': step.step_id,
'title': step.title,
'start_time': datetime.utcnow().isoformat(),
'status': 'in_progress',
'output': '',
'success': False,
'duration': 0
}
start_time = time.time()
try:
if step.action_type == "automated":
output = self._execute_automated_step(step)
step_result['output'] = output
step_result['success'] = self._validate_step_success(step, output)
elif step.action_type == "decision":
decision = self._execute_decision_step(step, execution)
step_result['output'] = decision
step_result['success'] = True
elif step.action_type == "manual":
# For manual steps, mark as pending manual action
step_result['output'] = "Manual action required"
step_result['success'] = True
step_result['status'] = 'pending_manual'
step_result['duration'] = time.time() - start_time
step_result['end_time'] = datetime.utcnow().isoformat()
step_result['status'] = 'completed' if step_result['success'] else 'failed'
except Exception as e:
step_result['duration'] = time.time() - start_time
step_result['end_time'] = datetime.utcnow().isoformat()
step_result['status'] = 'failed'
step_result['error'] = str(e)
step_result['success'] = False
self.logger.error(f"Step {step.step_id} failed: {str(e)}")
# Handle failure action
if step.failure_action == "escalate":
execution.escalated = True
execution.status = PlaybookStatus.ESCALATED
break
elif step.failure_action == "stop":
execution.status = PlaybookStatus.FAILED
break
execution.step_results.append(step_result)
# Check escalation criteria
if self._should_escalate(execution, playbook):
execution.escalated = True
execution.status = PlaybookStatus.ESCALATED
break
# Complete execution if not escalated or failed
if execution.status == PlaybookStatus.IN_PROGRESS:
execution.status = PlaybookStatus.COMPLETED
execution.end_time = datetime.utcnow()
# Move to history
with self.execution_lock:
del self.active_executions[execution.execution_id]
self.execution_history.append(execution)
except Exception as e:
execution.status = PlaybookStatus.FAILED
execution.end_time = datetime.utcnow()
self.logger.error(f"Playbook execution failed: {str(e)}")
def _execute_automated_step(self, step: PlaybookStep) -> str:
"""Execute an automated playbook step"""
try:
results = []
for command in step.commands:
if command == "get_cpu_metrics":
result = self._get_cpu_metrics()
elif command == "get_top_processes":
result = self._get_top_processes()
elif command == "check_autoscaling_activity":
result = self._check_autoscaling_activity()
elif command == "analyze_application_logs":
result = self._analyze_application_logs()
elif command == "check_database_status":
result = self._check_database_status()
elif command == "test_database_connectivity":
result = self._test_database_connectivity()
elif command == "analyze_connection_pool":
result = self._analyze_connection_pool()
elif command == "analyze_database_logs":
result = self._analyze_database_logs()
elif command == "check_network_connectivity":
result = self._check_network_connectivity()
else:
result = f"Unknown command: {command}"
results.append(f"{command}: {result}")
return "\n".join(results)
except Exception as e:
self.logger.error(f"Automated step execution failed: {str(e)}")
return f"Error: {str(e)}"
def _get_cpu_metrics(self) -> str:
"""Get CPU utilization metrics"""
try:
end_time = datetime.utcnow()
start_time = end_time - timedelta(minutes=15)
response = self.cloudwatch.get_metric_statistics(
Namespace='AWS/EC2',
MetricName='CPUUtilization',
Dimensions=[],
StartTime=start_time,
EndTime=end_time,
Period=300,
Statistics=['Average', 'Maximum']
)
if response['Datapoints']:
latest = max(response['Datapoints'], key=lambda x: x['Timestamp'])
return f"Current CPU: {latest['Average']:.2f}% (Max: {latest['Maximum']:.2f}%)"
else:
return "No CPU metrics available"
except Exception as e:
return f"Failed to get CPU metrics: {str(e)}"
def _get_top_processes(self) -> str:
"""Get top CPU-consuming processes via SSM"""
try:
# This would use SSM to run commands on instances
# For demo purposes, returning simulated data
return "Top processes: java (45%), nginx (12%), python (8%)"
except Exception as e:
return f"Failed to get top processes: {str(e)}"
def _check_autoscaling_activity(self) -> str:
"""Check Auto Scaling group activity"""
try:
# This would check ASG activities
return "Auto Scaling: 2 instances launched in last 10 minutes"
except Exception as e:
return f"Failed to check Auto Scaling: {str(e)}"
def _analyze_application_logs(self) -> str:
"""Analyze application logs for patterns"""
try:
end_time = datetime.utcnow()
start_time = end_time - timedelta(minutes=30)
# This would analyze CloudWatch Logs
return "Log analysis: 15 errors found, mostly timeout exceptions"
except Exception as e:
return f"Failed to analyze logs: {str(e)}"
def _check_database_status(self) -> str:
"""Check RDS database status"""
try:
response = self.rds.describe_db_instances()
statuses = []
for db in response['DBInstances']:
statuses.append(f"{db['DBInstanceIdentifier']}: {db['DBInstanceStatus']}")
return "; ".join(statuses) if statuses else "No databases found"
except Exception as e:
return f"Failed to check database status: {str(e)}"
def _test_database_connectivity(self) -> str:
"""Test database connectivity from application servers"""
try:
# This would test actual connectivity
return "Connectivity test: 3/4 app servers can connect, 1 timeout"
except Exception as e:
return f"Failed to test connectivity: {str(e)}"
def _analyze_connection_pool(self) -> str:
"""Analyze database connection pool metrics"""
try:
# This would analyze connection pool metrics
return "Connection pool: 85% utilization, avg wait time 2.3s"
except Exception as e:
return f"Failed to analyze connection pool: {str(e)}"
def _analyze_database_logs(self) -> str:
"""Analyze database logs"""
try:
# This would analyze RDS logs
return "Database logs: 8 slow queries detected, 2 connection errors"
except Exception as e:
return f"Failed to analyze database logs: {str(e)}"
def _check_network_connectivity(self) -> str:
"""Check network connectivity"""
try:
# This would check security groups, NACLs, etc.
return "Network check: All security groups allow required ports"
except Exception as e:
return f"Failed to check network: {str(e)}"
def _execute_decision_step(self, step: PlaybookStep, execution: PlaybookExecution) -> str:
"""Execute a decision step"""
try:
# Analyze previous step results to make decision
if step.step_id == "cpu-005":
# Analyze CPU metrics and decide on scaling
cpu_results = [r for r in execution.step_results if 'cpu' in r.get('output', '').lower()]
if cpu_results and 'CPU: 9' in cpu_results[0].get('output', ''):
return "Decision: Immediate scaling required"
else:
return "Decision: Continue investigation"
return "Decision: Continue with next step"
except Exception as e:
return f"Decision error: {str(e)}"
def _validate_step_success(self, step: PlaybookStep, output: str) -> bool:
"""Validate if a step was successful"""
try:
if "Error:" in output or "Failed:" in output:
return False
# Check success criteria
if step.success_criteria in output:
return True
# Default success if no errors
return "Error" not in output and "Failed" not in output
except Exception as e:
self.logger.error(f"Step validation failed: {str(e)}")
return False
def _should_escalate(self, execution: PlaybookExecution, playbook: IncidentPlaybook) -> bool:
"""Check if execution should be escalated"""
try:
criteria = playbook.escalation_criteria
# Check failed steps
failed_steps = len([r for r in execution.step_results if not r.get('success', False)])
if failed_steps >= criteria.get('failed_steps', 999):
return True
# Check duration
if execution.start_time:
duration = (datetime.utcnow() - execution.start_time).total_seconds() / 60
if duration > criteria.get('max_duration_minutes', 999):
return True
return False
except Exception as e:
self.logger.error(f"Escalation check failed: {str(e)}")
return False
def get_playbook_execution_status(self, execution_id: str) -> Dict[str, Any]:
"""Get status of a playbook execution"""
try:
# Check active executions
if execution_id in self.active_executions:
execution = self.active_executions[execution_id]
else:
# Check history
execution = next((e for e in self.execution_history if e.execution_id == execution_id), None)
if not execution:
return {'error': 'Execution not found'}
status = {
'execution_id': execution.execution_id,
'playbook_id': execution.playbook_id,
'incident_id': execution.incident_id,
'status': execution.status.value,
'started_by': execution.started_by,
'start_time': execution.start_time.isoformat(),
'end_time': execution.end_time.isoformat() if execution.end_time else None,
'current_step': execution.current_step,
'total_steps': len(self.playbooks[execution.playbook_id].steps) if execution.playbook_id in self.playbooks else 0,
'escalated': execution.escalated,
'step_results': execution.step_results,
'notes': execution.notes
}
return status
except Exception as e:
self.logger.error(f"Failed to get execution status: {str(e)}")
return {'error': str(e)}
def get_playbook_statistics(self) -> Dict[str, Any]:
"""Get playbook usage statistics"""
try:
total_executions = len(self.execution_history) + len(self.active_executions)
completed_executions = len([e for e in self.execution_history if e.status == PlaybookStatus.COMPLETED])
escalated_executions = len([e for e in self.execution_history if e.escalated])
# Calculate average execution time
completed = [e for e in self.execution_history if e.status == PlaybookStatus.COMPLETED and e.end_time]
avg_duration = 0
if completed:
durations = [(e.end_time - e.start_time).total_seconds() / 60 for e in completed]
avg_duration = sum(durations) / len(durations)
# Playbook usage frequency
playbook_usage = {}
for execution in self.execution_history:
playbook_id = execution.playbook_id
playbook_usage[playbook_id] = playbook_usage.get(playbook_id, 0) + 1
statistics = {
'total_playbooks': len(self.playbooks),
'total_executions': total_executions,
'active_executions': len(self.active_executions),
'completed_executions': completed_executions,
'escalated_executions': escalated_executions,
'success_rate': (completed_executions / total_executions * 100) if total_executions > 0 else 0,
'average_duration_minutes': avg_duration,
'playbook_usage_frequency': playbook_usage
}
return statistics
except Exception as e:
self.logger.error(f"Failed to get statistics: {str(e)}")
return {}
# Example usage
def main():
# Initialize failure investigation system
investigation_system = FailureInvestigationSystem(region='us-east-1')
# Create standard playbooks
print("Creating standard incident response playbooks...")
playbooks = investigation_system.create_standard_playbooks()
print(f"Created {len(playbooks)} playbooks:")
for playbook in playbooks:
print(f"- {playbook.name} ({len(playbook.steps)} steps)")
# Execute a playbook
print("\nExecuting CPU high utilization playbook...")
execution_id = investigation_system.execute_playbook(
playbook_id="cpu-high-utilization",
incident_id="incident-2024-001",
executed_by="ops-team"
)
if execution_id:
print(f"Playbook execution started: {execution_id}")
# Wait a moment for execution to progress
time.sleep(2)
# Get execution status
status = investigation_system.get_playbook_execution_status(execution_id)
print(f"Execution status: {json.dumps(status, indent=2, default=str)}")
# Get system statistics
stats = investigation_system.get_playbook_statistics()
print(f"\nPlaybook system statistics: {json.dumps(stats, indent=2)}")
if __name__ == "__main__":
main(){% endraw %}
AWS Services
Primary Services
- AWS Systems Manager: Automated command execution and operational procedures
- Amazon CloudWatch: Metrics collection and analysis for diagnostics
- Amazon CloudWatch Logs: Log aggregation and analysis
- AWS Lambda: Event-driven automation for playbook execution
Supporting Services
- Amazon S3: Storage for playbook documentation and execution results
- Amazon SNS: Notifications for playbook execution status
- AWS Step Functions: Complex playbook workflow orchestration
- Amazon EventBridge: Event-driven playbook triggering
Benefits
- Consistent Investigation: Standardized procedures ensure thorough analysis
- Faster Resolution: Automated diagnostics reduce mean time to resolution
- Knowledge Retention: Playbooks capture institutional knowledge
- Reduced Human Error: Systematic approach minimizes mistakes
- Continuous Improvement: Playbooks evolve based on lessons learned