SEC06-BP05: Automate compute protection

Automate your compute protection to improve your security posture, reduce human error, and scale your security operations. Use automation to consistently apply security configurations, respond to security events, and maintain compliance across your compute resources. Implement automated security scanning, patch management, and incident response to ensure comprehensive protection.

Implementation guidance

Automating compute protection is essential for maintaining a robust security posture at scale. By implementing automated security controls, monitoring, and response mechanisms, you can ensure consistent protection across all compute resources while reducing the burden on security teams and minimizing the risk of human error.

Key steps for implementing this best practice:

  1. Implement automated security configuration management:
    • Use Infrastructure as Code (IaC) for consistent security configurations
    • Automate security baseline deployment and enforcement
    • Implement configuration drift detection and remediation
    • Use policy-as-code for security compliance validation
    • Establish automated security configuration testing
  2. Deploy automated threat detection and response:
    • Implement endpoint detection and response (EDR) solutions
    • Configure automated malware detection and quarantine
    • Set up behavioral analysis and anomaly detection
    • Implement automated incident response workflows
    • Configure real-time threat intelligence integration
  3. Establish automated vulnerability management:
    • Implement continuous vulnerability scanning
    • Automate patch deployment and testing
    • Configure automated security updates
    • Set up vulnerability prioritization and remediation workflows
    • Implement automated compliance reporting
  4. Configure automated monitoring and alerting:
    • Implement comprehensive security event monitoring
    • Set up automated log analysis and correlation
    • Configure intelligent alerting and notification systems
    • Implement automated security metrics collection
    • Establish automated compliance monitoring
  5. Implement automated backup and recovery:
    • Configure automated backup scheduling and execution
    • Implement automated backup verification and testing
    • Set up automated disaster recovery procedures
    • Configure automated failover and failback processes
    • Implement automated recovery validation
  6. Establish automated security orchestration:
    • Implement Security Orchestration, Automation, and Response (SOAR)
    • Configure automated playbook execution
    • Set up automated evidence collection and preservation
    • Implement automated communication and notification workflows
    • Configure automated reporting and documentation

Implementation examples

Example 1: Automated security configuration with AWS Config

python import boto3 import json from datetime import datetime import os

class AutomatedThreatResponse: “"”Automated threat detection and response system”””

def __init__(self):
    self.guardduty = boto3.client('guardduty')
    self.ec2 = boto3.client('ec2')
    self.sns = boto3.client('sns')
    self.ssm = boto3.client('ssm')
    self.lambda_client = boto3.client('lambda')
    
def lambda_handler(self, event, context):
    """Main Lambda handler for GuardDuty findings"""
    
    try:
        # Parse GuardDuty finding
        detail = event.get('detail', {})
        finding_type = detail.get('type', '')
        severity = detail.get('severity', 0)
        
        print(f"Processing GuardDuty finding: {finding_type} (Severity: {severity})")
        
        # Extract relevant information
        finding_info = self.extract_finding_info(detail)
        
        # Determine response actions based on finding type and severity
        response_actions = self.determine_response_actions(finding_type, severity, finding_info)
        
        # Execute response actions
        results = []
        for action in response_actions:
            result = self.execute_response_action(action, finding_info)
            results.append(result)
        
        # Send notification
        self.send_notification(finding_type, severity, finding_info, results)
        
        return {
            'statusCode': 200,
            'body': json.dumps({
                'finding_type': finding_type,
                'severity': severity,
                'actions_executed': len(results),
                'results': results
            })
        }
        
    except Exception as e:
        print(f"Error processing GuardDuty finding: {str(e)}")
        return {
            'statusCode': 500,
            'body': json.dumps({'error': str(e)})
        }

