Skip to content
SEC10

SEC10-BP06: Pre-deploy tools

One-Click Remediation

Deploy CloudFormation stacks to implement this best practice with a single click.

Incident Response Tooling
Pre-deploy core incident response tooling, logging destinations, and automation hooks for investigations
Launch Stack

Stacks deploy to your AWS account. Review parameters before creating. Standard AWS charges apply.

SEC10-BP06: Pre-deploy tools

Overview

Verify that security personnel have the right tools pre-deployed to reduce the time for investigation through to recovery.

Implementation Guidance

To automate security response and operations functions, you can use a comprehensive set of APIs and tools from AWS. You can fully automate identity management, network security, data protection, and monitoring capabilities and deliver them using popular software development methods that you already have in place.

When you build security automation, your system can monitor, review, and initiate a response, rather than having people monitor your security position and manually react to events. If your incident response teams continue to respond to alerts in the same way, they risk alert fatigue. Over time, the team can become desensitized to alerts and can either make mistakes handling ordinary situations or miss unusual alerts.

Automation helps avoid alert fatigue by using functions that process the repetitive and ordinary alerts, leaving humans to handle the sensitive and unique incidents. Integrating anomaly detection systems, such as Amazon GuardDuty, AWS CloudTrail Insights, and Amazon CloudWatch Anomaly Detection, can reduce the burden of common threshold-based alerts.

You can improve manual processes by programmatically automating steps in the process. After you define the remediation pattern to an event, you can decompose that pattern into actionable logic, and write the code to perform that logic. Responders can then run that code to remediate the issue. Over time, you can automate more and more steps, and ultimately automatically handle whole classes of common incidents.

During a security investigation, you need to be able to review relevant logs to record and understand the full scope and timeline of the incident. Logs are also required for alert generation, indicating certain actions of interest have happened. It is critical to select, enable, store, and set up querying and retrieval mechanisms, and set up alerting. Additionally, an effective way to provide tools to search log data is Amazon Detective.

AWS offers over 200 cloud services and thousands of features. We recommend that you review the services that can support and simplify your incident response strategy. In addition to logging, you should develop and implement a tagging strategy. Tagging can help provide context around the purpose of an AWS resource. Tagging can also be used for automation.

Implementation Steps

Select and set up logs for analysis and alerting

See the following documentation on configuring logging for incident response:

Enable security services to support detection and response

AWS provides native detective, preventative, and responsive capabilities, and other services can be used to architect custom security solutions. For a list of the most relevant services for security incident response, see Cloud capability definitions.

Develop and implement a tagging strategy

Obtaining contextual information on the business use case and relevant internal stakeholders surrounding an AWS resource can be difficult. One way to do this is in the form of tags, which assign metadata to your AWS resources and consist of a user-defined key and value. You can create tags to categorize resources by purpose, owner, environment, type of data processed, and other criteria of your choice.

Having a consistent tagging strategy can speed up response times and minimize time spent on organizational context by allowing you to quickly identify and discern contextual information about an AWS resource. Tags can also serve as a mechanism to initiate response automations.

For more detail on what to tag, see Tagging your AWS resources. You’ll want to first define the tags you want to implement across your organization. After that, you’ll implement and enforce this tagging strategy. For more detail on implementation and enforcement, see Implement AWS resource tagging strategy using AWS Tag Policies and Service Control Policies (SCPs).

Implementation Examples

Example 1: Comprehensive Security Tools Deployment Framework

View code
# security_tools_deployment.py
import boto3
import json
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class SecurityTool:
    tool_name: str
    service_name: str
    deployment_status: str
    configuration: Dict[str, Any]
    regions: List[str]
    dependencies: List[str]
    automation_enabled: bool
    monitoring_enabled: bool
    cost_estimate: str
    deployment_date: str

@dataclass
class LoggingConfiguration:
    log_type: str
    service: str
    destination: str
    retention_days: int
    encryption_enabled: bool
    analysis_tools: List[str]
    alerting_enabled: bool
    cost_per_month: str

