Skip to content
REL11

REL11-BP02 - Fail over to healthy resources

REL11-BP02: Fail over to healthy resources

Automatic failover mechanisms ensure that when failures are detected, traffic and workloads are seamlessly redirected to healthy resources. This includes both planned failover for maintenance and unplanned failover for unexpected failures, maintaining service availability while failed components are recovered.

Implementation Steps

1. Health Check Configuration

Implement comprehensive health checks that accurately determine resource health status.

2. Automatic Failover Logic

Design failover mechanisms that can make decisions without human intervention.

3. Traffic Routing

Configure intelligent traffic routing to direct requests to healthy resources.

4. State Management

Ensure application state is properly managed during failover scenarios.

5. Failback Procedures

Implement automated failback when failed resources are restored to healthy state.

Detailed Implementation

{% raw %}

View code
import boto3
import json
import time
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Tuple
from dataclasses import dataclass, asdict
from enum import Enum
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed

class FailoverType(Enum):
    PLANNED = "planned"
    UNPLANNED = "unplanned"
    AUTOMATIC = "automatic"
    MANUAL = "manual"

class ResourceHealth(Enum):
    HEALTHY = "healthy"
    UNHEALTHY = "unhealthy"
    DEGRADED = "degraded"
    UNKNOWN = "unknown"

class FailoverStatus(Enum):
    PENDING = "pending"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    FAILED = "failed"
    ROLLED_BACK = "rolled_back"

@dataclass
class HealthCheck:
    name: str
    endpoint: str
    method: str
    expected_status: int
    timeout: int
    interval: int
    healthy_threshold: int
    unhealthy_threshold: int
    path: str = "/"
    port: int = 80

@dataclass
class FailoverTarget:
    resource_id: str
    resource_type: str
    region: str
    availability_zone: str
    capacity: int
    priority: int
    health_status: ResourceHealth
    last_health_check: datetime

@dataclass
class FailoverEvent:
    event_id: str
    source_resource: str
    target_resource: str
    failover_type: FailoverType
    status: FailoverStatus
    start_time: datetime
    end_time: Optional[datetime]
    reason: str
    rollback_plan: Dict[str, Any]

