Skip to content
REL12

REL12-BP05 - Test resiliency using chaos engineering

REL12-BP05: Test resiliency using chaos engineering

Proactively inject failures into your system to identify weaknesses and validate recovery mechanisms. Use chaos engineering principles to build confidence in system resilience by testing failure scenarios in controlled environments.

Implementation Steps

1. Start with Hypothesis-Driven Experiments

Define clear hypotheses about system behavior during failures before conducting experiments.

2. Begin in Non-Production Environments

Start chaos experiments in development and staging environments before production.

3. Implement Gradual Failure Injection

Start with small, controlled failures and gradually increase complexity and scope.

4. Monitor System Behavior

Collect comprehensive metrics during experiments to understand system response.

5. Automate Chaos Engineering

Build automated chaos engineering into your regular testing and deployment processes.

Detailed Implementation

{% raw %}

View code
import boto3
import json
import time
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Tuple
from dataclasses import dataclass, asdict
from enum import Enum
import threading
import random
import uuid

class ChaosExperimentType(Enum):
    INSTANCE_TERMINATION = "instance_termination"
    NETWORK_LATENCY = "network_latency"
    DISK_FILL = "disk_fill"
    CPU_STRESS = "cpu_stress"
    MEMORY_STRESS = "memory_stress"
    SERVICE_UNAVAILABLE = "service_unavailable"
    DATABASE_FAILURE = "database_failure"
    DEPENDENCY_TIMEOUT = "dependency_timeout"

class ExperimentStatus(Enum):
    PLANNED = "planned"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    ABORTED = "aborted"

class BlastRadius(Enum):
    SINGLE_INSTANCE = "single_instance"
    SINGLE_AZ = "single_az"
    MULTIPLE_AZ = "multiple_az"
    SINGLE_REGION = "single_region"
    MULTIPLE_REGION = "multiple_region"

@dataclass
class ChaosExperiment:
    experiment_id: str
    name: str
    description: str
    experiment_type: ChaosExperimentType
    hypothesis: str
    blast_radius: BlastRadius
    target_resources: List[str]
    duration_minutes: int
    rollback_plan: str
    success_criteria: List[str]
    abort_conditions: List[str]
    environment: str

@dataclass
class ExperimentExecution:
    execution_id: str
    experiment_id: str
    status: ExperimentStatus
    start_time: datetime
    end_time: Optional[datetime]
    hypothesis_validated: Optional[bool]
    observations: List[str]
    metrics_collected: Dict[str, Any]
    issues_discovered: List[str]
    improvements_identified: List[str]

