REL13-BP05 - Automate recovery
One-Click Remediation
Deploy CloudFormation stacks to implement this best practice with a single click.
Stacks deploy to your AWS account. Review parameters before creating. Standard AWS charges apply.
REL13-BP05: Automate recovery
Implement automated disaster recovery processes to reduce recovery time, minimize human error, and ensure consistent execution. Automate both the detection of disasters and the recovery procedures, including failover, data restoration, and service resumption.
Implementation Steps
1. Implement Automated Disaster Detection
Set up automated systems to detect disaster conditions and trigger recovery processes.
2. Automate Failover Procedures
Create automated failover mechanisms that can redirect traffic and services to DR sites.
3. Automate Data Recovery
Implement automated data restoration processes that meet RPO requirements.
4. Automate Service Restoration
Create automated procedures to restore services and validate functionality.
5. Implement Recovery Orchestration
Use orchestration tools to coordinate complex recovery workflows across multiple systems.
Detailed Implementation
{% raw %}
View code
import boto3
import json
import time
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Tuple
from dataclasses import dataclass, asdict
from enum import Enum
import threading
import uuid
class DisasterType(Enum):
REGION_OUTAGE = "region_outage"
AZ_OUTAGE = "az_outage"
SERVICE_OUTAGE = "service_outage"
DATA_CORRUPTION = "data_corruption"
SECURITY_INCIDENT = "security_incident"
NATURAL_DISASTER = "natural_disaster"
class RecoveryStatus(Enum):
DETECTING = "detecting"
INITIATING = "initiating"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
ROLLED_BACK = "rolled_back"
class RecoveryStep(Enum):
DISASTER_DETECTION = "disaster_detection"
FAILOVER_INITIATION = "failover_initiation"
DATA_RECOVERY = "data_recovery"
SERVICE_RESTORATION = "service_restoration"
VALIDATION = "validation"
NOTIFICATION = "notification"
@dataclass
class DisasterEvent:
event_id: str
disaster_type: DisasterType
affected_region: str
affected_services: List[str]
detection_time: datetime
severity: str
estimated_impact: str
recovery_required: bool
@dataclass
class RecoveryExecution:
execution_id: str
disaster_event_id: str
recovery_strategy: str
target_region: str
status: RecoveryStatus
start_time: datetime
end_time: Optional[datetime]
current_step: RecoveryStep
steps_completed: List[str]
rto_target_minutes: int
rpo_target_minutes: int
actual_rto_minutes: Optional[int]
actual_rpo_minutes: Optional[int]
class AutomatedRecoverySystem:
def __init__(self, region: str = 'us-east-1'):
self.region = region
self.dr_region = 'us-west-2' # Default DR region
# AWS clients for primary region
self.cloudwatch = boto3.client('cloudwatch', region_name=region)
self.route53 = boto3.client('route53')
self.lambda_client = boto3.client('lambda', region_name=region)
self.sns = boto3.client('sns', region_name=region)
self.stepfunctions = boto3.client('stepfunctions', region_name=region)
self.rds = boto3.client('rds', region_name=region)
self.s3 = boto3.client('s3', region_name=region)
# AWS clients for DR region
self.dr_cloudwatch = boto3.client('cloudwatch', region_name=self.dr_region)
self.dr_lambda = boto3.client('lambda', region_name=self.dr_region)
self.dr_rds = boto3.client('rds', region_name=self.dr_region)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
# Recovery management
self.disaster_events: Dict[str, DisasterEvent] = {}
self.recovery_executions: Dict[str, RecoveryExecution] = {}
self.recovery_workflows: Dict[str, str] = {} # Workflow ARNs
# Thread safety
self.recovery_lock = threading.Lock()
def setup_disaster_detection(self, detection_config: Dict[str, Any]) -> bool:
"""Set up automated disaster detection"""
try:
# Create CloudWatch alarms for disaster detection
for alarm_config in detection_config.get('alarms', []):
self._create_disaster_detection_alarm(alarm_config)
# Set up health checks
for health_check in detection_config.get('health_checks', []):
self._create_health_check(health_check)
# Create disaster detection Lambda function
self._deploy_disaster_detection_function()
self.logger.info("Disaster detection setup completed")
return True
except Exception as e:
self.logger.error(f"Disaster detection setup failed: {str(e)}")
return False
def create_recovery_workflow(self, workflow_config: Dict[str, Any]) -> str:
"""Create automated recovery workflow using Step Functions"""
try:
workflow_name = workflow_config['name']
# Define Step Functions state machine
state_machine_definition = {
"Comment": f"Automated disaster recovery workflow for {workflow_name}",
"StartAt": "DetectDisaster",
"States": {
"DetectDisaster": {
"Type": "Task",
"Resource": f"arn:aws:lambda:{self.region}:123456789012:function:disaster-detector",
"Next": "EvaluateRecoveryNeeded"
},
"EvaluateRecoveryNeeded": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.recoveryRequired",
"BooleanEquals": True,
"Next": "InitiateFailover"
}
],
"Default": "NoRecoveryNeeded"
},
"InitiateFailover": {
"Type": "Parallel",
"Branches": [
{
"StartAt": "DNSFailover",
"States": {
"DNSFailover": {
"Type": "Task",
"Resource": f"arn:aws:lambda:{self.region}:123456789012:function:dns-failover",
"End": True
}
}
},
{
"StartAt": "DatabaseFailover",
"States": {
"DatabaseFailover": {
"Type": "Task",
"Resource": f"arn:aws:lambda:{self.region}:123456789012:function:database-failover",
"End": True
}
}
}
],
"Next": "RestoreServices"
},
"RestoreServices": {
"Type": "Task",
"Resource": f"arn:aws:lambda:{self.region}:123456789012:function:service-restoration",
"Next": "ValidateRecovery"
},
"ValidateRecovery": {
"Type": "Task",
"Resource": f"arn:aws:lambda:{self.region}:123456789012:function:recovery-validator",
"Next": "SendNotification"
},
"SendNotification": {
"Type": "Task",
"Resource": f"arn:aws:lambda:{self.region}:123456789012:function:recovery-notifier",
"End": True
},
"NoRecoveryNeeded": {
"Type": "Pass",
"Result": "No recovery action required",
"End": True
}
}
}
# Create Step Functions state machine
response = self.stepfunctions.create_state_machine(
name=workflow_name,
definition=json.dumps(state_machine_definition),
roleArn=f"arn:aws:iam::123456789012:role/StepFunctionsExecutionRole",
type='STANDARD'
)
workflow_arn = response['stateMachineArn']
self.recovery_workflows[workflow_name] = workflow_arn
self.logger.info(f"Created recovery workflow: {workflow_name}")
return workflow_arn
except Exception as e:
self.logger.error(f"Recovery workflow creation failed: {str(e)}")
return ""
def detect_disaster(self, monitoring_data: Dict[str, Any]) -> Optional[DisasterEvent]:
"""Detect disaster conditions from monitoring data"""
try:
# Analyze monitoring data for disaster indicators
disaster_indicators = self._analyze_disaster_indicators(monitoring_data)
if disaster_indicators['disaster_detected']:
event_id = f"disaster-{uuid.uuid4().hex[:8]}"
disaster_event = DisasterEvent(
event_id=event_id,
disaster_type=DisasterType(disaster_indicators['type']),
affected_region=disaster_indicators['affected_region'],
affected_services=disaster_indicators['affected_services'],
detection_time=datetime.utcnow(),
severity=disaster_indicators['severity'],
estimated_impact=disaster_indicators['estimated_impact'],
recovery_required=disaster_indicators['recovery_required']
)
with self.recovery_lock:
self.disaster_events[event_id] = disaster_event
self.logger.warning(f"Disaster detected: {event_id} - {disaster_event.disaster_type.value}")
return disaster_event
return None
except Exception as e:
self.logger.error(f"Disaster detection failed: {str(e)}")
return None
def execute_automated_recovery(self, disaster_event_id: str, recovery_config: Dict[str, Any]) -> str:
"""Execute automated disaster recovery"""
try:
disaster_event = self.disaster_events.get(disaster_event_id)
if not disaster_event:
raise ValueError(f"Disaster event {disaster_event_id} not found")
execution_id = f"recovery-{uuid.uuid4().hex[:8]}"
recovery_execution = RecoveryExecution(
execution_id=execution_id,
disaster_event_id=disaster_event_id,
recovery_strategy=recovery_config['strategy'],
target_region=recovery_config.get('target_region', self.dr_region),
status=RecoveryStatus.INITIATING,
start_time=datetime.utcnow(),
end_time=None,
current_step=RecoveryStep.DISASTER_DETECTION,
steps_completed=[],
rto_target_minutes=recovery_config['rto_target_minutes'],
rpo_target_minutes=recovery_config['rpo_target_minutes'],
actual_rto_minutes=None,
actual_rpo_minutes=None
)
with self.recovery_lock:
self.recovery_executions[execution_id] = recovery_execution
# Execute recovery workflow
workflow_arn = self.recovery_workflows.get(recovery_config['workflow_name'])
if workflow_arn:
self._execute_step_functions_workflow(workflow_arn, recovery_execution, disaster_event)
else:
self._execute_manual_recovery_steps(recovery_execution, disaster_event, recovery_config)
self.logger.info(f"Started automated recovery: {execution_id}")
return execution_id
except Exception as e:
self.logger.error(f"Automated recovery execution failed: {str(e)}")
return ""
def _execute_step_functions_workflow(self, workflow_arn: str, execution: RecoveryExecution,
disaster_event: DisasterEvent) -> None:
"""Execute recovery using Step Functions workflow"""
try:
# Prepare input for Step Functions
workflow_input = {
'executionId': execution.execution_id,
'disasterEventId': disaster_event.event_id,
'disasterType': disaster_event.disaster_type.value,
'affectedRegion': disaster_event.affected_region,
'affectedServices': disaster_event.affected_services,
'targetRegion': execution.target_region,
'recoveryStrategy': execution.recovery_strategy,
'rtoTargetMinutes': execution.rto_target_minutes,
'rpoTargetMinutes': execution.rpo_target_minutes
}
# Start Step Functions execution
response = self.stepfunctions.start_execution(
stateMachineArn=workflow_arn,
name=f"recovery-{execution.execution_id}",
input=json.dumps(workflow_input)
)
execution.status = RecoveryStatus.IN_PROGRESS
self.logger.info(f"Started Step Functions workflow: {response['executionArn']}")
except Exception as e:
execution.status = RecoveryStatus.FAILED
self.logger.error(f"Step Functions workflow execution failed: {str(e)}")
def _execute_manual_recovery_steps(self, execution: RecoveryExecution, disaster_event: DisasterEvent,
recovery_config: Dict[str, Any]) -> None:
"""Execute recovery steps manually when no workflow is available"""
try:
execution.status = RecoveryStatus.IN_PROGRESS
# Step 1: DNS Failover
execution.current_step = RecoveryStep.FAILOVER_INITIATION
if self._execute_dns_failover(disaster_event, execution.target_region):
execution.steps_completed.append('dns_failover')
# Step 2: Database Failover
if self._execute_database_failover(disaster_event, execution.target_region):
execution.steps_completed.append('database_failover')
# Step 3: Service Restoration
execution.current_step = RecoveryStep.SERVICE_RESTORATION
if self._execute_service_restoration(disaster_event, execution.target_region):
execution.steps_completed.append('service_restoration')
# Step 4: Validation
execution.current_step = RecoveryStep.VALIDATION
if self._validate_recovery(execution):
execution.steps_completed.append('validation')
execution.status = RecoveryStatus.COMPLETED
else:
execution.status = RecoveryStatus.FAILED
# Step 5: Notification
execution.current_step = RecoveryStep.NOTIFICATION
self._send_recovery_notification(execution, disaster_event)
execution.steps_completed.append('notification')
# Calculate actual RTO
execution.end_time = datetime.utcnow()
execution.actual_rto_minutes = int((execution.end_time - execution.start_time).total_seconds() / 60)
self.logger.info(f"Manual recovery steps completed: {execution.execution_id}")
except Exception as e:
execution.status = RecoveryStatus.FAILED
execution.end_time = datetime.utcnow()
self.logger.error(f"Manual recovery steps failed: {str(e)}")
def _execute_dns_failover(self, disaster_event: DisasterEvent, target_region: str) -> bool:
"""Execute DNS failover to DR region"""
try:
# Get hosted zones that need failover
hosted_zones = self._get_affected_hosted_zones(disaster_event.affected_services)
for zone_id in hosted_zones:
# Update Route 53 records to point to DR region
self._update_route53_records(zone_id, target_region)
self.logger.info(f"DNS failover completed to {target_region}")
return True
except Exception as e:
self.logger.error(f"DNS failover failed: {str(e)}")
return False
def _execute_database_failover(self, disaster_event: DisasterEvent, target_region: str) -> bool:
"""Execute database failover to DR region"""
try:
# Get affected databases
affected_databases = self._get_affected_databases(disaster_event.affected_services)
for db_identifier in affected_databases:
# Promote read replica or restore from backup
self._promote_database_replica(db_identifier, target_region)
self.logger.info(f"Database failover completed to {target_region}")
return True
except Exception as e:
self.logger.error(f"Database failover failed: {str(e)}")
return False
def _execute_service_restoration(self, disaster_event: DisasterEvent, target_region: str) -> bool:
"""Execute service restoration in DR region"""
try:
# Start services in DR region
for service in disaster_event.affected_services:
self._start_service_in_dr_region(service, target_region)
self.logger.info(f"Service restoration completed in {target_region}")
return True
except Exception as e:
self.logger.error(f"Service restoration failed: {str(e)}")
return False
def _validate_recovery(self, execution: RecoveryExecution) -> bool:
"""Validate that recovery was successful"""
try:
# Perform health checks on restored services
validation_results = []
# Check DNS resolution
dns_valid = self._validate_dns_resolution()
validation_results.append(('dns_resolution', dns_valid))
# Check database connectivity
db_valid = self._validate_database_connectivity(execution.target_region)
validation_results.append(('database_connectivity', db_valid))
# Check service health
services_valid = self._validate_service_health(execution.target_region)
validation_results.append(('service_health', services_valid))
# All validations must pass
all_valid = all(result[1] for result in validation_results)
self.logger.info(f"Recovery validation results: {validation_results}")
return all_valid
except Exception as e:
self.logger.error(f"Recovery validation failed: {str(e)}")
return False
def _analyze_disaster_indicators(self, monitoring_data: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze monitoring data for disaster indicators"""
try:
indicators = {
'disaster_detected': False,
'type': 'region_outage',
'affected_region': self.region,
'affected_services': [],
'severity': 'medium',
'estimated_impact': 'moderate',
'recovery_required': False
}
# Analyze metrics for disaster patterns
error_rate = monitoring_data.get('error_rate', 0)
availability = monitoring_data.get('availability', 100)
response_time = monitoring_data.get('response_time', 0)
# Disaster detection logic
if error_rate > 50 or availability < 50:
indicators['disaster_detected'] = True
indicators['recovery_required'] = True
indicators['severity'] = 'high'
indicators['affected_services'] = monitoring_data.get('affected_services', ['all'])
return indicators
except Exception as e:
self.logger.error(f"Disaster indicator analysis failed: {str(e)}")
return {'disaster_detected': False}
def get_recovery_status(self, execution_id: str) -> Dict[str, Any]:
"""Get status of recovery execution"""
try:
execution = self.recovery_executions.get(execution_id)
if not execution:
return {'error': 'Recovery execution not found'}
disaster_event = self.disaster_events.get(execution.disaster_event_id)
status = {
'execution_id': execution_id,
'disaster_event_id': execution.disaster_event_id,
'disaster_type': disaster_event.disaster_type.value if disaster_event else 'unknown',
'recovery_strategy': execution.recovery_strategy,
'status': execution.status.value,
'current_step': execution.current_step.value,
'steps_completed': execution.steps_completed,
'start_time': execution.start_time.isoformat(),
'end_time': execution.end_time.isoformat() if execution.end_time else None,
'rto_target_minutes': execution.rto_target_minutes,
'rpo_target_minutes': execution.rpo_target_minutes,
'actual_rto_minutes': execution.actual_rto_minutes,
'actual_rpo_minutes': execution.actual_rpo_minutes,
'target_region': execution.target_region
}
return status
except Exception as e:
self.logger.error(f"Recovery status retrieval failed: {str(e)}")
return {'error': str(e)}
# Example usage
def main():
# Initialize automated recovery system
recovery_system = AutomatedRecoverySystem(region='us-east-1')
# Set up disaster detection
detection_config = {
'alarms': [
{
'name': 'HighErrorRate',
'metric': 'ErrorRate',
'threshold': 50,
'comparison': 'GreaterThanThreshold'
}
],
'health_checks': [
{
'name': 'WebsiteHealth',
'endpoint': 'https://example.com/health'
}
]
}
print("Setting up disaster detection...")
detection_setup = recovery_system.setup_disaster_detection(detection_config)
# Create recovery workflow
workflow_config = {
'name': 'WebApplicationRecovery',
'description': 'Automated recovery for web application'
}
print("Creating recovery workflow...")
workflow_arn = recovery_system.create_recovery_workflow(workflow_config)
# Simulate disaster detection
monitoring_data = {
'error_rate': 75,
'availability': 25,
'response_time': 5000,
'affected_services': ['web-app', 'database', 'api-gateway']
}
print("Detecting disaster...")
disaster_event = recovery_system.detect_disaster(monitoring_data)
if disaster_event:
print(f"Disaster detected: {disaster_event.event_id}")
# Execute automated recovery
recovery_config = {
'strategy': 'warm_standby',
'target_region': 'us-west-2',
'workflow_name': 'WebApplicationRecovery',
'rto_target_minutes': 30,
'rpo_target_minutes': 15
}
print("Executing automated recovery...")
execution_id = recovery_system.execute_automated_recovery(disaster_event.event_id, recovery_config)
if execution_id:
print(f"Recovery execution started: {execution_id}")
# Get recovery status
status = recovery_system.get_recovery_status(execution_id)
print(f"Recovery status: {json.dumps(status, indent=2, default=str)}")
if __name__ == "__main__":
main(){% endraw %}
AWS Services
Primary Services
- AWS Step Functions: Orchestration of complex recovery workflows
- AWS Lambda: Event-driven automation for recovery processes
- Amazon Route 53: Automated DNS failover and health checking
- AWS Site Recovery: Automated disaster recovery orchestration
Supporting Services
- Amazon CloudWatch: Monitoring and automated disaster detection
- Amazon EventBridge: Event-driven recovery triggering
- AWS Systems Manager: Automated configuration and command execution
- Amazon SNS: Automated notifications for recovery events
Benefits
- Reduced RTO: Automated processes significantly reduce recovery time
- Minimized Human Error: Automation eliminates manual mistakes during high-stress situations
- Consistent Execution: Automated procedures ensure consistent recovery processes
- 24/7 Availability: Automated systems can respond to disasters at any time
- Scalable Recovery: Automation can handle multiple simultaneous recovery scenarios