def extract_finding_info(self, detail):
    """Extract relevant information from GuardDuty finding"""
    
    finding_info = {
        'id': detail.get('id', ''),
        'type': detail.get('type', ''),
        'severity': detail.get('severity', 0),
        'title': detail.get('title', ''),
        'description': detail.get('description', ''),
        'created_at': detail.get('createdAt', ''),
        'updated_at': detail.get('updatedAt', ''),
        'region': detail.get('region', ''),
        'account_id': detail.get('accountId', ''),
        'resource': {},
        'service': {}
    }
    
    # Extract resource information
    if 'resource' in detail:
        resource = detail['resource']
        finding_info['resource'] = {
            'type': resource.get('resourceType', ''),
            'instance_id': resource.get('instanceDetails', {}).get('instanceId', ''),
            'instance_type': resource.get('instanceDetails', {}).get('instanceType', ''),
            'availability_zone': resource.get('instanceDetails', {}).get('availabilityZone', ''),
            'private_ip': resource.get('instanceDetails', {}).get('networkInterfaces', [{}])[0].get('privateIpAddress', ''),
            'public_ip': resource.get('instanceDetails', {}).get('networkInterfaces', [{}])[0].get('publicIp', '')
        }
    
    # Extract service information
    if 'service' in detail:
        service = detail['service']
        finding_info['service'] = {
            'action': service.get('action', {}),
            'remote_ip': service.get('remoteIpDetails', {}).get('ipAddressV4', ''),
            'remote_country': service.get('remoteIpDetails', {}).get('country', {}).get('countryName', ''),
            'remote_org': service.get('remoteIpDetails', {}).get('organization', {}).get('org', '')
        }
    
    return finding_info

def determine_response_actions(self, finding_type, severity, finding_info):
    """Determine appropriate response actions based on finding characteristics"""
    
    actions = []
    
    # High severity findings require immediate action
    if severity >= 7.0:
        actions.extend([
            'isolate_instance',
            'create_forensic_snapshot',
            'block_malicious_ip',
            'send_high_priority_alert'
        ])
    
    # Medium severity findings require monitoring and investigation
    elif severity >= 4.0:
        actions.extend([
            'enhance_monitoring',
            'collect_evidence',
            'send_medium_priority_alert'
        ])
    
    # Specific actions based on finding type
    if 'Backdoor' in finding_type:
        actions.extend(['isolate_instance', 'scan_for_malware'])
    
    if 'CryptoCurrency' in finding_type:
        actions.extend(['block_mining_traffic', 'check_cpu_usage'])
    
    if 'Trojan' in finding_type:
        actions.extend(['quarantine_files', 'full_system_scan'])
    
    if 'Recon' in finding_type:
        actions.extend(['block_source_ip', 'enhance_network_monitoring'])
    
    # Remove duplicates and return
    return list(set(actions))

def execute_response_action(self, action, finding_info):
    """Execute a specific response action"""
    
    try:
        if action == 'isolate_instance':
            return self.isolate_instance(finding_info['resource']['instance_id'])
        
        elif action == 'create_forensic_snapshot':
            return self.create_forensic_snapshot(finding_info['resource']['instance_id'])
        
        elif action == 'block_malicious_ip':
            return self.block_malicious_ip(finding_info['service']['remote_ip'])
        
        elif action == 'enhance_monitoring':
            return self.enhance_monitoring(finding_info['resource']['instance_id'])
        
        elif action == 'collect_evidence':
            return self.collect_evidence(finding_info)
        
        elif action == 'scan_for_malware':
            return self.scan_for_malware(finding_info['resource']['instance_id'])
        
        elif action == 'quarantine_files':
            return self.quarantine_suspicious_files(finding_info['resource']['instance_id'])
        
        else:
            return {'action': action, 'status': 'not_implemented', 'message': 'Action not implemented'}
    
    except Exception as e:
        return {'action': action, 'status': 'error', 'message': str(e)}

def isolate_instance(self, instance_id):
    """Isolate EC2 instance by changing security group"""
    
    if not instance_id:
        return {'action': 'isolate_instance', 'status': 'skipped', 'message': 'No instance ID provided'}
    
    try:
        # Get current instance details
        response = self.ec2.describe_instances(InstanceIds=[instance_id])
        
        if not response['Reservations']:
            return {'action': 'isolate_instance', 'status': 'error', 'message': 'Instance not found'}
        
        instance = response['Reservations'][0]['Instances'][0]
        
        # Create or get isolation security group
        isolation_sg_id = self.get_or_create_isolation_sg()
        
        # Change instance security group
        self.ec2.modify_instance_attribute(
            InstanceId=instance_id,
            Groups=[isolation_sg_id]
        )
        
        # Tag instance as isolated
        self.ec2.create_tags(
            Resources=[instance_id],
            Tags=[
                {'Key': 'SecurityStatus', 'Value': 'Isolated'},
                {'Key': 'IsolationTime', 'Value': datetime.utcnow().isoformat()},
                {'Key': 'IsolationReason', 'Value': 'GuardDuty Finding'}
            ]
        )
        
        return {
            'action': 'isolate_instance',
            'status': 'success',
            'message': f'Instance {instance_id} isolated successfully'
        }
    
    except Exception as e:
        return {'action': 'isolate_instance', 'status': 'error', 'message': str(e)}

