REL10
REL10-BP03 - Automate recovery for components constrained to a single location
One-Click Remediation
Deploy CloudFormation stacks to implement this best practice with a single click.
Single-Region Recovery Automation
Automate recovery actions for components constrained to a single location with failover orchestration hooks
Stacks deploy to your AWS account. Review parameters before creating. Standard AWS charges apply.
REL10-BP03: Automate recovery for components constrained to a single location
Overview
Implement automated recovery mechanisms for workload components that cannot be distributed across multiple locations due to technical, regulatory, or cost constraints. These single-location components represent potential single points of failure and require robust automated recovery strategies to maintain overall system reliability.
Implementation Steps
1. Identify Single-Location Components
- Catalog components constrained to single locations
- Analyze constraints preventing multi-location deployment
- Assess impact and criticality of single-location components
- Document dependencies and recovery requirements
2. Design Automated Recovery Strategies
- Implement automated backup and restore procedures
- Configure rapid provisioning and deployment automation
- Design failover mechanisms within the same location
- Establish automated health monitoring and failure detection
3. Implement Recovery Automation
- Configure automated instance replacement and scaling
- Implement database failover and point-in-time recovery
- Design automated application deployment and configuration
- Establish automated network and load balancer reconfiguration
4. Set Up Monitoring and Alerting
- Configure comprehensive health checks and monitoring
- Implement automated failure detection and classification
- Design escalation procedures and notification systems
- Establish recovery progress tracking and reporting
5. Configure Recovery Testing and Validation
- Implement automated recovery testing procedures
- Configure recovery time objective (RTO) validation
- Design recovery point objective (RPO) verification
- Establish continuous recovery capability assessment
6. Optimize Recovery Performance
- Monitor and analyze recovery times and success rates
- Implement continuous improvement based on recovery metrics
- Optimize recovery procedures and automation
- Establish recovery capacity planning and resource allocation
Implementation Examples
Example 1: Comprehensive Single-Location Recovery System
View code
import boto3
import json
import logging
import asyncio
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, asdict
from enum import Enum
import time
class ComponentType(Enum):
DATABASE = "database"
APPLICATION_SERVER = "application_server"
CACHE = "cache"
MESSAGE_QUEUE = "message_queue"
FILE_SYSTEM = "file_system"
LOAD_BALANCER = "load_balancer"
class RecoveryStrategy(Enum):
REPLACE_INSTANCE = "replace_instance"
RESTORE_FROM_BACKUP = "restore_from_backup"
FAILOVER_TO_STANDBY = "failover_to_standby"
SCALE_OUT = "scale_out"
RESTART_SERVICE = "restart_service"
class RecoveryStatus(Enum):
MONITORING = "monitoring"
FAILURE_DETECTED = "failure_detected"
RECOVERY_INITIATED = "recovery_initiated"
RECOVERY_IN_PROGRESS = "recovery_in_progress"
RECOVERY_COMPLETED = "recovery_completed"
RECOVERY_FAILED = "recovery_failed"
@dataclass
class SingleLocationComponent:
component_id: str
component_name: str
component_type: ComponentType
location: str # AZ or region
constraint_reason: str
criticality: str # critical, important, standard
recovery_strategies: List[RecoveryStrategy]
rto_minutes: int
rpo_minutes: int
health_check_config: Dict[str, Any]
backup_config: Dict[str, Any]
dependencies: List[str]
created_at: datetime
@dataclass
class RecoveryExecution:
execution_id: str
component_id: str
failure_detected_at: datetime
recovery_initiated_at: Optional[datetime]
recovery_completed_at: Optional[datetime]
status: RecoveryStatus
recovery_strategy_used: Optional[RecoveryStrategy]
actual_rto_minutes: Optional[float]
actual_rpo_minutes: Optional[float]
recovery_steps: List[Dict[str, Any]]
error_message: Optional[str]
class SingleLocationRecoveryManager:
"""Automated recovery system for single-location components"""
def __init__(self, config: Dict[str, Any]):
self.config = config
# AWS clients
self.ec2 = boto3.client('ec2')
self.rds = boto3.client('rds')
self.elasticache = boto3.client('elasticache')
self.elbv2 = boto3.client('elbv2')
self.autoscaling = boto3.client('autoscaling')
self.backup = boto3.client('backup')
self.lambda_client = boto3.client('lambda')
self.cloudwatch = boto3.client('cloudwatch')
self.sns = boto3.client('sns')
self.dynamodb = boto3.resource('dynamodb')
# Storage
self.components_table = self.dynamodb.Table(config.get('components_table', 'single-location-components'))
self.executions_table = self.dynamodb.Table(config.get('executions_table', 'recovery-executions'))
# Configuration
self.notification_topic_arn = config.get('notification_topic_arn')
self.recovery_timeout_minutes = config.get('recovery_timeout_minutes', 60)
# Active components and recoveries
self.monitored_components = {}
self.active_recoveries = {}
async def register_single_location_component(self, component_config: Dict[str, Any]) -> str:
"""Register a single-location component for monitoring and recovery"""
try:
component_id = f"comp_{int(datetime.utcnow().timestamp())}_{component_config['component_name'].replace(' ', '_').lower()}"
component = SingleLocationComponent(
component_id=component_id,
component_name=component_config['component_name'],
component_type=ComponentType(component_config['component_type']),
location=component_config['location'],
constraint_reason=component_config['constraint_reason'],
criticality=component_config.get('criticality', 'important'),
recovery_strategies=[RecoveryStrategy(s) for s in component_config.get('recovery_strategies', ['replace_instance'])],
rto_minutes=component_config.get('rto_minutes', 30),
rpo_minutes=component_config.get('rpo_minutes', 15),
health_check_config=component_config.get('health_check_config', {}),
backup_config=component_config.get('backup_config', {}),
dependencies=component_config.get('dependencies', []),
created_at=datetime.utcnow()
)
# Store component
await self._store_component(component)
# Start monitoring
self.monitored_components[component_id] = component
asyncio.create_task(self._monitor_component_health(component))
# Set up automated backups if configured
if component.backup_config:
await self._setup_automated_backups(component)
logging.info(f"Registered single-location component: {component_id}")
return component_id
except Exception as e:
logging.error(f"Failed to register component: {str(e)}")
raise
async def _monitor_component_health(self, component: SingleLocationComponent):
"""Monitor health of single-location component"""
try:
while component.component_id in self.monitored_components:
# Perform health check
health_status = await self._perform_health_check(component)
if not health_status['healthy']:
# Component failure detected
logging.warning(f"Component failure detected: {component.component_id}")
# Check if recovery is already in progress
if component.component_id not in self.active_recoveries:
# Initiate recovery
await self._initiate_component_recovery(component, health_status)
# Wait before next health check
check_interval = component.health_check_config.get('interval_seconds', 60)
await asyncio.sleep(check_interval)
except Exception as e:
logging.error(f"Health monitoring failed for {component.component_id}: {str(e)}")
async def _perform_health_check(self, component: SingleLocationComponent) -> Dict[str, Any]:
"""Perform health check on component"""
try:
health_config = component.health_check_config
if component.component_type == ComponentType.DATABASE:
return await self._check_database_health(component, health_config)
elif component.component_type == ComponentType.APPLICATION_SERVER:
return await self._check_application_server_health(component, health_config)
elif component.component_type == ComponentType.CACHE:
return await self._check_cache_health(component, health_config)
elif component.component_type == ComponentType.LOAD_BALANCER:
return await self._check_load_balancer_health(component, health_config)
else:
return await self._check_generic_health(component, health_config)
except Exception as e:
logging.error(f"Health check failed for {component.component_id}: {str(e)}")
return {'healthy': False, 'error': str(e)}
async def _check_database_health(self, component: SingleLocationComponent,
health_config: Dict[str, Any]) -> Dict[str, Any]:
"""Check database health"""
try:
db_identifier = health_config.get('db_identifier')
if not db_identifier:
return {'healthy': False, 'error': 'No database identifier configured'}
# Check RDS instance status
response = self.rds.describe_db_instances(DBInstanceIdentifier=db_identifier)
if response['DBInstances']:
db_instance = response['DBInstances'][0]
status = db_instance['DBInstanceStatus']
if status == 'available':
# Additional connectivity check
connectivity_check = await self._test_database_connectivity(health_config)
return {
'healthy': connectivity_check,
'status': status,
'connectivity': connectivity_check
}
else:
return {
'healthy': False,
'status': status,
'error': f'Database status is {status}'
}
else:
return {'healthy': False, 'error': 'Database instance not found'}
except Exception as e:
logging.error(f"Database health check failed: {str(e)}")
return {'healthy': False, 'error': str(e)}
async def _test_database_connectivity(self, health_config: Dict[str, Any]) -> bool:
"""Test database connectivity"""
try:
# This would implement actual database connectivity test
# For now, we'll simulate the check
return True
except Exception as e:
logging.error(f"Database connectivity test failed: {str(e)}")
return False
async def _check_application_server_health(self, component: SingleLocationComponent,
health_config: Dict[str, Any]) -> Dict[str, Any]:
"""Check application server health"""
try:
instance_id = health_config.get('instance_id')
if not instance_id:
return {'healthy': False, 'error': 'No instance ID configured'}
# Check EC2 instance status
response = self.ec2.describe_instance_status(InstanceIds=[instance_id])
if response['InstanceStatuses']:
status = response['InstanceStatuses'][0]
instance_status = status['InstanceStatus']['Status']
system_status = status['SystemStatus']['Status']
if instance_status == 'ok' and system_status == 'ok':
# Additional application health check
app_health = await self._test_application_health(health_config)
return {
'healthy': app_health,
'instance_status': instance_status,
'system_status': system_status,
'application_health': app_health
}
else:
return {
'healthy': False,
'instance_status': instance_status,
'system_status': system_status,
'error': 'Instance or system status check failed'
}
else:
return {'healthy': False, 'error': 'Instance status not available'}
except Exception as e:
logging.error(f"Application server health check failed: {str(e)}")
return {'healthy': False, 'error': str(e)}
async def _test_application_health(self, health_config: Dict[str, Any]) -> bool:
"""Test application health endpoint"""
try:
health_endpoint = health_config.get('health_endpoint')
if not health_endpoint:
return True # No endpoint configured, assume healthy
# This would implement actual HTTP health check
# For now, we'll simulate the check
import random
return random.random() > 0.1 # 90% success rate
except Exception as e:
logging.error(f"Application health test failed: {str(e)}")
return False
async def _initiate_component_recovery(self, component: SingleLocationComponent,
health_status: Dict[str, Any]):
"""Initiate recovery for failed component"""
try:
execution_id = f"recovery_{int(datetime.utcnow().timestamp())}_{component.component_id}"
# Create recovery execution record
execution = RecoveryExecution(
execution_id=execution_id,
component_id=component.component_id,
failure_detected_at=datetime.utcnow(),
recovery_initiated_at=None,
recovery_completed_at=None,
status=RecoveryStatus.FAILURE_DETECTED,
recovery_strategy_used=None,
actual_rto_minutes=None,
actual_rpo_minutes=None,
recovery_steps=[],
error_message=None
)
# Store execution record
await self._store_recovery_execution(execution)
# Add to active recoveries
self.active_recoveries[component.component_id] = execution
# Send failure notification
await self._send_failure_notification(component, health_status)
# Start recovery process
asyncio.create_task(self._execute_component_recovery(component, execution))
logging.info(f"Initiated recovery for component: {component.component_id}")
except Exception as e:
logging.error(f"Failed to initiate recovery: {str(e)}")
async def _execute_component_recovery(self, component: SingleLocationComponent,
execution: RecoveryExecution):
"""Execute recovery for component"""
try:
execution.status = RecoveryStatus.RECOVERY_INITIATED
execution.recovery_initiated_at = datetime.utcnow()
await self._store_recovery_execution(execution)
# Try recovery strategies in order of preference
recovery_successful = False
for strategy in component.recovery_strategies:
try:
execution.recovery_strategy_used = strategy
execution.status = RecoveryStatus.RECOVERY_IN_PROGRESS
await self._store_recovery_execution(execution)
logging.info(f"Attempting recovery strategy {strategy.value} for {component.component_id}")
# Execute recovery strategy
if strategy == RecoveryStrategy.REPLACE_INSTANCE:
success = await self._replace_instance_recovery(component, execution)
elif strategy == RecoveryStrategy.RESTORE_FROM_BACKUP:
success = await self._restore_from_backup_recovery(component, execution)
elif strategy == RecoveryStrategy.FAILOVER_TO_STANDBY:
success = await self._failover_to_standby_recovery(component, execution)
elif strategy == RecoveryStrategy.RESTART_SERVICE:
success = await self._restart_service_recovery(component, execution)
else:
success = False
if success:
recovery_successful = True
break
except Exception as strategy_error:
logging.error(f"Recovery strategy {strategy.value} failed: {str(strategy_error)}")
execution.recovery_steps.append({
'strategy': strategy.value,
'status': 'failed',
'error': str(strategy_error),
'timestamp': datetime.utcnow().isoformat()
})
continue
# Complete recovery
execution.recovery_completed_at = datetime.utcnow()
if recovery_successful:
execution.status = RecoveryStatus.RECOVERY_COMPLETED
# Calculate actual RTO
if execution.recovery_initiated_at and execution.recovery_completed_at:
execution.actual_rto_minutes = (execution.recovery_completed_at - execution.recovery_initiated_at).total_seconds() / 60
# Send success notification
await self._send_recovery_success_notification(component, execution)
logging.info(f"Recovery completed successfully for {component.component_id}")
else:
execution.status = RecoveryStatus.RECOVERY_FAILED
execution.error_message = "All recovery strategies failed"
# Send failure notification
await self._send_recovery_failure_notification(component, execution)
logging.error(f"Recovery failed for {component.component_id}")
# Store final execution state
await self._store_recovery_execution(execution)
# Remove from active recoveries
if component.component_id in self.active_recoveries:
del self.active_recoveries[component.component_id]
except Exception as e:
logging.error(f"Recovery execution failed: {str(e)}")
execution.status = RecoveryStatus.RECOVERY_FAILED
execution.error_message = str(e)
execution.recovery_completed_at = datetime.utcnow()
await self._store_recovery_execution(execution)
async def _replace_instance_recovery(self, component: SingleLocationComponent,
execution: RecoveryExecution) -> bool:
"""Replace failed instance"""
try:
if component.component_type != ComponentType.APPLICATION_SERVER:
return False
instance_id = component.health_check_config.get('instance_id')
if not instance_id:
return False
# Get instance details
response = self.ec2.describe_instances(InstanceIds=[instance_id])
if not response['Reservations']:
return False
instance = response['Reservations'][0]['Instances'][0]
# Launch replacement instance
new_instance_response = self.ec2.run_instances(
ImageId=instance['ImageId'],
MinCount=1,
MaxCount=1,
InstanceType=instance['InstanceType'],
KeyName=instance.get('KeyName'),
SecurityGroupIds=[sg['GroupId'] for sg in instance['SecurityGroups']],
SubnetId=instance['SubnetId'],
TagSpecifications=[
{
'ResourceType': 'instance',
'Tags': [
{'Key': 'Name', 'Value': f"{component.component_name}-replacement"},
{'Key': 'ReplacedInstance', 'Value': instance_id}
]
}
]
)
new_instance_id = new_instance_response['Instances'][0]['InstanceId']
# Wait for new instance to be running
waiter = self.ec2.get_waiter('instance_running')
waiter.wait(InstanceIds=[new_instance_id])
# Update component configuration
component.health_check_config['instance_id'] = new_instance_id
await self._store_component(component)
# Terminate old instance
self.ec2.terminate_instances(InstanceIds=[instance_id])
execution.recovery_steps.append({
'strategy': 'replace_instance',
'status': 'success',
'old_instance_id': instance_id,
'new_instance_id': new_instance_id,
'timestamp': datetime.utcnow().isoformat()
})
return True
except Exception as e:
logging.error(f"Instance replacement failed: {str(e)}")
return False
async def _restore_from_backup_recovery(self, component: SingleLocationComponent,
execution: RecoveryExecution) -> bool:
"""Restore component from backup"""
try:
backup_config = component.backup_config
if component.component_type == ComponentType.DATABASE:
return await self._restore_database_from_backup(component, execution, backup_config)
else:
return False
except Exception as e:
logging.error(f"Backup restoration failed: {str(e)}")
return False
async def _restore_database_from_backup(self, component: SingleLocationComponent,
execution: RecoveryExecution,
backup_config: Dict[str, Any]) -> bool:
"""Restore database from backup"""
try:
db_identifier = backup_config.get('db_identifier')
if not db_identifier:
return False
# Get latest automated backup
response = self.rds.describe_db_instances(DBInstanceIdentifier=db_identifier)
if not response['DBInstances']:
return False
db_instance = response['DBInstances'][0]
# Restore from point-in-time
restore_time = datetime.utcnow() - timedelta(minutes=component.rpo_minutes)
restored_db_identifier = f"{db_identifier}-restored-{int(datetime.utcnow().timestamp())}"
self.rds.restore_db_instance_to_point_in_time(
SourceDBInstanceIdentifier=db_identifier,
TargetDBInstanceIdentifier=restored_db_identifier,
RestoreTime=restore_time,
DBSubnetGroupName=db_instance.get('DBSubnetGroup', {}).get('DBSubnetGroupName'),
VpcSecurityGroupIds=[sg['VpcSecurityGroupId'] for sg in db_instance.get('VpcSecurityGroups', [])]
)
# Wait for restore to complete
waiter = self.rds.get_waiter('db_instance_available')
waiter.wait(DBInstanceIdentifier=restored_db_identifier)
# Update component configuration
component.health_check_config['db_identifier'] = restored_db_identifier
await self._store_component(component)
execution.recovery_steps.append({
'strategy': 'restore_from_backup',
'status': 'success',
'original_db': db_identifier,
'restored_db': restored_db_identifier,
'restore_time': restore_time.isoformat(),
'timestamp': datetime.utcnow().isoformat()
})
return True
except Exception as e:
logging.error(f"Database restoration failed: {str(e)}")
return False
async def _send_failure_notification(self, component: SingleLocationComponent,
health_status: Dict[str, Any]):
"""Send component failure notification"""
try:
if not self.notification_topic_arn:
return
message = {
'event_type': 'component_failure',
'component_id': component.component_id,
'component_name': component.component_name,
'component_type': component.component_type.value,
'location': component.location,
'criticality': component.criticality,
'health_status': health_status,
'timestamp': datetime.utcnow().isoformat()
}
self.sns.publish(
TopicArn=self.notification_topic_arn,
Message=json.dumps(message, indent=2),
Subject=f"Component Failure: {component.component_name}"
)
except Exception as e:
logging.error(f"Failed to send failure notification: {str(e)}")
async def _send_recovery_success_notification(self, component: SingleLocationComponent,
execution: RecoveryExecution):
"""Send recovery success notification"""
try:
if not self.notification_topic_arn:
return
message = {
'event_type': 'recovery_success',
'component_id': component.component_id,
'component_name': component.component_name,
'execution_id': execution.execution_id,
'recovery_strategy': execution.recovery_strategy_used.value if execution.recovery_strategy_used else None,
'actual_rto_minutes': execution.actual_rto_minutes,
'expected_rto_minutes': component.rto_minutes,
'timestamp': datetime.utcnow().isoformat()
}
self.sns.publish(
TopicArn=self.notification_topic_arn,
Message=json.dumps(message, indent=2),
Subject=f"Recovery Success: {component.component_name}"
)
except Exception as e:
logging.error(f"Failed to send recovery success notification: {str(e)}")
async def _store_component(self, component: SingleLocationComponent):
"""Store component in DynamoDB"""
try:
component_dict = asdict(component)
component_dict['created_at'] = component.created_at.isoformat()
self.components_table.put_item(Item=component_dict)
except Exception as e:
logging.error(f"Failed to store component: {str(e)}")
async def _store_recovery_execution(self, execution: RecoveryExecution):
"""Store recovery execution in DynamoDB"""
try:
execution_dict = asdict(execution)
execution_dict['failure_detected_at'] = execution.failure_detected_at.isoformat()
if execution.recovery_initiated_at:
execution_dict['recovery_initiated_at'] = execution.recovery_initiated_at.isoformat()
if execution.recovery_completed_at:
execution_dict['recovery_completed_at'] = execution.recovery_completed_at.isoformat()
self.executions_table.put_item(Item=execution_dict)
except Exception as e:
logging.error(f"Failed to store recovery execution: {str(e)}")
# Usage example
async def main():
config = {
'components_table': 'single-location-components',
'executions_table': 'recovery-executions',
'notification_topic_arn': 'arn:aws:sns:us-east-1:123456789012:recovery-notifications',
'recovery_timeout_minutes': 60
}
# Initialize recovery manager
recovery_manager = SingleLocationRecoveryManager(config)
# Register single-location components
database_component = {
'component_name': 'Primary Database',
'component_type': 'database',
'location': 'us-east-1a',
'constraint_reason': 'Legacy application requires single database instance',
'criticality': 'critical',
'recovery_strategies': ['restore_from_backup', 'replace_instance'],
'rto_minutes': 30,
'rpo_minutes': 15,
'health_check_config': {
'db_identifier': 'production-db',
'interval_seconds': 60
},
'backup_config': {
'db_identifier': 'production-db',
'backup_retention_days': 7
}
}
component_id = await recovery_manager.register_single_location_component(database_component)
print(f"Registered component: {component_id}")
# The system will now automatically monitor and recover the component
if __name__ == "__main__":
import asyncio
asyncio.run(main())AWS Services Used
- Amazon EC2: Instance replacement and automated recovery for compute resources
- Amazon RDS: Database backup, restore, and point-in-time recovery automation
- Amazon ElastiCache: Cache cluster recovery and failover automation
- Elastic Load Balancing: Load balancer health checks and target management
- AWS Auto Scaling: Automated instance replacement and capacity management
- AWS Backup: Centralized backup and restore automation across services
- AWS Lambda: Custom recovery logic and automation functions
- Amazon CloudWatch: Health monitoring, metrics, and automated alerting
- Amazon SNS: Recovery notifications and incident communication
- Amazon DynamoDB: Recovery execution tracking and component registry
- AWS Systems Manager: Automated patching, configuration, and remediation
- Amazon EventBridge: Event-driven recovery triggers and automation
- AWS Step Functions: Complex recovery workflow orchestration
- Amazon Route 53: Health checks and DNS failover for single-location services
- AWS CloudFormation: Infrastructure recovery and automated provisioning
Benefits
- Automated Recovery: Eliminates manual intervention for component failures
- Reduced Downtime: Fast automated recovery minimizes service interruptions
- Consistent Procedures: Standardized recovery processes ensure reliable outcomes
- 24/7 Monitoring: Continuous health monitoring provides immediate failure detection
- RTO/RPO Compliance: Automated recovery meets defined recovery objectives
- Cost Efficiency: Automated processes reduce operational overhead and manual effort
- Scalable Operations: Recovery automation scales with infrastructure growth
- Audit Trail: Complete logging of recovery actions for compliance and analysis
- Continuous Improvement: Recovery metrics enable optimization of procedures
- Risk Mitigation: Reduces impact of single points of failure through automation
Related Resources
- AWS Well-Architected Reliability Pillar
- Automate Recovery for Single Location Components
- Amazon EC2 User Guide
- Amazon RDS User Guide
- AWS Backup User Guide
- AWS Auto Scaling User Guide
- Amazon CloudWatch User Guide
- AWS Lambda Developer Guide
- AWS Step Functions Developer Guide
- AWS Systems Manager User Guide
- Automated Recovery Best Practices
- Disaster Recovery Strategies