class ChaosEngineeringSystem:
    def __init__(self, region: str = 'us-east-1'):
        self.region = region
        
        # AWS clients
        self.fis = boto3.client('fis', region_name=region)
        self.ec2 = boto3.client('ec2', region_name=region)
        self.cloudwatch = boto3.client('cloudwatch', region_name=region)
        self.ssm = boto3.client('ssm', region_name=region)
        self.lambda_client = boto3.client('lambda', region_name=region)
        
        # Configure logging
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        self.logger = logging.getLogger(__name__)
        
        # Experiment management
        self.experiments: Dict[str, ChaosExperiment] = {}
        self.executions: List[ExperimentExecution] = []
        self.safety_checks: List[str] = []
        
        # Thread safety
        self.chaos_lock = threading.Lock()

    def create_chaos_experiment(self, experiment_config: Dict[str, Any]) -> str:
        """Create a new chaos engineering experiment"""
        try:
            experiment_id = f"chaos-{uuid.uuid4().hex[:8]}"
            
            experiment = ChaosExperiment(
                experiment_id=experiment_id,
                name=experiment_config['name'],
                description=experiment_config['description'],
                experiment_type=ChaosExperimentType(experiment_config['type']),
                hypothesis=experiment_config['hypothesis'],
                blast_radius=BlastRadius(experiment_config['blast_radius']),
                target_resources=experiment_config['target_resources'],
                duration_minutes=experiment_config['duration_minutes'],
                rollback_plan=experiment_config['rollback_plan'],
                success_criteria=experiment_config['success_criteria'],
                abort_conditions=experiment_config['abort_conditions'],
                environment=experiment_config['environment']
            )
            
            self.experiments[experiment_id] = experiment
            
            self.logger.info(f"Created chaos experiment: {experiment.name}")
            return experiment_id
            
        except Exception as e:
            self.logger.error(f"Failed to create chaos experiment: {str(e)}")
            return ""

    def execute_chaos_experiment(self, experiment_id: str) -> str:
        """Execute a chaos engineering experiment"""
        try:
            experiment = self.experiments.get(experiment_id)
            if not experiment:
                raise ValueError(f"Experiment {experiment_id} not found")
            
            # Perform safety checks
            if not self._perform_safety_checks(experiment):
                raise ValueError("Safety checks failed - experiment aborted")
            
            execution_id = f"exec-{uuid.uuid4().hex[:8]}"
            
            execution = ExperimentExecution(
                execution_id=execution_id,
                experiment_id=experiment_id,
                status=ExperimentStatus.RUNNING,
                start_time=datetime.utcnow(),
                end_time=None,
                hypothesis_validated=None,
                observations=[],
                metrics_collected={},
                issues_discovered=[],
                improvements_identified=[]
            )
            
            with self.chaos_lock:
                self.executions.append(execution)
            
            # Start monitoring
            self._start_experiment_monitoring(execution, experiment)
            
            # Execute the chaos experiment
            if experiment.experiment_type == ChaosExperimentType.INSTANCE_TERMINATION:
                self._execute_instance_termination(execution, experiment)
            elif experiment.experiment_type == ChaosExperimentType.NETWORK_LATENCY:
                self._execute_network_latency(execution, experiment)
            elif experiment.experiment_type == ChaosExperimentType.CPU_STRESS:
                self._execute_cpu_stress(execution, experiment)
            elif experiment.experiment_type == ChaosExperimentType.SERVICE_UNAVAILABLE:
                self._execute_service_unavailable(execution, experiment)
            else:
                self._execute_generic_chaos(execution, experiment)
            
            # Wait for experiment duration
            time.sleep(experiment.duration_minutes * 60)
            
            # Complete experiment
            self._complete_experiment(execution, experiment)
            
            self.logger.info(f"Chaos experiment completed: {execution_id}")
            return execution_id
            
        except Exception as e:
            self.logger.error(f"Chaos experiment execution failed: {str(e)}")
            return ""

    def _perform_safety_checks(self, experiment: ChaosExperiment) -> bool:
        """Perform safety checks before experiment execution"""
        try:
            # Check environment restrictions
            if experiment.environment == "production" and experiment.blast_radius in [BlastRadius.MULTIPLE_AZ, BlastRadius.MULTIPLE_REGION]:
                self.logger.warning("Large blast radius in production - requires additional approval")
                return False
            
            # Check business hours (avoid peak times)
            current_hour = datetime.utcnow().hour
            if experiment.environment == "production" and 9 <= current_hour <= 17:
                self.logger.warning("Production experiment during business hours - not recommended")
                return False
            
            # Verify rollback plan exists
            if not experiment.rollback_plan:
                self.logger.error("No rollback plan defined - experiment aborted")
                return False
            
            # Check target resources exist
            for resource in experiment.target_resources:
                if not self._verify_resource_exists(resource):
                    self.logger.error(f"Target resource {resource} not found")
                    return False
            
            return True
            
        except Exception as e:
            self.logger.error(f"Safety check failed: {str(e)}")
            return False

    def _execute_instance_termination(self, execution: ExperimentExecution, experiment: ChaosExperiment) -> None:
        """Execute instance termination chaos experiment"""
        try:
            target_instances = experiment.target_resources
            
            # Select random instance(s) based on blast radius
            if experiment.blast_radius == BlastRadius.SINGLE_INSTANCE:
                instances_to_terminate = [random.choice(target_instances)]
            else:
                instances_to_terminate = target_instances[:2]  # Limit for safety
            
            execution.observations.append(f"Targeting instances: {instances_to_terminate}")
            
            # Terminate instances using FIS or direct EC2 API
            for instance_id in instances_to_terminate:
                try:
                    # In real implementation, use AWS FIS for safer execution
                    # self.fis.start_experiment(...)
                    
                    # For demo, simulate termination
                    execution.observations.append(f"Simulated termination of {instance_id}")
                    self.logger.info(f"Simulated instance termination: {instance_id}")
                    
                except Exception as e:
                    execution.issues_discovered.append(f"Failed to terminate {instance_id}: {str(e)}")
            
        except Exception as e:
            execution.issues_discovered.append(f"Instance termination experiment failed: {str(e)}")

    def _execute_network_latency(self, execution: ExperimentExecution, experiment: ChaosExperiment) -> None:
        """Execute network latency chaos experiment"""
        try:
            latency_ms = 500  # Add 500ms latency
            
            execution.observations.append(f"Injecting {latency_ms}ms network latency")
            
            # Use SSM to inject network latency
            for resource in experiment.target_resources:
                command = f"tc qdisc add dev eth0 root netem delay {latency_ms}ms"
                
                # In real implementation, execute via SSM
                # response = self.ssm.send_command(...)
                
                execution.observations.append(f"Applied network latency to {resource}")
            
        except Exception as e:
            execution.issues_discovered.append(f"Network latency experiment failed: {str(e)}")

    def _execute_cpu_stress(self, execution: ExperimentExecution, experiment: ChaosExperiment) -> None:
        """Execute CPU stress chaos experiment"""
        try:
            cpu_percentage = 80  # Stress CPU to 80%
            
            execution.observations.append(f"Applying {cpu_percentage}% CPU stress")
            
            # Use stress-ng or similar tool via SSM
            for resource in experiment.target_resources:
                command = f"stress-ng --cpu 0 --cpu-load {cpu_percentage} --timeout {experiment.duration_minutes}m"
                
                # In real implementation, execute via SSM
                execution.observations.append(f"Applied CPU stress to {resource}")
            
        except Exception as e:
            execution.issues_discovered.append(f"CPU stress experiment failed: {str(e)}")

    def _execute_service_unavailable(self, execution: ExperimentExecution, experiment: ChaosExperiment) -> None:
        """Execute service unavailable chaos experiment"""
        try:
            execution.observations.append("Making service unavailable")
            
            # Simulate service unavailability (e.g., stop service, block ports)
            for resource in experiment.target_resources:
                # In real implementation, stop service or block traffic
                execution.observations.append(f"Made service unavailable on {resource}")
            
        except Exception as e:
            execution.issues_discovered.append(f"Service unavailable experiment failed: {str(e)}")

    def _execute_generic_chaos(self, execution: ExperimentExecution, experiment: ChaosExperiment) -> None:
        """Execute generic chaos experiment"""
        try:
            execution.observations.append(f"Executing {experiment.experiment_type.value} experiment")
            
            # Generic chaos implementation
            for resource in experiment.target_resources:
                execution.observations.append(f"Applied chaos to {resource}")
            
        except Exception as e:
            execution.issues_discovered.append(f"Generic chaos experiment failed: {str(e)}")

    def _start_experiment_monitoring(self, execution: ExperimentExecution, experiment: ChaosExperiment) -> None:
        """Start monitoring during experiment"""
        try:
            # Collect baseline metrics
            baseline_metrics = self._collect_system_metrics(experiment.target_resources)
            execution.metrics_collected['baseline'] = baseline_metrics
            
            execution.observations.append("Started experiment monitoring")
            
        except Exception as e:
            execution.issues_discovered.append(f"Monitoring setup failed: {str(e)}")

    def _complete_experiment(self, execution: ExperimentExecution, experiment: ChaosExperiment) -> None:
        """Complete chaos experiment and analyze results"""
        try:
            # Collect final metrics
            final_metrics = self._collect_system_metrics(experiment.target_resources)
            execution.metrics_collected['final'] = final_metrics
            
            # Execute rollback
            self._execute_rollback(execution, experiment)
            
            # Analyze results
            self._analyze_experiment_results(execution, experiment)
            
            # Update status
            execution.status = ExperimentStatus.COMPLETED
            execution.end_time = datetime.utcnow()
            
            execution.observations.append("Experiment completed successfully")
            
        except Exception as e:
            execution.status = ExperimentStatus.FAILED
            execution.issues_discovered.append(f"Experiment completion failed: {str(e)}")

    def _execute_rollback(self, execution: ExperimentExecution, experiment: ChaosExperiment) -> None:
        """Execute rollback plan"""
        try:
            execution.observations.append("Executing rollback plan")
            
            # Execute rollback based on experiment type
            if experiment.experiment_type == ChaosExperimentType.NETWORK_LATENCY:
                # Remove network latency
                for resource in experiment.target_resources:
                    # tc qdisc del dev eth0 root
                    execution.observations.append(f"Removed network latency from {resource}")
            
            elif experiment.experiment_type == ChaosExperimentType.CPU_STRESS:
                # Stop stress processes
                for resource in experiment.target_resources:
                    # pkill stress-ng
                    execution.observations.append(f"Stopped CPU stress on {resource}")
            
            execution.observations.append("Rollback completed")
            
        except Exception as e:
            execution.issues_discovered.append(f"Rollback failed: {str(e)}")

    def _analyze_experiment_results(self, execution: ExperimentExecution, experiment: ChaosExperiment) -> None:
        """Analyze experiment results and validate hypothesis"""
        try:
            # Compare baseline and final metrics
            baseline = execution.metrics_collected.get('baseline', {})
            final = execution.metrics_collected.get('final', {})
            
            # Check success criteria
            success_count = 0
            for criteria in experiment.success_criteria:
                if self._evaluate_success_criteria(criteria, baseline, final):
                    success_count += 1
                else:
                    execution.issues_discovered.append(f"Success criteria not met: {criteria}")
            
            # Validate hypothesis
            hypothesis_met = success_count >= len(experiment.success_criteria) * 0.8  # 80% threshold
            execution.hypothesis_validated = hypothesis_met
            
            if hypothesis_met:
                execution.observations.append("Hypothesis validated - system behaved as expected")
            else:
                execution.observations.append("Hypothesis not validated - unexpected system behavior")
                execution.improvements_identified.append("System resilience needs improvement")
            
            # Generate recommendations
            recommendations = self._generate_recommendations(execution, experiment)
            execution.improvements_identified.extend(recommendations)
            
        except Exception as e:
            execution.issues_discovered.append(f"Result analysis failed: {str(e)}")

    def _collect_system_metrics(self, resources: List[str]) -> Dict[str, Any]:
        """Collect system metrics"""
        try:
            metrics = {}
            
            # Collect CloudWatch metrics
            end_time = datetime.utcnow()
            start_time = end_time - timedelta(minutes=5)
            
            # CPU Utilization
            cpu_response = self.cloudwatch.get_metric_statistics(
                Namespace='AWS/EC2',
                MetricName='CPUUtilization',
                StartTime=start_time,
                EndTime=end_time,
                Period=300,
                Statistics=['Average']
            )
            
            if cpu_response['Datapoints']:
                metrics['cpu_utilization'] = cpu_response['Datapoints'][-1]['Average']
            
            # Add more metrics as needed
            metrics['timestamp'] = datetime.utcnow().isoformat()
            
            return metrics
            
        except Exception as e:
            self.logger.error(f"Metrics collection failed: {str(e)}")
            return {}

    def _evaluate_success_criteria(self, criteria: str, baseline: Dict, final: Dict) -> bool:
        """Evaluate success criteria"""
        try:
            # Simple criteria evaluation
            if "response_time" in criteria.lower():
                # Check if response time remained acceptable
                return True  # Simplified for demo
            elif "availability" in criteria.lower():
                # Check if system remained available
                return True  # Simplified for demo
            elif "auto_scaling" in criteria.lower():
                # Check if auto-scaling responded
                return True  # Simplified for demo
            
            return True  # Default to success for demo
            
        except Exception as e:
            self.logger.error(f"Criteria evaluation failed: {str(e)}")
            return False

    def _generate_recommendations(self, execution: ExperimentExecution, experiment: ChaosExperiment) -> List[str]:
        """Generate improvement recommendations"""
        recommendations = []
        
        try:
            if execution.issues_discovered:
                recommendations.append("Implement additional monitoring and alerting")
                recommendations.append("Review and improve incident response procedures")
            
            if not execution.hypothesis_validated:
                recommendations.append("Strengthen system resilience mechanisms")
                recommendations.append("Consider additional redundancy")
            
            if experiment.experiment_type == ChaosExperimentType.INSTANCE_TERMINATION:
                recommendations.append("Verify auto-scaling configuration")
                recommendations.append("Test application graceful shutdown")
            
            return recommendations
            
        except Exception as e:
            self.logger.error(f"Recommendation generation failed: {str(e)}")
            return []

    def _verify_resource_exists(self, resource_id: str) -> bool:
        """Verify that a target resource exists"""
        try:
            if resource_id.startswith('i-'):
                # EC2 instance
                response = self.ec2.describe_instances(InstanceIds=[resource_id])
                return len(response['Reservations']) > 0
            
            # Add other resource type checks as needed
            return True  # Default to exists for demo
            
        except Exception as e:
            self.logger.error(f"Resource verification failed: {str(e)}")
            return False

    def get_experiment_results(self, execution_id: str) -> Dict[str, Any]:
        """Get results of a chaos experiment execution"""
        try:
            execution = next((e for e in self.executions if e.execution_id == execution_id), None)
            if not execution:
                return {'error': 'Execution not found'}
            
            experiment = self.experiments.get(execution.experiment_id)
            
            results = {
                'execution_id': execution_id,
                'experiment_name': experiment.name if experiment else 'Unknown',
                'status': execution.status.value,
                'start_time': execution.start_time.isoformat(),
                'end_time': execution.end_time.isoformat() if execution.end_time else None,
                'hypothesis_validated': execution.hypothesis_validated,
                'observations': execution.observations,
                'issues_discovered': execution.issues_discovered,
                'improvements_identified': execution.improvements_identified,
                'metrics_collected': execution.metrics_collected
            }
            
            return results
            
        except Exception as e:
            self.logger.error(f"Failed to get experiment results: {str(e)}")
            return {'error': str(e)}

    def generate_chaos_report(self, time_period_days: int = 30) -> Dict[str, Any]:
        """Generate chaos engineering report"""
        try:
            cutoff_date = datetime.utcnow() - timedelta(days=time_period_days)
            recent_executions = [
                e for e in self.executions 
                if e.start_time > cutoff_date
            ]
            
            if not recent_executions:
                return {'message': 'No chaos experiments in the specified time period'}
            
            # Calculate statistics
            total_experiments = len(recent_executions)
            successful_experiments = len([e for e in recent_executions if e.status == ExperimentStatus.COMPLETED])
            hypothesis_validated = len([e for e in recent_executions if e.hypothesis_validated])
            
            # Experiment type distribution
            type_distribution = {}
            for execution in recent_executions:
                experiment = self.experiments.get(execution.experiment_id)
                if experiment:
                    exp_type = experiment.experiment_type.value
                    type_distribution[exp_type] = type_distribution.get(exp_type, 0) + 1
            
            # Issues discovered
            all_issues = []
            for execution in recent_executions:
                all_issues.extend(execution.issues_discovered)
            
            report = {
                'report_period_days': time_period_days,
                'total_experiments': total_experiments,
                'successful_experiments': successful_experiments,
                'success_rate': (successful_experiments / total_experiments * 100) if total_experiments > 0 else 0,
                'hypothesis_validation_rate': (hypothesis_validated / total_experiments * 100) if total_experiments > 0 else 0,
                'experiment_type_distribution': type_distribution,
                'total_issues_discovered': len(all_issues),
                'common_issues': self._analyze_common_issues(all_issues),
                'recommendations': self._generate_chaos_recommendations(recent_executions)
            }
            
            return report
            
        except Exception as e:
            self.logger.error(f"Chaos report generation failed: {str(e)}")
            return {}

    def _analyze_common_issues(self, issues: List[str]) -> List[str]:
        """Analyze common issues from chaos experiments"""
        # Simple analysis - in real implementation, use NLP or pattern matching
        issue_keywords = {}
        for issue in issues:
            words = issue.lower().split()
            for word in words:
                if len(word) > 4:  # Filter short words
                    issue_keywords[word] = issue_keywords.get(word, 0) + 1
        
        # Return top issues
        sorted_issues = sorted(issue_keywords.items(), key=lambda x: x[1], reverse=True)
        return [f"{word} ({count} occurrences)" for word, count in sorted_issues[:5]]

    def _generate_chaos_recommendations(self, executions: List[ExperimentExecution]) -> List[str]:
        """Generate chaos engineering recommendations"""
        recommendations = []
        
        try:
            failed_experiments = [e for e in executions if e.status == ExperimentStatus.FAILED]
            if len(failed_experiments) > len(executions) * 0.2:  # More than 20% failed
                recommendations.append("Review experiment safety checks and rollback procedures")
            
            unvalidated_hypotheses = [e for e in executions if not e.hypothesis_validated]
            if len(unvalidated_hypotheses) > len(executions) * 0.3:  # More than 30% unvalidated
                recommendations.append("Strengthen system resilience and recovery mechanisms")
            
            if len(executions) < 4:  # Less than 4 experiments per month
                recommendations.append("Increase frequency of chaos engineering experiments")
            
            recommendations.append("Expand chaos experiments to cover more failure scenarios")
            recommendations.append("Integrate chaos engineering into CI/CD pipeline")
            
            return recommendations
            
        except Exception as e:
            self.logger.error(f"Chaos recommendations failed: {str(e)}")
            return []