class SecurityToolsDeploymentManager:
    """
    Comprehensive security tools deployment and management system
    """
    
    def __init__(self, region: str = 'us-east-1'):
        self.region = region
        self.organizations_client = boto3.client('organizations', region_name=region)
        self.guardduty_client = boto3.client('guardduty', region_name=region)
        self.securityhub_client = boto3.client('securityhub', region_name=region)
        self.detective_client = boto3.client('detective', region_name=region)
        self.config_client = boto3.client('config', region_name=region)
        self.cloudtrail_client = boto3.client('cloudtrail', region_name=region)
        self.logs_client = boto3.client('logs', region_name=region)
        self.s3_client = boto3.client('s3', region_name=region)
        self.lambda_client = boto3.client('lambda', region_name=region)
        self.dynamodb = boto3.resource('dynamodb', region_name=region)
        
        # DynamoDB tables for tool management
        self.tools_table = self.dynamodb.Table('security-tools-inventory')
        self.logging_table = self.dynamodb.Table('logging-configurations')
        
        # Define security tool catalog
        self.security_tools_catalog = self._define_security_tools_catalog()
        self.logging_configurations = self._define_logging_configurations()
    
    def _define_security_tools_catalog(self) -> Dict[str, Dict[str, Any]]:
        """
        Define comprehensive catalog of security tools for incident response
        """
        return {
            'guardduty': {
                'description': 'Threat detection service using machine learning',
                'service_name': 'guardduty',
                'deployment_priority': 1,
                'regions': ['us-east-1', 'us-west-2', 'eu-west-1'],
                'dependencies': [],
                'configuration': {
                    'finding_publishing_frequency': 'FIFTEEN_MINUTES',
                    'enable_s3_protection': True,
                    'enable_kubernetes_protection': True,
                    'enable_malware_protection': True,
                    'enable_rds_protection': True,
                    'enable_lambda_protection': True
                },
                'automation_capabilities': [
                    'Automatic threat detection',
                    'Finding severity scoring',
                    'Threat intelligence integration',
                    'Anomaly detection'
                ],
                'integration_points': ['Security Hub', 'Detective', 'EventBridge'],
                'cost_factors': ['Data processed', 'VPC Flow Logs', 'DNS logs', 'S3 data events']
            },
            'security_hub': {
                'description': 'Centralized security findings management',
                'service_name': 'securityhub',
                'deployment_priority': 2,
                'regions': ['us-east-1', 'us-west-2', 'eu-west-1'],
                'dependencies': ['guardduty', 'config'],
                'configuration': {
                    'enable_default_standards': True,
                    'standards': [
                        'AWS Foundational Security Standard',
                        'CIS AWS Foundations Benchmark',
                        'PCI DSS'
                    ],
                    'auto_enable_controls': True,
                    'finding_aggregation': True
                },
                'automation_capabilities': [
                    'Centralized finding management',
                    'Compliance scoring',
                    'Custom insights',
                    'Automated remediation triggers'
                ],
                'integration_points': ['GuardDuty', 'Config', 'Inspector', 'Macie'],
                'cost_factors': ['Security checks', 'Finding ingestion', 'Compliance scans']
            },
            'detective': {
                'description': 'Security investigation and analysis service',
                'service_name': 'detective',
                'deployment_priority': 3,
                'regions': ['us-east-1', 'us-west-2', 'eu-west-1'],
                'dependencies': ['guardduty', 'cloudtrail'],
                'configuration': {
                    'data_sources': ['VPC Flow Logs', 'DNS logs', 'CloudTrail'],
                    'retention_period': 365,
                    'enable_organization_graph': True,
                    'member_accounts': []
                },
                'automation_capabilities': [
                    'Behavior graph analysis',
                    'Investigation visualizations',
                    'Root cause analysis',
                    'Timeline reconstruction'
                ],
                'integration_points': ['GuardDuty', 'Security Hub', 'CloudTrail'],
                'cost_factors': ['Data ingested', 'Graph storage', 'Analysis queries']
            },
            'config': {
                'description': 'Configuration compliance and change tracking',
                'service_name': 'config',
                'deployment_priority': 2,
                'regions': ['us-east-1', 'us-west-2', 'eu-west-1'],
                'dependencies': [],
                'configuration': {
                    'delivery_channel': 's3://config-bucket-{account_id}',
                    'recording_group': {
                        'all_supported': True,
                        'include_global_resource_types': True,
                        'resource_types': []
                    },
                    'rules': [
                        'encrypted-volumes',
                        'root-access-key-check',
                        's3-bucket-public-access-prohibited',
                        'iam-password-policy'
                    ]
                },
                'automation_capabilities': [
                    'Configuration drift detection',
                    'Compliance monitoring',
                    'Change tracking',
                    'Automated remediation'
                ],
                'integration_points': ['Security Hub', 'Systems Manager', 'Lambda'],
                'cost_factors': ['Configuration items', 'Rule evaluations', 'S3 storage']
            },
            'cloudtrail': {
                'description': 'API activity logging and monitoring',
                'service_name': 'cloudtrail',
                'deployment_priority': 1,
                'regions': ['us-east-1', 'us-west-2', 'eu-west-1'],
                'dependencies': [],
                'configuration': {
                    'multi_region_trail': True,
                    'include_global_services': True,
                    'enable_log_file_validation': True,
                    's3_bucket': 'cloudtrail-logs-{account_id}',
                    'kms_key_id': 'arn:aws:kms:region:account:key/key-id',
                    'event_selectors': [
                        {
                            'read_write_type': 'All',
                            'include_management_events': True,
                            'data_resources': [
                                {
                                    'type': 'AWS::S3::Object',
                                    'values': ['arn:aws:s3:::sensitive-bucket/*']
                                }
                            ]
                        }
                    ],
                    'insights_selectors': [
                        {'insight_type': 'ApiCallRateInsight'}
                    ]
                },
                'automation_capabilities': [
                    'Real-time API monitoring',
                    'Anomaly detection',
                    'Event correlation',
                    'Automated alerting'
                ],
                'integration_points': ['CloudWatch', 'Detective', 'Athena', 'OpenSearch'],
                'cost_factors': ['Management events', 'Data events', 'Insights', 'S3 storage']
            },
            'inspector': {
                'description': 'Vulnerability assessment service',
                'service_name': 'inspector2',
                'deployment_priority': 3,
                'regions': ['us-east-1', 'us-west-2', 'eu-west-1'],
                'dependencies': [],
                'configuration': {
                    'scan_types': ['ECR', 'EC2', 'Lambda'],
                    'auto_enable': True,
                    'finding_aggregation': True,
                    'suppression_rules': []
                },
                'automation_capabilities': [
                    'Continuous vulnerability scanning',
                    'Risk-based prioritization',
                    'Integration with CI/CD',
                    'Automated reporting'
                ],
                'integration_points': ['Security Hub', 'Systems Manager', 'ECR'],
                'cost_factors': ['Images scanned', 'EC2 instances', 'Lambda functions']
            },
            'macie': {
                'description': 'Data security and privacy service',
                'service_name': 'macie2',
                'deployment_priority': 4,
                'regions': ['us-east-1', 'us-west-2', 'eu-west-1'],
                'dependencies': [],
                'configuration': {
                    'finding_publishing_frequency': 'FIFTEEN_MINUTES',
                    'classification_jobs': [],
                    'custom_data_identifiers': [],
                    'allow_lists': []
                },
                'automation_capabilities': [
                    'Sensitive data discovery',
                    'Data classification',
                    'Privacy risk assessment',
                    'Automated data protection'
                ],
                'integration_points': ['Security Hub', 'EventBridge', 'S3'],
                'cost_factors': ['S3 buckets monitored', 'Objects analyzed', 'Classification jobs']
            },
            'systems_manager': {
                'description': 'Operational management and automation',
                'service_name': 'ssm',
                'deployment_priority': 2,
                'regions': ['us-east-1', 'us-west-2', 'eu-west-1'],
                'dependencies': [],
                'configuration': {
                    'patch_manager': {
                        'enable_default_patch_baselines': True,
                        'maintenance_windows': []
                    },
                    'session_manager': {
                        'enable_logging': True,
                        'log_destination': 's3://session-logs-{account_id}'
                    },
                    'automation_documents': [
                        'AWSIncidents-CriticalIncidentRunbookTemplate',
                        'AWSSupport-TroubleshootConnectivityToRDS'
                    ]
                },
                'automation_capabilities': [
                    'Automated patching',
                    'Secure remote access',
                    'Runbook automation',
                    'Inventory management'
                ],
                'integration_points': ['Config', 'CloudWatch', 'EventBridge'],
                'cost_factors': ['API requests', 'Automation executions', 'Parameter Store usage']
            }
        }
    
    def _define_logging_configurations(self) -> Dict[str, LoggingConfiguration]:
        """
        Define comprehensive logging configurations for incident response
        """
        return {
            'cloudtrail_management': LoggingConfiguration(
                log_type='API Activity',
                service='CloudTrail',
                destination='S3 + CloudWatch Logs',
                retention_days=2555,  # 7 years
                encryption_enabled=True,
                analysis_tools=['Athena', 'Detective', 'CloudWatch Insights'],
                alerting_enabled=True,
                cost_per_month='$50-200'
            ),
            'vpc_flow_logs': LoggingConfiguration(
                log_type='Network Traffic',
                service='VPC',
                destination='S3 + CloudWatch Logs',
                retention_days=90,
                encryption_enabled=True,
                analysis_tools=['Athena', 'Detective', 'VPC Flow Logs Insights'],
                alerting_enabled=True,
                cost_per_month='$20-100'
            ),
            'dns_logs': LoggingConfiguration(
                log_type='DNS Queries',
                service='Route 53 Resolver',
                destination='S3 + CloudWatch Logs',
                retention_days=90,
                encryption_enabled=True,
                analysis_tools=['Athena', 'Detective'],
                alerting_enabled=True,
                cost_per_month='$10-50'
            ),
            'application_logs': LoggingConfiguration(
                log_type='Application Events',
                service='CloudWatch Logs',
                destination='CloudWatch Logs',
                retention_days=365,
                encryption_enabled=True,
                analysis_tools=['CloudWatch Insights', 'OpenSearch'],
                alerting_enabled=True,
                cost_per_month='$30-150'
            ),
            'load_balancer_logs': LoggingConfiguration(
                log_type='Load Balancer Access',
                service='ELB',
                destination='S3',
                retention_days=90,
                encryption_enabled=True,
                analysis_tools=['Athena', 'OpenSearch'],
                alerting_enabled=True,
                cost_per_month='$5-25'
            ),
            's3_access_logs': LoggingConfiguration(
                log_type='S3 Access',
                service='S3',
                destination='S3',
                retention_days=365,
                encryption_enabled=True,
                analysis_tools=['Athena', 'Macie'],
                alerting_enabled=True,
                cost_per_month='$10-50'
            )
        }
    
    def deploy_security_tools_suite(self, 
                                   target_accounts: List[str],
                                   regions: List[str],
                                   tool_selection: List[str] = None) -> Dict[str, Any]:
        """
        Deploy comprehensive security tools suite across multiple accounts and regions
        """
        try:
            if tool_selection is None:
                tool_selection = list(self.security_tools_catalog.keys())
            
            deployment_results = []
            
            # Sort tools by deployment priority
            sorted_tools = sorted(
                [(name, config) for name, config in self.security_tools_catalog.items() if name in tool_selection],
                key=lambda x: x[1]['deployment_priority']
            )
            
            for tool_name, tool_config in sorted_tools:
                for account_id in target_accounts:
                    for region in regions:
                        if region in tool_config['regions']:
                            deployment_result = self._deploy_security_tool(
                                tool_name, tool_config, account_id, region
                            )
                            deployment_results.append(deployment_result)
            
            # Set up cross-service integrations
            integration_results = self._setup_tool_integrations(target_accounts, regions, tool_selection)
            
            # Configure automation and alerting
            automation_results = self._setup_security_automation(target_accounts, regions, tool_selection)
            
            successful_deployments = sum(1 for result in deployment_results if result['status'] == 'success')
            
            return {
                'status': 'success',
                'total_deployments': len(deployment_results),
                'successful_deployments': successful_deployments,
                'deployment_results': deployment_results,
                'integration_results': integration_results,
                'automation_results': automation_results,
                'summary': {
                    'accounts': len(target_accounts),
                    'regions': len(regions),
                    'tools_deployed': len(tool_selection)
                }
            }
            
        except Exception as e:
            logger.error(f"Error deploying security tools suite: {str(e)}")
            return {
                'status': 'error',
                'message': str(e)
            }
    
    def _deploy_security_tool(self, 
                             tool_name: str,
                             tool_config: Dict[str, Any],
                             account_id: str,
                             region: str) -> Dict[str, Any]:
        """
        Deploy individual security tool in specific account and region
        """
        try:
            deployment_result = {
                'tool_name': tool_name,
                'account_id': account_id,
                'region': region,
                'deployment_time': datetime.utcnow().isoformat()
            }
            
            if tool_name == 'guardduty':
                result = self._deploy_guardduty(tool_config, account_id, region)
            elif tool_name == 'security_hub':
                result = self._deploy_security_hub(tool_config, account_id, region)
            elif tool_name == 'detective':
                result = self._deploy_detective(tool_config, account_id, region)
            elif tool_name == 'config':
                result = self._deploy_config(tool_config, account_id, region)
            elif tool_name == 'cloudtrail':
                result = self._deploy_cloudtrail(tool_config, account_id, region)
            elif tool_name == 'inspector':
                result = self._deploy_inspector(tool_config, account_id, region)
            elif tool_name == 'macie':
                result = self._deploy_macie(tool_config, account_id, region)
            elif tool_name == 'systems_manager':
                result = self._deploy_systems_manager(tool_config, account_id, region)
            else:
                result = {'status': 'error', 'message': f'Unknown tool: {tool_name}'}
            
            deployment_result.update(result)
            
            # Store deployment information
            security_tool = SecurityTool(
                tool_name=tool_name,
                service_name=tool_config['service_name'],
                deployment_status=result['status'],
                configuration=tool_config['configuration'],
                regions=[region],
                dependencies=tool_config['dependencies'],
                automation_enabled=True,
                monitoring_enabled=True,
                cost_estimate=tool_config.get('cost_factors', 'Variable'),
                deployment_date=datetime.utcnow().isoformat()
            )
            
            self.tools_table.put_item(
                Item={
                    **asdict(security_tool),
                    'deployment_key': f"{tool_name}_{account_id}_{region}"
                }
            )
            
            return deployment_result
            
        except Exception as e:
            logger.error(f"Error deploying {tool_name}: {str(e)}")
            return {
                'tool_name': tool_name,
                'account_id': account_id,
                'region': region,
                'status': 'error',
                'message': str(e)
            }
    
    def _deploy_guardduty(self, config: Dict[str, Any], account_id: str, region: str) -> Dict[str, Any]:
        """Deploy Amazon GuardDuty"""
        try:
            # Create GuardDuty detector
            detector_response = self.guardduty_client.create_detector(
                Enable=True,
                FindingPublishingFrequency=config['configuration']['finding_publishing_frequency'],
                DataSources={
                    'S3Logs': {'Enable': config['configuration']['enable_s3_protection']},
                    'KubernetesAuditLogs': {'Enable': config['configuration']['enable_kubernetes_protection']},
                    'MalwareProtection': {'ScanEc2InstanceWithFindings': {'EbsVolumes': True}}
                },
                Tags={
                    'Purpose': 'IncidentResponse',
                    'DeployedBy': 'SecurityToolsManager',
                    'Account': account_id,
                    'Region': region
                }
            )
            
            detector_id = detector_response['DetectorId']
            
            return {
                'status': 'success',
                'detector_id': detector_id,
                'features_enabled': list(config['configuration'].keys()),
                'message': f'GuardDuty deployed successfully in {region}'
            }
            
        except Exception as e:
            return {
                'status': 'error',
                'message': f'Failed to deploy GuardDuty: {str(e)}'
            }
    
    def _deploy_security_hub(self, config: Dict[str, Any], account_id: str, region: str) -> Dict[str, Any]:
        """Deploy AWS Security Hub"""
        try:
            # Enable Security Hub
            hub_response = self.securityhub_client.enable_security_hub(
                Tags={
                    'Purpose': 'IncidentResponse',
                    'DeployedBy': 'SecurityToolsManager',
                    'Account': account_id,
                    'Region': region
                },
                EnableDefaultStandards=config['configuration']['enable_default_standards']
            )
            
            # Enable additional standards
            standards_enabled = []
            for standard in config['configuration']['standards']:
                try:
                    # This would enable specific standards - simplified for example
                    standards_enabled.append(standard)
                except Exception as e:
                    logger.warning(f"Could not enable standard {standard}: {str(e)}")
            
            return {
                'status': 'success',
                'hub_arn': hub_response['HubArn'],
                'standards_enabled': standards_enabled,
                'message': f'Security Hub deployed successfully in {region}'
            }
            
        except Exception as e:
            return {
                'status': 'error',
                'message': f'Failed to deploy Security Hub: {str(e)}'
            }
    
    def _deploy_detective(self, config: Dict[str, Any], account_id: str, region: str) -> Dict[str, Any]:
        """Deploy Amazon Detective"""
        try:
            # Create Detective graph
            graph_response = self.detective_client.create_graph(
                Tags={
                    'Purpose': 'IncidentResponse',
                    'DeployedBy': 'SecurityToolsManager',
                    'Account': account_id,
                    'Region': region
                }
            )
            
            graph_arn = graph_response['GraphArn']
            
            return {
                'status': 'success',
                'graph_arn': graph_arn,
                'data_sources': config['configuration']['data_sources'],
                'message': f'Detective deployed successfully in {region}'
            }
            
        except Exception as e:
            return {
                'status': 'error',
                'message': f'Failed to deploy Detective: {str(e)}'
            }
    
    def setup_comprehensive_logging(self, 
                                   target_accounts: List[str],
                                   regions: List[str]) -> Dict[str, Any]:
        """
        Set up comprehensive logging infrastructure for incident response
        """
        try:
            logging_results = []
            
            for account_id in target_accounts:
                for region in regions:
                    for log_type, log_config in self.logging_configurations.items():
                        setup_result = self._setup_logging_configuration(
                            log_type, log_config, account_id, region
                        )
                        logging_results.append(setup_result)
            
            successful_setups = sum(1 for result in logging_results if result['status'] == 'success')
            
            return {
                'status': 'success',
                'total_configurations': len(logging_results),
                'successful_configurations': successful_setups,
                'logging_results': logging_results,
                'summary': {
                    'accounts': len(target_accounts),
                    'regions': len(regions),
                    'log_types': len(self.logging_configurations)
                }
            }
            
        except Exception as e:
            logger.error(f"Error setting up comprehensive logging: {str(e)}")
            return {
                'status': 'error',
                'message': str(e)
            }
    
    def _setup_logging_configuration(self, 
                                   log_type: str,
                                   log_config: LoggingConfiguration,
                                   account_id: str,
                                   region: str) -> Dict[str, Any]:
        """
        Set up individual logging configuration
        """
        try:
            setup_result = {
                'log_type': log_type,
                'service': log_config.service,
                'account_id': account_id,
                'region': region,
                'setup_time': datetime.utcnow().isoformat()
            }
            
            if log_type == 'cloudtrail_management':
                result = self._setup_cloudtrail_logging(log_config, account_id, region)
            elif log_type == 'vpc_flow_logs':
                result = self._setup_vpc_flow_logs(log_config, account_id, region)
            elif log_type == 'dns_logs':
                result = self._setup_dns_logging(log_config, account_id, region)
            elif log_type == 'application_logs':
                result = self._setup_application_logging(log_config, account_id, region)
            elif log_type == 'load_balancer_logs':
                result = self._setup_lb_logging(log_config, account_id, region)
            elif log_type == 's3_access_logs':
                result = self._setup_s3_logging(log_config, account_id, region)
            else:
                result = {'status': 'error', 'message': f'Unknown log type: {log_type}'}
            
            setup_result.update(result)
            
            # Store logging configuration
            self.logging_table.put_item(
                Item={
                    **asdict(log_config),
                    'logging_key': f"{log_type}_{account_id}_{region}",
                    'setup_date': datetime.utcnow().isoformat()
                }
            )
            
            return setup_result
            
        except Exception as e:
            logger.error(f"Error setting up {log_type} logging: {str(e)}")
            return {
                'log_type': log_type,
                'account_id': account_id,
                'region': region,
                'status': 'error',
                'message': str(e)
            }