def get_or_create_isolation_sg(self):
    """Get or create isolation security group"""
    
    try:
        # Try to find existing isolation security group
        response = self.ec2.describe_security_groups(
            Filters=[
                {'Name': 'group-name', 'Values': ['isolation-sg']},
                {'Name': 'description', 'Values': ['Isolation security group for compromised instances']}
            ]
        )
        
        if response['SecurityGroups']:
            return response['SecurityGroups'][0]['GroupId']
        
        # Create new isolation security group
        vpc_response = self.ec2.describe_vpcs(Filters=[{'Name': 'isDefault', 'Values': ['true']}])
        vpc_id = vpc_response['Vpcs'][0]['VpcId']
        
        sg_response = self.ec2.create_security_group(
            GroupName='isolation-sg',
            Description='Isolation security group for compromised instances',
            VpcId=vpc_id
        )
        
        sg_id = sg_response['GroupId']
        
        # Tag the security group
        self.ec2.create_tags(
            Resources=[sg_id],
            Tags=[
                {'Key': 'Name', 'Value': 'Isolation-SG'},
                {'Key': 'Purpose', 'Value': 'Instance-Isolation'}
            ]
        )
        
        return sg_id
    
    except Exception as e:
        print(f"Error creating isolation security group: {e}")
        raise

def create_forensic_snapshot(self, instance_id):
    """Create forensic snapshot of instance volumes"""
    
    if not instance_id:
        return {'action': 'create_forensic_snapshot', 'status': 'skipped', 'message': 'No instance ID provided'}
    
    try:
        # Get instance volumes
        response = self.ec2.describe_instances(InstanceIds=[instance_id])
        instance = response['Reservations'][0]['Instances'][0]
        
        snapshots_created = []
        
        for bdm in instance.get('BlockDeviceMappings', []):
            volume_id = bdm['Ebs']['VolumeId']
            
            # Create snapshot
            snapshot_response = self.ec2.create_snapshot(
                VolumeId=volume_id,
                Description=f'Forensic snapshot of {volume_id} from instance {instance_id}'
            )
            
            snapshot_id = snapshot_response['SnapshotId']
            snapshots_created.append(snapshot_id)
            
            # Tag snapshot
            self.ec2.create_tags(
                Resources=[snapshot_id],
                Tags=[
                    {'Key': 'Purpose', 'Value': 'Forensic'},
                    {'Key': 'SourceInstance', 'Value': instance_id},
                    {'Key': 'SourceVolume', 'Value': volume_id},
                    {'Key': 'CreatedBy', 'Value': 'AutomatedThreatResponse'},
                    {'Key': 'CreationTime', 'Value': datetime.utcnow().isoformat()}
                ]
            )
        
        return {
            'action': 'create_forensic_snapshot',
            'status': 'success',
            'message': f'Created {len(snapshots_created)} forensic snapshots',
            'snapshots': snapshots_created
        }
    
    except Exception as e:
        return {'action': 'create_forensic_snapshot', 'status': 'error', 'message': str(e)}

def block_malicious_ip(self, ip_address):
    """Block malicious IP address using security groups"""
    
    if not ip_address:
        return {'action': 'block_malicious_ip', 'status': 'skipped', 'message': 'No IP address provided'}
    
    try:
        # Get or create blocking security group
        blocking_sg_id = self.get_or_create_blocking_sg()
        
        # Add rule to block the IP
        self.ec2.authorize_security_group_ingress(
            GroupId=blocking_sg_id,
            IpPermissions=[
                {
                    'IpProtocol': '-1',
                    'IpRanges': [
                        {
                            'CidrIp': f'{ip_address}/32',
                            'Description': f'Blocked malicious IP - {datetime.utcnow().isoformat()}'
                        }
                    ]
                }
            ]
        )
        
        return {
            'action': 'block_malicious_ip',
            'status': 'success',
            'message': f'Blocked IP address {ip_address}'
        }
    
    except self.ec2.exceptions.ClientError as e:
        if 'InvalidPermission.Duplicate' in str(e):
            return {
                'action': 'block_malicious_ip',
                'status': 'already_blocked',
                'message': f'IP address {ip_address} already blocked'
            }
        else:
            return {'action': 'block_malicious_ip', 'status': 'error', 'message': str(e)}

