REL11
REL11-BP04 - Rely on the data plane and not the control plane during recovery
REL11-BP04: Rely on the data plane and not the control plane during recovery
During widespread failures, control plane APIs may become unavailable or throttled. Design recovery mechanisms that depend on data plane operations rather than control plane operations. Use pre-provisioned resources, cached configurations, and avoid making API calls during critical recovery paths.
Implementation Steps
1. Pre-Provision Recovery Resources
Deploy standby resources in advance rather than creating them during recovery.
2. Cache Configuration Data
Store critical configuration data locally to avoid dependency on external APIs.
3. Use Data Plane Operations
Design recovery logic to use data plane operations that remain available during control plane issues.
4. Implement Static Routing
Configure static routing and failover paths that don’t require API calls.
5. Local Decision Making
Enable components to make recovery decisions based on local information.
Detailed Implementation
{% raw %}
View code
import boto3
import json
import time
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Tuple
from dataclasses import dataclass, asdict
from enum import Enum
import threading
import sqlite3
import pickle
import os
class PlaneType(Enum):
CONTROL_PLANE = "control_plane"
DATA_PLANE = "data_plane"
class RecoveryStrategy(Enum):
PRE_PROVISIONED = "pre_provisioned"
CACHED_CONFIG = "cached_config"
LOCAL_DECISION = "local_decision"
STATIC_ROUTING = "static_routing"
class ResourceState(Enum):
ACTIVE = "active"
STANDBY = "standby"
FAILED = "failed"
RECOVERING = "recovering"
@dataclass
class PreProvisionedResource:
resource_id: str
resource_type: str
region: str
availability_zone: str
state: ResourceState
configuration: Dict[str, Any]
last_health_check: datetime
activation_trigger: str
@dataclass
class CachedConfiguration:
config_id: str
config_type: str
data: Dict[str, Any]
last_updated: datetime
ttl: int
source: str
class DataPlaneRecoverySystem:
def __init__(self, region: str = 'us-east-1'):
self.region = region
# Initialize AWS clients (used only for setup, not recovery)
self.ec2 = boto3.client('ec2', region_name=region)
self.elb = boto3.client('elbv2', region_name=region)
self.route53 = boto3.client('route53')
self.s3 = boto3.client('s3', region_name=region)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
# Local storage for configurations and state
self.config_cache_file = '/tmp/recovery_config_cache.db'
self.resource_state_file = '/tmp/resource_state.json'
self.init_local_storage()
# Pre-provisioned resources tracking
self.pre_provisioned_resources: Dict[str, PreProvisionedResource] = {}
self.cached_configurations: Dict[str, CachedConfiguration] = {}
# Recovery state
self.recovery_lock = threading.Lock()
self.control_plane_available = True
def init_local_storage(self) -> None:
"""Initialize local storage for configurations"""
try:
# Create SQLite database for configuration cache
conn = sqlite3.connect(self.config_cache_file)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS config_cache (
config_id TEXT PRIMARY KEY,
config_type TEXT,
data TEXT,
last_updated TEXT,
ttl INTEGER,
source TEXT
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS resource_state (
resource_id TEXT PRIMARY KEY,
resource_type TEXT,
region TEXT,
availability_zone TEXT,
state TEXT,
configuration TEXT,
last_health_check TEXT,
activation_trigger TEXT
)
''')
conn.commit()
conn.close()
self.logger.info("Local storage initialized")
except Exception as e:
self.logger.error(f"Local storage initialization failed: {str(e)}")
def setup_pre_provisioned_resources(self, resource_configs: List[Dict[str, Any]]) -> List[PreProvisionedResource]:
"""Set up pre-provisioned standby resources"""
pre_provisioned = []
try:
for config in resource_configs:
resource_type = config['resource_type']
if resource_type == 'ec2_instance':
resource = self._provision_standby_ec2(config)
elif resource_type == 'load_balancer':
resource = self._provision_standby_lb(config)
elif resource_type == 'database':
resource = self._provision_standby_db(config)
elif resource_type == 'lambda_function':
resource = self._provision_standby_lambda(config)
else:
continue
if resource:
pre_provisioned.append(resource)
self.pre_provisioned_resources[resource.resource_id] = resource
self._store_resource_state(resource)
self.logger.info(f"Pre-provisioned {len(pre_provisioned)} standby resources")
return pre_provisioned
except Exception as e:
self.logger.error(f"Pre-provisioning setup failed: {str(e)}")
return pre_provisioned
def cache_critical_configurations(self, config_sources: List[Dict[str, Any]]) -> List[CachedConfiguration]:
"""Cache critical configurations locally"""
cached_configs = []
try:
for source in config_sources:
config_type = source['config_type']
if config_type == 'route53_records':
configs = self._cache_route53_configs(source)
elif config_type == 'security_groups':
configs = self._cache_security_group_configs(source)
elif config_type == 'load_balancer_targets':
configs = self._cache_lb_target_configs(source)
elif config_type == 'application_config':
configs = self._cache_application_configs(source)
else:
continue
cached_configs.extend(configs)
# Store in local cache
for config in configs:
self.cached_configurations[config.config_id] = config
self._store_cached_config(config)
self.logger.info(f"Cached {len(cached_configs)} critical configurations")
return cached_configs
except Exception as e:
self.logger.error(f"Configuration caching failed: {str(e)}")
return cached_configs
def setup_data_plane_recovery(self, recovery_config: Dict[str, Any]) -> Dict[str, Any]:
"""Set up data plane recovery mechanisms"""
try:
recovery_setup = {
'static_routes': [],
'health_check_endpoints': [],
'failover_targets': [],
'local_decision_rules': []
}
# Configure static routing
for route_config in recovery_config.get('static_routes', []):
static_route = self._setup_static_route(route_config)
recovery_setup['static_routes'].append(static_route)
# Set up health check endpoints
for hc_config in recovery_config.get('health_checks', []):
health_endpoint = self._setup_health_check_endpoint(hc_config)
recovery_setup['health_check_endpoints'].append(health_endpoint)
# Configure failover targets
for failover_config in recovery_config.get('failover_targets', []):
failover_target = self._setup_failover_target(failover_config)
recovery_setup['failover_targets'].append(failover_target)
# Set up local decision rules
for rule_config in recovery_config.get('decision_rules', []):
decision_rule = self._setup_local_decision_rule(rule_config)
recovery_setup['local_decision_rules'].append(decision_rule)
self.logger.info("Data plane recovery setup completed")
return recovery_setup
except Exception as e:
self.logger.error(f"Data plane recovery setup failed: {str(e)}")
return {}
def execute_data_plane_recovery(self, failure_scenario: Dict[str, Any]) -> Dict[str, Any]:
"""Execute recovery using only data plane operations"""
recovery_result = {
'success': False,
'actions_taken': [],
'resources_activated': [],
'errors': []
}
try:
with self.recovery_lock:
self.logger.info(f"Executing data plane recovery for scenario: {failure_scenario['type']}")
# Check control plane availability
self.control_plane_available = self._check_control_plane_availability()
if not self.control_plane_available:
self.logger.warning("Control plane unavailable, using data plane recovery only")
# Load cached configurations
cached_configs = self._load_cached_configurations()
# Load pre-provisioned resource states
pre_provisioned = self._load_resource_states()
# Execute recovery based on failure type
failure_type = failure_scenario['type']
if failure_type == 'instance_failure':
success = self._recover_from_instance_failure(failure_scenario, cached_configs, pre_provisioned)
elif failure_type == 'availability_zone_failure':
success = self._recover_from_az_failure(failure_scenario, cached_configs, pre_provisioned)
elif failure_type == 'region_failure':
success = self._recover_from_region_failure(failure_scenario, cached_configs, pre_provisioned)
elif failure_type == 'service_failure':
success = self._recover_from_service_failure(failure_scenario, cached_configs, pre_provisioned)
else:
success = False
recovery_result['errors'].append(f"Unknown failure type: {failure_type}")
recovery_result['success'] = success
self.logger.info(f"Data plane recovery completed: {success}")
return recovery_result
except Exception as e:
recovery_result['errors'].append(str(e))
self.logger.error(f"Data plane recovery failed: {str(e)}")
return recovery_result
def _provision_standby_ec2(self, config: Dict[str, Any]) -> Optional[PreProvisionedResource]:
"""Provision standby EC2 instance"""
try:
# Launch instance in standby state
response = self.ec2.run_instances(
ImageId=config['ami_id'],
MinCount=1,
MaxCount=1,
InstanceType=config['instance_type'],
KeyName=config.get('key_name'),
SecurityGroupIds=config['security_groups'],
SubnetId=config['subnet_id'],
UserData=config.get('user_data', ''),
TagSpecifications=[
{
'ResourceType': 'instance',
'Tags': [
{'Key': 'Purpose', 'Value': 'StandbyRecovery'},
{'Key': 'Environment', 'Value': config.get('environment', 'production')},
{'Key': 'AutoActivate', 'Value': 'true'}
]
}
]
)
instance_id = response['Instances'][0]['InstanceId']
# Stop instance to save costs (will be started during recovery)
self.ec2.stop_instances(InstanceIds=[instance_id])
resource = PreProvisionedResource(
resource_id=instance_id,
resource_type='ec2_instance',
region=self.region,
availability_zone=config['availability_zone'],
state=ResourceState.STANDBY,
configuration=config,
last_health_check=datetime.utcnow(),
activation_trigger=config.get('activation_trigger', 'instance_failure')
)
self.logger.info(f"Provisioned standby EC2 instance: {instance_id}")
return resource
except Exception as e:
self.logger.error(f"Standby EC2 provisioning failed: {str(e)}")
return None
def _cache_route53_configs(self, source: Dict[str, Any]) -> List[CachedConfiguration]:
"""Cache Route 53 DNS configurations"""
configs = []
try:
hosted_zone_id = source['hosted_zone_id']
# Get all records in the hosted zone
response = self.route53.list_resource_record_sets(
HostedZoneId=hosted_zone_id
)
for record_set in response['ResourceRecordSets']:
config = CachedConfiguration(
config_id=f"route53-{hosted_zone_id}-{record_set['Name']}-{record_set['Type']}",
config_type='route53_record',
data={
'hosted_zone_id': hosted_zone_id,
'name': record_set['Name'],
'type': record_set['Type'],
'ttl': record_set.get('TTL', 300),
'resource_records': record_set.get('ResourceRecords', []),
'alias_target': record_set.get('AliasTarget'),
'failover': record_set.get('Failover'),
'set_identifier': record_set.get('SetIdentifier'),
'health_check_id': record_set.get('HealthCheckId')
},
last_updated=datetime.utcnow(),
ttl=source.get('cache_ttl', 3600),
source='route53_api'
)
configs.append(config)
return configs
except Exception as e:
self.logger.error(f"Route 53 config caching failed: {str(e)}")
return configs
def _recover_from_instance_failure(self, failure_scenario: Dict[str, Any],
cached_configs: Dict[str, CachedConfiguration],
pre_provisioned: Dict[str, PreProvisionedResource]) -> bool:
"""Recover from instance failure using data plane operations"""
try:
failed_instance_id = failure_scenario['resource_id']
# Find suitable standby instance
standby_instance = None
for resource in pre_provisioned.values():
if (resource.resource_type == 'ec2_instance' and
resource.state == ResourceState.STANDBY and
resource.activation_trigger in ['instance_failure', 'any']):
standby_instance = resource
break
if not standby_instance:
self.logger.error("No suitable standby instance found")
return False
# Activate standby instance (data plane operation)
if not self.control_plane_available:
# Use pre-configured activation script
activation_success = self._activate_instance_via_userdata(standby_instance)
else:
# Use EC2 API if available
activation_success = self._activate_instance_via_api(standby_instance)
if not activation_success:
return False
# Update load balancer targets using cached configuration
lb_update_success = self._update_lb_targets_from_cache(
failed_instance_id,
standby_instance.resource_id,
cached_configs
)
# Update DNS records using cached configuration
dns_update_success = self._update_dns_from_cache(
failed_instance_id,
standby_instance.resource_id,
cached_configs
)
# Update resource state
standby_instance.state = ResourceState.ACTIVE
self._store_resource_state(standby_instance)
self.logger.info(f"Instance recovery completed: {standby_instance.resource_id}")
return activation_success and lb_update_success and dns_update_success
except Exception as e:
self.logger.error(f"Instance failure recovery failed: {str(e)}")
return False
def _activate_instance_via_userdata(self, resource: PreProvisionedResource) -> bool:
"""Activate instance using pre-configured user data script"""
try:
# This would typically involve sending a signal to the instance
# via a pre-configured mechanism (e.g., SQS queue, file system trigger)
# For demonstration, we'll simulate the activation
activation_script = resource.configuration.get('activation_script', '')
if activation_script:
# In a real implementation, this would trigger the instance
# to start itself and begin serving traffic
self.logger.info(f"Triggering instance activation: {resource.resource_id}")
# Simulate activation delay
time.sleep(5)
# Update resource state
resource.state = ResourceState.ACTIVE
resource.last_health_check = datetime.utcnow()
return True
return False
except Exception as e:
self.logger.error(f"Instance activation via userdata failed: {str(e)}")
return False
def _update_lb_targets_from_cache(self, failed_instance: str, new_instance: str,
cached_configs: Dict[str, CachedConfiguration]) -> bool:
"""Update load balancer targets using cached configuration"""
try:
# Find load balancer configurations that include the failed instance
for config_id, config in cached_configs.items():
if config.config_type == 'load_balancer_targets':
targets = config.data.get('targets', [])
# Check if failed instance is in targets
if any(target.get('id') == failed_instance for target in targets):
# Update targets to replace failed instance with new instance
updated_targets = []
for target in targets:
if target.get('id') == failed_instance:
target['id'] = new_instance
updated_targets.append(target)
# Apply the update (this would be a data plane operation)
success = self._apply_lb_target_update(config.data['target_group_arn'], updated_targets)
if success:
# Update cached configuration
config.data['targets'] = updated_targets
config.last_updated = datetime.utcnow()
self._store_cached_config(config)
self.logger.info(f"Updated load balancer targets: {failed_instance} -> {new_instance}")
return True
return False
except Exception as e:
self.logger.error(f"Load balancer target update failed: {str(e)}")
return False
def _check_control_plane_availability(self) -> bool:
"""Check if AWS control plane is available"""
try:
# Simple check - try to describe regions
self.ec2.describe_regions()
return True
except Exception:
return False
def _load_cached_configurations(self) -> Dict[str, CachedConfiguration]:
"""Load configurations from local cache"""
configs = {}
try:
conn = sqlite3.connect(self.config_cache_file)
cursor = conn.cursor()
cursor.execute('SELECT * FROM config_cache')
rows = cursor.fetchall()
for row in rows:
config = CachedConfiguration(
config_id=row[0],
config_type=row[1],
data=json.loads(row[2]),
last_updated=datetime.fromisoformat(row[3]),
ttl=row[4],
source=row[5]
)
configs[config.config_id] = config
conn.close()
return configs
except Exception as e:
self.logger.error(f"Failed to load cached configurations: {str(e)}")
return configs
def _store_cached_config(self, config: CachedConfiguration) -> None:
"""Store configuration in local cache"""
try:
conn = sqlite3.connect(self.config_cache_file)
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO config_cache
(config_id, config_type, data, last_updated, ttl, source)
VALUES (?, ?, ?, ?, ?, ?)
''', (
config.config_id,
config.config_type,
json.dumps(config.data),
config.last_updated.isoformat(),
config.ttl,
config.source
))
conn.commit()
conn.close()
except Exception as e:
self.logger.error(f"Failed to store cached config: {str(e)}")
def get_recovery_readiness_status(self) -> Dict[str, Any]:
"""Get status of data plane recovery readiness"""
try:
status = {
'pre_provisioned_resources': len(self.pre_provisioned_resources),
'cached_configurations': len(self.cached_configurations),
'control_plane_available': self.control_plane_available,
'last_cache_update': None,
'standby_resources_by_type': {},
'cache_freshness': {}
}
# Analyze standby resources
for resource in self.pre_provisioned_resources.values():
resource_type = resource.resource_type
if resource_type not in status['standby_resources_by_type']:
status['standby_resources_by_type'][resource_type] = 0
status['standby_resources_by_type'][resource_type] += 1
# Analyze cache freshness
now = datetime.utcnow()
for config in self.cached_configurations.values():
config_type = config.config_type
age = (now - config.last_updated).total_seconds()
if config_type not in status['cache_freshness']:
status['cache_freshness'][config_type] = {'oldest': age, 'newest': age, 'average': age}
else:
status['cache_freshness'][config_type]['oldest'] = max(status['cache_freshness'][config_type]['oldest'], age)
status['cache_freshness'][config_type]['newest'] = min(status['cache_freshness'][config_type]['newest'], age)
return status
except Exception as e:
self.logger.error(f"Recovery readiness status check failed: {str(e)}")
return {}
# Example usage
def main():
# Initialize data plane recovery system
recovery_system = DataPlaneRecoverySystem(region='us-east-1')
# Define pre-provisioned resources
resource_configs = [
{
'resource_type': 'ec2_instance',
'ami_id': 'ami-12345678',
'instance_type': 't3.medium',
'security_groups': ['sg-12345678'],
'subnet_id': 'subnet-12345678',
'availability_zone': 'us-east-1b',
'activation_trigger': 'instance_failure',
'environment': 'production'
}
]
# Define configuration sources to cache
config_sources = [
{
'config_type': 'route53_records',
'hosted_zone_id': 'Z123456789012345678901',
'cache_ttl': 3600
}
]
# Set up recovery system
print("Setting up data plane recovery system...")
pre_provisioned = recovery_system.setup_pre_provisioned_resources(resource_configs)
cached_configs = recovery_system.cache_critical_configurations(config_sources)
print("Data plane recovery setup complete:")
print(f"- Pre-provisioned resources: {len(pre_provisioned)}")
print(f"- Cached configurations: {len(cached_configs)}")
# Get readiness status
status = recovery_system.get_recovery_readiness_status()
print(f"Recovery readiness status: {json.dumps(status, indent=2, default=str)}")
# Example recovery execution
failure_scenario = {
'type': 'instance_failure',
'resource_id': 'i-1234567890abcdef0',
'timestamp': datetime.utcnow().isoformat(),
'severity': 'high'
}
recovery_result = recovery_system.execute_data_plane_recovery(failure_scenario)
print(f"Recovery execution result: {json.dumps(recovery_result, indent=2)}")
if __name__ == "__main__":
main(){% endraw %}
AWS Services
Primary Services
- Amazon S3: Store configuration backups and recovery scripts
- Amazon Route 53: DNS failover with health checks (data plane operations)
- Elastic Load Balancing: Traffic routing without API dependencies
- Amazon EC2: Pre-provisioned standby instances
Supporting Services
- AWS Systems Manager: Parameter Store for configuration caching
- Amazon CloudWatch: Metrics and alarms (data plane operations)
- Amazon SQS: Asynchronous communication for recovery triggers
- AWS Lambda: Event-driven recovery logic
Benefits
- Control Plane Independence: Recovery works even when APIs are unavailable
- Faster Recovery: Pre-provisioned resources eliminate provisioning delays
- Reduced API Throttling: Avoid control plane rate limits during incidents
- Higher Reliability: Less dependency on external services during recovery
- Cost Optimization: Use stopped instances and cached data to reduce costs