SEC10-BP07: Run simulations
SEC10-BP07: Run simulations
Overview
As organizations grow and evolve over time, so does the threat landscape, making it important to continually review your incident response capabilities. Running simulations (also known as game days) is one method that can be used to perform this assessment. Simulations use real-world security event scenarios designed to mimic a threat actor’s tactics, techniques, and procedures (TTPs) and allow an organization to exercise and evaluate their incident response capabilities by responding to these mock cyber events as they might occur in reality.
Benefits of establishing this best practice:
Simulations have a variety of benefits:
- Validating cyber readiness and developing the confidence of your incident responders
- Testing the accuracy and efficiency of tools and workflows
- Refining communication and escalation methods aligned with your incident response plan
- Providing an opportunity to respond to less common vectors
Implementation Guidance
There are three main types of simulations:
Tabletop exercises
The tabletop approach to simulations is a discussion-based session involving the various incident response stakeholders to practice roles and responsibilities and use established communication tools and playbooks. Exercise facilitation can typically be accomplished in a full day in a virtual venue, physical venue, or a combination. Because it is discussion-based, the tabletop exercise focuses on processes, people, and collaboration. Technology is an integral part of the discussion, but the actual use of incident response tools or scripts is generally not a part of the tabletop exercise.
Purple team exercises
Purple team exercises increase the level of collaboration between the incident responders (blue team) and simulated threat actors (red team). The blue team is comprised of members of the security operations center (SOC), but can also include other stakeholders that would be involved during an actual cyber event. The red team is comprised of a penetration testing team or key stakeholders that are trained in offensive security. The red team works collaboratively with the exercise facilitators when designing a scenario so that the scenario is accurate and feasible. During purple team exercises, the primary focus is on the detection mechanisms, the tools, and the standard operating procedures (SOPs) supporting the incident response efforts.
Red team exercises
During a red team exercise, the offense (red team) conducts a simulation to achieve a certain objective or set of objectives from a predetermined scope. The defenders (blue team) will not necessarily have knowledge of the scope and duration of the exercise, which provides a more realistic assessment of how they would respond to an actual incident. Because red team exercises can be invasive tests, be cautious and implement controls to verify that the exercise does not cause actual harm to your environment.
Consider facilitating cyber simulations at a regular interval. Each exercise type can provide unique benefits to the participants and the organization as a whole, so you might choose to start with less complex simulation types (such as tabletop exercises) and progress to more complex simulation types (red team exercises). You should select a simulation type based on your security maturity, resources, and your desired outcomes. Some customers might not choose to perform red team exercises due to complexity and cost.
Implementation Steps
Regardless of the type of simulation you choose, simulations generally follow these implementation steps:
-
Define core exercise elements: Define the simulation scenario and the objectives of the simulation. Both of these should have leadership acceptance.
-
Identify key stakeholders: At a minimum, an exercise needs exercise facilitators and participants. Depending on the scenario, additional stakeholders such as legal, communications, or executive leadership might be involved.
-
Build and test the scenario: The scenario might need to be redefined as it is being built if specific elements aren’t feasible. A finalized scenario is expected as the output of this stage.
-
Facilitate the simulation: The type of simulation determines the facilitation used (a paper-based scenario compared to a highly technical, simulated scenario). The facilitators should align their facilitation tactics to the exercise objects and they should engage all exercise participants wherever possible to provide the most benefit.
-
Develop the after-action report (AAR): Identify areas that went well, those that can use improvement, and potential gaps. The AAR should measure the effectiveness of the simulation as well as the team’s response to the simulated event so that progress can be tracked over time with future simulations.
Implementation Examples
Example 1: Comprehensive Simulation Management Framework
View code
# simulation_management_framework.py
import boto3
import json
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
from enum import Enum
import logging
import uuid
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class SimulationType(Enum):
TABLETOP = "tabletop"
PURPLE_TEAM = "purple_team"
RED_TEAM = "red_team"
class SimulationPhase(Enum):
PLANNING = "planning"
PREPARATION = "preparation"
EXECUTION = "execution"
EVALUATION = "evaluation"
IMPROVEMENT = "improvement"
@dataclass
class SimulationScenario:
scenario_id: str
scenario_name: str
scenario_type: SimulationType
threat_actor_profile: str
attack_vectors: List[str]
target_systems: List[str]
business_impact: str
complexity_level: str
duration_hours: int
prerequisites: List[str]
learning_objectives: List[str]
success_criteria: List[str]
created_date: str
@dataclass
class SimulationExercise:
exercise_id: str
scenario_id: str
exercise_name: str
simulation_type: SimulationType
scheduled_date: str
duration_hours: int
facilitators: List[str]
participants: List[str]
observers: List[str]
objectives: List[str]
scope: Dict[str, Any]
constraints: List[str]
success_metrics: List[str]
status: str
created_date: str
@dataclass
class SimulationResults:
exercise_id: str
execution_date: str
participants_count: int
objectives_met: List[str]
objectives_missed: List[str]
response_times: Dict[str, int]
tools_effectiveness: Dict[str, str]
communication_effectiveness: str
lessons_learned: List[str]
improvement_recommendations: List[str]
overall_score: float
next_exercise_recommendations: List[str]
class SimulationManagementFramework:
"""
Comprehensive framework for managing security incident response simulations
"""
def __init__(self, region: str = 'us-east-1'):
self.region = region
self.dynamodb = boto3.resource('dynamodb', region_name=region)
self.s3_client = boto3.client('s3', region_name=region)
self.sns_client = boto3.client('sns', region_name=region)
self.lambda_client = boto3.client('lambda', region_name=region)
self.stepfunctions_client = boto3.client('stepfunctions', region_name=region)
# DynamoDB tables for simulation management
self.scenarios_table = self.dynamodb.Table('simulation-scenarios')
self.exercises_table = self.dynamodb.Table('simulation-exercises')
self.results_table = self.dynamodb.Table('simulation-results')
self.participants_table = self.dynamodb.Table('simulation-participants')
# Initialize scenario catalog
self.scenario_catalog = self._create_scenario_catalog()
def _create_scenario_catalog(self) -> Dict[str, SimulationScenario]:
"""
Create comprehensive catalog of simulation scenarios
"""
scenarios = {}
# Ransomware Attack Scenario
scenarios['ransomware_attack'] = SimulationScenario(
scenario_id='ransomware_attack_v1',
scenario_name='Advanced Ransomware Attack Simulation',
scenario_type=SimulationType.PURPLE_TEAM,
threat_actor_profile='Sophisticated cybercriminal group with advanced persistent threat capabilities',
attack_vectors=[
'Spear phishing email with malicious attachment',
'Exploitation of unpatched vulnerability',
'Lateral movement through network',
'Privilege escalation',
'Data encryption and ransom demand'
],
target_systems=[
'Email servers',
'File servers',
'Database servers',
'Backup systems',
'Domain controllers'
],
business_impact='Critical - Complete business operations disruption',
complexity_level='High',
duration_hours=8,
prerequisites=[
'Incident response team trained',
'Backup systems verified',
'Communication channels established',
'Forensic tools available'
],
learning_objectives=[
'Test ransomware detection capabilities',
'Evaluate containment procedures',
'Assess backup and recovery processes',
'Validate communication protocols',
'Practice decision-making under pressure'
],
success_criteria=[
'Ransomware detected within 15 minutes',
'Affected systems isolated within 30 minutes',
'Incident response team assembled within 45 minutes',
'Backup systems protected and verified',
'Recovery plan initiated within 2 hours'
],
created_date=datetime.utcnow().isoformat()
)
# Data Breach Scenario
scenarios['data_breach'] = SimulationScenario(
scenario_id='data_breach_v1',
scenario_name='Customer Data Breach Simulation',
scenario_type=SimulationType.TABLETOP,
threat_actor_profile='External attacker seeking to steal customer personal information',
attack_vectors=[
'SQL injection attack on web application',
'Privilege escalation in database',
'Data exfiltration through encrypted channels',
'Evidence cleanup and persistence'
],
target_systems=[
'Web application servers',
'Database servers',
'Customer data repositories',
'Logging systems'
],
business_impact='High - Customer data exposure and regulatory implications',
complexity_level='Medium',
duration_hours=4,
prerequisites=[
'Legal team availability',
'Customer notification procedures',
'Regulatory reporting requirements understood',
'Forensic capabilities ready'
],
learning_objectives=[
'Practice data breach response procedures',
'Test legal and regulatory notification processes',
'Evaluate customer communication strategies',
'Assess forensic investigation capabilities',
'Review compliance requirements'
],
success_criteria=[
'Breach detected and confirmed within 1 hour',
'Legal team notified within 2 hours',
'Regulatory notifications initiated within 24 hours',
'Customer communication plan activated',
'Forensic investigation commenced'
],
created_date=datetime.utcnow().isoformat()
)
# Insider Threat Scenario
scenarios['insider_threat'] = SimulationScenario(
scenario_id='insider_threat_v1',
scenario_name='Malicious Insider Threat Simulation',
scenario_type=SimulationType.RED_TEAM,
threat_actor_profile='Disgruntled employee with legitimate system access',
attack_vectors=[
'Abuse of legitimate access privileges',
'Data collection and staging',
'Covert data exfiltration',
'Evidence destruction attempts'
],
target_systems=[
'Internal file shares',
'Customer databases',
'Intellectual property repositories',
'Email systems'
],
business_impact='High - Intellectual property theft and competitive disadvantage',
complexity_level='High',
duration_hours=12,
prerequisites=[
'HR team involvement',
'User behavior analytics tools',
'Data loss prevention systems',
'Employee monitoring capabilities'
],
learning_objectives=[
'Test insider threat detection capabilities',
'Evaluate user behavior monitoring',
'Practice HR and legal coordination',
'Assess data loss prevention effectiveness',
'Review employee investigation procedures'
],
success_criteria=[
'Suspicious behavior detected within 4 hours',
'Investigation initiated within 6 hours',
'HR and legal teams engaged appropriately',
'Data exfiltration prevented or minimized',
'Evidence preserved for potential prosecution'
],
created_date=datetime.utcnow().isoformat()
)
return scenarios
def create_simulation_exercise(self,
scenario_id: str,
exercise_name: str,
scheduled_date: str,
facilitators: List[str],
participants: List[str],
custom_objectives: List[str] = None) -> Dict[str, Any]:
"""
Create a new simulation exercise based on a scenario
"""
try:
# Validate scenario exists
if scenario_id not in self.scenario_catalog:
return {
'status': 'error',
'message': f'Scenario {scenario_id} not found in catalog'
}
scenario = self.scenario_catalog[scenario_id]
exercise_id = str(uuid.uuid4())
# Create simulation exercise
exercise = SimulationExercise(
exercise_id=exercise_id,
scenario_id=scenario_id,
exercise_name=exercise_name,
simulation_type=scenario.scenario_type,
scheduled_date=scheduled_date,
duration_hours=scenario.duration_hours,
facilitators=facilitators,
participants=participants,
observers=[],
objectives=custom_objectives or scenario.learning_objectives,
scope={
'target_systems': scenario.target_systems,
'attack_vectors': scenario.attack_vectors,
'business_impact': scenario.business_impact
},
constraints=[
'No actual system damage',
'No real data exposure',
'Limited to test environment where applicable',
'All participants must be briefed on exercise nature'
],
success_metrics=scenario.success_criteria,
status='planned',
created_date=datetime.utcnow().isoformat()
)
# Store exercise
self.exercises_table.put_item(Item=asdict(exercise))
# Store scenario if not already stored
self.scenarios_table.put_item(Item=asdict(scenario))
# Send notifications to participants
self._notify_exercise_participants(exercise, scenario)
logger.info(f"Created simulation exercise: {exercise_id}")
return {
'status': 'success',
'exercise_id': exercise_id,
'exercise_name': exercise_name,
'scenario_name': scenario.scenario_name,
'scheduled_date': scheduled_date,
'participants_count': len(participants),
'message': 'Simulation exercise created successfully'
}
except Exception as e:
logger.error(f"Error creating simulation exercise: {str(e)}")
return {
'status': 'error',
'message': str(e)
}
def execute_tabletop_exercise(self,
exercise_id: str,
facilitator_notes: Dict[str, Any]) -> Dict[str, Any]:
"""
Execute tabletop exercise with guided discussion
"""
try:
# Retrieve exercise details
exercise_response = self.exercises_table.get_item(Key={'exercise_id': exercise_id})
if 'Item' not in exercise_response:
return {
'status': 'error',
'message': f'Exercise {exercise_id} not found'
}
exercise = exercise_response['Item']
scenario = self.scenario_catalog[exercise['scenario_id']]
# Create tabletop exercise structure
tabletop_structure = {
'exercise_id': exercise_id,
'start_time': datetime.utcnow().isoformat(),
'phases': {
'opening': {
'duration_minutes': 30,
'activities': [
'Welcome and introductions',
'Exercise objectives review',
'Scenario briefing',
'Ground rules establishment'
],
'facilitator_notes': facilitator_notes.get('opening', {})
},
'scenario_injection': {
'duration_minutes': 60,
'activities': [
'Initial incident notification',
'Situation assessment',
'Initial response decisions',
'Resource allocation discussions'
],
'discussion_points': [
'Who would be notified first?',
'What immediate actions would be taken?',
'What information is needed for decision-making?',
'What resources would be required?'
],
'facilitator_notes': facilitator_notes.get('scenario_injection', {})
},
'escalation_phase': {
'duration_minutes': 90,
'activities': [
'Incident escalation decisions',
'Stakeholder communication',
'Technical response coordination',
'Business impact assessment'
],
'discussion_points': [
'When and how would you escalate?',
'What would you communicate to executives?',
'How would you coordinate with external parties?',
'What business decisions need to be made?'
],
'facilitator_notes': facilitator_notes.get('escalation_phase', {})
},
'resolution_phase': {
'duration_minutes': 60,
'activities': [
'Recovery planning',
'Lessons learned discussion',
'Process improvement identification',
'Next steps planning'
],
'discussion_points': [
'How would you recover from this incident?',
'What worked well in your response?',
'What could be improved?',
'What additional preparations are needed?'
],
'facilitator_notes': facilitator_notes.get('resolution_phase', {})
},
'debrief': {
'duration_minutes': 30,
'activities': [
'Exercise summary',
'Key takeaways',
'Action items assignment',
'Next exercise planning'
],
'facilitator_notes': facilitator_notes.get('debrief', {})
}
},
'participants': exercise['participants'],
'facilitators': exercise['facilitators'],
'scenario_details': asdict(scenario)
}
# Update exercise status
self.exercises_table.update_item(
Key={'exercise_id': exercise_id},
UpdateExpression='SET #status = :status, execution_date = :exec_date',
ExpressionAttributeNames={'#status': 'status'},
ExpressionAttributeValues={
':status': 'in_progress',
':exec_date': datetime.utcnow().isoformat()
}
)
return {
'status': 'success',
'exercise_id': exercise_id,
'tabletop_structure': tabletop_structure,
'total_duration_minutes': sum(phase['duration_minutes'] for phase in tabletop_structure['phases'].values()),
'message': 'Tabletop exercise structure created and ready for execution'
}
except Exception as e:
logger.error(f"Error executing tabletop exercise: {str(e)}")
return {
'status': 'error',
'message': str(e)
}
def conduct_purple_team_exercise(self,
exercise_id: str,
red_team_actions: List[Dict[str, Any]],
blue_team_responses: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Conduct purple team exercise with coordinated red and blue team activities
"""
try:
# Retrieve exercise details
exercise_response = self.exercises_table.get_item(Key={'exercise_id': exercise_id})
if 'Item' not in exercise_response:
return {
'status': 'error',
'message': f'Exercise {exercise_id} not found'
}
exercise = exercise_response['Item']
scenario = self.scenario_catalog[exercise['scenario_id']]
# Create purple team exercise timeline
exercise_timeline = []
# Combine red team actions and blue team responses
for i, (red_action, blue_response) in enumerate(zip(red_team_actions, blue_team_responses)):
timeline_entry = {
'sequence': i + 1,
'timestamp': (datetime.utcnow() + timedelta(minutes=i*30)).isoformat(),
'red_team_action': {
'action_type': red_action.get('action_type', ''),
'description': red_action.get('description', ''),
'target_system': red_action.get('target_system', ''),
'expected_detection': red_action.get('expected_detection', ''),
'success_criteria': red_action.get('success_criteria', '')
},
'blue_team_response': {
'detection_method': blue_response.get('detection_method', ''),
'response_time_target': blue_response.get('response_time_target', ''),
'response_actions': blue_response.get('response_actions', []),
'tools_used': blue_response.get('tools_used', []),
'escalation_triggers': blue_response.get('escalation_triggers', [])
},
'collaboration_points': [
'Red team explains attack technique',
'Blue team demonstrates detection capability',
'Discussion of detection gaps',
'Improvement recommendations'
],
'metrics_to_capture': [
'Time to detection',
'Accuracy of detection',
'Response effectiveness',
'Tool performance'
]
}
exercise_timeline.append(timeline_entry)
# Create exercise execution plan
execution_plan = {
'exercise_id': exercise_id,
'exercise_type': 'purple_team',
'start_time': datetime.utcnow().isoformat(),
'scenario_overview': {
'name': scenario.scenario_name,
'threat_actor': scenario.threat_actor_profile,
'attack_vectors': scenario.attack_vectors,
'target_systems': scenario.target_systems
},
'team_composition': {
'red_team': [p for p in exercise['participants'] if 'red' in p.lower()],
'blue_team': [p for p in exercise['participants'] if 'blue' in p.lower()],
'facilitators': exercise['facilitators'],
'observers': exercise.get('observers', [])
},
'exercise_timeline': exercise_timeline,
'success_metrics': scenario.success_criteria,
'safety_controls': [
'All actions performed in isolated test environment',
'No production systems affected',
'Continuous monitoring of exercise boundaries',
'Immediate stop capability if issues arise'
],
'collaboration_framework': {
'communication_channels': ['Slack', 'Video conference', 'Shared documentation'],
'knowledge_sharing_points': ['After each attack phase', 'During detection discussions', 'At exercise conclusion'],
'documentation_requirements': ['All actions logged', 'Detection results recorded', 'Lessons learned captured']
}
}
# Update exercise status
self.exercises_table.update_item(
Key={'exercise_id': exercise_id},
UpdateExpression='SET #status = :status, execution_date = :exec_date, execution_plan = :plan',
ExpressionAttributeNames={'#status': 'status'},
ExpressionAttributeValues={
':status': 'in_progress',
':exec_date': datetime.utcnow().isoformat(),
':plan': execution_plan
}
)
return {
'status': 'success',
'exercise_id': exercise_id,
'execution_plan': execution_plan,
'timeline_entries': len(exercise_timeline),
'estimated_duration_hours': len(exercise_timeline) * 0.5,
'message': 'Purple team exercise plan created and ready for execution'
}
except Exception as e:
logger.error(f"Error conducting purple team exercise: {str(e)}")
return {
'status': 'error',
'message': str(e)
}
def evaluate_simulation_results(self,
exercise_id: str,
performance_data: Dict[str, Any],
participant_feedback: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Evaluate simulation results and generate comprehensive assessment
"""
try:
# Retrieve exercise details
exercise_response = self.exercises_table.get_item(Key={'exercise_id': exercise_id})
if 'Item' not in exercise_response:
return {
'status': 'error',
'message': f'Exercise {exercise_id} not found'
}
exercise = exercise_response['Item']
scenario = self.scenario_catalog[exercise['scenario_id']]
# Analyze performance against success criteria
objectives_analysis = self._analyze_objectives_performance(
scenario.success_criteria, performance_data
)
# Evaluate response times
response_times_analysis = self._analyze_response_times(
performance_data.get('response_times', {})
)
# Assess tool effectiveness
tools_analysis = self._analyze_tools_effectiveness(
performance_data.get('tools_used', {}),
performance_data.get('tools_performance', {})
)
# Evaluate communication effectiveness
communication_analysis = self._analyze_communication_effectiveness(
performance_data.get('communication_logs', []),
participant_feedback
)
# Generate lessons learned
lessons_learned = self._extract_lessons_learned(
participant_feedback,
objectives_analysis,
response_times_analysis,
tools_analysis,
communication_analysis
)
# Calculate overall score
overall_score = self._calculate_overall_score(
objectives_analysis,
response_times_analysis,
tools_analysis,
communication_analysis
)
# Generate improvement recommendations
improvement_recommendations = self._generate_improvement_recommendations(
objectives_analysis,
response_times_analysis,
tools_analysis,
communication_analysis,
lessons_learned
)
# Create simulation results
results = SimulationResults(
exercise_id=exercise_id,
execution_date=datetime.utcnow().isoformat(),
participants_count=len(exercise['participants']),
objectives_met=[obj['objective'] for obj in objectives_analysis if obj['met']],
objectives_missed=[obj['objective'] for obj in objectives_analysis if not obj['met']],
response_times=performance_data.get('response_times', {}),
tools_effectiveness=performance_data.get('tools_performance', {}),
communication_effectiveness=communication_analysis['overall_rating'],
lessons_learned=lessons_learned,
improvement_recommendations=improvement_recommendations,
overall_score=overall_score,
next_exercise_recommendations=self._recommend_next_exercises(overall_score, lessons_learned)
)
# Store results
self.results_table.put_item(Item=asdict(results))
# Update exercise status
self.exercises_table.update_item(
Key={'exercise_id': exercise_id},
UpdateExpression='SET #status = :status, completion_date = :comp_date, overall_score = :score',
ExpressionAttributeNames={'#status': 'status'},
ExpressionAttributeValues={
':status': 'completed',
':comp_date': datetime.utcnow().isoformat(),
':score': overall_score
}
)
# Generate after-action report
aar_report = self._generate_after_action_report(exercise, scenario, results)
logger.info(f"Evaluated simulation results for exercise: {exercise_id}")
return {
'status': 'success',
'exercise_id': exercise_id,
'overall_score': overall_score,
'objectives_met': len(results.objectives_met),
'objectives_missed': len(results.objectives_missed),
'lessons_learned_count': len(lessons_learned),
'improvement_recommendations_count': len(improvement_recommendations),
'after_action_report': aar_report,
'results': asdict(results)
}
except Exception as e:
logger.error(f"Error evaluating simulation results: {str(e)}")
return {
'status': 'error',
'message': str(e)
}
# Example usage
if __name__ == "__main__":
# Initialize simulation management framework
sim_framework = SimulationManagementFramework()
# Create a simulation exercise
exercise_result = sim_framework.create_simulation_exercise(
scenario_id='ransomware_attack',
exercise_name='Q1 2024 Ransomware Response Exercise',
scheduled_date='2024-03-15T09:00:00Z',
facilitators=['security.manager@company.com', 'incident.lead@company.com'],
participants=[
'security.analyst1@company.com',
'security.analyst2@company.com',
'it.operations@company.com',
'legal.counsel@company.com',
'communications.lead@company.com'
]
)
print(f"Exercise creation: {json.dumps(exercise_result, indent=2)}")
# Execute tabletop exercise
if exercise_result['status'] == 'success':
tabletop_result = sim_framework.execute_tabletop_exercise(
exercise_id=exercise_result['exercise_id'],
facilitator_notes={
'opening': {'focus_areas': ['Team roles', 'Communication channels']},
'scenario_injection': {'emphasis': 'Initial detection and response'},
'escalation_phase': {'key_decisions': ['Business continuity', 'External notifications']},
'resolution_phase': {'recovery_priorities': ['Critical systems first']},
'debrief': {'action_items': ['Update playbooks', 'Additional training']}
}
)
print(f"Tabletop execution: {json.dumps(tabletop_result, indent=2, default=str)}")Example 2: Automated Red Team Exercise Platform
View code
# red_team_exercise_platform.py
import boto3
import json
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
import logging
import random
import time
logger = logging.getLogger(__name__)
class RedTeamExercisePlatform:
"""
Automated platform for conducting red team exercises with safety controls
"""
def __init__(self, region: str = 'us-east-1'):
self.region = region
self.ec2_client = boto3.client('ec2', region_name=region)
self.lambda_client = boto3.client('lambda', region_name=region)
self.stepfunctions_client = boto3.client('stepfunctions', region_name=region)
self.guardduty_client = boto3.client('guardduty', region_name=region)
self.cloudtrail_client = boto3.client('cloudtrail', region_name=region)
self.sns_client = boto3.client('sns', region_name=region)
self.dynamodb = boto3.resource('dynamodb', region_name=region)
# DynamoDB tables for exercise management
self.exercises_table = self.dynamodb.Table('red-team-exercises')
self.attacks_table = self.dynamodb.Table('simulated-attacks')
self.detections_table = self.dynamodb.Table('detection-results')
# Define attack simulation catalog
self.attack_simulations = self._define_attack_simulations()
self.safety_controls = self._define_safety_controls()
def _define_attack_simulations(self) -> Dict[str, Dict[str, Any]]:
"""
Define catalog of safe attack simulations for red team exercises
"""
return {
'reconnaissance': {
'description': 'Simulated reconnaissance activities',
'techniques': [
'Port scanning simulation',
'DNS enumeration',
'Service discovery',
'Network mapping'
],
'detection_points': [
'Unusual network scanning patterns',
'Multiple failed connection attempts',
'DNS query anomalies',
'Network reconnaissance signatures'
],
'safety_level': 'low_risk',
'duration_minutes': 30,
'lambda_function': 'simulate-reconnaissance',
'expected_detections': ['GuardDuty Recon findings', 'VPC Flow Log anomalies']
},
'credential_access': {
'description': 'Simulated credential access attempts',
'techniques': [
'Brute force login simulation',
'Password spray simulation',
'Credential stuffing simulation',
'Token theft simulation'
],
'detection_points': [
'Multiple failed authentication attempts',
'Unusual login patterns',
'Credential access anomalies',
'Token usage irregularities'
],
'safety_level': 'medium_risk',
'duration_minutes': 45,
'lambda_function': 'simulate-credential-access',
'expected_detections': ['CloudTrail authentication anomalies', 'GuardDuty credential access findings']
},
'persistence': {
'description': 'Simulated persistence establishment',
'techniques': [
'Scheduled task creation simulation',
'Service installation simulation',
'Registry modification simulation',
'Startup folder modification simulation'
],
'detection_points': [
'Unauthorized scheduled tasks',
'New service installations',
'Registry modifications',
'Startup configuration changes'
],
'safety_level': 'medium_risk',
'duration_minutes': 60,
'lambda_function': 'simulate-persistence',
'expected_detections': ['Config rule violations', 'Systems Manager compliance findings']
},
'lateral_movement': {
'description': 'Simulated lateral movement activities',
'techniques': [
'Network share enumeration',
'Remote service exploitation simulation',
'Credential reuse simulation',
'Internal network scanning'
],
'detection_points': [
'Unusual network connections',
'Cross-system authentication patterns',
'Internal scanning activities',
'Service exploitation attempts'
],
'safety_level': 'medium_risk',
'duration_minutes': 90,
'lambda_function': 'simulate-lateral-movement',
'expected_detections': ['VPC Flow Log anomalies', 'GuardDuty lateral movement findings']
},
'data_exfiltration': {
'description': 'Simulated data exfiltration attempts',
'techniques': [
'Large data transfer simulation',
'Encrypted channel communication',
'DNS tunneling simulation',
'Cloud storage upload simulation'
],
'detection_points': [
'Unusual data transfer volumes',
'Encrypted communication channels',
'DNS tunneling patterns',
'Unauthorized cloud uploads'
],
'safety_level': 'high_risk',
'duration_minutes': 60,
'lambda_function': 'simulate-data-exfiltration',
'expected_detections': ['Macie data movement alerts', 'GuardDuty exfiltration findings']
}
}
def _define_safety_controls(self) -> Dict[str, Any]:
"""
Define safety controls for red team exercises
"""
return {
'environment_isolation': {
'description': 'Ensure exercises run in isolated environments',
'controls': [
'Dedicated test VPC',
'Isolated subnets',
'No production data access',
'Network segmentation'
]
},
'data_protection': {
'description': 'Protect sensitive data during exercises',
'controls': [
'No real customer data',
'Synthetic data only',
'Data masking where required',
'Encryption in transit and at rest'
]
},
'system_protection': {
'description': 'Prevent actual system damage',
'controls': [
'Read-only operations where possible',
'Automated rollback capabilities',
'System state snapshots',
'Resource limits and quotas'
]
},
'monitoring_and_control': {
'description': 'Continuous monitoring and control mechanisms',
'controls': [
'Real-time exercise monitoring',
'Emergency stop capabilities',
'Automated safety checks',
'Continuous logging and auditing'
]
}
}
def create_red_team_exercise(self,
exercise_name: str,
attack_scenarios: List[str],
target_environment: str,
duration_hours: int,
blue_team_notification: bool = False) -> Dict[str, Any]:
"""
Create and configure red team exercise
"""
try:
exercise_id = f"redteam_{int(datetime.utcnow().timestamp())}"
# Validate attack scenarios
invalid_scenarios = [s for s in attack_scenarios if s not in self.attack_simulations]
if invalid_scenarios:
return {
'status': 'error',
'message': f'Invalid attack scenarios: {invalid_scenarios}'
}
# Create exercise configuration
exercise_config = {
'exercise_id': exercise_id,
'exercise_name': exercise_name,
'attack_scenarios': attack_scenarios,
'target_environment': target_environment,
'duration_hours': duration_hours,
'blue_team_notification': blue_team_notification,
'created_date': datetime.utcnow().isoformat(),
'status': 'configured',
'safety_controls_enabled': True,
'attack_timeline': self._generate_attack_timeline(attack_scenarios, duration_hours),
'expected_detections': self._compile_expected_detections(attack_scenarios),
'safety_checkpoints': self._define_safety_checkpoints(duration_hours)
}
# Store exercise configuration
self.exercises_table.put_item(Item=exercise_config)
# Deploy safety controls
safety_deployment = self._deploy_safety_controls(exercise_id, target_environment)
# Create monitoring dashboard
monitoring_setup = self._setup_exercise_monitoring(exercise_id, attack_scenarios)
logger.info(f"Created red team exercise: {exercise_id}")
return {
'status': 'success',
'exercise_id': exercise_id,
'exercise_name': exercise_name,
'attack_scenarios_count': len(attack_scenarios),
'duration_hours': duration_hours,
'safety_controls': safety_deployment,
'monitoring_setup': monitoring_setup,
'message': 'Red team exercise configured and ready for execution'
}
except Exception as e:
logger.error(f"Error creating red team exercise: {str(e)}")
return {
'status': 'error',
'message': str(e)
}
def execute_red_team_exercise(self, exercise_id: str) -> Dict[str, Any]:
"""
Execute red team exercise with automated attack simulations
"""
try:
# Retrieve exercise configuration
exercise_response = self.exercises_table.get_item(Key={'exercise_id': exercise_id})
if 'Item' not in exercise_response:
return {
'status': 'error',
'message': f'Exercise {exercise_id} not found'
}
exercise_config = exercise_response['Item']
# Pre-execution safety checks
safety_check = self._perform_safety_checks(exercise_id, exercise_config)
if not safety_check['passed']:
return {
'status': 'error',
'message': f'Safety checks failed: {safety_check["issues"]}'
}
# Start exercise execution
execution_results = {
'exercise_id': exercise_id,
'start_time': datetime.utcnow().isoformat(),
'attack_executions': [],
'detection_results': [],
'safety_events': [],
'status': 'in_progress'
}
# Execute attack timeline
for timeline_entry in exercise_config['attack_timeline']:
attack_result = self._execute_attack_simulation(
exercise_id,
timeline_entry,
exercise_config['target_environment']
)
execution_results['attack_executions'].append(attack_result)
# Check for detections
detection_result = self._check_for_detections(
exercise_id,
timeline_entry['attack_type'],
attack_result
)
execution_results['detection_results'].append(detection_result)
# Perform safety checkpoint
safety_checkpoint = self._perform_safety_checkpoint(exercise_id, timeline_entry)
if not safety_checkpoint['passed']:
execution_results['status'] = 'stopped_for_safety'
execution_results['safety_events'].append(safety_checkpoint)
break
# Wait for next attack phase
time.sleep(timeline_entry.get('delay_minutes', 5) * 60)
# Complete exercise
execution_results['end_time'] = datetime.utcnow().isoformat()
execution_results['status'] = 'completed' if execution_results['status'] != 'stopped_for_safety' else execution_results['status']
# Update exercise status
self.exercises_table.update_item(
Key={'exercise_id': exercise_id},
UpdateExpression='SET #status = :status, execution_results = :results, execution_date = :exec_date',
ExpressionAttributeNames={'#status': 'status'},
ExpressionAttributeValues={
':status': execution_results['status'],
':results': execution_results,
':exec_date': datetime.utcnow().isoformat()
}
)
# Generate exercise report
exercise_report = self._generate_exercise_report(exercise_id, exercise_config, execution_results)
return {
'status': 'success',
'exercise_id': exercise_id,
'execution_status': execution_results['status'],
'attacks_executed': len(execution_results['attack_executions']),
'detections_triggered': len([d for d in execution_results['detection_results'] if d['detected']]),
'safety_events': len(execution_results['safety_events']),
'exercise_report': exercise_report,
'execution_results': execution_results
}
except Exception as e:
logger.error(f"Error executing red team exercise: {str(e)}")
return {
'status': 'error',
'message': str(e)
}
def _generate_attack_timeline(self, attack_scenarios: List[str], duration_hours: int) -> List[Dict[str, Any]]:
"""
Generate realistic attack timeline for exercise
"""
timeline = []
current_time = 0
for i, scenario in enumerate(attack_scenarios):
attack_config = self.attack_simulations[scenario]
timeline_entry = {
'sequence': i + 1,
'attack_type': scenario,
'start_time_minutes': current_time,
'duration_minutes': attack_config['duration_minutes'],
'techniques': attack_config['techniques'],
'expected_detections': attack_config['expected_detections'],
'safety_level': attack_config['safety_level'],
'lambda_function': attack_config['lambda_function'],
'delay_minutes': 5 if i < len(attack_scenarios) - 1 else 0
}
timeline.append(timeline_entry)
current_time += attack_config['duration_minutes'] + 5
return timeline
def _execute_attack_simulation(self,
exercise_id: str,
timeline_entry: Dict[str, Any],
target_environment: str) -> Dict[str, Any]:
"""
Execute individual attack simulation
"""
try:
attack_type = timeline_entry['attack_type']
lambda_function = timeline_entry['lambda_function']
# Prepare attack simulation payload
simulation_payload = {
'exercise_id': exercise_id,
'attack_type': attack_type,
'target_environment': target_environment,
'techniques': timeline_entry['techniques'],
'safety_level': timeline_entry['safety_level'],
'duration_minutes': timeline_entry['duration_minutes']
}
# Execute attack simulation via Lambda
response = self.lambda_client.invoke(
FunctionName=lambda_function,
InvocationType='RequestResponse',
Payload=json.dumps(simulation_payload)
)
# Parse response
response_payload = json.loads(response['Payload'].read())
attack_result = {
'attack_type': attack_type,
'execution_time': datetime.utcnow().isoformat(),
'status': response_payload.get('statusCode', 200) == 200,
'techniques_executed': response_payload.get('techniques_executed', []),
'artifacts_created': response_payload.get('artifacts_created', []),
'detection_triggers': response_payload.get('detection_triggers', []),
'safety_status': response_payload.get('safety_status', 'safe'),
'execution_details': response_payload.get('body', {})
}
# Store attack execution record
self.attacks_table.put_item(
Item={
'exercise_id': exercise_id,
'attack_sequence': timeline_entry['sequence'],
'attack_result': attack_result,
'timestamp': datetime.utcnow().isoformat()
}
)
return attack_result
except Exception as e:
logger.error(f"Error executing attack simulation {attack_type}: {str(e)}")
return {
'attack_type': attack_type,
'execution_time': datetime.utcnow().isoformat(),
'status': False,
'error': str(e),
'safety_status': 'error'
}
def _check_for_detections(self,
exercise_id: str,
attack_type: str,
attack_result: Dict[str, Any]) -> Dict[str, Any]:
"""
Check if attack simulation triggered expected detections
"""
try:
detection_result = {
'exercise_id': exercise_id,
'attack_type': attack_type,
'check_time': datetime.utcnow().isoformat(),
'detected': False,
'detection_sources': [],
'detection_delay_minutes': 0,
'false_positives': 0,
'missed_detections': []
}
# Check GuardDuty findings
guardduty_detections = self._check_guardduty_detections(exercise_id, attack_type)
if guardduty_detections['found']:
detection_result['detected'] = True
detection_result['detection_sources'].append('GuardDuty')
detection_result['detection_delay_minutes'] = guardduty_detections['delay_minutes']
# Check CloudTrail anomalies
cloudtrail_detections = self._check_cloudtrail_anomalies(exercise_id, attack_type)
if cloudtrail_detections['found']:
detection_result['detected'] = True
detection_result['detection_sources'].append('CloudTrail')
# Check VPC Flow Log anomalies
vpc_detections = self._check_vpc_flow_anomalies(exercise_id, attack_type)
if vpc_detections['found']:
detection_result['detected'] = True
detection_result['detection_sources'].append('VPC Flow Logs')
# Store detection results
self.detections_table.put_item(
Item={
'exercise_id': exercise_id,
'attack_type': attack_type,
'detection_result': detection_result,
'timestamp': datetime.utcnow().isoformat()
}
)
return detection_result
except Exception as e:
logger.error(f"Error checking detections for {attack_type}: {str(e)}")
return {
'exercise_id': exercise_id,
'attack_type': attack_type,
'detected': False,
'error': str(e)
}
def _check_guardduty_detections(self, exercise_id: str, attack_type: str) -> Dict[str, Any]:
"""
Check GuardDuty for exercise-related findings
"""
try:
# Get GuardDuty detector
detectors = self.guardduty_client.list_detectors()
if not detectors['DetectorIds']:
return {'found': False, 'reason': 'No GuardDuty detector found'}
detector_id = detectors['DetectorIds'][0]
# Get recent findings
findings_response = self.guardduty_client.list_findings(
DetectorId=detector_id,
FindingCriteria={
'Criterion': {
'updatedAt': {
'Gte': int((datetime.utcnow() - timedelta(minutes=30)).timestamp() * 1000)
}
}
}
)
# Check if any findings match our exercise
exercise_findings = []
for finding_id in findings_response['FindingIds']:
finding_details = self.guardduty_client.get_findings(
DetectorId=detector_id,
FindingIds=[finding_id]
)
for finding in finding_details['Findings']:
# Check if finding relates to our exercise (simplified check)
if exercise_id in finding.get('Description', '') or attack_type in finding.get('Type', ''):
exercise_findings.append(finding)
return {
'found': len(exercise_findings) > 0,
'findings_count': len(exercise_findings),
'delay_minutes': 5, # Simplified - would calculate actual delay
'findings': exercise_findings
}
except Exception as e:
logger.error(f"Error checking GuardDuty detections: {str(e)}")
return {'found': False, 'error': str(e)}
def generate_exercise_metrics(self, exercise_id: str) -> Dict[str, Any]:
"""
Generate comprehensive metrics for completed exercise
"""
try:
# Retrieve exercise data
exercise_response = self.exercises_table.get_item(Key={'exercise_id': exercise_id})
if 'Item' not in exercise_response:
return {
'status': 'error',
'message': f'Exercise {exercise_id} not found'
}
exercise_data = exercise_response['Item']
execution_results = exercise_data.get('execution_results', {})
# Calculate detection metrics
detection_metrics = self._calculate_detection_metrics(exercise_id, execution_results)
# Calculate response time metrics
response_time_metrics = self._calculate_response_time_metrics(exercise_id, execution_results)
# Calculate coverage metrics
coverage_metrics = self._calculate_coverage_metrics(exercise_id, execution_results)
# Generate improvement recommendations
improvement_recommendations = self._generate_improvement_recommendations_redteam(
detection_metrics, response_time_metrics, coverage_metrics
)
exercise_metrics = {
'exercise_id': exercise_id,
'exercise_name': exercise_data['exercise_name'],
'execution_date': exercise_data.get('execution_date', ''),
'duration_actual_minutes': self._calculate_actual_duration(execution_results),
'detection_metrics': detection_metrics,
'response_time_metrics': response_time_metrics,
'coverage_metrics': coverage_metrics,
'overall_effectiveness_score': self._calculate_effectiveness_score(
detection_metrics, response_time_metrics, coverage_metrics
),
'improvement_recommendations': improvement_recommendations,
'next_exercise_suggestions': self._suggest_next_exercises(
detection_metrics, coverage_metrics
)
}
return {
'status': 'success',
'exercise_metrics': exercise_metrics
}
except Exception as e:
logger.error(f"Error generating exercise metrics: {str(e)}")
return {
'status': 'error',
'message': str(e)
}
# Example usage and demonstration
def demonstrate_red_team_platform():
"""
Demonstrate red team exercise platform capabilities
"""
platform = RedTeamExercisePlatform()
# Create red team exercise
exercise_result = platform.create_red_team_exercise(
exercise_name='Q1 2024 Advanced Persistent Threat Simulation',
attack_scenarios=['reconnaissance', 'credential_access', 'lateral_movement', 'data_exfiltration'],
target_environment='test-vpc-12345',
duration_hours=4,
blue_team_notification=False
)
print(f"Red team exercise creation: {json.dumps(exercise_result, indent=2, default=str)}")
# Execute exercise (in real scenario)
if exercise_result['status'] == 'success':
execution_result = platform.execute_red_team_exercise(exercise_result['exercise_id'])
print(f"Exercise execution: {json.dumps(execution_result, indent=2, default=str)}")
# Generate metrics
metrics_result = platform.generate_exercise_metrics(exercise_result['exercise_id'])
print(f"Exercise metrics: {json.dumps(metrics_result, indent=2, default=str)}")
if __name__ == "__main__":
demonstrate_red_team_platform()Resources
Related Documents
Related Videos
Simulation Types and Characteristics
| Simulation Type | Complexity | Duration | Participants | Focus Area | Cost |
|---|---|---|---|---|---|
| Tabletop Exercise | Low | 4-8 hours | 5-15 people | Process & Communication | Low |
| Purple Team Exercise | Medium | 1-2 days | 10-20 people | Detection & Response | Medium |
| Red Team Exercise | High | 1-4 weeks | 15-30 people | Full Attack Simulation | High |
Tabletop Exercise Framework
Pre-Exercise Planning:
- Define scenario and objectives
- Identify key stakeholders and participants
- Prepare discussion materials and injects
- Schedule appropriate venue and duration
- Brief facilitators on objectives and flow
Exercise Structure:
-
Opening (30 minutes)
- Welcome and introductions
- Exercise objectives and ground rules
- Scenario briefing and context setting
-
Scenario Injection (60-90 minutes)
- Initial incident notification
- Situation assessment discussions
- Initial response decision points
- Resource allocation discussions
-
Escalation Phase (90-120 minutes)
- Incident escalation scenarios
- Stakeholder communication challenges
- Technical response coordination
- Business impact assessment
-
Resolution Phase (60-90 minutes)
- Recovery planning discussions
- Lessons learned identification
- Process improvement opportunities
- Communication strategy refinement
-
Debrief (30-45 minutes)
- Exercise summary and key takeaways
- Action items and improvement plans
- Next exercise planning
- Participant feedback collection
Purple Team Exercise Framework
Collaborative Planning:
- Joint red and blue team scenario development
- Agreed-upon rules of engagement
- Defined success criteria and metrics
- Safety controls and boundaries
- Communication protocols during exercise
Exercise Phases:
-
Preparation Phase
- Environment setup and isolation
- Tool deployment and configuration
- Team briefings and role assignments
- Safety control implementation
-
Execution Phase
- Coordinated attack and defense activities
- Real-time collaboration and knowledge sharing
- Continuous monitoring and adjustment
- Documentation of actions and results
-
Analysis Phase
- Joint review of attack techniques and detection
- Gap analysis and improvement identification
- Tool effectiveness evaluation
- Process refinement recommendations
Red Team Exercise Framework
Exercise Planning:
- Objective definition and scope boundaries
- Rules of engagement and safety controls
- Timeline and milestone planning
- Success criteria and metrics definition
- Legal and compliance considerations
Safety Controls:
- Environment isolation and protection
- Data protection and privacy measures
- System damage prevention controls
- Continuous monitoring and oversight
- Emergency stop procedures
Execution Phases:
-
Reconnaissance Phase
- Target identification and analysis
- Vulnerability assessment
- Attack vector identification
- Intelligence gathering
-
Initial Access Phase
- Exploitation of identified vulnerabilities
- Foothold establishment
- Persistence mechanism deployment
- Detection evasion techniques
-
Lateral Movement Phase
- Network exploration and mapping
- Privilege escalation attempts
- Additional system compromise
- Credential harvesting
-
Objective Achievement Phase
- Target data identification and access
- Simulated data exfiltration
- Impact demonstration
- Persistence validation
Simulation Scenario Library
Ransomware Attack Scenarios:
- Advanced persistent threat with ransomware deployment
- Insider threat leading to ransomware infection
- Supply chain compromise resulting in ransomware
- Cloud infrastructure ransomware attack
Data Breach Scenarios:
- Customer database compromise and exfiltration
- Intellectual property theft by insider
- Third-party vendor data breach impact
- Cloud storage misconfiguration exposure
Insider Threat Scenarios:
- Malicious employee data theft
- Compromised privileged user account
- Contractor access abuse
- Social engineering of employees
Cloud-Specific Scenarios:
- AWS account compromise and resource abuse
- Container escape and lateral movement
- Serverless function exploitation
- Cloud storage bucket compromise
Metrics and Evaluation Criteria
Detection Metrics:
- Time to detection (TTD)
- Detection accuracy and false positive rates
- Coverage of attack techniques
- Tool effectiveness ratings
Response Metrics:
- Time to containment (TTC)
- Time to eradication (TTE)
- Time to recovery (TTR)
- Communication effectiveness
Process Metrics:
- Playbook adherence rates
- Decision-making speed and accuracy
- Stakeholder engagement effectiveness
- Documentation completeness
Learning Metrics:
- Knowledge gaps identified
- Skills improvement areas
- Process enhancement opportunities
- Tool and technology needs
After-Action Report Template
Executive Summary:
- Exercise overview and objectives
- Key findings and recommendations
- Overall performance assessment
- Next steps and action items
Exercise Details:
- Scenario description and timeline
- Participants and roles
- Tools and technologies used
- Metrics and measurements
Performance Analysis:
- Objectives achievement assessment
- Response time analysis
- Detection effectiveness evaluation
- Communication assessment
Lessons Learned:
- What worked well
- Areas for improvement
- Gaps and vulnerabilities identified
- Process enhancement opportunities
Recommendations:
- Immediate action items
- Long-term improvement initiatives
- Training and development needs
- Tool and technology recommendations
Next Steps:
- Action item assignments and timelines
- Follow-up exercise planning
- Process improvement implementation
- Progress tracking mechanisms
Best Practices for Simulation Success
Planning Best Practices:
- Start with clear, measurable objectives
- Ensure leadership support and participation
- Select realistic and relevant scenarios
- Plan for appropriate complexity level
- Include diverse stakeholder perspectives
Execution Best Practices:
- Maintain realistic scenario progression
- Encourage active participation from all attendees
- Document all decisions and actions
- Adapt scenarios based on participant responses
- Focus on learning rather than blame
Evaluation Best Practices:
- Use objective metrics where possible
- Gather feedback from all participants
- Identify specific, actionable improvements
- Track progress over time
- Share lessons learned across organization
Follow-up Best Practices:
- Assign clear ownership for action items
- Set realistic timelines for improvements
- Track implementation progress
- Plan follow-up exercises to validate improvements
- Integrate lessons learned into standard procedures
Simulation Frequency Recommendations
Tabletop Exercises:
- Quarterly for core incident response team
- Semi-annually for extended stakeholders
- Annually for executive leadership
- After major process or personnel changes
Purple Team Exercises:
- Semi-annually for technical teams
- Annually for comprehensive scenarios
- After major tool or technology deployments
- Following significant threat landscape changes
Red Team Exercises:
- Annually for mature organizations
- Bi-annually for high-risk environments
- After major infrastructure changes
- As part of compliance requirements
Integration with Incident Response Program
Continuous Improvement Cycle:
- Plan simulation based on current threats and gaps
- Execute simulation with appropriate stakeholders
- Evaluate results and identify improvements
- Implement improvements in procedures and training
- Validate improvements in subsequent simulations
Documentation Integration:
- Update incident response playbooks based on lessons learned
- Revise communication procedures and contact lists
- Enhance training materials and programs
- Improve tool configurations and automation
Training Integration:
- Use simulation results to identify training needs
- Develop targeted training programs
- Include simulation participation in role requirements
- Track individual and team skill development