def get_or_create_blocking_sg(self):
    """Get or create security group for blocking malicious IPs"""
    
    try:
        # Try to find existing blocking security group
        response = self.ec2.describe_security_groups(
            Filters=[
                {'Name': 'group-name', 'Values': ['malicious-ip-blocker']},
                {'Name': 'description', 'Values': ['Security group for blocking malicious IP addresses']}
            ]
        )
        
        if response['SecurityGroups']:
            return response['SecurityGroups'][0]['GroupId']
        
        # Create new blocking security group
        vpc_response = self.ec2.describe_vpcs(Filters=[{'Name': 'isDefault', 'Values': ['true']}])
        vpc_id = vpc_response['Vpcs'][0]['VpcId']
        
        sg_response = self.ec2.create_security_group(
            GroupName='malicious-ip-blocker',
            Description='Security group for blocking malicious IP addresses',
            VpcId=vpc_id
        )
        
        return sg_response['GroupId']
    
    except Exception as e:
        print(f"Error creating blocking security group: {e}")
        raise

def send_notification(self, finding_type, severity, finding_info, results):
    """Send notification about the automated response"""
    
    message = f""" Automated Threat Response Executed

Finding Details:

  • Type: {finding_type}
  • Severity: {severity}
  • Instance: {finding_info[‘resource’][‘instance_id’]}
  • Remote IP: {finding_info[‘service’][‘remote_ip’]}
  • Country: {finding_info[‘service’][‘remote_country’]}

Actions Taken: “””

    for result in results:
        status_emoji = "✅" if result['status'] == 'success' else "❌" if result['status'] == 'error' else "⚠️"
        message += f"{status_emoji} {result['action']}: {result['message']}\n"
    
    message += f"\nTimestamp: {datetime.utcnow().isoformat()}Z"
    
    try:
        topic_arn = os.environ.get('SNS_TOPIC_ARN', 'arn:aws:sns:us-west-2:123456789012:ThreatResponseAlerts')
        
        self.sns.publish(
            TopicArn=topic_arn,
            Subject=f'Automated Threat Response: {finding_type}',
            Message=message
        )
    
    except Exception as e:
        print(f"Error sending notification: {e}")

Lambda deployment package would include this handler

def lambda_handler(event, context): “"”Lambda entry point””” threat_response = AutomatedThreatResponse() return threat_response.lambda_handler(event, context)

Example 3: Automated patch management with Systems Manager

AWS services to consider

AWS Config

Enables you to assess, audit, and evaluate the configurations of your AWS resources. Provides automated remediation capabilities for configuration compliance.

Amazon GuardDuty

Provides intelligent threat detection for your AWS accounts and workloads. Integrates with automated response systems for immediate threat mitigation.

AWS Systems Manager

Gives you visibility and control of your infrastructure on AWS. Provides automation capabilities for patch management, configuration management, and incident response.

AWS Lambda

Lets you run code without provisioning or managing servers. Essential for implementing automated response functions and security orchestration workflows.

Amazon EventBridge

A serverless event bus that makes it easy to connect applications together. Enables automated response to security events from multiple AWS services.

Amazon CloudWatch

Monitors your AWS resources and applications in real time. Provides metrics, alarms, and automated actions for security monitoring and response.

Benefits of automating compute protection

  • Consistent security posture: Automated controls ensure uniform security configurations across all compute resources
  • Rapid threat response: Automated systems can respond to threats in seconds rather than minutes or hours
  • Reduced human error: Automation eliminates mistakes that can occur during manual security operations
  • Scalable protection: Automated security scales with your infrastructure growth without proportional increases in security staff
  • 24/7 monitoring: Automated systems provide continuous protection without requiring human oversight
  • Improved compliance: Automated compliance monitoring and remediation helps maintain regulatory adherence
  • Cost efficiency: Reduces operational costs through automated security operations and faster incident resolution