Skip to content
REL13

REL13-BP05 - Automate recovery

One-Click Remediation

Deploy CloudFormation stacks to implement this best practice with a single click.

Disaster Recovery Automation
Automate recovery orchestration for disaster recovery procedures, validation, and failover readiness
Launch Stack

Stacks deploy to your AWS account. Review parameters before creating. Standard AWS charges apply.

REL13-BP05: Automate recovery

Implement automated disaster recovery processes to reduce recovery time, minimize human error, and ensure consistent execution. Automate both the detection of disasters and the recovery procedures, including failover, data restoration, and service resumption.

Implementation Steps

1. Implement Automated Disaster Detection

Set up automated systems to detect disaster conditions and trigger recovery processes.

2. Automate Failover Procedures

Create automated failover mechanisms that can redirect traffic and services to DR sites.

3. Automate Data Recovery

Implement automated data restoration processes that meet RPO requirements.

4. Automate Service Restoration

Create automated procedures to restore services and validate functionality.

5. Implement Recovery Orchestration

Use orchestration tools to coordinate complex recovery workflows across multiple systems.

Detailed Implementation

{% raw %}

View code
import boto3
import json
import time
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Tuple
from dataclasses import dataclass, asdict
from enum import Enum
import threading
import uuid

class DisasterType(Enum):
    REGION_OUTAGE = "region_outage"
    AZ_OUTAGE = "az_outage"
    SERVICE_OUTAGE = "service_outage"
    DATA_CORRUPTION = "data_corruption"
    SECURITY_INCIDENT = "security_incident"
    NATURAL_DISASTER = "natural_disaster"

class RecoveryStatus(Enum):
    DETECTING = "detecting"
    INITIATING = "initiating"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    FAILED = "failed"
    ROLLED_BACK = "rolled_back"

class RecoveryStep(Enum):
    DISASTER_DETECTION = "disaster_detection"
    FAILOVER_INITIATION = "failover_initiation"
    DATA_RECOVERY = "data_recovery"
    SERVICE_RESTORATION = "service_restoration"
    VALIDATION = "validation"
    NOTIFICATION = "notification"

@dataclass
class DisasterEvent:
    event_id: str
    disaster_type: DisasterType
    affected_region: str
    affected_services: List[str]
    detection_time: datetime
    severity: str
    estimated_impact: str
    recovery_required: bool

@dataclass
class RecoveryExecution:
    execution_id: str
    disaster_event_id: str
    recovery_strategy: str
    target_region: str
    status: RecoveryStatus
    start_time: datetime
    end_time: Optional[datetime]
    current_step: RecoveryStep
    steps_completed: List[str]
    rto_target_minutes: int
    rpo_target_minutes: int
    actual_rto_minutes: Optional[int]
    actual_rpo_minutes: Optional[int]