# Example usage
def main():
    # Initialize chaos engineering system
    chaos_system = ChaosEngineeringSystem(region='us-east-1')
    
    # Create chaos experiment
    experiment_config = {
        'name': 'EC2 Instance Termination Test',
        'description': 'Test system resilience when EC2 instances are terminated',
        'type': 'instance_termination',
        'hypothesis': 'System will maintain availability when 1 instance is terminated due to auto-scaling',
        'blast_radius': 'single_instance',
        'target_resources': ['i-1234567890abcdef0', 'i-0987654321fedcba0'],
        'duration_minutes': 10,
        'rollback_plan': 'Auto Scaling will launch replacement instances',
        'success_criteria': [
            'System availability > 99%',
            'Response time < 2 seconds',
            'Auto Scaling launches replacement instance'
        ],
        'abort_conditions': [
            'System availability < 95%',
            'Response time > 5 seconds'
        ],
        'environment': 'staging'
    }
    
    print("Creating chaos experiment...")
    experiment_id = chaos_system.create_chaos_experiment(experiment_config)
    
    if experiment_id:
        print(f"Created experiment: {experiment_id}")
        
        # Execute experiment
        print("Executing chaos experiment...")
        execution_id = chaos_system.execute_chaos_experiment(experiment_id)
        
        if execution_id:
            print(f"Experiment execution: {execution_id}")
            
            # Get results
            results = chaos_system.get_experiment_results(execution_id)
            print(f"Experiment results: {json.dumps(results, indent=2, default=str)}")
    
    # Generate chaos report
    report = chaos_system.generate_chaos_report(30)
    print(f"Chaos engineering report: {json.dumps(report, indent=2)}")

if __name__ == "__main__":
    main()

{% endraw %}

AWS Services

Primary Services

  • AWS Fault Injection Simulator (FIS): Managed chaos engineering service
  • Amazon EC2: Instance termination and resource stress testing
  • Amazon CloudWatch: Monitoring and metrics during experiments
  • AWS Systems Manager: Command execution for chaos injection

Supporting Services

  • AWS Lambda: Event-driven chaos experiment automation
  • Amazon SNS: Notifications for experiment status and results
  • AWS Step Functions: Complex chaos experiment workflows
  • Amazon S3: Storage for experiment results and analysis

Benefits

  • Proactive Resilience Testing: Identify weaknesses before they cause outages
  • Confidence Building: Validate that recovery mechanisms work as expected
  • Improved Incident Response: Practice responding to failures in controlled environments
  • System Understanding: Gain deeper insights into system behavior under stress
  • Continuous Improvement: Regular chaos experiments drive ongoing resilience improvements