# Example usage
if __name__ == "__main__":
    # Initialize security tools deployment manager
    tools_manager = SecurityToolsDeploymentManager()
    
    # Deploy security tools suite
    deployment_result = tools_manager.deploy_security_tools_suite(
        target_accounts=['123456789012', '987654321098'],
        regions=['us-east-1', 'us-west-2'],
        tool_selection=['guardduty', 'security_hub', 'detective', 'config', 'cloudtrail']
    )
    print(f"Security tools deployment: {json.dumps(deployment_result, indent=2, default=str)}")
    
    # Set up comprehensive logging
    logging_result = tools_manager.setup_comprehensive_logging(
        target_accounts=['123456789012', '987654321098'],
        regions=['us-east-1', 'us-west-2']
    )
    print(f"Logging setup: {json.dumps(logging_result, indent=2, default=str)}")

Example 2: Automated Incident Response Toolkit with Tagging Strategy

View code
# incident_response_toolkit.py
import boto3
import json
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
import logging

logger = logging.getLogger(__name__)

class IncidentResponseToolkit:
    """
    Automated incident response toolkit with comprehensive tagging strategy
    """
    
    def __init__(self, region: str = 'us-east-1'):
        self.region = region
        self.lambda_client = boto3.client('lambda', region_name=region)
        self.stepfunctions_client = boto3.client('stepfunctions', region_name=region)
        self.eventbridge_client = boto3.client('events', region_name=region)
        self.sns_client = boto3.client('sns', region_name=region)
        self.ssm_client = boto3.client('ssm', region_name=region)
        self.organizations_client = boto3.client('organizations', region_name=region)
        self.resourcegroupstaggingapi_client = boto3.client('resourcegroupstaggingapi', region_name=region)
        
        # Define tagging strategy
        self.tagging_strategy = self._define_tagging_strategy()
        self.automation_tools = self._define_automation_tools()
    
    def _define_tagging_strategy(self) -> Dict[str, Dict[str, Any]]:
        """
        Define comprehensive tagging strategy for incident response
        """
        return {
            'mandatory_tags': {
                'Environment': {
                    'description': 'Deployment environment',
                    'values': ['Production', 'Staging', 'Development', 'Test'],
                    'enforcement': 'required',
                    'automation_trigger': True
                },
                'Owner': {
                    'description': 'Resource owner or team',
                    'values': ['SecurityTeam', 'DevOpsTeam', 'ApplicationTeam'],
                    'enforcement': 'required',
                    'automation_trigger': True
                },
                'CostCenter': {
                    'description': 'Cost allocation identifier',
                    'values': ['CC-001', 'CC-002', 'CC-003'],
                    'enforcement': 'required',
                    'automation_trigger': False
                },
                'DataClassification': {
                    'description': 'Data sensitivity level',
                    'values': ['Public', 'Internal', 'Confidential', 'Restricted'],
                    'enforcement': 'required',
                    'automation_trigger': True
                }
            },
            'incident_response_tags': {
                'IncidentResponseRole': {
                    'description': 'Role in incident response',
                    'values': ['Critical', 'Important', 'Supporting', 'NonCritical'],
                    'enforcement': 'recommended',
                    'automation_trigger': True
                },
                'BackupRequired': {
                    'description': 'Backup requirement for incident recovery',
                    'values': ['Yes', 'No'],
                    'enforcement': 'recommended',
                    'automation_trigger': True
                },
                'MonitoringLevel': {
                    'description': 'Level of monitoring required',
                    'values': ['High', 'Medium', 'Low'],
                    'enforcement': 'recommended',
                    'automation_trigger': True
                },
                'ComplianceFramework': {
                    'description': 'Applicable compliance frameworks',
                    'values': ['SOC2', 'PCI-DSS', 'HIPAA', 'GDPR', 'None'],
                    'enforcement': 'optional',
                    'automation_trigger': True
                }
            },
            'automation_tags': {
                'AutomatedResponse': {
                    'description': 'Automated response enabled',
                    'values': ['Enabled', 'Disabled'],
                    'enforcement': 'optional',
                    'automation_trigger': True
                },
                'IsolationGroup': {
                    'description': 'Isolation group for containment',
                    'values': ['WebTier', 'AppTier', 'DataTier', 'Management'],
                    'enforcement': 'optional',
                    'automation_trigger': True
                },
                'RecoveryPriority': {
                    'description': 'Recovery priority order',
                    'values': ['P1', 'P2', 'P3', 'P4'],
                    'enforcement': 'optional',
                    'automation_trigger': True
                }
            }
        }
    
    def _define_automation_tools(self) -> Dict[str, Dict[str, Any]]:
        """
        Define automation tools for incident response
        """
        return {
            'threat_detection_automation': {
                'description': 'Automated threat detection and initial response',
                'triggers': ['GuardDuty Finding', 'Security Hub Finding', 'CloudWatch Alarm'],
                'actions': [
                    'Isolate affected resources',
                    'Create forensic snapshots',
                    'Notify incident response team',
                    'Initiate investigation workflow'
                ],
                'lambda_functions': [
                    'threat-detector-processor',
                    'resource-isolator',
                    'forensic-snapshot-creator',
                    'incident-notifier'
                ]
            },
            'compliance_violation_automation': {
                'description': 'Automated compliance violation response',
                'triggers': ['Config Rule Violation', 'Security Hub Compliance Finding'],
                'actions': [
                    'Assess violation severity',
                    'Apply automatic remediation',
                    'Generate compliance report',
                    'Notify compliance team'
                ],
                'lambda_functions': [
                    'compliance-assessor',
                    'auto-remediator',
                    'compliance-reporter',
                    'compliance-notifier'
                ]
            },
            'resource_tagging_automation': {
                'description': 'Automated resource tagging enforcement',
                'triggers': ['Resource Creation', 'Tag Policy Violation'],
                'actions': [
                    'Validate required tags',
                    'Apply default tags',
                    'Generate tagging report',
                    'Notify resource owners'
                ],
                'lambda_functions': [
                    'tag-validator',
                    'tag-applicator',
                    'tagging-reporter',
                    'tag-notifier'
                ]
            },
            'incident_orchestration': {
                'description': 'Orchestrated incident response workflow',
                'triggers': ['Manual Incident Declaration', 'Automated Incident Detection'],
                'actions': [
                    'Assess incident severity',
                    'Assemble response team',
                    'Execute response playbook',
                    'Track incident progress'
                ],
                'step_functions': [
                    'incident-response-orchestrator',
                    'team-assembly-workflow',
                    'playbook-executor',
                    'progress-tracker'
                ]
            }
        }
    
    def deploy_incident_response_automation(self, 
                                          automation_types: List[str],
                                          target_accounts: List[str]) -> Dict[str, Any]:
        """
        Deploy incident response automation tools
        """
        try:
            deployment_results = []
            
            for automation_type in automation_types:
                if automation_type not in self.automation_tools:
                    continue
                
                automation_config = self.automation_tools[automation_type]
                
                for account_id in target_accounts:
                    deployment_result = self._deploy_automation_tool(
                        automation_type, automation_config, account_id
                    )
                    deployment_results.append(deployment_result)
            
            successful_deployments = sum(1 for result in deployment_results if result['status'] == 'success')
            
            return {
                'status': 'success',
                'total_deployments': len(deployment_results),
                'successful_deployments': successful_deployments,
                'deployment_results': deployment_results
            }
            
        except Exception as e:
            logger.error(f"Error deploying incident response automation: {str(e)}")
            return {
                'status': 'error',
                'message': str(e)
            }
    
    def _deploy_automation_tool(self, 
                               automation_type: str,
                               automation_config: Dict[str, Any],
                               account_id: str) -> Dict[str, Any]:
        """
        Deploy individual automation tool
        """
        try:
            deployment_result = {
                'automation_type': automation_type,
                'account_id': account_id,
                'deployment_time': datetime.utcnow().isoformat()
            }
            
            # Deploy Lambda functions
            lambda_results = []
            for function_name in automation_config.get('lambda_functions', []):
                lambda_result = self._deploy_lambda_function(function_name, automation_type, account_id)
                lambda_results.append(lambda_result)
            
            # Deploy Step Functions
            stepfunction_results = []
            for workflow_name in automation_config.get('step_functions', []):
                stepfunction_result = self._deploy_step_function(workflow_name, automation_type, account_id)
                stepfunction_results.append(stepfunction_result)
            
            # Set up EventBridge rules
            eventbridge_results = []
            for trigger in automation_config.get('triggers', []):
                eventbridge_result = self._setup_eventbridge_rule(trigger, automation_type, account_id)
                eventbridge_results.append(eventbridge_result)
            
            deployment_result.update({
                'status': 'success',
                'lambda_functions': lambda_results,
                'step_functions': stepfunction_results,
                'eventbridge_rules': eventbridge_results,
                'message': f'Successfully deployed {automation_type} automation'
            })
            
            return deployment_result
            
        except Exception as e:
            logger.error(f"Error deploying {automation_type}: {str(e)}")
            return {
                'automation_type': automation_type,
                'account_id': account_id,
                'status': 'error',
                'message': str(e)
            }
    
    def _deploy_lambda_function(self, function_name: str, automation_type: str, account_id: str) -> Dict[str, Any]:
        """
        Deploy Lambda function for automation
        """
        try:
            # Example Lambda function code for threat detection
            if function_name == 'threat-detector-processor':
                function_code = '''
import json
import boto3

def lambda_handler(event, context):
    """Process GuardDuty findings and initiate response"""
    
    # Parse GuardDuty finding
    finding = event.get('detail', {})
    finding_type = finding.get('type', '')
    severity = finding.get('severity', 0)
    
    # Determine response based on severity and type
    if severity >= 7.0:  # High severity
        response_actions = [
            'isolate_resource',
            'create_snapshot',
            'notify_team'
        ]
    elif severity >= 4.0:  # Medium severity
        response_actions = [
            'create_snapshot',
            'notify_team'
        ]
    else:  # Low severity
        response_actions = [
            'log_finding'
        ]
    
    # Execute response actions
    results = []
    for action in response_actions:
        try:
            if action == 'isolate_resource':
                result = isolate_affected_resource(finding)
            elif action == 'create_snapshot':
                result = create_forensic_snapshot(finding)
            elif action == 'notify_team':
                result = notify_incident_team(finding)
            elif action == 'log_finding':
                result = log_security_finding(finding)
            
            results.append({'action': action, 'result': result})
        except Exception as e:
            results.append({'action': action, 'error': str(e)})
    
    return {
        'statusCode': 200,
        'body': {
            'finding_id': finding.get('id'),
            'severity': severity,
            'actions_taken': results
        }
    }

def isolate_affected_resource(finding):
    """Isolate affected EC2 instance or other resource"""
    ec2 = boto3.client('ec2')
    
    # Extract resource information from finding
    resource = finding.get('resource', {})
    instance_id = resource.get('instanceDetails', {}).get('instanceId')
    
    if instance_id:
        # Create isolation security group
        isolation_sg = ec2.create_security_group(
            GroupName=f'isolation-{instance_id}',
            Description='Isolation security group for incident response'
        )
        
        # Apply isolation security group
        ec2.modify_instance_attribute(
            InstanceId=instance_id,
            Groups=[isolation_sg['GroupId']]
        )
        
        return {'isolated_instance': instance_id, 'isolation_sg': isolation_sg['GroupId']}
    
    return {'message': 'No instance to isolate'}

def create_forensic_snapshot(finding):
    """Create forensic snapshot of affected resource"""
    ec2 = boto3.client('ec2')
    
    resource = finding.get('resource', {})
    instance_id = resource.get('instanceDetails', {}).get('instanceId')
    
    if instance_id:
        # Get instance volumes
        response = ec2.describe_instances(InstanceIds=[instance_id])
        
        snapshots = []
        for reservation in response['Reservations']:
            for instance in reservation['Instances']:
                for block_device in instance.get('BlockDeviceMappings', []):
                    volume_id = block_device['Ebs']['VolumeId']
                    
                    snapshot = ec2.create_snapshot(
                        VolumeId=volume_id,
                        Description=f'Forensic snapshot for finding {finding.get("id")}'
                    )
                    snapshots.append(snapshot['SnapshotId'])
        
        return {'snapshots_created': snapshots}
    
    return {'message': 'No volumes to snapshot'}

def notify_incident_team(finding):
    """Notify incident response team"""
    sns = boto3.client('sns')
    
    message = {
        'finding_id': finding.get('id'),
        'type': finding.get('type'),
        'severity': finding.get('severity'),
        'description': finding.get('description'),
        'resource': finding.get('resource', {})
    }
    
    sns.publish(
        TopicArn='arn:aws:sns:us-east-1:123456789012:incident-response-alerts',
        Message=json.dumps(message, indent=2),
        Subject=f'Security Finding: {finding.get("type")}'
    )
    
    return {'notification_sent': True}

def log_security_finding(finding):
    """Log security finding for tracking"""
    print(f"Security finding logged: {finding.get('id')} - {finding.get('type')}")
    return {'logged': True}
                '''
            else:
                function_code = f'''
import json

def lambda_handler(event, context):
    """Generic automation function for {function_name}"""
    print(f"Processing event for {function_name}: {{event}}")
    
    return {{
        'statusCode': 200,
        'body': {{
            'function': '{function_name}',
            'automation_type': '{automation_type}',
            'processed': True
        }}
    }}
                '''
            
            # Create Lambda function
            response = self.lambda_client.create_function(
                FunctionName=function_name,
                Runtime='python3.9',
                Role=f'arn:aws:iam::{account_id}:role/IncidentResponseLambdaRole',
                Handler='index.lambda_handler',
                Code={'ZipFile': function_code.encode()},
                Description=f'Incident response automation function: {function_name}',
                Timeout=300,
                MemorySize=256,
                Tags={
                    'Purpose': 'IncidentResponse',
                    'AutomationType': automation_type,
                    'DeployedBy': 'IncidentResponseToolkit'
                }
            )
            
            return {
                'function_name': function_name,
                'function_arn': response['FunctionArn'],
                'status': 'deployed'
            }
            
        except Exception as e:
            return {
                'function_name': function_name,
                'status': 'error',
                'message': str(e)
            }
    
    def implement_tagging_strategy(self, 
                                 target_accounts: List[str],
                                 resource_types: List[str] = None) -> Dict[str, Any]:
        """
        Implement comprehensive tagging strategy across accounts
        """
        try:
            if resource_types is None:
                resource_types = [
                    'ec2:instance',
                    'ec2:volume',
                    'ec2:security-group',
                    's3:bucket',
                    'rds:db',
                    'lambda:function'
                ]
            
            implementation_results = []
            
            for account_id in target_accounts:
                # Create tag policies
                tag_policy_result = self._create_tag_policies(account_id)
                implementation_results.append(tag_policy_result)
                
                # Deploy tag enforcement automation
                enforcement_result = self._deploy_tag_enforcement(account_id, resource_types)
                implementation_results.append(enforcement_result)
                
                # Set up tag compliance monitoring
                monitoring_result = self._setup_tag_monitoring(account_id, resource_types)
                implementation_results.append(monitoring_result)
            
            successful_implementations = sum(1 for result in implementation_results if result['status'] == 'success')
            
            return {
                'status': 'success',
                'total_implementations': len(implementation_results),
                'successful_implementations': successful_implementations,
                'implementation_results': implementation_results,
                'tagging_strategy': self.tagging_strategy
            }
            
        except Exception as e:
            logger.error(f"Error implementing tagging strategy: {str(e)}")
            return {
                'status': 'error',
                'message': str(e)
            }
    
    def _create_tag_policies(self, account_id: str) -> Dict[str, Any]:
        """
        Create tag policies for enforcement
        """
        try:
            # Create tag policy document
            tag_policy = {
                "tags": {}
            }
            
            # Add mandatory tags to policy
            for tag_key, tag_config in self.tagging_strategy['mandatory_tags'].items():
                if tag_config['enforcement'] == 'required':
                    tag_policy["tags"][tag_key] = {
                        "tag_key": {
                            "@@assign": tag_key
                        },
                        "tag_value": {
                            "@@assign": tag_config['values']
                        },
                        "enforced_for": {
                            "@@assign": [
                                "ec2:instance",
                                "ec2:volume",
                                "s3:bucket",
                                "rds:db-instance"
                            ]
                        }
                    }
            
            return {
                'account_id': account_id,
                'status': 'success',
                'tag_policy': tag_policy,
                'message': 'Tag policies created successfully'
            }
            
        except Exception as e:
            return {
                'account_id': account_id,
                'status': 'error',
                'message': str(e)
            }
    
    def audit_resource_tags(self, 
                           account_id: str,
                           resource_types: List[str] = None) -> Dict[str, Any]:
        """
        Audit resource tags for compliance with tagging strategy
        """
        try:
            if resource_types is None:
                resource_types = ['ec2:instance', 's3:bucket', 'rds:db']
            
            audit_results = {
                'account_id': account_id,
                'audit_date': datetime.utcnow().isoformat(),
                'resource_compliance': {},
                'missing_tags': {},
                'compliance_summary': {}
            }
            
            for resource_type in resource_types:
                # Get resources of this type
                resources = self._get_resources_by_type(resource_type)
                
                compliant_resources = 0
                non_compliant_resources = 0
                missing_tags_for_type = []
                
                for resource in resources:
                    resource_arn = resource['ResourceARN']
                    resource_tags = {tag['Key']: tag['Value'] for tag in resource.get('Tags', [])}
                    
                    # Check mandatory tags
                    missing_mandatory_tags = []
                    for tag_key, tag_config in self.tagging_strategy['mandatory_tags'].items():
                        if tag_config['enforcement'] == 'required' and tag_key not in resource_tags:
                            missing_mandatory_tags.append(tag_key)
                    
                    if missing_mandatory_tags:
                        non_compliant_resources += 1
                        missing_tags_for_type.append({
                            'resource_arn': resource_arn,
                            'missing_tags': missing_mandatory_tags
                        })
                    else:
                        compliant_resources += 1
                
                audit_results['resource_compliance'][resource_type] = {
                    'total_resources': len(resources),
                    'compliant_resources': compliant_resources,
                    'non_compliant_resources': non_compliant_resources,
                    'compliance_percentage': (compliant_resources / len(resources) * 100) if resources else 100
                }
                
                audit_results['missing_tags'][resource_type] = missing_tags_for_type
            
            # Calculate overall compliance
            total_resources = sum(compliance['total_resources'] for compliance in audit_results['resource_compliance'].values())
            total_compliant = sum(compliance['compliant_resources'] for compliance in audit_results['resource_compliance'].values())
            
            audit_results['compliance_summary'] = {
                'total_resources': total_resources,
                'compliant_resources': total_compliant,
                'overall_compliance_percentage': (total_compliant / total_resources * 100) if total_resources else 100
            }
            
            return {
                'status': 'success',
                'audit_results': audit_results
            }
            
        except Exception as e:
            logger.error(f"Error auditing resource tags: {str(e)}")
            return {
                'status': 'error',
                'message': str(e)
            }
    
    def _get_resources_by_type(self, resource_type: str) -> List[Dict[str, Any]]:
        """
        Get resources by type using Resource Groups Tagging API
        """
        try:
            response = self.resourcegroupstaggingapi_client.get_resources(
                ResourceTypeFilters=[resource_type],
                ResourcesPerPage=100
            )
            return response.get('ResourceTagMappingList', [])
        except Exception as e:
            logger.error(f"Error getting resources of type {resource_type}: {str(e)}")
            return []