class AutomatedRecoverySystem:
    def __init__(self, region: str = 'us-east-1'):
        self.region = region
        self.dr_region = 'us-west-2'  # Default DR region
        
        # AWS clients for primary region
        self.cloudwatch = boto3.client('cloudwatch', region_name=region)
        self.route53 = boto3.client('route53')
        self.lambda_client = boto3.client('lambda', region_name=region)
        self.sns = boto3.client('sns', region_name=region)
        self.stepfunctions = boto3.client('stepfunctions', region_name=region)
        self.rds = boto3.client('rds', region_name=region)
        self.s3 = boto3.client('s3', region_name=region)
        
        # AWS clients for DR region
        self.dr_cloudwatch = boto3.client('cloudwatch', region_name=self.dr_region)
        self.dr_lambda = boto3.client('lambda', region_name=self.dr_region)
        self.dr_rds = boto3.client('rds', region_name=self.dr_region)
        
        # Configure logging
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        self.logger = logging.getLogger(__name__)
        
        # Recovery management
        self.disaster_events: Dict[str, DisasterEvent] = {}
        self.recovery_executions: Dict[str, RecoveryExecution] = {}
        self.recovery_workflows: Dict[str, str] = {}  # Workflow ARNs
        
        # Thread safety
        self.recovery_lock = threading.Lock()

    def setup_disaster_detection(self, detection_config: Dict[str, Any]) -> bool:
        """Set up automated disaster detection"""
        try:
            # Create CloudWatch alarms for disaster detection
            for alarm_config in detection_config.get('alarms', []):
                self._create_disaster_detection_alarm(alarm_config)
            
            # Set up health checks
            for health_check in detection_config.get('health_checks', []):
                self._create_health_check(health_check)
            
            # Create disaster detection Lambda function
            self._deploy_disaster_detection_function()
            
            self.logger.info("Disaster detection setup completed")
            return True
            
        except Exception as e:
            self.logger.error(f"Disaster detection setup failed: {str(e)}")
            return False

    def create_recovery_workflow(self, workflow_config: Dict[str, Any]) -> str:
        """Create automated recovery workflow using Step Functions"""
        try:
            workflow_name = workflow_config['name']
            
            # Define Step Functions state machine
            state_machine_definition = {
                "Comment": f"Automated disaster recovery workflow for {workflow_name}",
                "StartAt": "DetectDisaster",
                "States": {
                    "DetectDisaster": {
                        "Type": "Task",
                        "Resource": f"arn:aws:lambda:{self.region}:123456789012:function:disaster-detector",
                        "Next": "EvaluateRecoveryNeeded"
                    },
                    "EvaluateRecoveryNeeded": {
                        "Type": "Choice",
                        "Choices": [
                            {
                                "Variable": "$.recoveryRequired",
                                "BooleanEquals": True,
                                "Next": "InitiateFailover"
                            }
                        ],
                        "Default": "NoRecoveryNeeded"
                    },
                    "InitiateFailover": {
                        "Type": "Parallel",
                        "Branches": [
                            {
                                "StartAt": "DNSFailover",
                                "States": {
                                    "DNSFailover": {
                                        "Type": "Task",
                                        "Resource": f"arn:aws:lambda:{self.region}:123456789012:function:dns-failover",
                                        "End": True
                                    }
                                }
                            },
                            {
                                "StartAt": "DatabaseFailover",
                                "States": {
                                    "DatabaseFailover": {
                                        "Type": "Task",
                                        "Resource": f"arn:aws:lambda:{self.region}:123456789012:function:database-failover",
                                        "End": True
                                    }
                                }
                            }
                        ],
                        "Next": "RestoreServices"
                    },
                    "RestoreServices": {
                        "Type": "Task",
                        "Resource": f"arn:aws:lambda:{self.region}:123456789012:function:service-restoration",
                        "Next": "ValidateRecovery"
                    },
                    "ValidateRecovery": {
                        "Type": "Task",
                        "Resource": f"arn:aws:lambda:{self.region}:123456789012:function:recovery-validator",
                        "Next": "SendNotification"
                    },
                    "SendNotification": {
                        "Type": "Task",
                        "Resource": f"arn:aws:lambda:{self.region}:123456789012:function:recovery-notifier",
                        "End": True
                    },
                    "NoRecoveryNeeded": {
                        "Type": "Pass",
                        "Result": "No recovery action required",
                        "End": True
                    }
                }
            }
            
            # Create Step Functions state machine
            response = self.stepfunctions.create_state_machine(
                name=workflow_name,
                definition=json.dumps(state_machine_definition),
                roleArn=f"arn:aws:iam::123456789012:role/StepFunctionsExecutionRole",
                type='STANDARD'
            )
            
            workflow_arn = response['stateMachineArn']
            self.recovery_workflows[workflow_name] = workflow_arn
            
            self.logger.info(f"Created recovery workflow: {workflow_name}")
            return workflow_arn
            
        except Exception as e:
            self.logger.error(f"Recovery workflow creation failed: {str(e)}")
            return ""

    def detect_disaster(self, monitoring_data: Dict[str, Any]) -> Optional[DisasterEvent]:
        """Detect disaster conditions from monitoring data"""
        try:
            # Analyze monitoring data for disaster indicators
            disaster_indicators = self._analyze_disaster_indicators(monitoring_data)
            
            if disaster_indicators['disaster_detected']:
                event_id = f"disaster-{uuid.uuid4().hex[:8]}"
                
                disaster_event = DisasterEvent(
                    event_id=event_id,
                    disaster_type=DisasterType(disaster_indicators['type']),
                    affected_region=disaster_indicators['affected_region'],
                    affected_services=disaster_indicators['affected_services'],
                    detection_time=datetime.utcnow(),
                    severity=disaster_indicators['severity'],
                    estimated_impact=disaster_indicators['estimated_impact'],
                    recovery_required=disaster_indicators['recovery_required']
                )
                
                with self.recovery_lock:
                    self.disaster_events[event_id] = disaster_event
                
                self.logger.warning(f"Disaster detected: {event_id} - {disaster_event.disaster_type.value}")
                return disaster_event
            
            return None
            
        except Exception as e:
            self.logger.error(f"Disaster detection failed: {str(e)}")
            return None

    def execute_automated_recovery(self, disaster_event_id: str, recovery_config: Dict[str, Any]) -> str:
        """Execute automated disaster recovery"""
        try:
            disaster_event = self.disaster_events.get(disaster_event_id)
            if not disaster_event:
                raise ValueError(f"Disaster event {disaster_event_id} not found")
            
            execution_id = f"recovery-{uuid.uuid4().hex[:8]}"
            
            recovery_execution = RecoveryExecution(
                execution_id=execution_id,
                disaster_event_id=disaster_event_id,
                recovery_strategy=recovery_config['strategy'],
                target_region=recovery_config.get('target_region', self.dr_region),
                status=RecoveryStatus.INITIATING,
                start_time=datetime.utcnow(),
                end_time=None,
                current_step=RecoveryStep.DISASTER_DETECTION,
                steps_completed=[],
                rto_target_minutes=recovery_config['rto_target_minutes'],
                rpo_target_minutes=recovery_config['rpo_target_minutes'],
                actual_rto_minutes=None,
                actual_rpo_minutes=None
            )
            
            with self.recovery_lock:
                self.recovery_executions[execution_id] = recovery_execution
            
            # Execute recovery workflow
            workflow_arn = self.recovery_workflows.get(recovery_config['workflow_name'])
            if workflow_arn:
                self._execute_step_functions_workflow(workflow_arn, recovery_execution, disaster_event)
            else:
                self._execute_manual_recovery_steps(recovery_execution, disaster_event, recovery_config)
            
            self.logger.info(f"Started automated recovery: {execution_id}")
            return execution_id
            
        except Exception as e:
            self.logger.error(f"Automated recovery execution failed: {str(e)}")
            return ""

    def _execute_step_functions_workflow(self, workflow_arn: str, execution: RecoveryExecution, 
                                       disaster_event: DisasterEvent) -> None:
        """Execute recovery using Step Functions workflow"""
        try:
            # Prepare input for Step Functions
            workflow_input = {
                'executionId': execution.execution_id,
                'disasterEventId': disaster_event.event_id,
                'disasterType': disaster_event.disaster_type.value,
                'affectedRegion': disaster_event.affected_region,
                'affectedServices': disaster_event.affected_services,
                'targetRegion': execution.target_region,
                'recoveryStrategy': execution.recovery_strategy,
                'rtoTargetMinutes': execution.rto_target_minutes,
                'rpoTargetMinutes': execution.rpo_target_minutes
            }
            
            # Start Step Functions execution
            response = self.stepfunctions.start_execution(
                stateMachineArn=workflow_arn,
                name=f"recovery-{execution.execution_id}",
                input=json.dumps(workflow_input)
            )
            
            execution.status = RecoveryStatus.IN_PROGRESS
            self.logger.info(f"Started Step Functions workflow: {response['executionArn']}")
            
        except Exception as e:
            execution.status = RecoveryStatus.FAILED
            self.logger.error(f"Step Functions workflow execution failed: {str(e)}")

    def _execute_manual_recovery_steps(self, execution: RecoveryExecution, disaster_event: DisasterEvent,
                                     recovery_config: Dict[str, Any]) -> None:
        """Execute recovery steps manually when no workflow is available"""
        try:
            execution.status = RecoveryStatus.IN_PROGRESS
            
            # Step 1: DNS Failover
            execution.current_step = RecoveryStep.FAILOVER_INITIATION
            if self._execute_dns_failover(disaster_event, execution.target_region):
                execution.steps_completed.append('dns_failover')
            
            # Step 2: Database Failover
            if self._execute_database_failover(disaster_event, execution.target_region):
                execution.steps_completed.append('database_failover')
            
            # Step 3: Service Restoration
            execution.current_step = RecoveryStep.SERVICE_RESTORATION
            if self._execute_service_restoration(disaster_event, execution.target_region):
                execution.steps_completed.append('service_restoration')
            
            # Step 4: Validation
            execution.current_step = RecoveryStep.VALIDATION
            if self._validate_recovery(execution):
                execution.steps_completed.append('validation')
                execution.status = RecoveryStatus.COMPLETED
            else:
                execution.status = RecoveryStatus.FAILED
            
            # Step 5: Notification
            execution.current_step = RecoveryStep.NOTIFICATION
            self._send_recovery_notification(execution, disaster_event)
            execution.steps_completed.append('notification')
            
            # Calculate actual RTO
            execution.end_time = datetime.utcnow()
            execution.actual_rto_minutes = int((execution.end_time - execution.start_time).total_seconds() / 60)
            
            self.logger.info(f"Manual recovery steps completed: {execution.execution_id}")
            
        except Exception as e:
            execution.status = RecoveryStatus.FAILED
            execution.end_time = datetime.utcnow()
            self.logger.error(f"Manual recovery steps failed: {str(e)}")

    def _execute_dns_failover(self, disaster_event: DisasterEvent, target_region: str) -> bool:
        """Execute DNS failover to DR region"""
        try:
            # Get hosted zones that need failover
            hosted_zones = self._get_affected_hosted_zones(disaster_event.affected_services)
            
            for zone_id in hosted_zones:
                # Update Route 53 records to point to DR region
                self._update_route53_records(zone_id, target_region)
            
            self.logger.info(f"DNS failover completed to {target_region}")
            return True
            
        except Exception as e:
            self.logger.error(f"DNS failover failed: {str(e)}")
            return False

    def _execute_database_failover(self, disaster_event: DisasterEvent, target_region: str) -> bool:
        """Execute database failover to DR region"""
        try:
            # Get affected databases
            affected_databases = self._get_affected_databases(disaster_event.affected_services)
            
            for db_identifier in affected_databases:
                # Promote read replica or restore from backup
                self._promote_database_replica(db_identifier, target_region)
            
            self.logger.info(f"Database failover completed to {target_region}")
            return True
            
        except Exception as e:
            self.logger.error(f"Database failover failed: {str(e)}")
            return False

    def _execute_service_restoration(self, disaster_event: DisasterEvent, target_region: str) -> bool:
        """Execute service restoration in DR region"""
        try:
            # Start services in DR region
            for service in disaster_event.affected_services:
                self._start_service_in_dr_region(service, target_region)
            
            self.logger.info(f"Service restoration completed in {target_region}")
            return True
            
        except Exception as e:
            self.logger.error(f"Service restoration failed: {str(e)}")
            return False

    def _validate_recovery(self, execution: RecoveryExecution) -> bool:
        """Validate that recovery was successful"""
        try:
            # Perform health checks on restored services
            validation_results = []
            
            # Check DNS resolution
            dns_valid = self._validate_dns_resolution()
            validation_results.append(('dns_resolution', dns_valid))
            
            # Check database connectivity
            db_valid = self._validate_database_connectivity(execution.target_region)
            validation_results.append(('database_connectivity', db_valid))
            
            # Check service health
            services_valid = self._validate_service_health(execution.target_region)
            validation_results.append(('service_health', services_valid))
            
            # All validations must pass
            all_valid = all(result[1] for result in validation_results)
            
            self.logger.info(f"Recovery validation results: {validation_results}")
            return all_valid
            
        except Exception as e:
            self.logger.error(f"Recovery validation failed: {str(e)}")
            return False

    def _analyze_disaster_indicators(self, monitoring_data: Dict[str, Any]) -> Dict[str, Any]:
        """Analyze monitoring data for disaster indicators"""
        try:
            indicators = {
                'disaster_detected': False,
                'type': 'region_outage',
                'affected_region': self.region,
                'affected_services': [],
                'severity': 'medium',
                'estimated_impact': 'moderate',
                'recovery_required': False
            }
            
            # Analyze metrics for disaster patterns
            error_rate = monitoring_data.get('error_rate', 0)
            availability = monitoring_data.get('availability', 100)
            response_time = monitoring_data.get('response_time', 0)
            
            # Disaster detection logic
            if error_rate > 50 or availability < 50:
                indicators['disaster_detected'] = True
                indicators['recovery_required'] = True
                indicators['severity'] = 'high'
                indicators['affected_services'] = monitoring_data.get('affected_services', ['all'])
            
            return indicators
            
        except Exception as e:
            self.logger.error(f"Disaster indicator analysis failed: {str(e)}")
            return {'disaster_detected': False}

    def get_recovery_status(self, execution_id: str) -> Dict[str, Any]:
        """Get status of recovery execution"""
        try:
            execution = self.recovery_executions.get(execution_id)
            if not execution:
                return {'error': 'Recovery execution not found'}
            
            disaster_event = self.disaster_events.get(execution.disaster_event_id)
            
            status = {
                'execution_id': execution_id,
                'disaster_event_id': execution.disaster_event_id,
                'disaster_type': disaster_event.disaster_type.value if disaster_event else 'unknown',
                'recovery_strategy': execution.recovery_strategy,
                'status': execution.status.value,
                'current_step': execution.current_step.value,
                'steps_completed': execution.steps_completed,
                'start_time': execution.start_time.isoformat(),
                'end_time': execution.end_time.isoformat() if execution.end_time else None,
                'rto_target_minutes': execution.rto_target_minutes,
                'rpo_target_minutes': execution.rpo_target_minutes,
                'actual_rto_minutes': execution.actual_rto_minutes,
                'actual_rpo_minutes': execution.actual_rpo_minutes,
                'target_region': execution.target_region
            }
            
            return status
            
        except Exception as e:
            self.logger.error(f"Recovery status retrieval failed: {str(e)}")
            return {'error': str(e)}

