REL11-BP03 - Automate healing on all layers
One-Click Remediation
Deploy CloudFormation stacks to implement this best practice with a single click.
Stacks deploy to your AWS account. Review parameters before creating. Standard AWS charges apply.
REL11-BP03: Automate healing on all layers
Automated healing mechanisms operate at every layer of your architecture to detect and recover from failures without human intervention. This includes infrastructure-level healing (instance replacement), platform-level healing (service restart), and application-level healing (circuit breakers, retry logic).
Implementation Steps
1. Infrastructure Layer Healing
Implement automated instance replacement, scaling, and resource provisioning.
2. Platform Layer Healing
Configure service-level healing including container restarts and service recovery.
3. Application Layer Healing
Build application-level resilience with circuit breakers, retries, and graceful degradation.
4. Data Layer Healing
Implement automated backup restoration and data consistency checks.
5. Network Layer Healing
Configure automatic network path recovery and traffic rerouting.
Detailed Implementation
{% raw %}
View code
import boto3
import json
import time
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Callable
from dataclasses import dataclass, asdict
from enum import Enum
import threading
from concurrent.futures import ThreadPoolExecutor
import requests
import subprocess
class HealingLayer(Enum):
INFRASTRUCTURE = "infrastructure"
PLATFORM = "platform"
APPLICATION = "application"
DATA = "data"
NETWORK = "network"
class HealingAction(Enum):
RESTART = "restart"
REPLACE = "replace"
SCALE = "scale"
ROLLBACK = "rollback"
REPAIR = "repair"
FAILOVER = "failover"
class HealingStatus(Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
SKIPPED = "skipped"
@dataclass
class HealingRule:
name: str
layer: HealingLayer
trigger_condition: str
action: HealingAction
parameters: Dict[str, Any]
cooldown_period: int
max_attempts: int
enabled: bool
@dataclass
class HealingEvent:
event_id: str
rule_name: str
layer: HealingLayer
action: HealingAction
resource_id: str
status: HealingStatus
start_time: datetime
end_time: Optional[datetime]
attempt_count: int
error_message: Optional[str]
class AutomatedHealingSystem:
def __init__(self, region: str = 'us-east-1'):
self.region = region
self.ec2 = boto3.client('ec2', region_name=region)
self.asg = boto3.client('autoscaling', region_name=region)
self.ecs = boto3.client('ecs', region_name=region)
self.lambda_client = boto3.client('lambda', region_name=region)
self.rds = boto3.client('rds', region_name=region)
self.cloudwatch = boto3.client('cloudwatch', region_name=region)
self.sns = boto3.client('sns', region_name=region)
self.ssm = boto3.client('ssm', region_name=region)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
# Healing state management
self.healing_rules: Dict[str, HealingRule] = {}
self.active_healing_events: Dict[str, HealingEvent] = {}
self.healing_history: List[HealingEvent] = []
self.healing_lock = threading.Lock()
def register_healing_rule(self, rule: HealingRule) -> bool:
"""Register a new healing rule"""
try:
self.healing_rules[rule.name] = rule
self.logger.info(f"Registered healing rule: {rule.name}")
return True
except Exception as e:
self.logger.error(f"Failed to register healing rule {rule.name}: {str(e)}")
return False
def setup_infrastructure_healing(self) -> List[HealingRule]:
"""Set up infrastructure layer healing rules"""
rules = []
try:
# EC2 Instance Healing
ec2_healing_rule = HealingRule(
name="ec2-instance-healing",
layer=HealingLayer.INFRASTRUCTURE,
trigger_condition="instance_status_check_failed",
action=HealingAction.REPLACE,
parameters={
'health_check_grace_period': 300,
'replacement_strategy': 'immediate',
'preserve_eip': True
},
cooldown_period=600,
max_attempts=3,
enabled=True
)
rules.append(ec2_healing_rule)
self.register_healing_rule(ec2_healing_rule)
# Auto Scaling Group Healing
asg_healing_rule = HealingRule(
name="asg-capacity-healing",
layer=HealingLayer.INFRASTRUCTURE,
trigger_condition="unhealthy_instance_threshold",
action=HealingAction.SCALE,
parameters={
'scale_out_amount': 2,
'health_check_type': 'ELB',
'terminate_unhealthy': True
},
cooldown_period=300,
max_attempts=5,
enabled=True
)
rules.append(asg_healing_rule)
self.register_healing_rule(asg_healing_rule)
# EBS Volume Healing
ebs_healing_rule = HealingRule(
name="ebs-volume-healing",
layer=HealingLayer.INFRASTRUCTURE,
trigger_condition="volume_io_performance_degraded",
action=HealingAction.REPAIR,
parameters={
'create_snapshot': True,
'force_detach': False,
'replacement_volume_type': 'gp3'
},
cooldown_period=1800,
max_attempts=2,
enabled=True
)
rules.append(ebs_healing_rule)
self.register_healing_rule(ebs_healing_rule)
self.logger.info(f"Set up {len(rules)} infrastructure healing rules")
return rules
except Exception as e:
self.logger.error(f"Infrastructure healing setup failed: {str(e)}")
return rules
def setup_platform_healing(self) -> List[HealingRule]:
"""Set up platform layer healing rules"""
rules = []
try:
# ECS Service Healing
ecs_service_rule = HealingRule(
name="ecs-service-healing",
layer=HealingLayer.PLATFORM,
trigger_condition="service_unhealthy_tasks",
action=HealingAction.RESTART,
parameters={
'force_new_deployment': True,
'desired_count_adjustment': 1,
'stop_unhealthy_tasks': True
},
cooldown_period=300,
max_attempts=3,
enabled=True
)
rules.append(ecs_service_rule)
self.register_healing_rule(ecs_service_rule)
# Lambda Function Healing
lambda_healing_rule = HealingRule(
name="lambda-function-healing",
layer=HealingLayer.PLATFORM,
trigger_condition="high_error_rate",
action=HealingAction.ROLLBACK,
parameters={
'rollback_to_previous_version': True,
'update_alias': True,
'notification_required': True
},
cooldown_period=600,
max_attempts=2,
enabled=True
)
rules.append(lambda_healing_rule)
self.register_healing_rule(lambda_healing_rule)
# RDS Instance Healing
rds_healing_rule = HealingRule(
name="rds-instance-healing",
layer=HealingLayer.PLATFORM,
trigger_condition="database_connection_failures",
action=HealingAction.RESTART,
parameters={
'force_failover': False,
'apply_pending_maintenance': False,
'backup_before_restart': True
},
cooldown_period=1800,
max_attempts=2,
enabled=True
)
rules.append(rds_healing_rule)
self.register_healing_rule(rds_healing_rule)
self.logger.info(f"Set up {len(rules)} platform healing rules")
return rules
except Exception as e:
self.logger.error(f"Platform healing setup failed: {str(e)}")
return rules
def setup_application_healing(self) -> List[HealingRule]:
"""Set up application layer healing rules"""
rules = []
try:
# Circuit Breaker Healing
circuit_breaker_rule = HealingRule(
name="circuit-breaker-healing",
layer=HealingLayer.APPLICATION,
trigger_condition="circuit_breaker_open",
action=HealingAction.REPAIR,
parameters={
'reset_circuit_breaker': True,
'gradual_recovery': True,
'test_requests_percentage': 10
},
cooldown_period=300,
max_attempts=5,
enabled=True
)
rules.append(circuit_breaker_rule)
self.register_healing_rule(circuit_breaker_rule)
# Memory Leak Healing
memory_healing_rule = HealingRule(
name="memory-leak-healing",
layer=HealingLayer.APPLICATION,
trigger_condition="high_memory_usage",
action=HealingAction.RESTART,
parameters={
'memory_threshold': 85,
'graceful_shutdown': True,
'heap_dump_before_restart': True
},
cooldown_period=600,
max_attempts=3,
enabled=True
)
rules.append(memory_healing_rule)
self.register_healing_rule(memory_healing_rule)
# Connection Pool Healing
connection_pool_rule = HealingRule(
name="connection-pool-healing",
layer=HealingLayer.APPLICATION,
trigger_condition="connection_pool_exhausted",
action=HealingAction.REPAIR,
parameters={
'reset_connections': True,
'increase_pool_size': True,
'connection_timeout_adjustment': 30
},
cooldown_period=180,
max_attempts=4,
enabled=True
)
rules.append(connection_pool_rule)
self.register_healing_rule(connection_pool_rule)
self.logger.info(f"Set up {len(rules)} application healing rules")
return rules
except Exception as e:
self.logger.error(f"Application healing setup failed: {str(e)}")
return rules
def setup_data_healing(self) -> List[HealingRule]:
"""Set up data layer healing rules"""
rules = []
try:
# Database Corruption Healing
db_corruption_rule = HealingRule(
name="database-corruption-healing",
layer=HealingLayer.DATA,
trigger_condition="data_corruption_detected",
action=HealingAction.ROLLBACK,
parameters={
'restore_from_backup': True,
'point_in_time_recovery': True,
'verify_data_integrity': True
},
cooldown_period=3600,
max_attempts=2,
enabled=True
)
rules.append(db_corruption_rule)
self.register_healing_rule(db_corruption_rule)
# Cache Invalidation Healing
cache_healing_rule = HealingRule(
name="cache-invalidation-healing",
layer=HealingLayer.DATA,
trigger_condition="cache_hit_rate_low",
action=HealingAction.REPAIR,
parameters={
'warm_cache': True,
'invalidate_stale_entries': True,
'adjust_ttl': True
},
cooldown_period=300,
max_attempts=3,
enabled=True
)
rules.append(cache_healing_rule)
self.register_healing_rule(cache_healing_rule)
# Backup Verification Healing
backup_healing_rule = HealingRule(
name="backup-verification-healing",
layer=HealingLayer.DATA,
trigger_condition="backup_verification_failed",
action=HealingAction.REPAIR,
parameters={
'create_new_backup': True,
'test_restore_process': True,
'update_backup_schedule': True
},
cooldown_period=7200,
max_attempts=2,
enabled=True
)
rules.append(backup_healing_rule)
self.register_healing_rule(backup_healing_rule)
self.logger.info(f"Set up {len(rules)} data healing rules")
return rules
except Exception as e:
self.logger.error(f"Data healing setup failed: {str(e)}")
return rules
def execute_healing_action(self, rule_name: str, resource_id: str, trigger_data: Dict[str, Any]) -> HealingEvent:
"""Execute a healing action based on a rule"""
event_id = f"healing-{int(time.time())}-{rule_name}"
try:
rule = self.healing_rules.get(rule_name)
if not rule or not rule.enabled:
raise ValueError(f"Healing rule {rule_name} not found or disabled")
# Check cooldown period
if self._is_in_cooldown(rule_name, resource_id):
self.logger.info(f"Healing action {rule_name} for {resource_id} skipped due to cooldown")
return self._create_skipped_event(event_id, rule, resource_id)
# Create healing event
with self.healing_lock:
healing_event = HealingEvent(
event_id=event_id,
rule_name=rule_name,
layer=rule.layer,
action=rule.action,
resource_id=resource_id,
status=HealingStatus.PENDING,
start_time=datetime.utcnow(),
end_time=None,
attempt_count=1,
error_message=None
)
self.active_healing_events[event_id] = healing_event
# Update status to in progress
healing_event.status = HealingStatus.IN_PROGRESS
# Execute healing action based on layer and action
success = self._execute_layer_healing(rule, resource_id, trigger_data)
# Update final status
healing_event.status = HealingStatus.COMPLETED if success else HealingStatus.FAILED
healing_event.end_time = datetime.utcnow()
# Move to history
with self.healing_lock:
del self.active_healing_events[event_id]
self.healing_history.append(healing_event)
# Send notification
self._send_healing_notification(healing_event)
self.logger.info(f"Healing action {event_id} completed with status: {healing_event.status}")
return healing_event
except Exception as e:
healing_event.status = HealingStatus.FAILED
healing_event.error_message = str(e)
healing_event.end_time = datetime.utcnow()
self.logger.error(f"Healing action {event_id} failed: {str(e)}")
return healing_event
def _execute_layer_healing(self, rule: HealingRule, resource_id: str, trigger_data: Dict[str, Any]) -> bool:
"""Execute healing action based on layer"""
try:
if rule.layer == HealingLayer.INFRASTRUCTURE:
return self._execute_infrastructure_healing(rule, resource_id, trigger_data)
elif rule.layer == HealingLayer.PLATFORM:
return self._execute_platform_healing(rule, resource_id, trigger_data)
elif rule.layer == HealingLayer.APPLICATION:
return self._execute_application_healing(rule, resource_id, trigger_data)
elif rule.layer == HealingLayer.DATA:
return self._execute_data_healing(rule, resource_id, trigger_data)
elif rule.layer == HealingLayer.NETWORK:
return self._execute_network_healing(rule, resource_id, trigger_data)
else:
return False
except Exception as e:
self.logger.error(f"Layer healing execution failed: {str(e)}")
return False
def _execute_infrastructure_healing(self, rule: HealingRule, resource_id: str, trigger_data: Dict[str, Any]) -> bool:
"""Execute infrastructure layer healing"""
try:
if rule.action == HealingAction.REPLACE and resource_id.startswith('i-'):
# Replace EC2 instance
return self._replace_ec2_instance(resource_id, rule.parameters)
elif rule.action == HealingAction.SCALE and resource_id.startswith('asg-'):
# Scale Auto Scaling Group
return self._scale_auto_scaling_group(resource_id, rule.parameters)
elif rule.action == HealingAction.REPAIR and resource_id.startswith('vol-'):
# Repair EBS volume
return self._repair_ebs_volume(resource_id, rule.parameters)
else:
return False
except Exception as e:
self.logger.error(f"Infrastructure healing failed: {str(e)}")
return False
def _execute_platform_healing(self, rule: HealingRule, resource_id: str, trigger_data: Dict[str, Any]) -> bool:
"""Execute platform layer healing"""
try:
if rule.action == HealingAction.RESTART and 'ecs' in resource_id:
# Restart ECS service
return self._restart_ecs_service(resource_id, rule.parameters)
elif rule.action == HealingAction.ROLLBACK and resource_id.startswith('lambda'):
# Rollback Lambda function
return self._rollback_lambda_function(resource_id, rule.parameters)
elif rule.action == HealingAction.RESTART and resource_id.startswith('db-'):
# Restart RDS instance
return self._restart_rds_instance(resource_id, rule.parameters)
else:
return False
except Exception as e:
self.logger.error(f"Platform healing failed: {str(e)}")
return False
def _execute_application_healing(self, rule: HealingRule, resource_id: str, trigger_data: Dict[str, Any]) -> bool:
"""Execute application layer healing"""
try:
if rule.action == HealingAction.REPAIR and 'circuit-breaker' in rule.name:
# Reset circuit breaker
return self._reset_circuit_breaker(resource_id, rule.parameters)
elif rule.action == HealingAction.RESTART and 'memory' in rule.name:
# Restart application due to memory issues
return self._restart_application_for_memory(resource_id, rule.parameters)
elif rule.action == HealingAction.REPAIR and 'connection-pool' in rule.name:
# Repair connection pool
return self._repair_connection_pool(resource_id, rule.parameters)
else:
return False
except Exception as e:
self.logger.error(f"Application healing failed: {str(e)}")
return False
def _replace_ec2_instance(self, instance_id: str, parameters: Dict[str, Any]) -> bool:
"""Replace unhealthy EC2 instance"""
try:
# Get instance details
response = self.ec2.describe_instances(InstanceIds=[instance_id])
instance = response['Reservations'][0]['Instances'][0]
# Terminate the unhealthy instance
self.ec2.terminate_instances(InstanceIds=[instance_id])
# If part of ASG, let ASG handle replacement
# Otherwise, launch new instance with same configuration
if not self._is_instance_in_asg(instance_id):
launch_template = {
'ImageId': instance['ImageId'],
'InstanceType': instance['InstanceType'],
'KeyName': instance.get('KeyName'),
'SecurityGroupIds': [sg['GroupId'] for sg in instance['SecurityGroups']],
'SubnetId': instance['SubnetId']
}
new_instance = self.ec2.run_instances(
MinCount=1,
MaxCount=1,
**launch_template
)
self.logger.info(f"Launched replacement instance: {new_instance['Instances'][0]['InstanceId']}")
return True
except Exception as e:
self.logger.error(f"EC2 instance replacement failed: {str(e)}")
return False
def _restart_ecs_service(self, service_arn: str, parameters: Dict[str, Any]) -> bool:
"""Restart ECS service"""
try:
cluster_name = service_arn.split('/')[1]
service_name = service_arn.split('/')[-1]
# Force new deployment
self.ecs.update_service(
cluster=cluster_name,
service=service_name,
forceNewDeployment=parameters.get('force_new_deployment', True)
)
# Stop unhealthy tasks if requested
if parameters.get('stop_unhealthy_tasks', False):
tasks = self.ecs.list_tasks(
cluster=cluster_name,
serviceName=service_name
)
for task_arn in tasks['taskArns']:
self.ecs.stop_task(
cluster=cluster_name,
task=task_arn,
reason='Automated healing - unhealthy task'
)
return True
except Exception as e:
self.logger.error(f"ECS service restart failed: {str(e)}")
return False
def _is_in_cooldown(self, rule_name: str, resource_id: str) -> bool:
"""Check if healing action is in cooldown period"""
try:
rule = self.healing_rules.get(rule_name)
if not rule:
return False
# Check recent healing events for this rule and resource
cutoff_time = datetime.utcnow() - timedelta(seconds=rule.cooldown_period)
for event in self.healing_history:
if (event.rule_name == rule_name and
event.resource_id == resource_id and
event.start_time > cutoff_time):
return True
return False
except Exception as e:
self.logger.error(f"Cooldown check failed: {str(e)}")
return False
def _send_healing_notification(self, healing_event: HealingEvent) -> None:
"""Send notification about healing event"""
try:
message = {
'event_id': healing_event.event_id,
'rule_name': healing_event.rule_name,
'layer': healing_event.layer.value,
'action': healing_event.action.value,
'resource_id': healing_event.resource_id,
'status': healing_event.status.value,
'start_time': healing_event.start_time.isoformat(),
'end_time': healing_event.end_time.isoformat() if healing_event.end_time else None,
'attempt_count': healing_event.attempt_count,
'error_message': healing_event.error_message
}
self.sns.publish(
TopicArn=f"arn:aws:sns:{self.region}:123456789012:healing-notifications",
Message=json.dumps(message, indent=2),
Subject=f"Healing Event: {healing_event.status.value.title()}"
)
except Exception as e:
self.logger.error(f"Healing notification failed: {str(e)}")
def start_healing_monitor(self, monitoring_config: Dict[str, Any]) -> None:
"""Start continuous healing monitoring"""
try:
self.logger.info("Starting automated healing monitor...")
while True:
# Check for healing triggers
for rule_name, rule in self.healing_rules.items():
if not rule.enabled:
continue
# Check trigger conditions
triggered_resources = self._check_healing_triggers(rule, monitoring_config)
for resource_id, trigger_data in triggered_resources.items():
# Execute healing action
self.execute_healing_action(rule_name, resource_id, trigger_data)
# Wait before next check
time.sleep(monitoring_config.get('check_interval', 60))
except KeyboardInterrupt:
self.logger.info("Healing monitor stopped")
except Exception as e:
self.logger.error(f"Healing monitor error: {str(e)}")
def get_healing_statistics(self) -> Dict[str, Any]:
"""Get healing system statistics"""
try:
total_events = len(self.healing_history)
successful_events = len([e for e in self.healing_history if e.status == HealingStatus.COMPLETED])
failed_events = len([e for e in self.healing_history if e.status == HealingStatus.FAILED])
layer_stats = {}
for layer in HealingLayer:
layer_events = [e for e in self.healing_history if e.layer == layer]
layer_stats[layer.value] = {
'total': len(layer_events),
'successful': len([e for e in layer_events if e.status == HealingStatus.COMPLETED]),
'failed': len([e for e in layer_events if e.status == HealingStatus.FAILED])
}
return {
'total_healing_events': total_events,
'successful_healing_events': successful_events,
'failed_healing_events': failed_events,
'success_rate': (successful_events / total_events * 100) if total_events > 0 else 0,
'active_healing_events': len(self.active_healing_events),
'registered_rules': len(self.healing_rules),
'enabled_rules': len([r for r in self.healing_rules.values() if r.enabled]),
'layer_statistics': layer_stats
}
except Exception as e:
self.logger.error(f"Statistics calculation failed: {str(e)}")
return {}
# Example usage
def main():
# Initialize healing system
healing_system = AutomatedHealingSystem(region='us-east-1')
# Set up healing rules for all layers
print("Setting up automated healing system...")
infra_rules = healing_system.setup_infrastructure_healing()
platform_rules = healing_system.setup_platform_healing()
app_rules = healing_system.setup_application_healing()
data_rules = healing_system.setup_data_healing()
print("Healing system setup complete:")
print(f"- Infrastructure rules: {len(infra_rules)}")
print(f"- Platform rules: {len(platform_rules)}")
print(f"- Application rules: {len(app_rules)}")
print(f"- Data rules: {len(data_rules)}")
# Example healing action execution
healing_event = healing_system.execute_healing_action(
rule_name="ec2-instance-healing",
resource_id="i-1234567890abcdef0",
trigger_data={'status_check': 'failed', 'timestamp': datetime.utcnow().isoformat()}
)
print(f"Healing action executed: {healing_event.event_id} - Status: {healing_event.status.value}")
# Get system statistics
stats = healing_system.get_healing_statistics()
print(f"Healing system statistics: {json.dumps(stats, indent=2)}")
if __name__ == "__main__":
main(){% endraw %}
AWS Services
Primary Services
- Amazon EC2 Auto Scaling: Automatic instance replacement and scaling
- Amazon ECS: Container-level healing and service management
- AWS Lambda: Serverless function healing and rollback
- Amazon RDS: Database healing and automated backups
Supporting Services
- AWS Systems Manager: Automated patching and maintenance
- Amazon CloudWatch: Monitoring and alarm-based healing triggers
- AWS Auto Scaling: Unified scaling across multiple services
- Amazon SNS: Healing event notifications
Benefits
- Self-Healing Infrastructure: Automatic recovery without human intervention
- Multi-Layer Protection: Healing at every architectural layer
- Reduced MTTR: Faster recovery through automated actions
- Proactive Maintenance: Prevention of issues before they impact users
- Operational Efficiency: Reduced manual intervention and operational overhead