# Example usage and demonstration
def demonstrate_incident_response_toolkit():
    """
    Demonstrate incident response toolkit deployment and usage
    """
    toolkit = IncidentResponseToolkit()
    
    # Deploy automation tools
    automation_result = toolkit.deploy_incident_response_automation(
        automation_types=['threat_detection_automation', 'compliance_violation_automation'],
        target_accounts=['123456789012', '987654321098']
    )
    print(f"Automation deployment: {json.dumps(automation_result, indent=2, default=str)}")
    
    # Implement tagging strategy
    tagging_result = toolkit.implement_tagging_strategy(
        target_accounts=['123456789012', '987654321098'],
        resource_types=['ec2:instance', 's3:bucket', 'rds:db']
    )
    print(f"Tagging strategy implementation: {json.dumps(tagging_result, indent=2, default=str)}")
    
    # Audit resource tags
    audit_result = toolkit.audit_resource_tags(
        account_id='123456789012',
        resource_types=['ec2:instance', 's3:bucket']
    )
    print(f"Tag audit results: {json.dumps(audit_result, indent=2, default=str)}")

if __name__ == "__main__":
    demonstrate_incident_response_toolkit()

Resources

AWS Security Services for Pre-deployment

Detection Services:

Logging and Monitoring:

Automation and Orchestration:

