SEC08-BP03: Automate data at rest protection
One-Click Remediation
Deploy CloudFormation stacks to implement this best practice with a single click.
Stacks deploy to your AWS account. Review parameters before creating. Standard AWS charges apply.
SEC08-BP03: Automate data at rest protection
Overview
Automating data at rest protection ensures consistent, scalable, and reliable implementation of security controls without manual intervention. This best practice focuses on creating automated workflows that continuously monitor, enforce, and remediate data protection policies across your entire AWS environment.
Automation reduces human error, ensures consistent policy application, enables rapid response to security events, and scales protection mechanisms as your infrastructure grows. It encompasses automated encryption, access control enforcement, compliance monitoring, and incident response.
Implementation Guidance
1. Implement Automated Encryption Workflows
Deploy automated systems for encryption management:
- Resource Creation Automation: Automatically encrypt new resources upon creation
- Encryption Remediation: Automatically fix unencrypted resources
- Key Rotation Automation: Automated key rotation and management
- Cross-Service Integration: Seamless encryption across all AWS services
2. Automate Access Control Enforcement
Implement automated access control mechanisms:
- Policy-Based Access: Automated policy application based on data classification
- Attribute-Based Controls: Dynamic access control based on resource attributes
- Temporary Access Management: Automated provisioning and revocation of access
- Compliance Validation: Continuous validation of access control policies
3. Deploy Continuous Monitoring and Alerting
Establish automated monitoring systems:
- Real-Time Compliance Monitoring: Continuous assessment of protection status
- Anomaly Detection: Automated detection of unusual access patterns
- Security Event Response: Automated response to security incidents
- Compliance Reporting: Automated generation of compliance reports
4. Implement Automated Backup and Recovery
Deploy automated backup and recovery systems:
- Scheduled Backups: Automated backup creation and management
- Cross-Region Replication: Automated data replication for disaster recovery
- Recovery Testing: Automated testing of backup and recovery procedures
- Retention Management: Automated enforcement of retention policies
5. Enable Automated Threat Response
Implement automated security incident response:
- Threat Detection: Automated identification of security threats
- Incident Isolation: Automatic isolation of compromised resources
- Forensic Data Collection: Automated collection of security evidence
- Recovery Orchestration: Automated recovery from security incidents
6. Establish Automated Compliance Management
Deploy automated compliance monitoring and enforcement:
- Policy Compliance: Continuous monitoring of policy adherence
- Regulatory Reporting: Automated generation of compliance reports
- Audit Trail Management: Automated collection and retention of audit logs
- Remediation Workflows: Automated fixing of compliance violations
Implementation Examples
Example 1: Comprehensive Data Protection Automation System
View code
# data_protection_automation.py
import boto3
import json
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
import logging
import threading
import time
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class ProtectionPolicy:
resource_type: str
encryption_required: bool
backup_required: bool
monitoring_level: str
retention_days: int
compliance_frameworks: List[str]
@dataclass
class AutomationTask:
task_id: str
task_type: str
resource_arn: str
action: str
status: str
created_at: str
completed_at: Optional[str]
error_message: Optional[str]
class DataProtectionAutomation:
"""
Comprehensive automation system for data at rest protection
"""
def __init__(self, region: str = 'us-east-1'):
self.region = region
self.s3_client = boto3.client('s3', region_name=region)
self.rds_client = boto3.client('rds', region_name=region)
self.ec2_client = boto3.client('ec2', region_name=region)
self.kms_client = boto3.client('kms', region_name=region)
self.backup_client = boto3.client('backup', region_name=region)
self.config_client = boto3.client('config', region_name=region)
self.lambda_client = boto3.client('lambda', region_name=region)
self.events_client = boto3.client('events', region_name=region)
self.sns_client = boto3.client('sns', region_name=region)
self.stepfunctions_client = boto3.client('stepfunctions', region_name=region)
self.dynamodb = boto3.resource('dynamodb', region_name=region)
# Automation tracking tables
self.automation_table = self.dynamodb.Table('data-protection-automation')
self.policy_table = self.dynamodb.Table('protection-policies')
# Protection policies
self.protection_policies = self._load_protection_policies()
# Automation workflows
self.active_workflows = {}
def _load_protection_policies(self) -> Dict[str, ProtectionPolicy]:
"""
Load protection policies from DynamoDB or use defaults
"""
try:
response = self.policy_table.scan()
policies = {}
for item in response['Items']:
policy = ProtectionPolicy(
resource_type=item['resource_type'],
encryption_required=item['encryption_required'],
backup_required=item['backup_required'],
monitoring_level=item['monitoring_level'],
retention_days=int(item['retention_days']),
compliance_frameworks=item['compliance_frameworks']
)
policies[item['resource_type']] = policy
return policies if policies else self._get_default_policies()
except Exception as e:
logger.error(f"Error loading protection policies: {str(e)}")
return self._get_default_policies()
def _get_default_policies(self) -> Dict[str, ProtectionPolicy]:
"""
Get default protection policies
"""
return {
's3_bucket': ProtectionPolicy(
resource_type='s3_bucket',
encryption_required=True,
backup_required=True,
monitoring_level='comprehensive',
retention_days=2555, # 7 years
compliance_frameworks=['GDPR', 'HIPAA', 'PCI_DSS']
),
'rds_instance': ProtectionPolicy(
resource_type='rds_instance',
encryption_required=True,
backup_required=True,
monitoring_level='enhanced',
retention_days=2555,
compliance_frameworks=['HIPAA', 'SOX', 'PCI_DSS']
),
'ebs_volume': ProtectionPolicy(
resource_type='ebs_volume',
encryption_required=True,
backup_required=True,
monitoring_level='standard',
retention_days=365,
compliance_frameworks=['GDPR', 'HIPAA']
),
'dynamodb_table': ProtectionPolicy(
resource_type='dynamodb_table',
encryption_required=True,
backup_required=True,
monitoring_level='comprehensive',
retention_days=2555,
compliance_frameworks=['GDPR', 'HIPAA', 'PCI_DSS']
)
}
def start_continuous_monitoring(self) -> Dict[str, Any]:
"""
Start continuous monitoring and automation workflows
"""
try:
# Start monitoring threads
monitoring_threads = []
# Resource discovery and protection thread
discovery_thread = threading.Thread(
target=self._continuous_resource_discovery,
daemon=True
)
discovery_thread.start()
monitoring_threads.append(discovery_thread)
# Compliance monitoring thread
compliance_thread = threading.Thread(
target=self._continuous_compliance_monitoring,
daemon=True
)
compliance_thread.start()
monitoring_threads.append(compliance_thread)
# Backup monitoring thread
backup_thread = threading.Thread(
target=self._continuous_backup_monitoring,
daemon=True
)
backup_thread.start()
monitoring_threads.append(backup_thread)
# Threat detection thread
threat_thread = threading.Thread(
target=self._continuous_threat_monitoring,
daemon=True
)
threat_thread.start()
monitoring_threads.append(threat_thread)
logger.info("Started continuous monitoring workflows")
return {
'status': 'success',
'monitoring_threads': len(monitoring_threads),
'workflows_started': [
'resource_discovery',
'compliance_monitoring',
'backup_monitoring',
'threat_monitoring'
]
}
except Exception as e:
logger.error(f"Error starting continuous monitoring: {str(e)}")
return {
'status': 'error',
'message': str(e)
}
def _continuous_resource_discovery(self):
"""
Continuously discover and protect new resources
"""
while True:
try:
logger.info("Starting resource discovery cycle")
# Discover new S3 buckets
self._discover_and_protect_s3_buckets()
# Discover new RDS instances
self._discover_and_protect_rds_instances()
# Discover new EBS volumes
self._discover_and_protect_ebs_volumes()
# Discover new DynamoDB tables
self._discover_and_protect_dynamodb_tables()
# Wait before next cycle
time.sleep(300) # 5 minutes
except Exception as e:
logger.error(f"Error in resource discovery cycle: {str(e)}")
time.sleep(60) # Wait 1 minute before retry
def _discover_and_protect_s3_buckets(self):
"""
Discover and automatically protect S3 buckets
"""
try:
buckets_response = self.s3_client.list_buckets()
policy = self.protection_policies.get('s3_bucket')
for bucket in buckets_response['Buckets']:
bucket_name = bucket['Name']
bucket_arn = f"arn:aws:s3:::{bucket_name}"
# Check if bucket is already being processed
if bucket_arn in self.active_workflows:
continue
# Check current protection status
protection_status = self._check_s3_protection_status(bucket_name)
if not protection_status['compliant']:
# Start protection workflow
self._start_s3_protection_workflow(bucket_name, policy, protection_status)
except Exception as e:
logger.error(f"Error discovering S3 buckets: {str(e)}")
def _check_s3_protection_status(self, bucket_name: str) -> Dict[str, Any]:
"""
Check S3 bucket protection status
"""
status = {
'bucket_name': bucket_name,
'encrypted': False,
'backup_configured': False,
'monitoring_enabled': False,
'compliant': False,
'issues': []
}
try:
# Check encryption
try:
encryption_response = self.s3_client.get_bucket_encryption(Bucket=bucket_name)
status['encrypted'] = True
except:
status['encrypted'] = False
status['issues'].append('Encryption not enabled')
# Check backup configuration
try:
# Check if bucket has backup plan
backup_plans = self.backup_client.list_backup_plans()
# Simplified check - in practice, would verify specific bucket coverage
status['backup_configured'] = len(backup_plans['BackupPlansList']) > 0
except:
status['backup_configured'] = False
status['issues'].append('Backup not configured')
# Check monitoring
try:
# Check if bucket has CloudTrail logging
status['monitoring_enabled'] = True # Simplified
except:
status['monitoring_enabled'] = False
status['issues'].append('Monitoring not enabled')
# Determine overall compliance
status['compliant'] = (
status['encrypted'] and
status['backup_configured'] and
status['monitoring_enabled']
)
except Exception as e:
logger.error(f"Error checking S3 protection status for {bucket_name}: {str(e)}")
status['issues'].append(f"Status check error: {str(e)}")
return status
def _start_s3_protection_workflow(self, bucket_name: str, policy: ProtectionPolicy, current_status: Dict[str, Any]):
"""
Start automated S3 protection workflow
"""
try:
bucket_arn = f"arn:aws:s3:::{bucket_name}"
self.active_workflows[bucket_arn] = datetime.utcnow()
# Create automation task
task = AutomationTask(
task_id=f"s3-protect-{bucket_name}-{int(time.time())}",
task_type='s3_protection',
resource_arn=bucket_arn,
action='apply_protection_policy',
status='in_progress',
created_at=datetime.utcnow().isoformat(),
completed_at=None,
error_message=None
)
# Track task
self._track_automation_task(task)
# Apply protection measures
protection_results = []
# Enable encryption if needed
if not current_status['encrypted'] and policy.encryption_required:
encryption_result = self._enable_s3_encryption(bucket_name)
protection_results.append(encryption_result)
# Configure backup if needed
if not current_status['backup_configured'] and policy.backup_required:
backup_result = self._configure_s3_backup(bucket_name, policy)
protection_results.append(backup_result)
# Enable monitoring if needed
if not current_status['monitoring_enabled']:
monitoring_result = self._enable_s3_monitoring(bucket_name, policy)
protection_results.append(monitoring_result)
# Update task status
task.status = 'completed'
task.completed_at = datetime.utcnow().isoformat()
self._track_automation_task(task)
# Remove from active workflows
if bucket_arn in self.active_workflows:
del self.active_workflows[bucket_arn]
logger.info(f"Completed S3 protection workflow for {bucket_name}")
except Exception as e:
logger.error(f"Error in S3 protection workflow for {bucket_name}: {str(e)}")
# Update task with error
task.status = 'failed'
task.error_message = str(e)
task.completed_at = datetime.utcnow().isoformat()
self._track_automation_task(task)
# Remove from active workflows
if bucket_arn in self.active_workflows:
del self.active_workflows[bucket_arn]
def _enable_s3_encryption(self, bucket_name: str) -> Dict[str, Any]:
"""
Enable S3 bucket encryption
"""
try:
# Get or create KMS key
kms_key_id = self._get_or_create_kms_key(f's3-{bucket_name}-key')
# Enable server-side encryption
self.s3_client.put_bucket_encryption(
Bucket=bucket_name,
ServerSideEncryptionConfiguration={
'Rules': [
{
'ApplyServerSideEncryptionByDefault': {
'SSEAlgorithm': 'aws:kms',
'KMSMasterKeyID': kms_key_id
},
'BucketKeyEnabled': True
}
]
}
)
return {
'action': 'enable_encryption',
'status': 'success',
'bucket': bucket_name,
'kms_key_id': kms_key_id
}
except Exception as e:
return {
'action': 'enable_encryption',
'status': 'error',
'bucket': bucket_name,
'error': str(e)
}
def _configure_s3_backup(self, bucket_name: str, policy: ProtectionPolicy) -> Dict[str, Any]:
"""
Configure S3 bucket backup
"""
try:
# Create backup plan if it doesn't exist
backup_plan_name = f's3-backup-plan-{bucket_name}'
backup_plan = {
'BackupPlanName': backup_plan_name,
'Rules': [
{
'RuleName': 'DailyBackups',
'TargetBackupVaultName': 'default',
'ScheduleExpression': 'cron(0 2 ? * * *)', # Daily at 2 AM
'StartWindowMinutes': 60,
'CompletionWindowMinutes': 120,
'Lifecycle': {
'DeleteAfterDays': policy.retention_days
}
}
]
}
# Create backup plan
backup_response = self.backup_client.create_backup_plan(BackupPlan=backup_plan)
return {
'action': 'configure_backup',
'status': 'success',
'bucket': bucket_name,
'backup_plan_id': backup_response['BackupPlanId']
}
except Exception as e:
return {
'action': 'configure_backup',
'status': 'error',
'bucket': bucket_name,
'error': str(e)
}
def _enable_s3_monitoring(self, bucket_name: str, policy: ProtectionPolicy) -> Dict[str, Any]:
"""
Enable S3 bucket monitoring
"""
try:
# Enable CloudTrail data events for the bucket
# This is a simplified implementation
# Enable access logging
logging_bucket = f"{bucket_name}-access-logs"
try:
self.s3_client.head_bucket(Bucket=logging_bucket)
except:
self.s3_client.create_bucket(Bucket=logging_bucket)
self.s3_client.put_bucket_logging(
Bucket=bucket_name,
BucketLoggingStatus={
'LoggingEnabled': {
'TargetBucket': logging_bucket,
'TargetPrefix': f'{bucket_name}/'
}
}
)
return {
'action': 'enable_monitoring',
'status': 'success',
'bucket': bucket_name,
'logging_bucket': logging_bucket
}
except Exception as e:
return {
'action': 'enable_monitoring',
'status': 'error',
'bucket': bucket_name,
'error': str(e)
}
def _get_or_create_kms_key(self, key_alias: str) -> str:
"""
Get existing or create new KMS key
"""
try:
# Try to get existing key
response = self.kms_client.describe_key(KeyId=f'alias/{key_alias}')
return response['KeyMetadata']['KeyId']
except:
# Create new key
response = self.kms_client.create_key(
Description=f'Automated encryption key for {key_alias}',
Usage='ENCRYPT_DECRYPT'
)
key_id = response['KeyMetadata']['KeyId']
# Create alias
self.kms_client.create_alias(
AliasName=f'alias/{key_alias}',
TargetKeyId=key_id
)
return key_id
def _track_automation_task(self, task: AutomationTask):
"""
Track automation task in DynamoDB
"""
try:
self.automation_table.put_item(Item=asdict(task))
except Exception as e:
logger.error(f"Error tracking automation task: {str(e)}")
# Example usage
if __name__ == "__main__":
# Initialize automation system
automation = DataProtectionAutomation()
# Start continuous monitoring
monitoring_result = automation.start_continuous_monitoring()
print(f"Monitoring started: {monitoring_result}")
# Keep the main thread alive
try:
while True:
time.sleep(60)
except KeyboardInterrupt:
print("Shutting down automation system...")Example 2: Event-Driven Protection Automation with Step Functions
View code
# event_driven_protection.py
import boto3
import json
from typing import Dict, List, Any
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
class EventDrivenProtectionAutomation:
"""
Event-driven automation system using Step Functions for data protection workflows
"""
def __init__(self, region: str = 'us-east-1'):
self.region = region
self.stepfunctions_client = boto3.client('stepfunctions', region_name=region)
self.events_client = boto3.client('events', region_name=region)
self.lambda_client = boto3.client('lambda', region_name=region)
self.iam_client = boto3.client('iam', region_name=region)
def create_protection_workflow(self) -> Dict[str, Any]:
"""
Create Step Functions workflow for automated data protection
"""
workflow_definition = {
"Comment": "Automated data protection workflow",
"StartAt": "ClassifyResource",
"States": {
"ClassifyResource": {
"Type": "Task",
"Resource": f"arn:aws:lambda:{self.region}:{self._get_account_id()}:function:classify-resource",
"Next": "DetermineProtectionLevel"
},
"DetermineProtectionLevel": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.classification",
"StringEquals": "restricted",
"Next": "ApplyMaximumProtection"
},
{
"Variable": "$.classification",
"StringEquals": "confidential",
"Next": "ApplyEnhancedProtection"
},
{
"Variable": "$.classification",
"StringEquals": "internal",
"Next": "ApplyStandardProtection"
}
],
"Default": "ApplyBasicProtection"
},
"ApplyMaximumProtection": {
"Type": "Parallel",
"Branches": [
{
"StartAt": "EnableEncryption",
"States": {
"EnableEncryption": {
"Type": "Task",
"Resource": f"arn:aws:lambda:{self.region}:{self._get_account_id()}:function:enable-encryption",
"Parameters": {
"encryptionLevel": "customer-managed-kms",
"keyRotation": True
},
"End": True
}
}
},
{
"StartAt": "ConfigureBackup",
"States": {
"ConfigureBackup": {
"Type": "Task",
"Resource": f"arn:aws:lambda:{self.region}:{self._get_account_id()}:function:configure-backup",
"Parameters": {
"backupFrequency": "hourly",
"retentionDays": 2555,
"crossRegion": True
},
"End": True
}
}
},
{
"StartAt": "EnableMonitoring",
"States": {
"EnableMonitoring": {
"Type": "Task",
"Resource": f"arn:aws:lambda:{self.region}:{self._get_account_id()}:function:enable-monitoring",
"Parameters": {
"monitoringLevel": "comprehensive",
"alerting": True,
"anomalyDetection": True
},
"End": True
}
}
},
{
"StartAt": "ConfigureAccessControls",
"States": {
"ConfigureAccessControls": {
"Type": "Task",
"Resource": f"arn:aws:lambda:{self.region}:{self._get_account_id()}:function:configure-access-controls",
"Parameters": {
"accessLevel": "restricted",
"mfaRequired": True,
"temporaryAccess": True
},
"End": True
}
}
}
],
"Next": "ValidateProtection"
},
"ApplyEnhancedProtection": {
"Type": "Parallel",
"Branches": [
{
"StartAt": "EnableEncryption",
"States": {
"EnableEncryption": {
"Type": "Task",
"Resource": f"arn:aws:lambda:{self.region}:{self._get_account_id()}:function:enable-encryption",
"Parameters": {
"encryptionLevel": "aws-managed-kms",
"keyRotation": True
},
"End": True
}
}
},
{
"StartAt": "ConfigureBackup",
"States": {
"ConfigureBackup": {
"Type": "Task",
"Resource": f"arn:aws:lambda:{self.region}:{self._get_account_id()}:function:configure-backup",
"Parameters": {
"backupFrequency": "daily",
"retentionDays": 365,
"crossRegion": False
},
"End": True
}
}
},
{
"StartAt": "EnableMonitoring",
"States": {
"EnableMonitoring": {
"Type": "Task",
"Resource": f"arn:aws:lambda:{self.region}:{self._get_account_id()}:function:enable-monitoring",
"Parameters": {
"monitoringLevel": "enhanced",
"alerting": True,
"anomalyDetection": False
},
"End": True
}
}
}
],
"Next": "ValidateProtection"
},
"ApplyStandardProtection": {
"Type": "Parallel",
"Branches": [
{
"StartAt": "EnableEncryption",
"States": {
"EnableEncryption": {
"Type": "Task",
"Resource": f"arn:aws:lambda:{self.region}:{self._get_account_id()}:function:enable-encryption",
"Parameters": {
"encryptionLevel": "server-side",
"keyRotation": False
},
"End": True
}
}
},
{
"StartAt": "ConfigureBackup",
"States": {
"ConfigureBackup": {
"Type": "Task",
"Resource": f"arn:aws:lambda:{self.region}:{self._get_account_id()}:function:configure-backup",
"Parameters": {
"backupFrequency": "weekly",
"retentionDays": 90,
"crossRegion": False
},
"End": True
}
}
}
],
"Next": "ValidateProtection"
},
"ApplyBasicProtection": {
"Type": "Task",
"Resource": f"arn:aws:lambda:{self.region}:{self._get_account_id()}:function:enable-encryption",
"Parameters": {
"encryptionLevel": "basic",
"keyRotation": False
},
"Next": "ValidateProtection"
},
"ValidateProtection": {
"Type": "Task",
"Resource": f"arn:aws:lambda:{self.region}:{self._get_account_id()}:function:validate-protection",
"Next": "GenerateReport"
},
"GenerateReport": {
"Type": "Task",
"Resource": f"arn:aws:lambda:{self.region}:{self._get_account_id()}:function:generate-protection-report",
"End": True
}
}
}
try:
response = self.stepfunctions_client.create_state_machine(
name='DataProtectionAutomation',
definition=json.dumps(workflow_definition),
roleArn=f'arn:aws:iam::{self._get_account_id()}:role/StepFunctionsDataProtectionRole'
)
return {
'status': 'success',
'state_machine_arn': response['stateMachineArn']
}
except Exception as e:
logger.error(f"Error creating protection workflow: {str(e)}")
return {
'status': 'error',
'message': str(e)
}
def setup_event_triggers(self, state_machine_arn: str) -> Dict[str, Any]:
"""
Set up EventBridge rules to trigger protection workflows
"""
event_rules = [
{
'name': 'S3BucketCreated',
'event_pattern': {
"source": ["aws.s3"],
"detail-type": ["AWS API Call via CloudTrail"],
"detail": {
"eventSource": ["s3.amazonaws.com"],
"eventName": ["CreateBucket"]
}
}
},
{
'name': 'RDSInstanceCreated',
'event_pattern': {
"source": ["aws.rds"],
"detail-type": ["AWS API Call via CloudTrail"],
"detail": {
"eventSource": ["rds.amazonaws.com"],
"eventName": ["CreateDBInstance"]
}
}
},
{
'name': 'EBSVolumeCreated',
'event_pattern': {
"source": ["aws.ec2"],
"detail-type": ["AWS API Call via CloudTrail"],
"detail": {
"eventSource": ["ec2.amazonaws.com"],
"eventName": ["CreateVolume"]
}
}
},
{
'name': 'DynamoDBTableCreated',
'event_pattern': {
"source": ["aws.dynamodb"],
"detail-type": ["AWS API Call via CloudTrail"],
"detail": {
"eventSource": ["dynamodb.amazonaws.com"],
"eventName": ["CreateTable"]
}
}
}
]
created_rules = []
for rule_config in event_rules:
try:
# Create EventBridge rule
self.events_client.put_rule(
Name=rule_config['name'],
EventPattern=json.dumps(rule_config['event_pattern']),
State='ENABLED',
Description=f'Trigger data protection for {rule_config["name"]}'
)
# Add Step Functions as target
self.events_client.put_targets(
Rule=rule_config['name'],
Targets=[
{
'Id': '1',
'Arn': state_machine_arn,
'RoleArn': f'arn:aws:iam::{self._get_account_id()}:role/EventBridgeStepFunctionsRole',
'InputTransformer': {
'InputPathsMap': {
'resource': '$.detail.responseElements',
'eventName': '$.detail.eventName',
'sourceIPAddress': '$.detail.sourceIPAddress'
},
'InputTemplate': '{"resource": <resource>, "eventName": <eventName>, "sourceIP": <sourceIPAddress>}'
}
}
]
)
created_rules.append(rule_config['name'])
logger.info(f"Created event rule: {rule_config['name']}")
except Exception as e:
logger.error(f"Error creating event rule {rule_config['name']}: {str(e)}")
return {
'status': 'success',
'rules_created': created_rules,
'total_rules': len(created_rules)
}
def _get_account_id(self) -> str:
"""Get AWS account ID"""
return boto3.client('sts').get_caller_identity()['Account']
# Example usage
if __name__ == "__main__":
# Initialize event-driven automation
automation = EventDrivenProtectionAutomation()
# Create protection workflow
workflow_result = automation.create_protection_workflow()
print(f"Workflow creation: {workflow_result}")
# Set up event triggers
if workflow_result['status'] == 'success':
triggers_result = automation.setup_event_triggers(workflow_result['state_machine_arn'])
print(f"Event triggers setup: {triggers_result}")Example 3: CloudFormation Template for Automated Protection Infrastructure
View code
# automated-protection-infrastructure.yaml
AWSTemplateFormatVersion: '2010-09-09'
Description: 'Automated data protection infrastructure with comprehensive monitoring'
Parameters:
Environment:
Type: String
Default: 'prod'
AllowedValues: ['dev', 'staging', 'prod']
Resources:
# DynamoDB tables for automation tracking
AutomationTrackingTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: !Sub '${Environment}-data-protection-automation'
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: task_id
AttributeType: S
- AttributeName: resource_arn
AttributeType: S
KeySchema:
- AttributeName: task_id
KeyType: HASH
GlobalSecondaryIndexes:
- IndexName: ResourceIndex
KeySchema:
- AttributeName: resource_arn
KeyType: HASH
Projection:
ProjectionType: ALL
StreamSpecification:
StreamViewType: NEW_AND_OLD_IMAGES
TimeToLiveSpecification:
AttributeName: ttl
Enabled: true
ProtectionPoliciesTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: !Sub '${Environment}-protection-policies'
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: resource_type
AttributeType: S
KeySchema:
- AttributeName: resource_type
KeyType: HASH
# Lambda functions for automation
ResourceClassifierFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName: !Sub '${Environment}-classify-resource'
Runtime: python3.9
Handler: index.lambda_handler
Role: !GetAtt AutomationLambdaRole.Arn
Timeout: 300
Environment:
Variables:
POLICIES_TABLE: !Ref ProtectionPoliciesTable
Code:
ZipFile: |
import boto3
import json
import os
def lambda_handler(event, context):
"""Classify resource based on tags and metadata"""
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.environ['POLICIES_TABLE'])
try:
resource_arn = event.get('resource_arn', '')
resource_type = event.get('resource_type', '')
# Default classification
classification = 'internal'
# Check resource tags for classification
if 'tags' in event:
tags = event['tags']
if 'DataClassification' in tags:
classification = tags['DataClassification'].lower()
# Get protection policy for resource type
try:
response = table.get_item(Key={'resource_type': resource_type})
policy = response.get('Item', {})
except:
policy = {}
return {
'resource_arn': resource_arn,
'resource_type': resource_type,
'classification': classification,
'policy': policy,
'timestamp': context.aws_request_id
}
except Exception as e:
return {
'error': str(e),
'resource_arn': event.get('resource_arn', ''),
'classification': 'internal'
}
EncryptionEnablerFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName: !Sub '${Environment}-enable-encryption'
Runtime: python3.9
Handler: index.lambda_handler
Role: !GetAtt AutomationLambdaRole.Arn
Timeout: 300
Code:
ZipFile: |
import boto3
import json
def lambda_handler(event, context):
"""Enable encryption for AWS resources"""
resource_arn = event.get('resource_arn', '')
encryption_level = event.get('encryptionLevel', 'basic')
try:
if 's3' in resource_arn:
return enable_s3_encryption(resource_arn, encryption_level)
elif 'rds' in resource_arn:
return enable_rds_encryption(resource_arn, encryption_level)
elif 'dynamodb' in resource_arn:
return enable_dynamodb_encryption(resource_arn, encryption_level)
else:
return {
'status': 'unsupported',
'resource_arn': resource_arn,
'message': 'Resource type not supported for encryption'
}
except Exception as e:
return {
'status': 'error',
'resource_arn': resource_arn,
'error': str(e)
}
def enable_s3_encryption(resource_arn, encryption_level):
s3 = boto3.client('s3')
bucket_name = resource_arn.split(':')[-1]
if encryption_level == 'customer-managed-kms':
sse_algorithm = 'aws:kms'
# Would need to create/get customer managed key
kms_key_id = 'alias/aws/s3'
else:
sse_algorithm = 'AES256'
kms_key_id = None
encryption_config = {
'Rules': [{
'ApplyServerSideEncryptionByDefault': {
'SSEAlgorithm': sse_algorithm
}
}]
}
if kms_key_id:
encryption_config['Rules'][0]['ApplyServerSideEncryptionByDefault']['KMSMasterKeyID'] = kms_key_id
s3.put_bucket_encryption(
Bucket=bucket_name,
ServerSideEncryptionConfiguration=encryption_config
)
return {
'status': 'success',
'resource_arn': resource_arn,
'encryption_enabled': True,
'encryption_level': encryption_level
}
def enable_rds_encryption(resource_arn, encryption_level):
# RDS encryption must be enabled at creation time
return {
'status': 'manual_action_required',
'resource_arn': resource_arn,
'message': 'RDS encryption requires snapshot and restore'
}
def enable_dynamodb_encryption(resource_arn, encryption_level):
dynamodb = boto3.client('dynamodb')
table_name = resource_arn.split('/')[-1]
dynamodb.update_table(
TableName=table_name,
SSESpecification={
'Enabled': True,
'SSEType': 'KMS'
}
)
return {
'status': 'success',
'resource_arn': resource_arn,
'encryption_enabled': True
}
BackupConfiguratorFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName: !Sub '${Environment}-configure-backup'
Runtime: python3.9
Handler: index.lambda_handler
Role: !GetAtt AutomationLambdaRole.Arn
Timeout: 300
Code:
ZipFile: |
import boto3
import json
def lambda_handler(event, context):
"""Configure backup for AWS resources"""
backup_client = boto3.client('backup')
resource_arn = event.get('resource_arn', '')
backup_frequency = event.get('backupFrequency', 'daily')
retention_days = event.get('retentionDays', 30)
try:
# Create backup plan
backup_plan_name = f"auto-backup-{context.aws_request_id}"
if backup_frequency == 'hourly':
schedule = 'cron(0 * ? * * *)'
elif backup_frequency == 'daily':
schedule = 'cron(0 2 ? * * *)'
elif backup_frequency == 'weekly':
schedule = 'cron(0 2 ? * SUN *)'
else:
schedule = 'cron(0 2 ? * * *)'
backup_plan = {
'BackupPlanName': backup_plan_name,
'Rules': [{
'RuleName': 'AutomatedBackup',
'TargetBackupVaultName': 'default',
'ScheduleExpression': schedule,
'StartWindowMinutes': 60,
'CompletionWindowMinutes': 120,
'Lifecycle': {
'DeleteAfterDays': retention_days
}
}]
}
response = backup_client.create_backup_plan(BackupPlan=backup_plan)
return {
'status': 'success',
'resource_arn': resource_arn,
'backup_plan_id': response['BackupPlanId'],
'backup_frequency': backup_frequency,
'retention_days': retention_days
}
except Exception as e:
return {
'status': 'error',
'resource_arn': resource_arn,
'error': str(e)
}
MonitoringEnablerFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName: !Sub '${Environment}-enable-monitoring'
Runtime: python3.9
Handler: index.lambda_handler
Role: !GetAtt AutomationLambdaRole.Arn
Timeout: 300
Code:
ZipFile: |
import boto3
import json
def lambda_handler(event, context):
"""Enable monitoring for AWS resources"""
resource_arn = event.get('resource_arn', '')
monitoring_level = event.get('monitoringLevel', 'standard')
try:
cloudwatch = boto3.client('cloudwatch')
# Create CloudWatch alarms based on resource type
if 's3' in resource_arn:
return setup_s3_monitoring(resource_arn, monitoring_level)
elif 'rds' in resource_arn:
return setup_rds_monitoring(resource_arn, monitoring_level)
else:
return {
'status': 'success',
'resource_arn': resource_arn,
'monitoring_enabled': True,
'monitoring_level': monitoring_level
}
except Exception as e:
return {
'status': 'error',
'resource_arn': resource_arn,
'error': str(e)
}
def setup_s3_monitoring(resource_arn, monitoring_level):
# Enable S3 access logging and CloudWatch metrics
return {
'status': 'success',
'resource_arn': resource_arn,
'monitoring_enabled': True,
'monitoring_type': 's3_access_logging'
}
def setup_rds_monitoring(resource_arn, monitoring_level):
# Enable RDS Performance Insights and enhanced monitoring
return {
'status': 'success',
'resource_arn': resource_arn,
'monitoring_enabled': True,
'monitoring_type': 'rds_performance_insights'
}
ProtectionValidatorFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName: !Sub '${Environment}-validate-protection'
Runtime: python3.9
Handler: index.lambda_handler
Role: !GetAtt AutomationLambdaRole.Arn
Timeout: 300
Code:
ZipFile: |
import boto3
import json
def lambda_handler(event, context):
"""Validate that protection measures are properly applied"""
resource_arn = event.get('resource_arn', '')
try:
validation_results = {
'resource_arn': resource_arn,
'encryption_validated': False,
'backup_validated': False,
'monitoring_validated': False,
'overall_compliance': False
}
# Validate encryption
if 's3' in resource_arn:
validation_results['encryption_validated'] = validate_s3_encryption(resource_arn)
elif 'dynamodb' in resource_arn:
validation_results['encryption_validated'] = validate_dynamodb_encryption(resource_arn)
# Validate backup (simplified)
validation_results['backup_validated'] = True
# Validate monitoring (simplified)
validation_results['monitoring_validated'] = True
# Overall compliance
validation_results['overall_compliance'] = (
validation_results['encryption_validated'] and
validation_results['backup_validated'] and
validation_results['monitoring_validated']
)
return validation_results
except Exception as e:
return {
'status': 'error',
'resource_arn': resource_arn,
'error': str(e)
}
def validate_s3_encryption(resource_arn):
s3 = boto3.client('s3')
bucket_name = resource_arn.split(':')[-1]
try:
response = s3.get_bucket_encryption(Bucket=bucket_name)
return True
except:
return False
def validate_dynamodb_encryption(resource_arn):
dynamodb = boto3.client('dynamodb')
table_name = resource_arn.split('/')[-1]
try:
response = dynamodb.describe_table(TableName=table_name)
sse_description = response['Table'].get('SSEDescription', {})
return sse_description.get('Status') == 'ENABLED'
except:
return False
# IAM Role for Lambda functions
AutomationLambdaRole:
Type: AWS::IAM::Role
Properties:
RoleName: !Sub '${Environment}-automation-lambda-role'
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Policies:
- PolicyName: AutomationPolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- s3:*
- rds:*
- dynamodb:*
- kms:*
- backup:*
- cloudwatch:*
- logs:*
Resource: '*'
# Step Functions State Machine
DataProtectionStateMachine:
Type: AWS::StepFunctions::StateMachine
Properties:
StateMachineName: !Sub '${Environment}-data-protection-automation'
RoleArn: !GetAtt StepFunctionsRole.Arn
DefinitionString: !Sub |
{
"Comment": "Automated data protection workflow",
"StartAt": "ClassifyResource",
"States": {
"ClassifyResource": {
"Type": "Task",
"Resource": "${ResourceClassifierFunction.Arn}",
"Next": "ApplyProtection"
},
"ApplyProtection": {
"Type": "Parallel",
"Branches": [
{
"StartAt": "EnableEncryption",
"States": {
"EnableEncryption": {
"Type": "Task",
"Resource": "${EncryptionEnablerFunction.Arn}",
"End": true
}
}
},
{
"StartAt": "ConfigureBackup",
"States": {
"ConfigureBackup": {
"Type": "Task",
"Resource": "${BackupConfiguratorFunction.Arn}",
"End": true
}
}
},
{
"StartAt": "EnableMonitoring",
"States": {
"EnableMonitoring": {
"Type": "Task",
"Resource": "${MonitoringEnablerFunction.Arn}",
"End": true
}
}
}
],
"Next": "ValidateProtection"
},
"ValidateProtection": {
"Type": "Task",
"Resource": "${ProtectionValidatorFunction.Arn}",
"End": true
}
}
}
StepFunctionsRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: states.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: StepFunctionsExecutionPolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- lambda:InvokeFunction
Resource: '*'
# EventBridge Rules
S3BucketCreatedRule:
Type: AWS::Events::Rule
Properties:
Name: !Sub '${Environment}-s3-bucket-created'
EventPattern:
source: ['aws.s3']
detail-type: ['AWS API Call via CloudTrail']
detail:
eventSource: ['s3.amazonaws.com']
eventName: ['CreateBucket']
State: ENABLED
Targets:
- Arn: !Ref DataProtectionStateMachine
Id: S3BucketTarget
RoleArn: !GetAtt EventBridgeRole.Arn
EventBridgeRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: events.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: EventBridgeExecutionPolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- states:StartExecution
Resource: !Ref DataProtectionStateMachine
Outputs:
StateMachineArn:
Description: 'ARN of the data protection state machine'
Value: !Ref DataProtectionStateMachine
Export:
Name: !Sub '${AWS::StackName}-StateMachine'
AutomationTableName:
Description: 'Name of the automation tracking table'
Value: !Ref AutomationTrackingTable
Export:
Name: !Sub '${AWS::StackName}-AutomationTable'Relevant AWS Services
Automation and Orchestration
- AWS Step Functions: Workflow orchestration for complex automation scenarios
- AWS Lambda: Serverless functions for automation logic
- Amazon EventBridge: Event-driven automation triggers
- AWS Systems Manager: Automated configuration management and patching
Monitoring and Compliance
- AWS Config: Continuous compliance monitoring and automated remediation
- Amazon CloudWatch: Metrics, alarms, and automated responses
- AWS CloudTrail: Audit logging and event-driven automation
- AWS Security Hub: Centralized security findings and automated remediation
Data Protection Services
- AWS Backup: Automated backup across AWS services
- AWS Key Management Service (KMS): Automated key rotation and management
- Amazon Macie: Automated data discovery and classification
- Amazon GuardDuty: Automated threat detection and response
Integration and Storage
- Amazon DynamoDB: Automation state and policy storage
- Amazon SNS: Automated notifications and alerts
- Amazon SQS: Asynchronous automation workflows
- AWS Organizations: Automated policy enforcement across accounts
Benefits of Automated Data Protection
Operational Benefits
- Consistency: Uniform application of protection policies across all resources
- Scalability: Automatic protection as infrastructure grows
- Efficiency: Reduced manual effort and faster response times
- Reliability: Elimination of human error in protection implementation
Security Benefits
- Immediate Protection: Automatic protection upon resource creation
- Continuous Monitoring: Real-time detection of protection gaps
- Rapid Response: Automated incident response and remediation
- Comprehensive Coverage: Protection across all AWS services and regions
Compliance Benefits
- Automated Compliance: Continuous adherence to regulatory requirements
- Audit Readiness: Complete audit trails of all protection activities
- Policy Enforcement: Consistent enforcement of organizational policies
- Reporting: Automated generation of compliance reports
Cost Benefits
- Resource Optimization: Efficient use of protection resources
- Reduced Overhead: Lower operational costs through automation
- Preventive Measures: Reduced costs from security incidents
- Scalable Economics: Cost-effective protection at scale
Related Resources
- AWS Well-Architected Framework - Data at Rest Protection
- AWS Step Functions Developer Guide
- Amazon EventBridge User Guide
- AWS Config Developer Guide
- AWS Backup Developer Guide
- AWS Security Blog - Automation
View code