SEC06-BP05: Automate compute protection
Automate your compute protection to improve your security posture, reduce human error, and scale your security operations. Use automation to consistently apply security configurations, respond to security events, and maintain compliance across your compute resources. Implement automated security scanning, patch management, and incident response to ensure comprehensive protection.
Implementation guidance
Automating compute protection is essential for maintaining a robust security posture at scale. By implementing automated security controls, monitoring, and response mechanisms, you can ensure consistent protection across all compute resources while reducing the burden on security teams and minimizing the risk of human error.
Key steps for implementing this best practice:
- Implement automated security configuration management:
- Use Infrastructure as Code (IaC) for consistent security configurations
- Automate security baseline deployment and enforcement
- Implement configuration drift detection and remediation
- Use policy-as-code for security compliance validation
- Establish automated security configuration testing
- Deploy automated threat detection and response:
- Implement endpoint detection and response (EDR) solutions
- Configure automated malware detection and quarantine
- Set up behavioral analysis and anomaly detection
- Implement automated incident response workflows
- Configure real-time threat intelligence integration
- Establish automated vulnerability management:
- Implement continuous vulnerability scanning
- Automate patch deployment and testing
- Configure automated security updates
- Set up vulnerability prioritization and remediation workflows
- Implement automated compliance reporting
- Configure automated monitoring and alerting:
- Implement comprehensive security event monitoring
- Set up automated log analysis and correlation
- Configure intelligent alerting and notification systems
- Implement automated security metrics collection
- Establish automated compliance monitoring
- Implement automated backup and recovery:
- Configure automated backup scheduling and execution
- Implement automated backup verification and testing
- Set up automated disaster recovery procedures
- Configure automated failover and failback processes
- Implement automated recovery validation
- Establish automated security orchestration:
- Implement Security Orchestration, Automation, and Response (SOAR)
- Configure automated playbook execution
- Set up automated evidence collection and preservation
- Implement automated communication and notification workflows
- Configure automated reporting and documentation
Implementation examples
Example 1: Automated security configuration with AWS Config
python import boto3 import json from datetime import datetime import os
class AutomatedThreatResponse: “"”Automated threat detection and response system”””
def __init__(self):
self.guardduty = boto3.client('guardduty')
self.ec2 = boto3.client('ec2')
self.sns = boto3.client('sns')
self.ssm = boto3.client('ssm')
self.lambda_client = boto3.client('lambda')
def lambda_handler(self, event, context):
"""Main Lambda handler for GuardDuty findings"""
try:
# Parse GuardDuty finding
detail = event.get('detail', {})
finding_type = detail.get('type', '')
severity = detail.get('severity', 0)
print(f"Processing GuardDuty finding: {finding_type} (Severity: {severity})")
# Extract relevant information
finding_info = self.extract_finding_info(detail)
# Determine response actions based on finding type and severity
response_actions = self.determine_response_actions(finding_type, severity, finding_info)
# Execute response actions
results = []
for action in response_actions:
result = self.execute_response_action(action, finding_info)
results.append(result)
# Send notification
self.send_notification(finding_type, severity, finding_info, results)
return {
'statusCode': 200,
'body': json.dumps({
'finding_type': finding_type,
'severity': severity,
'actions_executed': len(results),
'results': results
})
}
except Exception as e:
print(f"Error processing GuardDuty finding: {str(e)}")
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
def extract_finding_info(self, detail):
"""Extract relevant information from GuardDuty finding"""
finding_info = {
'id': detail.get('id', ''),
'type': detail.get('type', ''),
'severity': detail.get('severity', 0),
'title': detail.get('title', ''),
'description': detail.get('description', ''),
'created_at': detail.get('createdAt', ''),
'updated_at': detail.get('updatedAt', ''),
'region': detail.get('region', ''),
'account_id': detail.get('accountId', ''),
'resource': {},
'service': {}
}
# Extract resource information
if 'resource' in detail:
resource = detail['resource']
finding_info['resource'] = {
'type': resource.get('resourceType', ''),
'instance_id': resource.get('instanceDetails', {}).get('instanceId', ''),
'instance_type': resource.get('instanceDetails', {}).get('instanceType', ''),
'availability_zone': resource.get('instanceDetails', {}).get('availabilityZone', ''),
'private_ip': resource.get('instanceDetails', {}).get('networkInterfaces', [{}])[0].get('privateIpAddress', ''),
'public_ip': resource.get('instanceDetails', {}).get('networkInterfaces', [{}])[0].get('publicIp', '')
}
# Extract service information
if 'service' in detail:
service = detail['service']
finding_info['service'] = {
'action': service.get('action', {}),
'remote_ip': service.get('remoteIpDetails', {}).get('ipAddressV4', ''),
'remote_country': service.get('remoteIpDetails', {}).get('country', {}).get('countryName', ''),
'remote_org': service.get('remoteIpDetails', {}).get('organization', {}).get('org', '')
}
return finding_info
def determine_response_actions(self, finding_type, severity, finding_info):
"""Determine appropriate response actions based on finding characteristics"""
actions = []
# High severity findings require immediate action
if severity >= 7.0:
actions.extend([
'isolate_instance',
'create_forensic_snapshot',
'block_malicious_ip',
'send_high_priority_alert'
])
# Medium severity findings require monitoring and investigation
elif severity >= 4.0:
actions.extend([
'enhance_monitoring',
'collect_evidence',
'send_medium_priority_alert'
])
# Specific actions based on finding type
if 'Backdoor' in finding_type:
actions.extend(['isolate_instance', 'scan_for_malware'])
if 'CryptoCurrency' in finding_type:
actions.extend(['block_mining_traffic', 'check_cpu_usage'])
if 'Trojan' in finding_type:
actions.extend(['quarantine_files', 'full_system_scan'])
if 'Recon' in finding_type:
actions.extend(['block_source_ip', 'enhance_network_monitoring'])
# Remove duplicates and return
return list(set(actions))
def execute_response_action(self, action, finding_info):
"""Execute a specific response action"""
try:
if action == 'isolate_instance':
return self.isolate_instance(finding_info['resource']['instance_id'])
elif action == 'create_forensic_snapshot':
return self.create_forensic_snapshot(finding_info['resource']['instance_id'])
elif action == 'block_malicious_ip':
return self.block_malicious_ip(finding_info['service']['remote_ip'])
elif action == 'enhance_monitoring':
return self.enhance_monitoring(finding_info['resource']['instance_id'])
elif action == 'collect_evidence':
return self.collect_evidence(finding_info)
elif action == 'scan_for_malware':
return self.scan_for_malware(finding_info['resource']['instance_id'])
elif action == 'quarantine_files':
return self.quarantine_suspicious_files(finding_info['resource']['instance_id'])
else:
return {'action': action, 'status': 'not_implemented', 'message': 'Action not implemented'}
except Exception as e:
return {'action': action, 'status': 'error', 'message': str(e)}
def isolate_instance(self, instance_id):
"""Isolate EC2 instance by changing security group"""
if not instance_id:
return {'action': 'isolate_instance', 'status': 'skipped', 'message': 'No instance ID provided'}
try:
# Get current instance details
response = self.ec2.describe_instances(InstanceIds=[instance_id])
if not response['Reservations']:
return {'action': 'isolate_instance', 'status': 'error', 'message': 'Instance not found'}
instance = response['Reservations'][0]['Instances'][0]
# Create or get isolation security group
isolation_sg_id = self.get_or_create_isolation_sg()
# Change instance security group
self.ec2.modify_instance_attribute(
InstanceId=instance_id,
Groups=[isolation_sg_id]
)
# Tag instance as isolated
self.ec2.create_tags(
Resources=[instance_id],
Tags=[
{'Key': 'SecurityStatus', 'Value': 'Isolated'},
{'Key': 'IsolationTime', 'Value': datetime.utcnow().isoformat()},
{'Key': 'IsolationReason', 'Value': 'GuardDuty Finding'}
]
)
return {
'action': 'isolate_instance',
'status': 'success',
'message': f'Instance {instance_id} isolated successfully'
}
except Exception as e:
return {'action': 'isolate_instance', 'status': 'error', 'message': str(e)}
def get_or_create_isolation_sg(self):
"""Get or create isolation security group"""
try:
# Try to find existing isolation security group
response = self.ec2.describe_security_groups(
Filters=[
{'Name': 'group-name', 'Values': ['isolation-sg']},
{'Name': 'description', 'Values': ['Isolation security group for compromised instances']}
]
)
if response['SecurityGroups']:
return response['SecurityGroups'][0]['GroupId']
# Create new isolation security group
vpc_response = self.ec2.describe_vpcs(Filters=[{'Name': 'isDefault', 'Values': ['true']}])
vpc_id = vpc_response['Vpcs'][0]['VpcId']
sg_response = self.ec2.create_security_group(
GroupName='isolation-sg',
Description='Isolation security group for compromised instances',
VpcId=vpc_id
)
sg_id = sg_response['GroupId']
# Tag the security group
self.ec2.create_tags(
Resources=[sg_id],
Tags=[
{'Key': 'Name', 'Value': 'Isolation-SG'},
{'Key': 'Purpose', 'Value': 'Instance-Isolation'}
]
)
return sg_id
except Exception as e:
print(f"Error creating isolation security group: {e}")
raise
def create_forensic_snapshot(self, instance_id):
"""Create forensic snapshot of instance volumes"""
if not instance_id:
return {'action': 'create_forensic_snapshot', 'status': 'skipped', 'message': 'No instance ID provided'}
try:
# Get instance volumes
response = self.ec2.describe_instances(InstanceIds=[instance_id])
instance = response['Reservations'][0]['Instances'][0]
snapshots_created = []
for bdm in instance.get('BlockDeviceMappings', []):
volume_id = bdm['Ebs']['VolumeId']
# Create snapshot
snapshot_response = self.ec2.create_snapshot(
VolumeId=volume_id,
Description=f'Forensic snapshot of {volume_id} from instance {instance_id}'
)
snapshot_id = snapshot_response['SnapshotId']
snapshots_created.append(snapshot_id)
# Tag snapshot
self.ec2.create_tags(
Resources=[snapshot_id],
Tags=[
{'Key': 'Purpose', 'Value': 'Forensic'},
{'Key': 'SourceInstance', 'Value': instance_id},
{'Key': 'SourceVolume', 'Value': volume_id},
{'Key': 'CreatedBy', 'Value': 'AutomatedThreatResponse'},
{'Key': 'CreationTime', 'Value': datetime.utcnow().isoformat()}
]
)
return {
'action': 'create_forensic_snapshot',
'status': 'success',
'message': f'Created {len(snapshots_created)} forensic snapshots',
'snapshots': snapshots_created
}
except Exception as e:
return {'action': 'create_forensic_snapshot', 'status': 'error', 'message': str(e)}
def block_malicious_ip(self, ip_address):
"""Block malicious IP address using security groups"""
if not ip_address:
return {'action': 'block_malicious_ip', 'status': 'skipped', 'message': 'No IP address provided'}
try:
# Get or create blocking security group
blocking_sg_id = self.get_or_create_blocking_sg()
# Add rule to block the IP
self.ec2.authorize_security_group_ingress(
GroupId=blocking_sg_id,
IpPermissions=[
{
'IpProtocol': '-1',
'IpRanges': [
{
'CidrIp': f'{ip_address}/32',
'Description': f'Blocked malicious IP - {datetime.utcnow().isoformat()}'
}
]
}
]
)
return {
'action': 'block_malicious_ip',
'status': 'success',
'message': f'Blocked IP address {ip_address}'
}
except self.ec2.exceptions.ClientError as e:
if 'InvalidPermission.Duplicate' in str(e):
return {
'action': 'block_malicious_ip',
'status': 'already_blocked',
'message': f'IP address {ip_address} already blocked'
}
else:
return {'action': 'block_malicious_ip', 'status': 'error', 'message': str(e)}
def get_or_create_blocking_sg(self):
"""Get or create security group for blocking malicious IPs"""
try:
# Try to find existing blocking security group
response = self.ec2.describe_security_groups(
Filters=[
{'Name': 'group-name', 'Values': ['malicious-ip-blocker']},
{'Name': 'description', 'Values': ['Security group for blocking malicious IP addresses']}
]
)
if response['SecurityGroups']:
return response['SecurityGroups'][0]['GroupId']
# Create new blocking security group
vpc_response = self.ec2.describe_vpcs(Filters=[{'Name': 'isDefault', 'Values': ['true']}])
vpc_id = vpc_response['Vpcs'][0]['VpcId']
sg_response = self.ec2.create_security_group(
GroupName='malicious-ip-blocker',
Description='Security group for blocking malicious IP addresses',
VpcId=vpc_id
)
return sg_response['GroupId']
except Exception as e:
print(f"Error creating blocking security group: {e}")
raise
def send_notification(self, finding_type, severity, finding_info, results):
"""Send notification about the automated response"""
message = f""" Automated Threat Response Executed
Finding Details:
- Type: {finding_type}
- Severity: {severity}
- Instance: {finding_info[‘resource’][‘instance_id’]}
- Remote IP: {finding_info[‘service’][‘remote_ip’]}
- Country: {finding_info[‘service’][‘remote_country’]}
Actions Taken: “””
for result in results:
status_emoji = "✅" if result['status'] == 'success' else "❌" if result['status'] == 'error' else "⚠️"
message += f"{status_emoji} {result['action']}: {result['message']}\n"
message += f"\nTimestamp: {datetime.utcnow().isoformat()}Z"
try:
topic_arn = os.environ.get('SNS_TOPIC_ARN', 'arn:aws:sns:us-west-2:123456789012:ThreatResponseAlerts')
self.sns.publish(
TopicArn=topic_arn,
Subject=f'Automated Threat Response: {finding_type}',
Message=message
)
except Exception as e:
print(f"Error sending notification: {e}")
Lambda deployment package would include this handler
def lambda_handler(event, context): “"”Lambda entry point””” threat_response = AutomatedThreatResponse() return threat_response.lambda_handler(event, context)
Example 3: Automated patch management with Systems Manager
AWS services to consider
Benefits of automating compute protection
- Consistent security posture: Automated controls ensure uniform security configurations across all compute resources
- Rapid threat response: Automated systems can respond to threats in seconds rather than minutes or hours
- Reduced human error: Automation eliminates mistakes that can occur during manual security operations
- Scalable protection: Automated security scales with your infrastructure growth without proportional increases in security staff
- 24/7 monitoring: Automated systems provide continuous protection without requiring human oversight
- Improved compliance: Automated compliance monitoring and remediation helps maintain regulatory adherence
- Cost efficiency: Reduces operational costs through automated security operations and faster incident resolution
Related resources