Management and Governance:

Security Tool Deployment Checklist

Pre-deployment Planning:

  • Define security tool requirements based on threat model
  • Identify target accounts and regions for deployment
  • Plan integration points between security services
  • Design automation workflows and response procedures
  • Establish logging and monitoring requirements

Core Security Services:

  • Deploy Amazon GuardDuty in all active regions
  • Enable AWS Security Hub with appropriate standards
  • Configure Amazon Detective for investigation capabilities
  • Set up AWS Config for compliance monitoring
  • Implement comprehensive CloudTrail logging
  • Deploy Amazon Inspector for vulnerability scanning

Logging Infrastructure:

  • Configure CloudTrail for API activity logging
  • Enable VPC Flow Logs for network monitoring
  • Set up DNS query logging with Route 53 Resolver
  • Configure application logging with CloudWatch Logs
  • Enable load balancer access logging
  • Implement S3 access logging for sensitive buckets

Automation Framework:

  • Deploy Lambda functions for automated response
  • Create Step Functions workflows for complex procedures
  • Configure EventBridge rules for event-driven automation
  • Set up SNS topics for notification and alerting
  • Implement Systems Manager automation documents

Tagging Strategy:

  • Define mandatory and optional tags
  • Create tag policies for enforcement
  • Deploy tag compliance monitoring
  • Implement automated tag application
  • Set up tag audit and reporting

