REL09
REL09-BP04 - Perform periodic recovery of the data to verify backup integrity and processes
One-Click Remediation
Deploy CloudFormation stacks to implement this best practice with a single click.
Backup Recovery Testing
Schedule and track backup recovery testing workflows to validate restore readiness and backup integrity
Stacks deploy to your AWS account. Review parameters before creating. Standard AWS charges apply.
REL09-BP04: Perform periodic recovery of the data to verify backup integrity and processes
Overview
Implement comprehensive backup validation and recovery testing programs to ensure backup integrity and verify that recovery procedures work correctly. Regular recovery testing validates that backups can be successfully restored, recovery processes are effective, and recovery time objectives can be met.
Implementation Steps
1. Design Recovery Testing Strategy
- Establish recovery testing schedules and frequencies
- Define test scenarios covering various failure modes
- Create recovery testing environments and procedures
- Design automated recovery validation and verification
2. Implement Backup Integrity Validation
- Configure automated backup integrity checking
- Implement checksum validation and corruption detection
- Design backup completeness verification
- Establish backup metadata validation and consistency checks
3. Configure Automated Recovery Testing
- Implement scheduled recovery test execution
- Configure test environment provisioning and cleanup
- Design recovery performance measurement and benchmarking
- Establish automated test result analysis and reporting
4. Establish Recovery Process Validation
- Test complete disaster recovery procedures
- Validate recovery time and point objectives (RTO/RPO)
- Verify cross-region recovery capabilities
- Test recovery under various failure scenarios
5. Implement Recovery Monitoring and Alerting
- Configure recovery test success and failure monitoring
- Implement recovery performance tracking and analysis
- Design recovery test result notifications and escalation
- Establish recovery readiness dashboards and reporting
6. Optimize Recovery Procedures
- Analyze recovery test results for improvement opportunities
- Optimize recovery procedures based on test findings
- Update recovery documentation and runbooks
- Establish continuous improvement processes for recovery capabilities
Implementation Examples
Example 1: Comprehensive Backup Recovery Testing System
View code
import boto3
import json
import logging
import asyncio
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, asdict
from enum import Enum
import hashlib
import time
class RecoveryTestType(Enum):
FULL_RESTORE = "full_restore"
PARTIAL_RESTORE = "partial_restore"
POINT_IN_TIME = "point_in_time"
CROSS_REGION = "cross_region"
DISASTER_RECOVERY = "disaster_recovery"
class TestStatus(Enum):
SCHEDULED = "scheduled"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
@dataclass
class RecoveryTest:
test_id: str
test_name: str
test_type: RecoveryTestType
backup_arn: str
target_environment: str
test_schedule: str
expected_rto_minutes: int
expected_rpo_minutes: int
validation_criteria: List[Dict[str, Any]]
cleanup_required: bool
created_at: datetime
@dataclass
class RecoveryTestExecution:
execution_id: str
test_id: str
status: TestStatus
started_at: datetime
completed_at: Optional[datetime]
recovery_start_time: Optional[datetime]
recovery_end_time: Optional[datetime]
actual_rto_minutes: Optional[float]
actual_rpo_minutes: Optional[float]
validation_results: List[Dict[str, Any]]
error_message: Optional[str]
restored_resources: List[str]
class BackupRecoveryTestManager:
"""Comprehensive backup recovery testing and validation system"""
def __init__(self, config: Dict[str, Any]):
self.config = config
# AWS clients
self.backup = boto3.client('backup')
self.s3 = boto3.client('s3')
self.rds = boto3.client('rds')
self.ec2 = boto3.client('ec2')
self.dynamodb_client = boto3.client('dynamodb')
self.dynamodb = boto3.resource('dynamodb')
self.lambda_client = boto3.client('lambda')
self.cloudwatch = boto3.client('cloudwatch')
self.sns = boto3.client('sns')
# Storage
self.tests_table = self.dynamodb.Table(config.get('tests_table', 'recovery-tests'))
self.executions_table = self.dynamodb.Table(config.get('executions_table', 'recovery-test-executions'))
# Configuration
self.test_environment_prefix = config.get('test_environment_prefix', 'recovery-test')
self.notification_topic_arn = config.get('notification_topic_arn')
# Active tests
self.active_tests = {}
async def create_recovery_test_suite(self, test_configs: List[Dict[str, Any]]) -> List[str]:
"""Create comprehensive recovery test suite"""
try:
created_tests = []
for test_config in test_configs:
test = RecoveryTest(
test_id=f"test_{int(datetime.utcnow().timestamp())}_{test_config['test_name'].replace(' ', '_').lower()}",
test_name=test_config['test_name'],
test_type=RecoveryTestType(test_config['test_type']),
backup_arn=test_config['backup_arn'],
target_environment=test_config.get('target_environment', 'test'),
test_schedule=test_config.get('test_schedule', 'weekly'),
expected_rto_minutes=test_config.get('expected_rto_minutes', 60),
expected_rpo_minutes=test_config.get('expected_rpo_minutes', 60),
validation_criteria=test_config.get('validation_criteria', []),
cleanup_required=test_config.get('cleanup_required', True),
created_at=datetime.utcnow()
)
# Store test
await self._store_recovery_test(test)
# Schedule test execution
await self._schedule_recovery_test(test)
created_tests.append(test.test_id)
self.active_tests[test.test_id] = test
logging.info(f"Created recovery test: {test.test_name}")
return created_tests
except Exception as e:
logging.error(f"Failed to create recovery test suite: {str(e)}")
return []
async def execute_recovery_test(self, test_id: str) -> str:
"""Execute recovery test"""
try:
test = self.active_tests.get(test_id)
if not test:
raise ValueError(f"Recovery test {test_id} not found")
# Create execution record
execution_id = f"exec_{int(datetime.utcnow().timestamp())}_{test_id}"
execution = RecoveryTestExecution(
execution_id=execution_id,
test_id=test_id,
status=TestStatus.RUNNING,
started_at=datetime.utcnow(),
completed_at=None,
recovery_start_time=None,
recovery_end_time=None,
actual_rto_minutes=None,
actual_rpo_minutes=None,
validation_results=[],
error_message=None,
restored_resources=[]
)
# Store execution
await self._store_test_execution(execution)
# Start test execution
asyncio.create_task(self._execute_recovery_test_phases(test, execution))
logging.info(f"Started recovery test execution: {execution_id}")
return execution_id
except Exception as e:
logging.error(f"Failed to execute recovery test: {str(e)}")
raise
async def _execute_recovery_test_phases(self, test: RecoveryTest, execution: RecoveryTestExecution):
"""Execute all phases of recovery test"""
try:
# Phase 1: Pre-test validation
await self._validate_backup_integrity(test, execution)
# Phase 2: Environment preparation
await self._prepare_test_environment(test, execution)
# Phase 3: Recovery execution
execution.recovery_start_time = datetime.utcnow()
await self._execute_recovery_operation(test, execution)
execution.recovery_end_time = datetime.utcnow()
# Calculate actual RTO
if execution.recovery_start_time and execution.recovery_end_time:
execution.actual_rto_minutes = (execution.recovery_end_time - execution.recovery_start_time).total_seconds() / 60
# Phase 4: Validation
await self._validate_recovery_results(test, execution)
# Phase 5: Cleanup
if test.cleanup_required:
await self._cleanup_test_environment(test, execution)
# Complete execution
execution.status = TestStatus.COMPLETED
execution.completed_at = datetime.utcnow()
# Send notification
await self._send_test_notification(test, execution)
except Exception as e:
logging.error(f"Recovery test execution failed: {str(e)}")
execution.status = TestStatus.FAILED
execution.error_message = str(e)
execution.completed_at = datetime.utcnow()
# Send failure notification
await self._send_test_notification(test, execution)
finally:
# Store final execution state
await self._store_test_execution(execution)
async def _validate_backup_integrity(self, test: RecoveryTest, execution: RecoveryTestExecution):
"""Validate backup integrity before recovery"""
try:
# Get backup details
backup_details = await self._get_backup_details(test.backup_arn)
validation_result = {
'validation_type': 'backup_integrity',
'timestamp': datetime.utcnow().isoformat(),
'passed': True,
'details': {}
}
# Check backup status
if backup_details.get('Status') != 'COMPLETED':
validation_result['passed'] = False
validation_result['details']['status_issue'] = f"Backup status is {backup_details.get('Status')}"
# Check backup size
backup_size = backup_details.get('BackupSizeInBytes', 0)
if backup_size == 0:
validation_result['passed'] = False
validation_result['details']['size_issue'] = "Backup size is zero"
# Check encryption
if not backup_details.get('EncryptionKeyArn'):
validation_result['passed'] = False
validation_result['details']['encryption_issue'] = "Backup is not encrypted"
execution.validation_results.append(validation_result)
if not validation_result['passed']:
raise Exception(f"Backup integrity validation failed: {validation_result['details']}")
logging.info("Backup integrity validation passed")
except Exception as e:
logging.error(f"Backup integrity validation failed: {str(e)}")
raise
async def _prepare_test_environment(self, test: RecoveryTest, execution: RecoveryTestExecution):
"""Prepare test environment for recovery"""
try:
# Create test environment resources based on test type
if test.test_type == RecoveryTestType.FULL_RESTORE:
await self._prepare_full_restore_environment(test, execution)
elif test.test_type == RecoveryTestType.CROSS_REGION:
await self._prepare_cross_region_environment(test, execution)
elif test.test_type == RecoveryTestType.DISASTER_RECOVERY:
await self._prepare_disaster_recovery_environment(test, execution)
logging.info("Test environment prepared successfully")
except Exception as e:
logging.error(f"Failed to prepare test environment: {str(e)}")
raise
async def _execute_recovery_operation(self, test: RecoveryTest, execution: RecoveryTestExecution):
"""Execute the actual recovery operation"""
try:
# Start restore job
restore_response = self.backup.start_restore_job(
RecoveryPointArn=test.backup_arn,
Metadata=self._get_restore_metadata(test),
IamRoleArn=self.config.get('restore_service_role_arn'),
ResourceType=self._get_resource_type_from_arn(test.backup_arn)
)
restore_job_id = restore_response['RestoreJobId']
# Monitor restore job
while True:
restore_status = self.backup.describe_restore_job(RestoreJobId=restore_job_id)
status = restore_status['Status']
if status == 'COMPLETED':
execution.restored_resources.append(restore_status.get('CreatedResourceArn', ''))
break
elif status in ['FAILED', 'ABORTED']:
raise Exception(f"Restore job failed: {restore_status.get('StatusMessage', 'Unknown error')}")
# Wait before checking again
await asyncio.sleep(30)
logging.info(f"Recovery operation completed: {restore_job_id}")
except Exception as e:
logging.error(f"Recovery operation failed: {str(e)}")
raise
async def _validate_recovery_results(self, test: RecoveryTest, execution: RecoveryTestExecution):
"""Validate recovery results against criteria"""
try:
for criteria in test.validation_criteria:
validation_result = await self._execute_validation_criteria(criteria, execution)
execution.validation_results.append(validation_result)
if not validation_result['passed']:
logging.warning(f"Validation criteria failed: {criteria['name']}")
# Validate RTO/RPO objectives
rto_validation = {
'validation_type': 'rto_check',
'timestamp': datetime.utcnow().isoformat(),
'passed': execution.actual_rto_minutes <= test.expected_rto_minutes if execution.actual_rto_minutes else False,
'details': {
'expected_rto_minutes': test.expected_rto_minutes,
'actual_rto_minutes': execution.actual_rto_minutes
}
}
execution.validation_results.append(rto_validation)
logging.info("Recovery results validation completed")
except Exception as e:
logging.error(f"Recovery results validation failed: {str(e)}")
raise
async def _execute_validation_criteria(self, criteria: Dict[str, Any],
execution: RecoveryTestExecution) -> Dict[str, Any]:
"""Execute specific validation criteria"""
try:
validation_result = {
'validation_type': criteria['type'],
'name': criteria['name'],
'timestamp': datetime.utcnow().isoformat(),
'passed': False,
'details': {}
}
if criteria['type'] == 'data_integrity':
# Validate data integrity
validation_result['passed'] = await self._validate_data_integrity(
execution.restored_resources[0], criteria
)
elif criteria['type'] == 'connectivity':
# Validate connectivity
validation_result['passed'] = await self._validate_connectivity(
execution.restored_resources[0], criteria
)
elif criteria['type'] == 'performance':
# Validate performance
validation_result['passed'] = await self._validate_performance(
execution.restored_resources[0], criteria
)
return validation_result
except Exception as e:
logging.error(f"Validation criteria execution failed: {str(e)}")
return {
'validation_type': criteria['type'],
'name': criteria['name'],
'timestamp': datetime.utcnow().isoformat(),
'passed': False,
'details': {'error': str(e)}
}
async def _validate_data_integrity(self, resource_arn: str, criteria: Dict[str, Any]) -> bool:
"""Validate data integrity of restored resource"""
try:
# This would implement specific data integrity checks
# For example, checksum validation, record counts, etc.
# Simplified validation - in practice, this would be more comprehensive
return True
except Exception as e:
logging.error(f"Data integrity validation failed: {str(e)}")
return False
async def _validate_connectivity(self, resource_arn: str, criteria: Dict[str, Any]) -> bool:
"""Validate connectivity to restored resource"""
try:
# This would implement connectivity tests
# For example, database connections, API endpoints, etc.
# Simplified validation
return True
except Exception as e:
logging.error(f"Connectivity validation failed: {str(e)}")
return False
async def _validate_performance(self, resource_arn: str, criteria: Dict[str, Any]) -> bool:
"""Validate performance of restored resource"""
try:
# This would implement performance tests
# For example, query response times, throughput tests, etc.
# Simplified validation
return True
except Exception as e:
logging.error(f"Performance validation failed: {str(e)}")
return False
async def _cleanup_test_environment(self, test: RecoveryTest, execution: RecoveryTestExecution):
"""Clean up test environment resources"""
try:
for resource_arn in execution.restored_resources:
await self._delete_test_resource(resource_arn)
logging.info("Test environment cleanup completed")
except Exception as e:
logging.error(f"Test environment cleanup failed: {str(e)}")
async def _delete_test_resource(self, resource_arn: str):
"""Delete specific test resource"""
try:
# Determine resource type and delete accordingly
if 'rds' in resource_arn:
# Delete RDS instance
db_identifier = resource_arn.split(':')[-1]
self.rds.delete_db_instance(
DBInstanceIdentifier=db_identifier,
SkipFinalSnapshot=True
)
elif 'ec2' in resource_arn:
# Terminate EC2 instance
instance_id = resource_arn.split('/')[-1]
self.ec2.terminate_instances(InstanceIds=[instance_id])
logging.info(f"Deleted test resource: {resource_arn}")
except Exception as e:
logging.error(f"Failed to delete test resource {resource_arn}: {str(e)}")
def _get_restore_metadata(self, test: RecoveryTest) -> Dict[str, str]:
"""Get restore metadata based on test configuration"""
metadata = {}
# Add test-specific metadata
if test.test_type == RecoveryTestType.CROSS_REGION:
metadata['TargetRegion'] = 'us-west-2' # Example
# Add test environment prefix
metadata['Environment'] = test.target_environment
metadata['TestId'] = test.test_id
return metadata
def _get_resource_type_from_arn(self, backup_arn: str) -> str:
"""Extract resource type from backup ARN"""
# Simplified extraction - in practice, this would be more robust
if 'rds' in backup_arn:
return 'RDS'
elif 'ec2' in backup_arn:
return 'EC2'
elif 'dynamodb' in backup_arn:
return 'DynamoDB'
else:
return 'Unknown'
async def _get_backup_details(self, backup_arn: str) -> Dict[str, Any]:
"""Get backup details from AWS Backup"""
try:
# Extract vault name and recovery point ARN from backup ARN
vault_name = backup_arn.split('/')[-2]
response = self.backup.describe_recovery_point(
BackupVaultName=vault_name,
RecoveryPointArn=backup_arn
)
return response
except Exception as e:
logging.error(f"Failed to get backup details: {str(e)}")
return {}
async def generate_recovery_test_report(self) -> Dict[str, Any]:
"""Generate comprehensive recovery test report"""
try:
# Get all tests and recent executions
all_tests = list(self.active_tests.values())
recent_executions = await self._get_recent_test_executions(days=30)
# Calculate statistics
total_tests = len(all_tests)
successful_tests = len([e for e in recent_executions if e.status == TestStatus.COMPLETED])
failed_tests = len([e for e in recent_executions if e.status == TestStatus.FAILED])
# Calculate average RTO
completed_executions = [e for e in recent_executions if e.actual_rto_minutes]
avg_rto = sum([e.actual_rto_minutes for e in completed_executions]) / len(completed_executions) if completed_executions else 0
report = {
'report_timestamp': datetime.utcnow().isoformat(),
'summary': {
'total_recovery_tests': total_tests,
'successful_tests_30d': successful_tests,
'failed_tests_30d': failed_tests,
'success_rate': (successful_tests / (successful_tests + failed_tests) * 100) if (successful_tests + failed_tests) > 0 else 0,
'average_rto_minutes': avg_rto
},
'test_details': [
{
'test_id': test.test_id,
'test_name': test.test_name,
'test_type': test.test_type.value,
'expected_rto_minutes': test.expected_rto_minutes,
'last_execution': 'N/A' # Would be populated from recent executions
}
for test in all_tests
],
'recent_executions': [
{
'execution_id': exec.execution_id,
'test_name': next((t.test_name for t in all_tests if t.test_id == exec.test_id), 'Unknown'),
'status': exec.status.value,
'actual_rto_minutes': exec.actual_rto_minutes,
'validation_passed': all([v.get('passed', False) for v in exec.validation_results])
}
for exec in recent_executions
]
}
return report
except Exception as e:
logging.error(f"Failed to generate recovery test report: {str(e)}")
return {'error': str(e)}
async def _get_recent_test_executions(self, days: int = 30) -> List[RecoveryTestExecution]:
"""Get recent test executions"""
try:
# This would typically query DynamoDB with time-based filters
# For now, we'll return a placeholder
return []
except Exception as e:
logging.error(f"Failed to get recent test executions: {str(e)}")
return []
async def _send_test_notification(self, test: RecoveryTest, execution: RecoveryTestExecution):
"""Send test completion notification"""
try:
if not self.notification_topic_arn:
return
message = {
'test_id': test.test_id,
'test_name': test.test_name,
'execution_id': execution.execution_id,
'status': execution.status.value,
'actual_rto_minutes': execution.actual_rto_minutes,
'expected_rto_minutes': test.expected_rto_minutes,
'validation_results': execution.validation_results,
'error_message': execution.error_message
}
subject = f"Recovery Test {'Completed' if execution.status == TestStatus.COMPLETED else 'Failed'}: {test.test_name}"
self.sns.publish(
TopicArn=self.notification_topic_arn,
Message=json.dumps(message, indent=2),
Subject=subject
)
except Exception as e:
logging.error(f"Failed to send test notification: {str(e)}")
async def _store_recovery_test(self, test: RecoveryTest):
"""Store recovery test in DynamoDB"""
try:
test_dict = asdict(test)
test_dict['created_at'] = test.created_at.isoformat()
self.tests_table.put_item(Item=test_dict)
except Exception as e:
logging.error(f"Failed to store recovery test: {str(e)}")
async def _store_test_execution(self, execution: RecoveryTestExecution):
"""Store test execution in DynamoDB"""
try:
execution_dict = asdict(execution)
execution_dict['started_at'] = execution.started_at.isoformat()
if execution.completed_at:
execution_dict['completed_at'] = execution.completed_at.isoformat()
if execution.recovery_start_time:
execution_dict['recovery_start_time'] = execution.recovery_start_time.isoformat()
if execution.recovery_end_time:
execution_dict['recovery_end_time'] = execution.recovery_end_time.isoformat()
self.executions_table.put_item(Item=execution_dict)
except Exception as e:
logging.error(f"Failed to store test execution: {str(e)}")
# Usage example
async def main():
config = {
'tests_table': 'recovery-tests',
'executions_table': 'recovery-test-executions',
'test_environment_prefix': 'recovery-test',
'notification_topic_arn': 'arn:aws:sns:us-east-1:123456789012:recovery-test-notifications',
'restore_service_role_arn': 'arn:aws:iam::123456789012:role/AWSBackupDefaultServiceRole'
}
# Initialize recovery test manager
test_manager = BackupRecoveryTestManager(config)
# Create recovery test suite
test_configs = [
{
'test_name': 'Production Database Full Recovery Test',
'test_type': 'full_restore',
'backup_arn': 'arn:aws:backup:us-east-1:123456789012:recovery-point:12345678-1234-1234-1234-123456789012',
'target_environment': 'test',
'test_schedule': 'weekly',
'expected_rto_minutes': 60,
'expected_rpo_minutes': 15,
'validation_criteria': [
{
'type': 'data_integrity',
'name': 'Database Record Count Validation',
'expected_records': 1000000
},
{
'type': 'connectivity',
'name': 'Database Connection Test',
'timeout_seconds': 30
}
],
'cleanup_required': True
}
]
created_tests = await test_manager.create_recovery_test_suite(test_configs)
print(f"Created {len(created_tests)} recovery tests")
# Execute a recovery test
if created_tests:
execution_id = await test_manager.execute_recovery_test(created_tests[0])
print(f"Started recovery test execution: {execution_id}")
# Generate test report
report = await test_manager.generate_recovery_test_report()
print(f"Generated recovery test report with {report['summary']['total_recovery_tests']} tests")
if __name__ == "__main__":
import asyncio
asyncio.run(main())AWS Services Used
- AWS Backup: Backup restoration and recovery point management
- Amazon S3: Backup storage validation and integrity checking
- Amazon RDS: Database backup restoration and validation testing
- Amazon EC2: Instance backup restoration and environment provisioning
- Amazon DynamoDB: Test configuration and execution history storage
- AWS Lambda: Custom validation functions and test automation
- Amazon CloudWatch: Recovery performance monitoring and metrics collection
- Amazon SNS: Test result notifications and alerting
- AWS Step Functions: Complex recovery test workflow orchestration
- Amazon EventBridge: Scheduled recovery test execution and automation
- AWS Systems Manager: Test environment configuration and management
- Amazon VPC: Isolated test environment provisioning and networking
- AWS CloudFormation: Test infrastructure provisioning and cleanup
- Amazon EBS: Volume backup restoration and validation
- Amazon EFS: File system backup restoration and testing
Benefits
- Recovery Assurance: Regular testing validates that backups can be successfully restored
- RTO/RPO Validation: Testing confirms that recovery objectives can be met
- Process Verification: Regular testing ensures recovery procedures are effective and current
- Issue Detection: Early identification of backup or recovery problems before disasters
- Compliance: Regular testing supports regulatory and audit requirements
- Confidence Building: Successful tests increase confidence in disaster recovery capabilities
- Continuous Improvement: Test results drive optimization of backup and recovery processes
- Documentation: Testing validates and updates recovery documentation and procedures
- Team Training: Regular testing provides hands-on experience with recovery procedures
- Cost Optimization: Testing identifies opportunities to optimize recovery processes and costs
Related Resources
- AWS Well-Architected Reliability Pillar
- Perform Periodic Recovery Testing
- AWS Backup User Guide
- Amazon S3 User Guide
- Amazon RDS User Guide
- Amazon EC2 User Guide
- AWS Lambda Developer Guide
- Amazon CloudWatch User Guide
- AWS Step Functions Developer Guide
- Disaster Recovery Testing Best Practices
- Recovery Testing Strategies
- Backup Validation Guidelines