class AutomaticFailoverSystem:
    def __init__(self, region: str = 'us-east-1'):
        self.region = region
        self.ec2 = boto3.client('ec2', region_name=region)
        self.elb = boto3.client('elbv2', region_name=region)
        self.route53 = boto3.client('route53')
        self.rds = boto3.client('rds', region_name=region)
        self.asg = boto3.client('autoscaling', region_name=region)
        self.cloudwatch = boto3.client('cloudwatch', region_name=region)
        self.sns = boto3.client('sns', region_name=region)
        
        # Configure logging
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        self.logger = logging.getLogger(__name__)
        
        # Failover state tracking
        self.active_failovers: Dict[str, FailoverEvent] = {}
        self.health_check_results: Dict[str, ResourceHealth] = {}
        self.failover_lock = threading.Lock()

    def setup_load_balancer_failover(self, lb_config: Dict[str, Any]) -> Dict[str, Any]:
        """Set up automatic failover for load balancers"""
        try:
            lb_arn = lb_config['load_balancer_arn']
            target_groups = lb_config['target_groups']
            
            failover_config = {
                'load_balancer_arn': lb_arn,
                'primary_targets': [],
                'secondary_targets': [],
                'health_checks': []
            }
            
            for tg_config in target_groups:
                tg_arn = tg_config['target_group_arn']
                
                # Configure health check
                health_check = HealthCheck(
                    name=f"tg-{tg_arn.split('/')[-1]}",
                    endpoint=tg_config.get('health_check_path', '/health'),
                    method='GET',
                    expected_status=200,
                    timeout=tg_config.get('health_check_timeout', 5),
                    interval=tg_config.get('health_check_interval', 30),
                    healthy_threshold=tg_config.get('healthy_threshold', 2),
                    unhealthy_threshold=tg_config.get('unhealthy_threshold', 2),
                    port=tg_config.get('health_check_port', 80)
                )
                
                # Update target group health check settings
                self.elb.modify_target_group(
                    TargetGroupArn=tg_arn,
                    HealthCheckProtocol='HTTP',
                    HealthCheckPath=health_check.path,
                    HealthCheckIntervalSeconds=health_check.interval,
                    HealthCheckTimeoutSeconds=health_check.timeout,
                    HealthyThresholdCount=health_check.healthy_threshold,
                    UnhealthyThresholdCount=health_check.unhealthy_threshold,
                    HealthCheckPort=str(health_check.port)
                )
                
                failover_config['health_checks'].append(asdict(health_check))
                
                # Categorize targets by priority
                if tg_config.get('priority', 1) == 1:
                    failover_config['primary_targets'].append(tg_arn)
                else:
                    failover_config['secondary_targets'].append(tg_arn)
            
            # Set up CloudWatch alarms for automatic failover
            self._setup_failover_alarms(lb_arn, failover_config)
            
            self.logger.info(f"Load balancer failover configured for {lb_arn}")
            return failover_config
            
        except Exception as e:
            self.logger.error(f"Load balancer failover setup failed: {str(e)}")
            return {}

    def setup_dns_failover(self, dns_config: Dict[str, Any]) -> Dict[str, Any]:
        """Set up DNS-based failover with Route 53"""
        try:
            hosted_zone_id = dns_config['hosted_zone_id']
            record_name = dns_config['record_name']
            primary_endpoint = dns_config['primary_endpoint']
            secondary_endpoint = dns_config['secondary_endpoint']
            
            # Create health checks
            primary_health_check = self.route53.create_health_check(
                Type='HTTPS',
                ResourcePath=dns_config.get('health_check_path', '/health'),
                FullyQualifiedDomainName=primary_endpoint,
                Port=dns_config.get('port', 443),
                RequestInterval=30,
                FailureThreshold=3
            )
            
            secondary_health_check = self.route53.create_health_check(
                Type='HTTPS',
                ResourcePath=dns_config.get('health_check_path', '/health'),
                FullyQualifiedDomainName=secondary_endpoint,
                Port=dns_config.get('port', 443),
                RequestInterval=30,
                FailureThreshold=3
            )
            
            # Create primary record with failover routing
            primary_record = self.route53.change_resource_record_sets(
                HostedZoneId=hosted_zone_id,
                ChangeBatch={
                    'Changes': [{
                        'Action': 'UPSERT',
                        'ResourceRecordSet': {
                            'Name': record_name,
                            'Type': 'A',
                            'SetIdentifier': 'primary',
                            'Failover': 'PRIMARY',
                            'TTL': 60,
                            'ResourceRecords': [{'Value': primary_endpoint}],
                            'HealthCheckId': primary_health_check['HealthCheck']['Id']
                        }
                    }]
                }
            )
            
            # Create secondary record with failover routing
            secondary_record = self.route53.change_resource_record_sets(
                HostedZoneId=hosted_zone_id,
                ChangeBatch={
                    'Changes': [{
                        'Action': 'UPSERT',
                        'ResourceRecordSet': {
                            'Name': record_name,
                            'Type': 'A',
                            'SetIdentifier': 'secondary',
                            'Failover': 'SECONDARY',
                            'TTL': 60,
                            'ResourceRecords': [{'Value': secondary_endpoint}]
                        }
                    }]
                }
            )
            
            failover_config = {
                'hosted_zone_id': hosted_zone_id,
                'record_name': record_name,
                'primary_endpoint': primary_endpoint,
                'secondary_endpoint': secondary_endpoint,
                'primary_health_check_id': primary_health_check['HealthCheck']['Id'],
                'secondary_health_check_id': secondary_health_check['HealthCheck']['Id']
            }
            
            self.logger.info(f"DNS failover configured for {record_name}")
            return failover_config
            
        except Exception as e:
            self.logger.error(f"DNS failover setup failed: {str(e)}")
            return {}

    def setup_database_failover(self, db_config: Dict[str, Any]) -> Dict[str, Any]:
        """Set up database failover with RDS Multi-AZ"""
        try:
            db_instance_id = db_config['db_instance_identifier']
            
            # Enable Multi-AZ deployment
            self.rds.modify_db_instance(
                DBInstanceIdentifier=db_instance_id,
                MultiAZ=True,
                ApplyImmediately=db_config.get('apply_immediately', False)
            )
            
            # Create read replicas for additional failover options
            read_replicas = []
            for replica_config in db_config.get('read_replicas', []):
                replica_response = self.rds.create_db_instance_read_replica(
                    DBInstanceIdentifier=replica_config['identifier'],
                    SourceDBInstanceIdentifier=db_instance_id,
                    DBInstanceClass=replica_config.get('instance_class', 'db.t3.micro'),
                    AvailabilityZone=replica_config.get('availability_zone'),
                    MultiAZ=replica_config.get('multi_az', False),
                    PubliclyAccessible=False,
                    AutoMinorVersionUpgrade=True,
                    Tags=[
                        {'Key': 'Purpose', 'Value': 'ReadReplica'},
                        {'Key': 'SourceDB', 'Value': db_instance_id}
                    ]
                )
                read_replicas.append(replica_response['DBInstance']['DBInstanceIdentifier'])
            
            # Set up CloudWatch alarms for database health
            self._setup_database_failover_alarms(db_instance_id)
            
            failover_config = {
                'primary_db': db_instance_id,
                'multi_az_enabled': True,
                'read_replicas': read_replicas,
                'failover_type': 'automatic'
            }
            
            self.logger.info(f"Database failover configured for {db_instance_id}")
            return failover_config
            
        except Exception as e:
            self.logger.error(f"Database failover setup failed: {str(e)}")
            return {}

    def setup_auto_scaling_failover(self, asg_config: Dict[str, Any]) -> Dict[str, Any]:
        """Set up Auto Scaling Group failover across AZs"""
        try:
            asg_name = asg_config['auto_scaling_group_name']
            
            # Update ASG to span multiple AZs
            self.asg.update_auto_scaling_group(
                AutoScalingGroupName=asg_name,
                AvailabilityZones=asg_config['availability_zones'],
                HealthCheckType='ELB',
                HealthCheckGracePeriod=asg_config.get('health_check_grace_period', 300),
                DefaultCooldown=asg_config.get('cooldown', 300)
            )
            
            # Create scaling policies for failover scenarios
            scale_up_policy = self.asg.put_scaling_policy(
                AutoScalingGroupName=asg_name,
                PolicyName=f"{asg_name}-failover-scale-up",
                PolicyType='StepScaling',
                AdjustmentType='ChangeInCapacity',
                StepAdjustments=[
                    {
                        'MetricIntervalLowerBound': 0,
                        'ScalingAdjustment': asg_config.get('failover_scale_amount', 2)
                    }
                ],
                Cooldown=60
            )
            
            # Set up CloudWatch alarms for ASG health
            self._setup_asg_failover_alarms(asg_name, scale_up_policy['PolicyARN'])
            
            failover_config = {
                'auto_scaling_group': asg_name,
                'availability_zones': asg_config['availability_zones'],
                'health_check_type': 'ELB',
                'scale_up_policy_arn': scale_up_policy['PolicyARN']
            }
            
            self.logger.info(f"Auto Scaling failover configured for {asg_name}")
            return failover_config
            
        except Exception as e:
            self.logger.error(f"Auto Scaling failover setup failed: {str(e)}")
            return {}

    def execute_manual_failover(self, failover_request: Dict[str, Any]) -> FailoverEvent:
        """Execute manual failover operation"""
        event_id = f"failover-{int(time.time())}"
        
        try:
            with self.failover_lock:
                failover_event = FailoverEvent(
                    event_id=event_id,
                    source_resource=failover_request['source_resource'],
                    target_resource=failover_request['target_resource'],
                    failover_type=FailoverType.MANUAL,
                    status=FailoverStatus.PENDING,
                    start_time=datetime.utcnow(),
                    end_time=None,
                    reason=failover_request.get('reason', 'Manual failover requested'),
                    rollback_plan=failover_request.get('rollback_plan', {})
                )
                
                self.active_failovers[event_id] = failover_event
            
            # Update status to in progress
            failover_event.status = FailoverStatus.IN_PROGRESS
            
            # Execute failover based on resource type
            resource_type = failover_request['resource_type']
            
            if resource_type == 'load_balancer':
                success = self._execute_lb_failover(failover_request)
            elif resource_type == 'database':
                success = self._execute_db_failover(failover_request)
            elif resource_type == 'dns':
                success = self._execute_dns_failover(failover_request)
            elif resource_type == 'auto_scaling':
                success = self._execute_asg_failover(failover_request)
            else:
                raise ValueError(f"Unsupported resource type: {resource_type}")
            
            # Update final status
            failover_event.status = FailoverStatus.COMPLETED if success else FailoverStatus.FAILED
            failover_event.end_time = datetime.utcnow()
            
            # Send notification
            self._send_failover_notification(failover_event)
            
            self.logger.info(f"Manual failover {event_id} completed with status: {failover_event.status}")
            return failover_event
            
        except Exception as e:
            failover_event.status = FailoverStatus.FAILED
            failover_event.end_time = datetime.utcnow()
            self.logger.error(f"Manual failover {event_id} failed: {str(e)}")
            return failover_event

    def monitor_health_and_failover(self, monitoring_config: Dict[str, Any]) -> None:
        """Continuously monitor health and trigger automatic failover"""
        try:
            while True:
                with ThreadPoolExecutor(max_workers=10) as executor:
                    # Submit health check tasks
                    health_check_futures = []
                    
                    for resource_config in monitoring_config['resources']:
                        future = executor.submit(
                            self._perform_health_check,
                            resource_config
                        )
                        health_check_futures.append((future, resource_config))
                    
                    # Process health check results
                    for future, resource_config in health_check_futures:
                        try:
                            health_status = future.result(timeout=30)
                            resource_id = resource_config['resource_id']
                            
                            # Update health status
                            previous_status = self.health_check_results.get(resource_id, ResourceHealth.UNKNOWN)
                            self.health_check_results[resource_id] = health_status
                            
                            # Trigger failover if health degraded
                            if (previous_status == ResourceHealth.HEALTHY and 
                                health_status in [ResourceHealth.UNHEALTHY, ResourceHealth.DEGRADED]):
                                
                                self._trigger_automatic_failover(resource_config, health_status)
                                
                        except Exception as e:
                            self.logger.error(f"Health check failed for {resource_config['resource_id']}: {str(e)}")
                
                # Wait before next health check cycle
                time.sleep(monitoring_config.get('check_interval', 60))
                
        except KeyboardInterrupt:
            self.logger.info("Health monitoring stopped")
        except Exception as e:
            self.logger.error(f"Health monitoring error: {str(e)}")

    def _perform_health_check(self, resource_config: Dict[str, Any]) -> ResourceHealth:
        """Perform health check on a resource"""
        try:
            resource_type = resource_config['resource_type']
            
            if resource_type == 'ec2':
                return self._check_ec2_health(resource_config)
            elif resource_type == 'rds':
                return self._check_rds_health(resource_config)
            elif resource_type == 'load_balancer':
                return self._check_lb_health(resource_config)
            elif resource_type == 'endpoint':
                return self._check_endpoint_health(resource_config)
            else:
                return ResourceHealth.UNKNOWN
                
        except Exception as e:
            self.logger.error(f"Health check error: {str(e)}")
            return ResourceHealth.UNKNOWN

    def _check_ec2_health(self, resource_config: Dict[str, Any]) -> ResourceHealth:
        """Check EC2 instance health"""
        try:
            instance_id = resource_config['resource_id']
            
            response = self.ec2.describe_instance_status(
                InstanceIds=[instance_id],
                IncludeAllInstances=True
            )
            
            if not response['InstanceStatuses']:
                return ResourceHealth.UNKNOWN
            
            status = response['InstanceStatuses'][0]
            instance_status = status['InstanceStatus']['Status']
            system_status = status['SystemStatus']['Status']
            
            if instance_status == 'ok' and system_status == 'ok':
                return ResourceHealth.HEALTHY
            elif instance_status == 'impaired' or system_status == 'impaired':
                return ResourceHealth.DEGRADED
            else:
                return ResourceHealth.UNHEALTHY
                
        except Exception as e:
            self.logger.error(f"EC2 health check failed: {str(e)}")
            return ResourceHealth.UNKNOWN

    def _check_rds_health(self, resource_config: Dict[str, Any]) -> ResourceHealth:
        """Check RDS instance health"""
        try:
            db_instance_id = resource_config['resource_id']
            
            response = self.rds.describe_db_instances(
                DBInstanceIdentifier=db_instance_id
            )
            
            db_instance = response['DBInstances'][0]
            status = db_instance['DBInstanceStatus']
            
            if status == 'available':
                return ResourceHealth.HEALTHY
            elif status in ['backing-up', 'modifying', 'upgrading']:
                return ResourceHealth.DEGRADED
            else:
                return ResourceHealth.UNHEALTHY
                
        except Exception as e:
            self.logger.error(f"RDS health check failed: {str(e)}")
            return ResourceHealth.UNKNOWN

    def _trigger_automatic_failover(self, resource_config: Dict[str, Any], health_status: ResourceHealth) -> None:
        """Trigger automatic failover based on health status"""
        try:
            if not resource_config.get('auto_failover_enabled', False):
                return
            
            failover_request = {
                'source_resource': resource_config['resource_id'],
                'target_resource': resource_config.get('failover_target'),
                'resource_type': resource_config['resource_type'],
                'reason': f'Automatic failover triggered due to {health_status.value} status',
                'rollback_plan': resource_config.get('rollback_plan', {})
            }
            
            self.execute_manual_failover(failover_request)
            
        except Exception as e:
            self.logger.error(f"Automatic failover trigger failed: {str(e)}")

    def _setup_failover_alarms(self, lb_arn: str, config: Dict[str, Any]) -> None:
        """Set up CloudWatch alarms for load balancer failover"""
        try:
            # Unhealthy host count alarm
            self.cloudwatch.put_metric_alarm(
                AlarmName=f"LB-UnhealthyHosts-{lb_arn.split('/')[-1]}",
                ComparisonOperator='GreaterThanThreshold',
                EvaluationPeriods=2,
                MetricName='UnHealthyHostCount',
                Namespace='AWS/ApplicationELB',
                Period=300,
                Statistic='Average',
                Threshold=0.0,
                ActionsEnabled=True,
                AlarmActions=[
                    self._get_failover_sns_topic()
                ],
                AlarmDescription='Load balancer has unhealthy targets',
                Dimensions=[
                    {
                        'Name': 'LoadBalancer',
                        'Value': lb_arn.split('/')[-3] + '/' + lb_arn.split('/')[-2] + '/' + lb_arn.split('/')[-1]
                    }
                ]
            )
            
        except Exception as e:
            self.logger.error(f"Failover alarm setup failed: {str(e)}")

    def _get_failover_sns_topic(self) -> str:
        """Get SNS topic ARN for failover notifications"""
        return f"arn:aws:sns:{self.region}:123456789012:failover-notifications"

    def _send_failover_notification(self, failover_event: FailoverEvent) -> None:
        """Send notification about failover event"""
        try:
            message = {
                'event_id': failover_event.event_id,
                'source_resource': failover_event.source_resource,
                'target_resource': failover_event.target_resource,
                'status': failover_event.status.value,
                'reason': failover_event.reason,
                'start_time': failover_event.start_time.isoformat(),
                'end_time': failover_event.end_time.isoformat() if failover_event.end_time else None
            }
            
            self.sns.publish(
                TopicArn=self._get_failover_sns_topic(),
                Message=json.dumps(message, indent=2),
                Subject=f"Failover Event: {failover_event.status.value.title()}"
            )
            
        except Exception as e:
            self.logger.error(f"Failover notification failed: {str(e)}")