Tagging Strategy Best Practices

Mandatory Tags:

  • Environment: Production, Staging, Development, Test
  • Owner: Team or individual responsible for the resource
  • CostCenter: For cost allocation and chargeback
  • DataClassification: Public, Internal, Confidential, Restricted

Incident Response Tags:

  • IncidentResponseRole: Critical, Important, Supporting, NonCritical
  • BackupRequired: Yes, No
  • MonitoringLevel: High, Medium, Low
  • ComplianceFramework: SOC2, PCI-DSS, HIPAA, GDPR

Automation Tags:

  • AutomatedResponse: Enabled, Disabled
  • IsolationGroup: WebTier, AppTier, DataTier, Management
  • RecoveryPriority: P1, P2, P3, P4

Automation Patterns

Threat Detection Automation:

  • GuardDuty finding → EventBridge → Lambda → Automated response
  • Security Hub finding → Step Functions → Investigation workflow
  • CloudWatch alarm → SNS → Incident notification

Compliance Automation:

  • Config rule violation → Lambda → Automatic remediation
  • Tag policy violation → EventBridge → Tag enforcement
  • Security standard failure → Systems Manager → Remediation runbook

Incident Response Automation:

  • Manual incident declaration → Step Functions → Response orchestration
  • Automated threat detection → Lambda → Containment actions
  • Forensic evidence collection → Systems Manager → Evidence preservation

