SEC10-BP06: Pre-deploy tools
One-Click Remediation
Deploy CloudFormation stacks to implement this best practice with a single click.
Stacks deploy to your AWS account. Review parameters before creating. Standard AWS charges apply.
SEC10-BP06: Pre-deploy tools
Overview
Verify that security personnel have the right tools pre-deployed to reduce the time for investigation through to recovery.
Implementation Guidance
To automate security response and operations functions, you can use a comprehensive set of APIs and tools from AWS. You can fully automate identity management, network security, data protection, and monitoring capabilities and deliver them using popular software development methods that you already have in place.
When you build security automation, your system can monitor, review, and initiate a response, rather than having people monitor your security position and manually react to events. If your incident response teams continue to respond to alerts in the same way, they risk alert fatigue. Over time, the team can become desensitized to alerts and can either make mistakes handling ordinary situations or miss unusual alerts.
Automation helps avoid alert fatigue by using functions that process the repetitive and ordinary alerts, leaving humans to handle the sensitive and unique incidents. Integrating anomaly detection systems, such as Amazon GuardDuty, AWS CloudTrail Insights, and Amazon CloudWatch Anomaly Detection, can reduce the burden of common threshold-based alerts.
You can improve manual processes by programmatically automating steps in the process. After you define the remediation pattern to an event, you can decompose that pattern into actionable logic, and write the code to perform that logic. Responders can then run that code to remediate the issue. Over time, you can automate more and more steps, and ultimately automatically handle whole classes of common incidents.
During a security investigation, you need to be able to review relevant logs to record and understand the full scope and timeline of the incident. Logs are also required for alert generation, indicating certain actions of interest have happened. It is critical to select, enable, store, and set up querying and retrieval mechanisms, and set up alerting. Additionally, an effective way to provide tools to search log data is Amazon Detective.
AWS offers over 200 cloud services and thousands of features. We recommend that you review the services that can support and simplify your incident response strategy. In addition to logging, you should develop and implement a tagging strategy. Tagging can help provide context around the purpose of an AWS resource. Tagging can also be used for automation.
Implementation Steps
Select and set up logs for analysis and alerting
See the following documentation on configuring logging for incident response:
- Logging strategies for security incident response
- SEC04-BP01 Configure service and application logging
Enable security services to support detection and response
AWS provides native detective, preventative, and responsive capabilities, and other services can be used to architect custom security solutions. For a list of the most relevant services for security incident response, see Cloud capability definitions.
Develop and implement a tagging strategy
Obtaining contextual information on the business use case and relevant internal stakeholders surrounding an AWS resource can be difficult. One way to do this is in the form of tags, which assign metadata to your AWS resources and consist of a user-defined key and value. You can create tags to categorize resources by purpose, owner, environment, type of data processed, and other criteria of your choice.
Having a consistent tagging strategy can speed up response times and minimize time spent on organizational context by allowing you to quickly identify and discern contextual information about an AWS resource. Tags can also serve as a mechanism to initiate response automations.
For more detail on what to tag, see Tagging your AWS resources. You’ll want to first define the tags you want to implement across your organization. After that, you’ll implement and enforce this tagging strategy. For more detail on implementation and enforcement, see Implement AWS resource tagging strategy using AWS Tag Policies and Service Control Policies (SCPs).
Implementation Examples
Example 1: Comprehensive Security Tools Deployment Framework
View code
# security_tools_deployment.py
import boto3
import json
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class SecurityTool:
tool_name: str
service_name: str
deployment_status: str
configuration: Dict[str, Any]
regions: List[str]
dependencies: List[str]
automation_enabled: bool
monitoring_enabled: bool
cost_estimate: str
deployment_date: str
@dataclass
class LoggingConfiguration:
log_type: str
service: str
destination: str
retention_days: int
encryption_enabled: bool
analysis_tools: List[str]
alerting_enabled: bool
cost_per_month: str
class SecurityToolsDeploymentManager:
"""
Comprehensive security tools deployment and management system
"""
def __init__(self, region: str = 'us-east-1'):
self.region = region
self.organizations_client = boto3.client('organizations', region_name=region)
self.guardduty_client = boto3.client('guardduty', region_name=region)
self.securityhub_client = boto3.client('securityhub', region_name=region)
self.detective_client = boto3.client('detective', region_name=region)
self.config_client = boto3.client('config', region_name=region)
self.cloudtrail_client = boto3.client('cloudtrail', region_name=region)
self.logs_client = boto3.client('logs', region_name=region)
self.s3_client = boto3.client('s3', region_name=region)
self.lambda_client = boto3.client('lambda', region_name=region)
self.dynamodb = boto3.resource('dynamodb', region_name=region)
# DynamoDB tables for tool management
self.tools_table = self.dynamodb.Table('security-tools-inventory')
self.logging_table = self.dynamodb.Table('logging-configurations')
# Define security tool catalog
self.security_tools_catalog = self._define_security_tools_catalog()
self.logging_configurations = self._define_logging_configurations()
def _define_security_tools_catalog(self) -> Dict[str, Dict[str, Any]]:
"""
Define comprehensive catalog of security tools for incident response
"""
return {
'guardduty': {
'description': 'Threat detection service using machine learning',
'service_name': 'guardduty',
'deployment_priority': 1,
'regions': ['us-east-1', 'us-west-2', 'eu-west-1'],
'dependencies': [],
'configuration': {
'finding_publishing_frequency': 'FIFTEEN_MINUTES',
'enable_s3_protection': True,
'enable_kubernetes_protection': True,
'enable_malware_protection': True,
'enable_rds_protection': True,
'enable_lambda_protection': True
},
'automation_capabilities': [
'Automatic threat detection',
'Finding severity scoring',
'Threat intelligence integration',
'Anomaly detection'
],
'integration_points': ['Security Hub', 'Detective', 'EventBridge'],
'cost_factors': ['Data processed', 'VPC Flow Logs', 'DNS logs', 'S3 data events']
},
'security_hub': {
'description': 'Centralized security findings management',
'service_name': 'securityhub',
'deployment_priority': 2,
'regions': ['us-east-1', 'us-west-2', 'eu-west-1'],
'dependencies': ['guardduty', 'config'],
'configuration': {
'enable_default_standards': True,
'standards': [
'AWS Foundational Security Standard',
'CIS AWS Foundations Benchmark',
'PCI DSS'
],
'auto_enable_controls': True,
'finding_aggregation': True
},
'automation_capabilities': [
'Centralized finding management',
'Compliance scoring',
'Custom insights',
'Automated remediation triggers'
],
'integration_points': ['GuardDuty', 'Config', 'Inspector', 'Macie'],
'cost_factors': ['Security checks', 'Finding ingestion', 'Compliance scans']
},
'detective': {
'description': 'Security investigation and analysis service',
'service_name': 'detective',
'deployment_priority': 3,
'regions': ['us-east-1', 'us-west-2', 'eu-west-1'],
'dependencies': ['guardduty', 'cloudtrail'],
'configuration': {
'data_sources': ['VPC Flow Logs', 'DNS logs', 'CloudTrail'],
'retention_period': 365,
'enable_organization_graph': True,
'member_accounts': []
},
'automation_capabilities': [
'Behavior graph analysis',
'Investigation visualizations',
'Root cause analysis',
'Timeline reconstruction'
],
'integration_points': ['GuardDuty', 'Security Hub', 'CloudTrail'],
'cost_factors': ['Data ingested', 'Graph storage', 'Analysis queries']
},
'config': {
'description': 'Configuration compliance and change tracking',
'service_name': 'config',
'deployment_priority': 2,
'regions': ['us-east-1', 'us-west-2', 'eu-west-1'],
'dependencies': [],
'configuration': {
'delivery_channel': 's3://config-bucket-{account_id}',
'recording_group': {
'all_supported': True,
'include_global_resource_types': True,
'resource_types': []
},
'rules': [
'encrypted-volumes',
'root-access-key-check',
's3-bucket-public-access-prohibited',
'iam-password-policy'
]
},
'automation_capabilities': [
'Configuration drift detection',
'Compliance monitoring',
'Change tracking',
'Automated remediation'
],
'integration_points': ['Security Hub', 'Systems Manager', 'Lambda'],
'cost_factors': ['Configuration items', 'Rule evaluations', 'S3 storage']
},
'cloudtrail': {
'description': 'API activity logging and monitoring',
'service_name': 'cloudtrail',
'deployment_priority': 1,
'regions': ['us-east-1', 'us-west-2', 'eu-west-1'],
'dependencies': [],
'configuration': {
'multi_region_trail': True,
'include_global_services': True,
'enable_log_file_validation': True,
's3_bucket': 'cloudtrail-logs-{account_id}',
'kms_key_id': 'arn:aws:kms:region:account:key/key-id',
'event_selectors': [
{
'read_write_type': 'All',
'include_management_events': True,
'data_resources': [
{
'type': 'AWS::S3::Object',
'values': ['arn:aws:s3:::sensitive-bucket/*']
}
]
}
],
'insights_selectors': [
{'insight_type': 'ApiCallRateInsight'}
]
},
'automation_capabilities': [
'Real-time API monitoring',
'Anomaly detection',
'Event correlation',
'Automated alerting'
],
'integration_points': ['CloudWatch', 'Detective', 'Athena', 'OpenSearch'],
'cost_factors': ['Management events', 'Data events', 'Insights', 'S3 storage']
},
'inspector': {
'description': 'Vulnerability assessment service',
'service_name': 'inspector2',
'deployment_priority': 3,
'regions': ['us-east-1', 'us-west-2', 'eu-west-1'],
'dependencies': [],
'configuration': {
'scan_types': ['ECR', 'EC2', 'Lambda'],
'auto_enable': True,
'finding_aggregation': True,
'suppression_rules': []
},
'automation_capabilities': [
'Continuous vulnerability scanning',
'Risk-based prioritization',
'Integration with CI/CD',
'Automated reporting'
],
'integration_points': ['Security Hub', 'Systems Manager', 'ECR'],
'cost_factors': ['Images scanned', 'EC2 instances', 'Lambda functions']
},
'macie': {
'description': 'Data security and privacy service',
'service_name': 'macie2',
'deployment_priority': 4,
'regions': ['us-east-1', 'us-west-2', 'eu-west-1'],
'dependencies': [],
'configuration': {
'finding_publishing_frequency': 'FIFTEEN_MINUTES',
'classification_jobs': [],
'custom_data_identifiers': [],
'allow_lists': []
},
'automation_capabilities': [
'Sensitive data discovery',
'Data classification',
'Privacy risk assessment',
'Automated data protection'
],
'integration_points': ['Security Hub', 'EventBridge', 'S3'],
'cost_factors': ['S3 buckets monitored', 'Objects analyzed', 'Classification jobs']
},
'systems_manager': {
'description': 'Operational management and automation',
'service_name': 'ssm',
'deployment_priority': 2,
'regions': ['us-east-1', 'us-west-2', 'eu-west-1'],
'dependencies': [],
'configuration': {
'patch_manager': {
'enable_default_patch_baselines': True,
'maintenance_windows': []
},
'session_manager': {
'enable_logging': True,
'log_destination': 's3://session-logs-{account_id}'
},
'automation_documents': [
'AWSIncidents-CriticalIncidentRunbookTemplate',
'AWSSupport-TroubleshootConnectivityToRDS'
]
},
'automation_capabilities': [
'Automated patching',
'Secure remote access',
'Runbook automation',
'Inventory management'
],
'integration_points': ['Config', 'CloudWatch', 'EventBridge'],
'cost_factors': ['API requests', 'Automation executions', 'Parameter Store usage']
}
}
def _define_logging_configurations(self) -> Dict[str, LoggingConfiguration]:
"""
Define comprehensive logging configurations for incident response
"""
return {
'cloudtrail_management': LoggingConfiguration(
log_type='API Activity',
service='CloudTrail',
destination='S3 + CloudWatch Logs',
retention_days=2555, # 7 years
encryption_enabled=True,
analysis_tools=['Athena', 'Detective', 'CloudWatch Insights'],
alerting_enabled=True,
cost_per_month='$50-200'
),
'vpc_flow_logs': LoggingConfiguration(
log_type='Network Traffic',
service='VPC',
destination='S3 + CloudWatch Logs',
retention_days=90,
encryption_enabled=True,
analysis_tools=['Athena', 'Detective', 'VPC Flow Logs Insights'],
alerting_enabled=True,
cost_per_month='$20-100'
),
'dns_logs': LoggingConfiguration(
log_type='DNS Queries',
service='Route 53 Resolver',
destination='S3 + CloudWatch Logs',
retention_days=90,
encryption_enabled=True,
analysis_tools=['Athena', 'Detective'],
alerting_enabled=True,
cost_per_month='$10-50'
),
'application_logs': LoggingConfiguration(
log_type='Application Events',
service='CloudWatch Logs',
destination='CloudWatch Logs',
retention_days=365,
encryption_enabled=True,
analysis_tools=['CloudWatch Insights', 'OpenSearch'],
alerting_enabled=True,
cost_per_month='$30-150'
),
'load_balancer_logs': LoggingConfiguration(
log_type='Load Balancer Access',
service='ELB',
destination='S3',
retention_days=90,
encryption_enabled=True,
analysis_tools=['Athena', 'OpenSearch'],
alerting_enabled=True,
cost_per_month='$5-25'
),
's3_access_logs': LoggingConfiguration(
log_type='S3 Access',
service='S3',
destination='S3',
retention_days=365,
encryption_enabled=True,
analysis_tools=['Athena', 'Macie'],
alerting_enabled=True,
cost_per_month='$10-50'
)
}
def deploy_security_tools_suite(self,
target_accounts: List[str],
regions: List[str],
tool_selection: List[str] = None) -> Dict[str, Any]:
"""
Deploy comprehensive security tools suite across multiple accounts and regions
"""
try:
if tool_selection is None:
tool_selection = list(self.security_tools_catalog.keys())
deployment_results = []
# Sort tools by deployment priority
sorted_tools = sorted(
[(name, config) for name, config in self.security_tools_catalog.items() if name in tool_selection],
key=lambda x: x[1]['deployment_priority']
)
for tool_name, tool_config in sorted_tools:
for account_id in target_accounts:
for region in regions:
if region in tool_config['regions']:
deployment_result = self._deploy_security_tool(
tool_name, tool_config, account_id, region
)
deployment_results.append(deployment_result)
# Set up cross-service integrations
integration_results = self._setup_tool_integrations(target_accounts, regions, tool_selection)
# Configure automation and alerting
automation_results = self._setup_security_automation(target_accounts, regions, tool_selection)
successful_deployments = sum(1 for result in deployment_results if result['status'] == 'success')
return {
'status': 'success',
'total_deployments': len(deployment_results),
'successful_deployments': successful_deployments,
'deployment_results': deployment_results,
'integration_results': integration_results,
'automation_results': automation_results,
'summary': {
'accounts': len(target_accounts),
'regions': len(regions),
'tools_deployed': len(tool_selection)
}
}
except Exception as e:
logger.error(f"Error deploying security tools suite: {str(e)}")
return {
'status': 'error',
'message': str(e)
}
def _deploy_security_tool(self,
tool_name: str,
tool_config: Dict[str, Any],
account_id: str,
region: str) -> Dict[str, Any]:
"""
Deploy individual security tool in specific account and region
"""
try:
deployment_result = {
'tool_name': tool_name,
'account_id': account_id,
'region': region,
'deployment_time': datetime.utcnow().isoformat()
}
if tool_name == 'guardduty':
result = self._deploy_guardduty(tool_config, account_id, region)
elif tool_name == 'security_hub':
result = self._deploy_security_hub(tool_config, account_id, region)
elif tool_name == 'detective':
result = self._deploy_detective(tool_config, account_id, region)
elif tool_name == 'config':
result = self._deploy_config(tool_config, account_id, region)
elif tool_name == 'cloudtrail':
result = self._deploy_cloudtrail(tool_config, account_id, region)
elif tool_name == 'inspector':
result = self._deploy_inspector(tool_config, account_id, region)
elif tool_name == 'macie':
result = self._deploy_macie(tool_config, account_id, region)
elif tool_name == 'systems_manager':
result = self._deploy_systems_manager(tool_config, account_id, region)
else:
result = {'status': 'error', 'message': f'Unknown tool: {tool_name}'}
deployment_result.update(result)
# Store deployment information
security_tool = SecurityTool(
tool_name=tool_name,
service_name=tool_config['service_name'],
deployment_status=result['status'],
configuration=tool_config['configuration'],
regions=[region],
dependencies=tool_config['dependencies'],
automation_enabled=True,
monitoring_enabled=True,
cost_estimate=tool_config.get('cost_factors', 'Variable'),
deployment_date=datetime.utcnow().isoformat()
)
self.tools_table.put_item(
Item={
**asdict(security_tool),
'deployment_key': f"{tool_name}_{account_id}_{region}"
}
)
return deployment_result
except Exception as e:
logger.error(f"Error deploying {tool_name}: {str(e)}")
return {
'tool_name': tool_name,
'account_id': account_id,
'region': region,
'status': 'error',
'message': str(e)
}
def _deploy_guardduty(self, config: Dict[str, Any], account_id: str, region: str) -> Dict[str, Any]:
"""Deploy Amazon GuardDuty"""
try:
# Create GuardDuty detector
detector_response = self.guardduty_client.create_detector(
Enable=True,
FindingPublishingFrequency=config['configuration']['finding_publishing_frequency'],
DataSources={
'S3Logs': {'Enable': config['configuration']['enable_s3_protection']},
'KubernetesAuditLogs': {'Enable': config['configuration']['enable_kubernetes_protection']},
'MalwareProtection': {'ScanEc2InstanceWithFindings': {'EbsVolumes': True}}
},
Tags={
'Purpose': 'IncidentResponse',
'DeployedBy': 'SecurityToolsManager',
'Account': account_id,
'Region': region
}
)
detector_id = detector_response['DetectorId']
return {
'status': 'success',
'detector_id': detector_id,
'features_enabled': list(config['configuration'].keys()),
'message': f'GuardDuty deployed successfully in {region}'
}
except Exception as e:
return {
'status': 'error',
'message': f'Failed to deploy GuardDuty: {str(e)}'
}
def _deploy_security_hub(self, config: Dict[str, Any], account_id: str, region: str) -> Dict[str, Any]:
"""Deploy AWS Security Hub"""
try:
# Enable Security Hub
hub_response = self.securityhub_client.enable_security_hub(
Tags={
'Purpose': 'IncidentResponse',
'DeployedBy': 'SecurityToolsManager',
'Account': account_id,
'Region': region
},
EnableDefaultStandards=config['configuration']['enable_default_standards']
)
# Enable additional standards
standards_enabled = []
for standard in config['configuration']['standards']:
try:
# This would enable specific standards - simplified for example
standards_enabled.append(standard)
except Exception as e:
logger.warning(f"Could not enable standard {standard}: {str(e)}")
return {
'status': 'success',
'hub_arn': hub_response['HubArn'],
'standards_enabled': standards_enabled,
'message': f'Security Hub deployed successfully in {region}'
}
except Exception as e:
return {
'status': 'error',
'message': f'Failed to deploy Security Hub: {str(e)}'
}
def _deploy_detective(self, config: Dict[str, Any], account_id: str, region: str) -> Dict[str, Any]:
"""Deploy Amazon Detective"""
try:
# Create Detective graph
graph_response = self.detective_client.create_graph(
Tags={
'Purpose': 'IncidentResponse',
'DeployedBy': 'SecurityToolsManager',
'Account': account_id,
'Region': region
}
)
graph_arn = graph_response['GraphArn']
return {
'status': 'success',
'graph_arn': graph_arn,
'data_sources': config['configuration']['data_sources'],
'message': f'Detective deployed successfully in {region}'
}
except Exception as e:
return {
'status': 'error',
'message': f'Failed to deploy Detective: {str(e)}'
}
def setup_comprehensive_logging(self,
target_accounts: List[str],
regions: List[str]) -> Dict[str, Any]:
"""
Set up comprehensive logging infrastructure for incident response
"""
try:
logging_results = []
for account_id in target_accounts:
for region in regions:
for log_type, log_config in self.logging_configurations.items():
setup_result = self._setup_logging_configuration(
log_type, log_config, account_id, region
)
logging_results.append(setup_result)
successful_setups = sum(1 for result in logging_results if result['status'] == 'success')
return {
'status': 'success',
'total_configurations': len(logging_results),
'successful_configurations': successful_setups,
'logging_results': logging_results,
'summary': {
'accounts': len(target_accounts),
'regions': len(regions),
'log_types': len(self.logging_configurations)
}
}
except Exception as e:
logger.error(f"Error setting up comprehensive logging: {str(e)}")
return {
'status': 'error',
'message': str(e)
}
def _setup_logging_configuration(self,
log_type: str,
log_config: LoggingConfiguration,
account_id: str,
region: str) -> Dict[str, Any]:
"""
Set up individual logging configuration
"""
try:
setup_result = {
'log_type': log_type,
'service': log_config.service,
'account_id': account_id,
'region': region,
'setup_time': datetime.utcnow().isoformat()
}
if log_type == 'cloudtrail_management':
result = self._setup_cloudtrail_logging(log_config, account_id, region)
elif log_type == 'vpc_flow_logs':
result = self._setup_vpc_flow_logs(log_config, account_id, region)
elif log_type == 'dns_logs':
result = self._setup_dns_logging(log_config, account_id, region)
elif log_type == 'application_logs':
result = self._setup_application_logging(log_config, account_id, region)
elif log_type == 'load_balancer_logs':
result = self._setup_lb_logging(log_config, account_id, region)
elif log_type == 's3_access_logs':
result = self._setup_s3_logging(log_config, account_id, region)
else:
result = {'status': 'error', 'message': f'Unknown log type: {log_type}'}
setup_result.update(result)
# Store logging configuration
self.logging_table.put_item(
Item={
**asdict(log_config),
'logging_key': f"{log_type}_{account_id}_{region}",
'setup_date': datetime.utcnow().isoformat()
}
)
return setup_result
except Exception as e:
logger.error(f"Error setting up {log_type} logging: {str(e)}")
return {
'log_type': log_type,
'account_id': account_id,
'region': region,
'status': 'error',
'message': str(e)
}
# Example usage
if __name__ == "__main__":
# Initialize security tools deployment manager
tools_manager = SecurityToolsDeploymentManager()
# Deploy security tools suite
deployment_result = tools_manager.deploy_security_tools_suite(
target_accounts=['123456789012', '987654321098'],
regions=['us-east-1', 'us-west-2'],
tool_selection=['guardduty', 'security_hub', 'detective', 'config', 'cloudtrail']
)
print(f"Security tools deployment: {json.dumps(deployment_result, indent=2, default=str)}")
# Set up comprehensive logging
logging_result = tools_manager.setup_comprehensive_logging(
target_accounts=['123456789012', '987654321098'],
regions=['us-east-1', 'us-west-2']
)
print(f"Logging setup: {json.dumps(logging_result, indent=2, default=str)}")Example 2: Automated Incident Response Toolkit with Tagging Strategy
View code
# incident_response_toolkit.py
import boto3
import json
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
class IncidentResponseToolkit:
"""
Automated incident response toolkit with comprehensive tagging strategy
"""
def __init__(self, region: str = 'us-east-1'):
self.region = region
self.lambda_client = boto3.client('lambda', region_name=region)
self.stepfunctions_client = boto3.client('stepfunctions', region_name=region)
self.eventbridge_client = boto3.client('events', region_name=region)
self.sns_client = boto3.client('sns', region_name=region)
self.ssm_client = boto3.client('ssm', region_name=region)
self.organizations_client = boto3.client('organizations', region_name=region)
self.resourcegroupstaggingapi_client = boto3.client('resourcegroupstaggingapi', region_name=region)
# Define tagging strategy
self.tagging_strategy = self._define_tagging_strategy()
self.automation_tools = self._define_automation_tools()
def _define_tagging_strategy(self) -> Dict[str, Dict[str, Any]]:
"""
Define comprehensive tagging strategy for incident response
"""
return {
'mandatory_tags': {
'Environment': {
'description': 'Deployment environment',
'values': ['Production', 'Staging', 'Development', 'Test'],
'enforcement': 'required',
'automation_trigger': True
},
'Owner': {
'description': 'Resource owner or team',
'values': ['SecurityTeam', 'DevOpsTeam', 'ApplicationTeam'],
'enforcement': 'required',
'automation_trigger': True
},
'CostCenter': {
'description': 'Cost allocation identifier',
'values': ['CC-001', 'CC-002', 'CC-003'],
'enforcement': 'required',
'automation_trigger': False
},
'DataClassification': {
'description': 'Data sensitivity level',
'values': ['Public', 'Internal', 'Confidential', 'Restricted'],
'enforcement': 'required',
'automation_trigger': True
}
},
'incident_response_tags': {
'IncidentResponseRole': {
'description': 'Role in incident response',
'values': ['Critical', 'Important', 'Supporting', 'NonCritical'],
'enforcement': 'recommended',
'automation_trigger': True
},
'BackupRequired': {
'description': 'Backup requirement for incident recovery',
'values': ['Yes', 'No'],
'enforcement': 'recommended',
'automation_trigger': True
},
'MonitoringLevel': {
'description': 'Level of monitoring required',
'values': ['High', 'Medium', 'Low'],
'enforcement': 'recommended',
'automation_trigger': True
},
'ComplianceFramework': {
'description': 'Applicable compliance frameworks',
'values': ['SOC2', 'PCI-DSS', 'HIPAA', 'GDPR', 'None'],
'enforcement': 'optional',
'automation_trigger': True
}
},
'automation_tags': {
'AutomatedResponse': {
'description': 'Automated response enabled',
'values': ['Enabled', 'Disabled'],
'enforcement': 'optional',
'automation_trigger': True
},
'IsolationGroup': {
'description': 'Isolation group for containment',
'values': ['WebTier', 'AppTier', 'DataTier', 'Management'],
'enforcement': 'optional',
'automation_trigger': True
},
'RecoveryPriority': {
'description': 'Recovery priority order',
'values': ['P1', 'P2', 'P3', 'P4'],
'enforcement': 'optional',
'automation_trigger': True
}
}
}
def _define_automation_tools(self) -> Dict[str, Dict[str, Any]]:
"""
Define automation tools for incident response
"""
return {
'threat_detection_automation': {
'description': 'Automated threat detection and initial response',
'triggers': ['GuardDuty Finding', 'Security Hub Finding', 'CloudWatch Alarm'],
'actions': [
'Isolate affected resources',
'Create forensic snapshots',
'Notify incident response team',
'Initiate investigation workflow'
],
'lambda_functions': [
'threat-detector-processor',
'resource-isolator',
'forensic-snapshot-creator',
'incident-notifier'
]
},
'compliance_violation_automation': {
'description': 'Automated compliance violation response',
'triggers': ['Config Rule Violation', 'Security Hub Compliance Finding'],
'actions': [
'Assess violation severity',
'Apply automatic remediation',
'Generate compliance report',
'Notify compliance team'
],
'lambda_functions': [
'compliance-assessor',
'auto-remediator',
'compliance-reporter',
'compliance-notifier'
]
},
'resource_tagging_automation': {
'description': 'Automated resource tagging enforcement',
'triggers': ['Resource Creation', 'Tag Policy Violation'],
'actions': [
'Validate required tags',
'Apply default tags',
'Generate tagging report',
'Notify resource owners'
],
'lambda_functions': [
'tag-validator',
'tag-applicator',
'tagging-reporter',
'tag-notifier'
]
},
'incident_orchestration': {
'description': 'Orchestrated incident response workflow',
'triggers': ['Manual Incident Declaration', 'Automated Incident Detection'],
'actions': [
'Assess incident severity',
'Assemble response team',
'Execute response playbook',
'Track incident progress'
],
'step_functions': [
'incident-response-orchestrator',
'team-assembly-workflow',
'playbook-executor',
'progress-tracker'
]
}
}
def deploy_incident_response_automation(self,
automation_types: List[str],
target_accounts: List[str]) -> Dict[str, Any]:
"""
Deploy incident response automation tools
"""
try:
deployment_results = []
for automation_type in automation_types:
if automation_type not in self.automation_tools:
continue
automation_config = self.automation_tools[automation_type]
for account_id in target_accounts:
deployment_result = self._deploy_automation_tool(
automation_type, automation_config, account_id
)
deployment_results.append(deployment_result)
successful_deployments = sum(1 for result in deployment_results if result['status'] == 'success')
return {
'status': 'success',
'total_deployments': len(deployment_results),
'successful_deployments': successful_deployments,
'deployment_results': deployment_results
}
except Exception as e:
logger.error(f"Error deploying incident response automation: {str(e)}")
return {
'status': 'error',
'message': str(e)
}
def _deploy_automation_tool(self,
automation_type: str,
automation_config: Dict[str, Any],
account_id: str) -> Dict[str, Any]:
"""
Deploy individual automation tool
"""
try:
deployment_result = {
'automation_type': automation_type,
'account_id': account_id,
'deployment_time': datetime.utcnow().isoformat()
}
# Deploy Lambda functions
lambda_results = []
for function_name in automation_config.get('lambda_functions', []):
lambda_result = self._deploy_lambda_function(function_name, automation_type, account_id)
lambda_results.append(lambda_result)
# Deploy Step Functions
stepfunction_results = []
for workflow_name in automation_config.get('step_functions', []):
stepfunction_result = self._deploy_step_function(workflow_name, automation_type, account_id)
stepfunction_results.append(stepfunction_result)
# Set up EventBridge rules
eventbridge_results = []
for trigger in automation_config.get('triggers', []):
eventbridge_result = self._setup_eventbridge_rule(trigger, automation_type, account_id)
eventbridge_results.append(eventbridge_result)
deployment_result.update({
'status': 'success',
'lambda_functions': lambda_results,
'step_functions': stepfunction_results,
'eventbridge_rules': eventbridge_results,
'message': f'Successfully deployed {automation_type} automation'
})
return deployment_result
except Exception as e:
logger.error(f"Error deploying {automation_type}: {str(e)}")
return {
'automation_type': automation_type,
'account_id': account_id,
'status': 'error',
'message': str(e)
}
def _deploy_lambda_function(self, function_name: str, automation_type: str, account_id: str) -> Dict[str, Any]:
"""
Deploy Lambda function for automation
"""
try:
# Example Lambda function code for threat detection
if function_name == 'threat-detector-processor':
function_code = '''
import json
import boto3
def lambda_handler(event, context):
"""Process GuardDuty findings and initiate response"""
# Parse GuardDuty finding
finding = event.get('detail', {})
finding_type = finding.get('type', '')
severity = finding.get('severity', 0)
# Determine response based on severity and type
if severity >= 7.0: # High severity
response_actions = [
'isolate_resource',
'create_snapshot',
'notify_team'
]
elif severity >= 4.0: # Medium severity
response_actions = [
'create_snapshot',
'notify_team'
]
else: # Low severity
response_actions = [
'log_finding'
]
# Execute response actions
results = []
for action in response_actions:
try:
if action == 'isolate_resource':
result = isolate_affected_resource(finding)
elif action == 'create_snapshot':
result = create_forensic_snapshot(finding)
elif action == 'notify_team':
result = notify_incident_team(finding)
elif action == 'log_finding':
result = log_security_finding(finding)
results.append({'action': action, 'result': result})
except Exception as e:
results.append({'action': action, 'error': str(e)})
return {
'statusCode': 200,
'body': {
'finding_id': finding.get('id'),
'severity': severity,
'actions_taken': results
}
}
def isolate_affected_resource(finding):
"""Isolate affected EC2 instance or other resource"""
ec2 = boto3.client('ec2')
# Extract resource information from finding
resource = finding.get('resource', {})
instance_id = resource.get('instanceDetails', {}).get('instanceId')
if instance_id:
# Create isolation security group
isolation_sg = ec2.create_security_group(
GroupName=f'isolation-{instance_id}',
Description='Isolation security group for incident response'
)
# Apply isolation security group
ec2.modify_instance_attribute(
InstanceId=instance_id,
Groups=[isolation_sg['GroupId']]
)
return {'isolated_instance': instance_id, 'isolation_sg': isolation_sg['GroupId']}
return {'message': 'No instance to isolate'}
def create_forensic_snapshot(finding):
"""Create forensic snapshot of affected resource"""
ec2 = boto3.client('ec2')
resource = finding.get('resource', {})
instance_id = resource.get('instanceDetails', {}).get('instanceId')
if instance_id:
# Get instance volumes
response = ec2.describe_instances(InstanceIds=[instance_id])
snapshots = []
for reservation in response['Reservations']:
for instance in reservation['Instances']:
for block_device in instance.get('BlockDeviceMappings', []):
volume_id = block_device['Ebs']['VolumeId']
snapshot = ec2.create_snapshot(
VolumeId=volume_id,
Description=f'Forensic snapshot for finding {finding.get("id")}'
)
snapshots.append(snapshot['SnapshotId'])
return {'snapshots_created': snapshots}
return {'message': 'No volumes to snapshot'}
def notify_incident_team(finding):
"""Notify incident response team"""
sns = boto3.client('sns')
message = {
'finding_id': finding.get('id'),
'type': finding.get('type'),
'severity': finding.get('severity'),
'description': finding.get('description'),
'resource': finding.get('resource', {})
}
sns.publish(
TopicArn='arn:aws:sns:us-east-1:123456789012:incident-response-alerts',
Message=json.dumps(message, indent=2),
Subject=f'Security Finding: {finding.get("type")}'
)
return {'notification_sent': True}
def log_security_finding(finding):
"""Log security finding for tracking"""
print(f"Security finding logged: {finding.get('id')} - {finding.get('type')}")
return {'logged': True}
'''
else:
function_code = f'''
import json
def lambda_handler(event, context):
"""Generic automation function for {function_name}"""
print(f"Processing event for {function_name}: {{event}}")
return {{
'statusCode': 200,
'body': {{
'function': '{function_name}',
'automation_type': '{automation_type}',
'processed': True
}}
}}
'''
# Create Lambda function
response = self.lambda_client.create_function(
FunctionName=function_name,
Runtime='python3.9',
Role=f'arn:aws:iam::{account_id}:role/IncidentResponseLambdaRole',
Handler='index.lambda_handler',
Code={'ZipFile': function_code.encode()},
Description=f'Incident response automation function: {function_name}',
Timeout=300,
MemorySize=256,
Tags={
'Purpose': 'IncidentResponse',
'AutomationType': automation_type,
'DeployedBy': 'IncidentResponseToolkit'
}
)
return {
'function_name': function_name,
'function_arn': response['FunctionArn'],
'status': 'deployed'
}
except Exception as e:
return {
'function_name': function_name,
'status': 'error',
'message': str(e)
}
def implement_tagging_strategy(self,
target_accounts: List[str],
resource_types: List[str] = None) -> Dict[str, Any]:
"""
Implement comprehensive tagging strategy across accounts
"""
try:
if resource_types is None:
resource_types = [
'ec2:instance',
'ec2:volume',
'ec2:security-group',
's3:bucket',
'rds:db',
'lambda:function'
]
implementation_results = []
for account_id in target_accounts:
# Create tag policies
tag_policy_result = self._create_tag_policies(account_id)
implementation_results.append(tag_policy_result)
# Deploy tag enforcement automation
enforcement_result = self._deploy_tag_enforcement(account_id, resource_types)
implementation_results.append(enforcement_result)
# Set up tag compliance monitoring
monitoring_result = self._setup_tag_monitoring(account_id, resource_types)
implementation_results.append(monitoring_result)
successful_implementations = sum(1 for result in implementation_results if result['status'] == 'success')
return {
'status': 'success',
'total_implementations': len(implementation_results),
'successful_implementations': successful_implementations,
'implementation_results': implementation_results,
'tagging_strategy': self.tagging_strategy
}
except Exception as e:
logger.error(f"Error implementing tagging strategy: {str(e)}")
return {
'status': 'error',
'message': str(e)
}
def _create_tag_policies(self, account_id: str) -> Dict[str, Any]:
"""
Create tag policies for enforcement
"""
try:
# Create tag policy document
tag_policy = {
"tags": {}
}
# Add mandatory tags to policy
for tag_key, tag_config in self.tagging_strategy['mandatory_tags'].items():
if tag_config['enforcement'] == 'required':
tag_policy["tags"][tag_key] = {
"tag_key": {
"@@assign": tag_key
},
"tag_value": {
"@@assign": tag_config['values']
},
"enforced_for": {
"@@assign": [
"ec2:instance",
"ec2:volume",
"s3:bucket",
"rds:db-instance"
]
}
}
return {
'account_id': account_id,
'status': 'success',
'tag_policy': tag_policy,
'message': 'Tag policies created successfully'
}
except Exception as e:
return {
'account_id': account_id,
'status': 'error',
'message': str(e)
}
def audit_resource_tags(self,
account_id: str,
resource_types: List[str] = None) -> Dict[str, Any]:
"""
Audit resource tags for compliance with tagging strategy
"""
try:
if resource_types is None:
resource_types = ['ec2:instance', 's3:bucket', 'rds:db']
audit_results = {
'account_id': account_id,
'audit_date': datetime.utcnow().isoformat(),
'resource_compliance': {},
'missing_tags': {},
'compliance_summary': {}
}
for resource_type in resource_types:
# Get resources of this type
resources = self._get_resources_by_type(resource_type)
compliant_resources = 0
non_compliant_resources = 0
missing_tags_for_type = []
for resource in resources:
resource_arn = resource['ResourceARN']
resource_tags = {tag['Key']: tag['Value'] for tag in resource.get('Tags', [])}
# Check mandatory tags
missing_mandatory_tags = []
for tag_key, tag_config in self.tagging_strategy['mandatory_tags'].items():
if tag_config['enforcement'] == 'required' and tag_key not in resource_tags:
missing_mandatory_tags.append(tag_key)
if missing_mandatory_tags:
non_compliant_resources += 1
missing_tags_for_type.append({
'resource_arn': resource_arn,
'missing_tags': missing_mandatory_tags
})
else:
compliant_resources += 1
audit_results['resource_compliance'][resource_type] = {
'total_resources': len(resources),
'compliant_resources': compliant_resources,
'non_compliant_resources': non_compliant_resources,
'compliance_percentage': (compliant_resources / len(resources) * 100) if resources else 100
}
audit_results['missing_tags'][resource_type] = missing_tags_for_type
# Calculate overall compliance
total_resources = sum(compliance['total_resources'] for compliance in audit_results['resource_compliance'].values())
total_compliant = sum(compliance['compliant_resources'] for compliance in audit_results['resource_compliance'].values())
audit_results['compliance_summary'] = {
'total_resources': total_resources,
'compliant_resources': total_compliant,
'overall_compliance_percentage': (total_compliant / total_resources * 100) if total_resources else 100
}
return {
'status': 'success',
'audit_results': audit_results
}
except Exception as e:
logger.error(f"Error auditing resource tags: {str(e)}")
return {
'status': 'error',
'message': str(e)
}
def _get_resources_by_type(self, resource_type: str) -> List[Dict[str, Any]]:
"""
Get resources by type using Resource Groups Tagging API
"""
try:
response = self.resourcegroupstaggingapi_client.get_resources(
ResourceTypeFilters=[resource_type],
ResourcesPerPage=100
)
return response.get('ResourceTagMappingList', [])
except Exception as e:
logger.error(f"Error getting resources of type {resource_type}: {str(e)}")
return []
# Example usage and demonstration
def demonstrate_incident_response_toolkit():
"""
Demonstrate incident response toolkit deployment and usage
"""
toolkit = IncidentResponseToolkit()
# Deploy automation tools
automation_result = toolkit.deploy_incident_response_automation(
automation_types=['threat_detection_automation', 'compliance_violation_automation'],
target_accounts=['123456789012', '987654321098']
)
print(f"Automation deployment: {json.dumps(automation_result, indent=2, default=str)}")
# Implement tagging strategy
tagging_result = toolkit.implement_tagging_strategy(
target_accounts=['123456789012', '987654321098'],
resource_types=['ec2:instance', 's3:bucket', 'rds:db']
)
print(f"Tagging strategy implementation: {json.dumps(tagging_result, indent=2, default=str)}")
# Audit resource tags
audit_result = toolkit.audit_resource_tags(
account_id='123456789012',
resource_types=['ec2:instance', 's3:bucket']
)
print(f"Tag audit results: {json.dumps(audit_result, indent=2, default=str)}")
if __name__ == "__main__":
demonstrate_incident_response_toolkit()Resources
Related Well-Architected Best Practices
- SEC04-BP01 Configure service and application logging
- SEC04-BP02 Capture logs, findings, and metrics in standardized locations
Related Documents
- Logging strategies for security incident response
- Incident response cloud capability definitions
- Tagging your AWS resources
- Implement AWS resource tagging strategy using AWS Tag Policies and Service Control Policies (SCPs)
- AWS Security Incident Response Guide
Related Examples
- Threat Detection and Response with Amazon GuardDuty and Amazon Detective
- Security Hub Workshop
- Vulnerability Management with Amazon Inspector
AWS Security Services for Pre-deployment
Detection Services:
- Amazon GuardDuty - Threat detection using machine learning
- AWS Security Hub - Centralized security findings management
- Amazon Detective - Security investigation and analysis
- Amazon Inspector - Vulnerability assessment and management
- Amazon Macie - Data security and privacy service
- AWS Config - Configuration compliance monitoring
Logging and Monitoring:
- AWS CloudTrail - API activity logging
- Amazon CloudWatch - Monitoring and alerting
- AWS CloudTrail Insights - Anomaly detection in API activity
- Amazon CloudWatch Anomaly Detection - Machine learning-based anomaly detection
- VPC Flow Logs - Network traffic logging
Automation and Orchestration:
- AWS Lambda - Serverless automation functions
- AWS Step Functions - Workflow orchestration
- Amazon EventBridge - Event-driven automation
- AWS Systems Manager - Operational automation
- AWS Systems Manager Incident Manager - Incident management automation
Management and Governance:
- AWS Organizations - Multi-account management
- AWS Resource Groups Tagging API - Resource tagging management
- AWS Tag Policies - Tagging governance
Security Tool Deployment Checklist
Pre-deployment Planning:
- Define security tool requirements based on threat model
- Identify target accounts and regions for deployment
- Plan integration points between security services
- Design automation workflows and response procedures
- Establish logging and monitoring requirements
Core Security Services:
- Deploy Amazon GuardDuty in all active regions
- Enable AWS Security Hub with appropriate standards
- Configure Amazon Detective for investigation capabilities
- Set up AWS Config for compliance monitoring
- Implement comprehensive CloudTrail logging
- Deploy Amazon Inspector for vulnerability scanning
Logging Infrastructure:
- Configure CloudTrail for API activity logging
- Enable VPC Flow Logs for network monitoring
- Set up DNS query logging with Route 53 Resolver
- Configure application logging with CloudWatch Logs
- Enable load balancer access logging
- Implement S3 access logging for sensitive buckets
Automation Framework:
- Deploy Lambda functions for automated response
- Create Step Functions workflows for complex procedures
- Configure EventBridge rules for event-driven automation
- Set up SNS topics for notification and alerting
- Implement Systems Manager automation documents
Tagging Strategy:
- Define mandatory and optional tags
- Create tag policies for enforcement
- Deploy tag compliance monitoring
- Implement automated tag application
- Set up tag audit and reporting
Tagging Strategy Best Practices
Mandatory Tags:
- Environment: Production, Staging, Development, Test
- Owner: Team or individual responsible for the resource
- CostCenter: For cost allocation and chargeback
- DataClassification: Public, Internal, Confidential, Restricted
Incident Response Tags:
- IncidentResponseRole: Critical, Important, Supporting, NonCritical
- BackupRequired: Yes, No
- MonitoringLevel: High, Medium, Low
- ComplianceFramework: SOC2, PCI-DSS, HIPAA, GDPR
Automation Tags:
- AutomatedResponse: Enabled, Disabled
- IsolationGroup: WebTier, AppTier, DataTier, Management
- RecoveryPriority: P1, P2, P3, P4
Automation Patterns
Threat Detection Automation:
- GuardDuty finding → EventBridge → Lambda → Automated response
- Security Hub finding → Step Functions → Investigation workflow
- CloudWatch alarm → SNS → Incident notification
Compliance Automation:
- Config rule violation → Lambda → Automatic remediation
- Tag policy violation → EventBridge → Tag enforcement
- Security standard failure → Systems Manager → Remediation runbook
Incident Response Automation:
- Manual incident declaration → Step Functions → Response orchestration
- Automated threat detection → Lambda → Containment actions
- Forensic evidence collection → Systems Manager → Evidence preservation
Cost Optimization for Security Tools
GuardDuty Cost Factors:
- CloudTrail events processed
- VPC Flow Logs analyzed
- DNS logs processed
- S3 data events monitored
Security Hub Cost Factors:
- Security checks performed
- Findings ingested from integrated services
- Compliance scans executed
Detective Cost Factors:
- Data ingested from sources
- Behavior graph storage
- Investigation queries performed
Config Cost Factors:
- Configuration items recorded
- Rule evaluations performed
- S3 storage for configuration history
Monitoring and Alerting Setup
Critical Alerts:
- High-severity GuardDuty findings
- Security Hub compliance failures
- Config rule violations
- Unauthorized API activity
Operational Alerts:
- Service deployment failures
- Log ingestion issues
- Automation execution failures
- Tag compliance violations
Performance Monitoring:
- Lambda function execution metrics
- Step Functions workflow success rates
- EventBridge rule processing times
- API throttling and error rates
Testing and Validation
Functional Testing:
- Verify security service deployment across all regions
- Test automation workflows with simulated events
- Validate logging and monitoring configurations
- Confirm alert delivery and escalation procedures
Security Testing:
- Conduct red team exercises to test detection capabilities
- Simulate security incidents to validate response procedures
- Test forensic evidence collection and preservation
- Verify compliance monitoring and reporting accuracy
Performance Testing:
- Load test automation functions with high event volumes
- Validate scaling behavior under stress conditions
- Test failover and recovery procedures
- Monitor resource utilization and cost impact