# Example usage
def main():
    # Initialize failover system
    failover_system = AutomaticFailoverSystem(region='us-east-1')
    
    # Configure load balancer failover
    lb_config = {
        'load_balancer_arn': 'arn:aws:elasticloadbalancing:us-east-1:123456789012:loadbalancer/app/myapp-alb/1234567890123456',
        'target_groups': [
            {
                'target_group_arn': 'arn:aws:elasticloadbalancing:us-east-1:123456789012:targetgroup/myapp-tg-primary/1234567890123456',
                'priority': 1,
                'health_check_path': '/health',
                'health_check_interval': 30,
                'healthy_threshold': 2,
                'unhealthy_threshold': 2
            },
            {
                'target_group_arn': 'arn:aws:elasticloadbalancing:us-east-1:123456789012:targetgroup/myapp-tg-secondary/1234567890123456',
                'priority': 2,
                'health_check_path': '/health',
                'health_check_interval': 30,
                'healthy_threshold': 2,
                'unhealthy_threshold': 2
            }
        ]
    }
    
    # Configure DNS failover
    dns_config = {
        'hosted_zone_id': 'Z123456789012345678901',
        'record_name': 'api.myapp.com',
        'primary_endpoint': '1.2.3.4',
        'secondary_endpoint': '5.6.7.8',
        'health_check_path': '/health',
        'port': 443
    }
    
    # Configure database failover
    db_config = {
        'db_instance_identifier': 'myapp-prod-db',
        'apply_immediately': False,
        'read_replicas': [
            {
                'identifier': 'myapp-prod-db-replica-1',
                'instance_class': 'db.t3.medium',
                'availability_zone': 'us-east-1b'
            }
        ]
    }
    
    # Set up failover configurations
    print("Setting up failover mechanisms...")
    
    lb_failover = failover_system.setup_load_balancer_failover(lb_config)
    dns_failover = failover_system.setup_dns_failover(dns_config)
    db_failover = failover_system.setup_database_failover(db_config)
    
    print("Failover setup complete:")
    print(f"- Load balancer failover: {len(lb_failover.get('primary_targets', []))} primary targets")
    print(f"- DNS failover: {dns_failover.get('record_name', 'N/A')}")
    print(f"- Database failover: {db_failover.get('primary_db', 'N/A')}")
    
    # Example manual failover
    manual_failover_request = {
        'source_resource': 'i-1234567890abcdef0',
        'target_resource': 'i-0987654321fedcba0',
        'resource_type': 'ec2',
        'reason': 'Planned maintenance',
        'rollback_plan': {
            'auto_rollback': True,
            'rollback_delay': 3600
        }
    }
    
    failover_event = failover_system.execute_manual_failover(manual_failover_request)
    print(f"Manual failover executed: {failover_event.event_id} - Status: {failover_event.status.value}")

if __name__ == "__main__":
    main()

{% endraw %}

AWS Services

Primary Services

  • Elastic Load Balancing: Automatic traffic distribution and health checking
  • Amazon Route 53: DNS-based failover with health checks
  • Amazon RDS Multi-AZ: Automatic database failover
  • Amazon EC2 Auto Scaling: Instance-level failover and replacement

Supporting Services

  • AWS Global Accelerator: Global traffic management and failover
  • Amazon CloudWatch: Health monitoring and alarm-based failover triggers
  • Amazon SNS: Failover event notifications
  • AWS Lambda: Custom failover logic and automation

Benefits

  • Automatic Recovery: Seamless failover without manual intervention
  • Reduced Downtime: Faster recovery through pre-configured failover paths
  • Multi-Layer Protection: Failover at DNS, load balancer, and application levels
  • Geographic Distribution: Cross-region failover capabilities
  • State Preservation: Maintain application state during failover events