# Example usage
def main():
    # Initialize automated recovery system
    recovery_system = AutomatedRecoverySystem(region='us-east-1')
    
    # Set up disaster detection
    detection_config = {
        'alarms': [
            {
                'name': 'HighErrorRate',
                'metric': 'ErrorRate',
                'threshold': 50,
                'comparison': 'GreaterThanThreshold'
            }
        ],
        'health_checks': [
            {
                'name': 'WebsiteHealth',
                'endpoint': 'https://example.com/health'
            }
        ]
    }
    
    print("Setting up disaster detection...")
    detection_setup = recovery_system.setup_disaster_detection(detection_config)
    
    # Create recovery workflow
    workflow_config = {
        'name': 'WebApplicationRecovery',
        'description': 'Automated recovery for web application'
    }
    
    print("Creating recovery workflow...")
    workflow_arn = recovery_system.create_recovery_workflow(workflow_config)
    
    # Simulate disaster detection
    monitoring_data = {
        'error_rate': 75,
        'availability': 25,
        'response_time': 5000,
        'affected_services': ['web-app', 'database', 'api-gateway']
    }
    
    print("Detecting disaster...")
    disaster_event = recovery_system.detect_disaster(monitoring_data)
    
    if disaster_event:
        print(f"Disaster detected: {disaster_event.event_id}")
        
        # Execute automated recovery
        recovery_config = {
            'strategy': 'warm_standby',
            'target_region': 'us-west-2',
            'workflow_name': 'WebApplicationRecovery',
            'rto_target_minutes': 30,
            'rpo_target_minutes': 15
        }
        
        print("Executing automated recovery...")
        execution_id = recovery_system.execute_automated_recovery(disaster_event.event_id, recovery_config)
        
        if execution_id:
            print(f"Recovery execution started: {execution_id}")
            
            # Get recovery status
            status = recovery_system.get_recovery_status(execution_id)
            print(f"Recovery status: {json.dumps(status, indent=2, default=str)}")

if __name__ == "__main__":
    main()

{% endraw %}

AWS Services

Primary Services

  • AWS Step Functions: Orchestration of complex recovery workflows
  • AWS Lambda: Event-driven automation for recovery processes
  • Amazon Route 53: Automated DNS failover and health checking
  • AWS Site Recovery: Automated disaster recovery orchestration

Supporting Services

  • Amazon CloudWatch: Monitoring and automated disaster detection
  • Amazon EventBridge: Event-driven recovery triggering
  • AWS Systems Manager: Automated configuration and command execution
  • Amazon SNS: Automated notifications for recovery events

Benefits

  • Reduced RTO: Automated processes significantly reduce recovery time
  • Minimized Human Error: Automation eliminates manual mistakes during high-stress situations
  • Consistent Execution: Automated procedures ensure consistent recovery processes
  • 24/7 Availability: Automated systems can respond to disasters at any time
  • Scalable Recovery: Automation can handle multiple simultaneous recovery scenarios