Cost Optimization for Security Tools

GuardDuty Cost Factors:

  • CloudTrail events processed
  • VPC Flow Logs analyzed
  • DNS logs processed
  • S3 data events monitored

Security Hub Cost Factors:

  • Security checks performed
  • Findings ingested from integrated services
  • Compliance scans executed

Detective Cost Factors:

  • Data ingested from sources
  • Behavior graph storage
  • Investigation queries performed

Config Cost Factors:

  • Configuration items recorded
  • Rule evaluations performed
  • S3 storage for configuration history

Monitoring and Alerting Setup

Critical Alerts:

  • High-severity GuardDuty findings
  • Security Hub compliance failures
  • Config rule violations
  • Unauthorized API activity

Operational Alerts:

  • Service deployment failures
  • Log ingestion issues
  • Automation execution failures
  • Tag compliance violations

Performance Monitoring:

  • Lambda function execution metrics
  • Step Functions workflow success rates
  • EventBridge rule processing times
  • API throttling and error rates

Testing and Validation

Functional Testing:

  • Verify security service deployment across all regions
  • Test automation workflows with simulated events
  • Validate logging and monitoring configurations
  • Confirm alert delivery and escalation procedures

Security Testing:

  • Conduct red team exercises to test detection capabilities
  • Simulate security incidents to validate response procedures
  • Test forensic evidence collection and preservation
  • Verify compliance monitoring and reporting accuracy

Performance Testing:

  • Load test automation functions with high event volumes
  • Validate scaling behavior under stress conditions
  • Test failover and recovery procedures
  • Monitor resource utilization and cost impact