SEC07-BP04: Define scalable data lifecycle management
SEC07-BP04: Define scalable data lifecycle management
Overview
Effective data lifecycle management ensures that data is handled appropriately throughout its entire lifecycle, from creation to deletion, based on its classification level, business value, and regulatory requirements. Scalable lifecycle management automates data transitions, retention policies, and disposal processes while maintaining security and compliance requirements.
This best practice focuses on implementing automated, classification-aware lifecycle policies that can scale with your data growth while ensuring appropriate protection, cost optimization, and regulatory compliance throughout the data’s lifecycle.
Implementation Guidance
1. Define Data Lifecycle Stages
Establish clear lifecycle stages for different data classifications:
- Creation: Initial data ingestion and classification
- Active Use: Data in regular use with full accessibility
- Infrequent Access: Data accessed less frequently but still needed
- Archive: Long-term storage for compliance or historical purposes
- Disposal: Secure deletion when data is no longer needed
2. Implement Classification-Based Lifecycle Policies
Create lifecycle policies that align with data classification levels:
- Public Data: Basic lifecycle with cost optimization focus
- Internal Data: Standard retention with access controls
- Confidential Data: Extended retention with enhanced security
- Restricted Data: Maximum retention with comprehensive audit trails
3. Automate Lifecycle Transitions
Deploy automated systems for lifecycle management:
- Policy-Driven Automation: Automatic transitions based on predefined rules
- Event-Driven Processing: Lifecycle actions triggered by business events
- Scheduled Operations: Regular lifecycle maintenance and cleanup
- Exception Handling: Manage special cases and manual overrides
4. Ensure Compliance Throughout Lifecycle
Maintain compliance requirements across all lifecycle stages:
- Retention Requirements: Meet legal and regulatory retention periods
- Data Sovereignty: Ensure data remains in required geographic locations
- Audit Trails: Maintain complete records of lifecycle actions
- Secure Disposal: Implement certified data destruction methods
5. Optimize Costs Across Lifecycle
Balance security requirements with cost optimization:
- Storage Tiering: Move data to appropriate storage classes
- Compression and Deduplication: Reduce storage costs while maintaining accessibility
- Resource Optimization: Right-size compute and storage resources
- Monitoring and Alerting: Track lifecycle costs and performance
6. Enable Lifecycle Governance and Monitoring
Implement governance controls for lifecycle management:
- Policy Management: Centralized lifecycle policy definition and updates
- Compliance Monitoring: Continuous monitoring of lifecycle compliance
- Performance Metrics: Track lifecycle efficiency and effectiveness
- Stakeholder Reporting: Regular reports on lifecycle management status
Implementation Examples
Example 1: Classification-Based S3 Lifecycle Management
View code
# s3_lifecycle_manager.py
import boto3
import json
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class LifecycleRule:
rule_id: str
classification_level: str
transitions: List[Dict[str, Any]]
expiration_days: Optional[int]
noncurrent_version_expiration_days: Optional[int]
abort_incomplete_multipart_upload_days: int
filter_tags: Dict[str, str]
@dataclass
class LifecyclePolicy:
policy_name: str
description: str
rules: List[LifecycleRule]
compliance_frameworks: List[str]
class S3LifecycleManager:
"""
Manages S3 lifecycle policies based on data classification
"""
def __init__(self, region: str = 'us-east-1'):
self.region = region
self.s3_client = boto3.client('s3', region_name=region)
self.dynamodb = boto3.resource('dynamodb', region_name=region)
# Lifecycle tracking table
self.lifecycle_table = self.dynamodb.Table('data-lifecycle-tracking')
# Classification-based lifecycle policies
self.lifecycle_policies = self._define_classification_lifecycle_policies()
def _define_classification_lifecycle_policies(self) -> Dict[str, LifecyclePolicy]:
"""
Define lifecycle policies for each classification level
"""
return {
'public': LifecyclePolicy(
policy_name='PublicDataLifecycle',
description='Lifecycle policy for public data with cost optimization focus',
rules=[
LifecycleRule(
rule_id='public-data-lifecycle',
classification_level='public',
transitions=[
{'Days': 30, 'StorageClass': 'STANDARD_IA'},
{'Days': 90, 'StorageClass': 'GLACIER'},
{'Days': 365, 'StorageClass': 'DEEP_ARCHIVE'}
],
expiration_days=2555, # 7 years
noncurrent_version_expiration_days=30,
abort_incomplete_multipart_upload_days=7,
filter_tags={'DataClassification': 'public'}
)
],
compliance_frameworks=[]
),
'internal': LifecyclePolicy(
policy_name='InternalDataLifecycle',
description='Lifecycle policy for internal data with standard retention',
rules=[
LifecycleRule(
rule_id='internal-data-lifecycle',
classification_level='internal',
transitions=[
{'Days': 90, 'StorageClass': 'STANDARD_IA'},
{'Days': 365, 'StorageClass': 'GLACIER'},
{'Days': 1095, 'StorageClass': 'DEEP_ARCHIVE'} # 3 years
],
expiration_days=2555, # 7 years
noncurrent_version_expiration_days=90,
abort_incomplete_multipart_upload_days=7,
filter_tags={'DataClassification': 'internal'}
)
],
compliance_frameworks=['SOX']
),
'confidential': LifecyclePolicy(
policy_name='ConfidentialDataLifecycle',
description='Lifecycle policy for confidential data with extended retention',
rules=[
LifecycleRule(
rule_id='confidential-data-lifecycle',
classification_level='confidential',
transitions=[
{'Days': 180, 'StorageClass': 'STANDARD_IA'},
{'Days': 730, 'StorageClass': 'GLACIER'}, # 2 years
{'Days': 1825, 'StorageClass': 'DEEP_ARCHIVE'} # 5 years
],
expiration_days=3650, # 10 years
noncurrent_version_expiration_days=365,
abort_incomplete_multipart_upload_days=14,
filter_tags={'DataClassification': 'confidential'}
)
],
compliance_frameworks=['GDPR', 'HIPAA']
),
'restricted': LifecyclePolicy(
policy_name='RestrictedDataLifecycle',
description='Lifecycle policy for restricted data with maximum retention',
rules=[
LifecycleRule(
rule_id='restricted-data-lifecycle',
classification_level='restricted',
transitions=[
{'Days': 365, 'StorageClass': 'STANDARD_IA'}, # 1 year
{'Days': 1095, 'StorageClass': 'GLACIER'}, # 3 years
{'Days': 2555, 'StorageClass': 'DEEP_ARCHIVE'} # 7 years
],
expiration_days=None, # No automatic expiration
noncurrent_version_expiration_days=730, # 2 years
abort_incomplete_multipart_upload_days=30,
filter_tags={'DataClassification': 'restricted'}
)
],
compliance_frameworks=['PCI_DSS', 'HIPAA', 'GDPR']
)
}
def apply_lifecycle_policy_to_bucket(self, bucket_name: str, classification_level: str) -> Dict[str, Any]:
"""
Apply classification-based lifecycle policy to S3 bucket
"""
try:
if classification_level not in self.lifecycle_policies:
return {
'status': 'error',
'message': f'Unknown classification level: {classification_level}'
}
policy = self.lifecycle_policies[classification_level]
# Convert lifecycle rules to S3 format
s3_rules = []
for rule in policy.rules:
s3_rule = {
'ID': rule.rule_id,
'Status': 'Enabled',
'Filter': {
'And': {
'Tags': [
{'Key': k, 'Value': v} for k, v in rule.filter_tags.items()
]
}
},
'Transitions': rule.transitions,
'AbortIncompleteMultipartUpload': {
'DaysAfterInitiation': rule.abort_incomplete_multipart_upload_days
}
}
# Add expiration if specified
if rule.expiration_days:
s3_rule['Expiration'] = {'Days': rule.expiration_days}
# Add noncurrent version expiration
if rule.noncurrent_version_expiration_days:
s3_rule['NoncurrentVersionExpiration'] = {
'NoncurrentDays': rule.noncurrent_version_expiration_days
}
s3_rules.append(s3_rule)
# Apply lifecycle configuration
self.s3_client.put_bucket_lifecycle_configuration(
Bucket=bucket_name,
LifecycleConfiguration={'Rules': s3_rules}
)
# Track lifecycle policy application
self._track_lifecycle_policy_application(bucket_name, policy)
logger.info(f"Applied {classification_level} lifecycle policy to bucket {bucket_name}")
return {
'status': 'success',
'bucket': bucket_name,
'classification': classification_level,
'policy_name': policy.policy_name,
'rules_applied': len(s3_rules)
}
except Exception as e:
logger.error(f"Error applying lifecycle policy to {bucket_name}: {str(e)}")
return {
'status': 'error',
'bucket': bucket_name,
'message': str(e)
}
def _track_lifecycle_policy_application(self, bucket_name: str, policy: LifecyclePolicy):
"""
Track lifecycle policy application in DynamoDB
"""
try:
self.lifecycle_table.put_item(
Item={
'resource_arn': f'arn:aws:s3:::{bucket_name}',
'policy_application_timestamp': datetime.utcnow().isoformat(),
'policy_name': policy.policy_name,
'classification_level': policy.rules[0].classification_level,
'compliance_frameworks': policy.compliance_frameworks,
'rules_count': len(policy.rules),
'ttl': int((datetime.utcnow() + timedelta(days=365)).timestamp())
}
)
except Exception as e:
logger.error(f"Error tracking lifecycle policy application: {str(e)}")
def create_custom_lifecycle_policy(self,
policy_name: str,
classification_level: str,
custom_rules: List[Dict[str, Any]]) -> LifecyclePolicy:
"""
Create custom lifecycle policy for specific requirements
"""
rules = []
for rule_config in custom_rules:
rule = LifecycleRule(
rule_id=rule_config.get('rule_id', f'{policy_name}-rule'),
classification_level=classification_level,
transitions=rule_config.get('transitions', []),
expiration_days=rule_config.get('expiration_days'),
noncurrent_version_expiration_days=rule_config.get('noncurrent_version_expiration_days'),
abort_incomplete_multipart_upload_days=rule_config.get('abort_incomplete_multipart_upload_days', 7),
filter_tags=rule_config.get('filter_tags', {'DataClassification': classification_level})
)
rules.append(rule)
custom_policy = LifecyclePolicy(
policy_name=policy_name,
description=f'Custom lifecycle policy for {classification_level} data',
rules=rules,
compliance_frameworks=[]
)
return custom_policy
def audit_bucket_lifecycle_compliance(self, bucket_name: str) -> Dict[str, Any]:
"""
Audit bucket lifecycle configuration for compliance
"""
try:
# Get bucket lifecycle configuration
try:
lifecycle_response = self.s3_client.get_bucket_lifecycle_configuration(Bucket=bucket_name)
rules = lifecycle_response.get('Rules', [])
except self.s3_client.exceptions.NoSuchLifecycleConfiguration:
return {
'bucket': bucket_name,
'compliance_status': 'non_compliant',
'issues': ['No lifecycle configuration found'],
'recommendations': ['Apply appropriate lifecycle policy based on data classification']
}
# Get bucket classification
try:
tags_response = self.s3_client.get_bucket_tagging(Bucket=bucket_name)
tags = {tag['Key']: tag['Value'] for tag in tags_response['TagSet']}
classification = tags.get('DataClassification', 'unknown')
except self.s3_client.exceptions.NoSuchTagSet:
classification = 'unknown'
# Audit compliance
audit_results = {
'bucket': bucket_name,
'classification': classification,
'rules_count': len(rules),
'compliance_status': 'compliant',
'issues': [],
'recommendations': []
}
if classification == 'unknown':
audit_results['issues'].append('Bucket classification not found')
audit_results['recommendations'].append('Apply data classification tags')
audit_results['compliance_status'] = 'non_compliant'
if classification in self.lifecycle_policies:
expected_policy = self.lifecycle_policies[classification]
# Check if lifecycle rules match expected policy
compliance_issues = self._validate_lifecycle_rules(rules, expected_policy)
if compliance_issues:
audit_results['issues'].extend(compliance_issues)
audit_results['compliance_status'] = 'non_compliant'
audit_results['recommendations'].append(f'Update lifecycle policy to match {classification} requirements')
return audit_results
except Exception as e:
logger.error(f"Error auditing bucket lifecycle compliance: {str(e)}")
return {
'bucket': bucket_name,
'compliance_status': 'error',
'error': str(e)
}
def _validate_lifecycle_rules(self, actual_rules: List[Dict[str, Any]], expected_policy: LifecyclePolicy) -> List[str]:
"""
Validate actual lifecycle rules against expected policy
"""
issues = []
if len(actual_rules) != len(expected_policy.rules):
issues.append(f'Expected {len(expected_policy.rules)} rules, found {len(actual_rules)}')
for expected_rule in expected_policy.rules:
# Find matching actual rule
matching_rule = None
for actual_rule in actual_rules:
if actual_rule.get('ID') == expected_rule.rule_id:
matching_rule = actual_rule
break
if not matching_rule:
issues.append(f'Missing lifecycle rule: {expected_rule.rule_id}')
continue
# Validate transitions
actual_transitions = matching_rule.get('Transitions', [])
if len(actual_transitions) != len(expected_rule.transitions):
issues.append(f'Rule {expected_rule.rule_id}: Expected {len(expected_rule.transitions)} transitions, found {len(actual_transitions)}')
# Validate expiration
if expected_rule.expiration_days:
actual_expiration = matching_rule.get('Expiration', {}).get('Days')
if actual_expiration != expected_rule.expiration_days:
issues.append(f'Rule {expected_rule.rule_id}: Expected expiration {expected_rule.expiration_days} days, found {actual_expiration}')
return issues
def generate_lifecycle_report(self, bucket_names: Optional[List[str]] = None) -> Dict[str, Any]:
"""
Generate comprehensive lifecycle management report
"""
try:
if bucket_names is None:
# Get all buckets
buckets_response = self.s3_client.list_buckets()
bucket_names = [bucket['Name'] for bucket in buckets_response['Buckets']]
report = {
'report_timestamp': datetime.utcnow().isoformat(),
'total_buckets': len(bucket_names),
'compliance_summary': {
'compliant': 0,
'non_compliant': 0,
'error': 0
},
'classification_distribution': {},
'bucket_details': [],
'recommendations': []
}
for bucket_name in bucket_names:
audit_result = self.audit_bucket_lifecycle_compliance(bucket_name)
report['bucket_details'].append(audit_result)
# Update compliance summary
status = audit_result.get('compliance_status', 'error')
report['compliance_summary'][status] += 1
# Update classification distribution
classification = audit_result.get('classification', 'unknown')
report['classification_distribution'][classification] = report['classification_distribution'].get(classification, 0) + 1
# Generate overall recommendations
if report['compliance_summary']['non_compliant'] > 0:
report['recommendations'].append('Review and update lifecycle policies for non-compliant buckets')
if report['classification_distribution'].get('unknown', 0) > 0:
report['recommendations'].append('Apply data classification tags to unclassified buckets')
return report
except Exception as e:
logger.error(f"Error generating lifecycle report: {str(e)}")
return {
'error': str(e),
'report_timestamp': datetime.utcnow().isoformat()
}
def optimize_lifecycle_costs(self, bucket_name: str) -> Dict[str, Any]:
"""
Analyze and optimize lifecycle costs for a bucket
"""
try:
# Get bucket size and access patterns (simplified)
cloudwatch = boto3.client('cloudwatch', region_name=self.region)
# Get bucket size metrics
size_response = cloudwatch.get_metric_statistics(
Namespace='AWS/S3',
MetricName='BucketSizeBytes',
Dimensions=[
{'Name': 'BucketName', 'Value': bucket_name},
{'Name': 'StorageType', 'Value': 'StandardStorage'}
],
StartTime=datetime.utcnow() - timedelta(days=30),
EndTime=datetime.utcnow(),
Period=86400, # 1 day
Statistics=['Average']
)
# Calculate current storage costs (simplified)
current_size_gb = 0
if size_response['Datapoints']:
current_size_bytes = size_response['Datapoints'][-1]['Average']
current_size_gb = current_size_bytes / (1024**3)
# Get current lifecycle configuration
try:
lifecycle_response = self.s3_client.get_bucket_lifecycle_configuration(Bucket=bucket_name)
has_lifecycle = True
except self.s3_client.exceptions.NoSuchLifecycleConfiguration:
has_lifecycle = False
# Calculate potential savings
optimization_results = {
'bucket': bucket_name,
'current_size_gb': round(current_size_gb, 2),
'has_lifecycle_policy': has_lifecycle,
'optimization_recommendations': [],
'estimated_monthly_savings': 0
}
if not has_lifecycle:
# Estimate savings from implementing lifecycle policy
standard_cost_per_gb = 0.023 # Approximate S3 Standard cost
ia_cost_per_gb = 0.0125 # Approximate S3 IA cost
glacier_cost_per_gb = 0.004 # Approximate Glacier cost
# Assume 30% moves to IA after 30 days, 50% to Glacier after 90 days
potential_savings = (current_size_gb * 0.3 * (standard_cost_per_gb - ia_cost_per_gb)) + \
(current_size_gb * 0.5 * (standard_cost_per_gb - glacier_cost_per_gb))
optimization_results['estimated_monthly_savings'] = round(potential_savings, 2)
optimization_results['optimization_recommendations'].append(
'Implement lifecycle policy to transition data to lower-cost storage classes'
)
if current_size_gb > 1000: # Large bucket
optimization_results['optimization_recommendations'].append(
'Consider implementing Intelligent Tiering for automatic cost optimization'
)
return optimization_results
except Exception as e:
logger.error(f"Error optimizing lifecycle costs: {str(e)}")
return {
'bucket': bucket_name,
'error': str(e)
}
def schedule_lifecycle_maintenance(self, schedule_expression: str = 'rate(1 day)') -> Dict[str, Any]:
"""
Schedule automated lifecycle maintenance using EventBridge and Lambda
"""
try:
# This would create EventBridge rule and Lambda function for maintenance
# Implementation would include:
# 1. Create Lambda function for lifecycle maintenance
# 2. Create EventBridge rule with schedule
# 3. Set up permissions
maintenance_config = {
'schedule_expression': schedule_expression,
'lambda_function': 'lifecycle-maintenance-function',
'eventbridge_rule': 'lifecycle-maintenance-schedule',
'status': 'configured'
}
logger.info(f"Scheduled lifecycle maintenance with expression: {schedule_expression}")
return maintenance_config
except Exception as e:
logger.error(f"Error scheduling lifecycle maintenance: {str(e)}")
return {
'status': 'error',
'message': str(e)
}
# Example usage
if __name__ == "__main__":
# Initialize lifecycle manager
manager = S3LifecycleManager()
# Apply lifecycle policy to a bucket
result = manager.apply_lifecycle_policy_to_bucket('my-data-bucket', 'confidential')
print(f"Lifecycle policy application result: {result}")
# Audit bucket compliance
audit_result = manager.audit_bucket_lifecycle_compliance('my-data-bucket')
print(f"Audit result: {audit_result}")
# Generate lifecycle report
report = manager.generate_lifecycle_report(['my-data-bucket', 'another-bucket'])
print(f"Lifecycle report: {json.dumps(report, indent=2)}")
# Optimize costs
cost_optimization = manager.optimize_lifecycle_costs('my-data-bucket')
print(f"Cost optimization: {cost_optimization}")Example 2: Multi-Service Data Lifecycle Orchestration
View code
# multi_service_lifecycle.py
import boto3
import json
from typing import Dict, List, Any
from dataclasses import dataclass
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
@dataclass
class DataAsset:
resource_arn: str
service_type: str
classification: str
creation_date: datetime
last_accessed: datetime
size_bytes: int
compliance_requirements: List[str]
class MultiServiceLifecycleManager:
"""
Manages data lifecycle across multiple AWS services
"""
def __init__(self, region: str = 'us-east-1'):
self.region = region
self.s3_client = boto3.client('s3', region_name=region)
self.rds_client = boto3.client('rds', region_name=region)
self.dynamodb_client = boto3.client('dynamodb', region_name=region)
self.backup_client = boto3.client('backup', region_name=region)
self.stepfunctions_client = boto3.client('stepfunctions', region_name=region)
# Service-specific lifecycle policies
self.service_policies = {
's3': self._get_s3_lifecycle_policies(),
'rds': self._get_rds_lifecycle_policies(),
'dynamodb': self._get_dynamodb_lifecycle_policies()
}
def _get_s3_lifecycle_policies(self) -> Dict[str, Dict[str, Any]]:
"""S3 lifecycle policies by classification"""
return {
'public': {
'transitions': [
{'days': 30, 'storage_class': 'STANDARD_IA'},
{'days': 90, 'storage_class': 'GLACIER'},
{'days': 365, 'storage_class': 'DEEP_ARCHIVE'}
],
'expiration_days': 2555
},
'confidential': {
'transitions': [
{'days': 180, 'storage_class': 'STANDARD_IA'},
{'days': 730, 'storage_class': 'GLACIER'},
{'days': 1825, 'storage_class': 'DEEP_ARCHIVE'}
],
'expiration_days': 3650
},
'restricted': {
'transitions': [
{'days': 365, 'storage_class': 'STANDARD_IA'},
{'days': 1095, 'storage_class': 'GLACIER'}
],
'expiration_days': None # No automatic expiration
}
}
def _get_rds_lifecycle_policies(self) -> Dict[str, Dict[str, Any]]:
"""RDS lifecycle policies by classification"""
return {
'public': {
'backup_retention_days': 7,
'snapshot_retention_days': 30,
'archive_after_days': 365
},
'confidential': {
'backup_retention_days': 35,
'snapshot_retention_days': 365,
'archive_after_days': 1095
},
'restricted': {
'backup_retention_days': 35,
'snapshot_retention_days': 2555,
'archive_after_days': None
}
}
def _get_dynamodb_lifecycle_policies(self) -> Dict[str, Dict[str, Any]]:
"""DynamoDB lifecycle policies by classification"""
return {
'public': {
'point_in_time_recovery': False,
'backup_retention_days': 30
},
'confidential': {
'point_in_time_recovery': True,
'backup_retention_days': 365
},
'restricted': {
'point_in_time_recovery': True,
'backup_retention_days': 2555
}
}
def discover_data_assets(self) -> List[DataAsset]:
"""Discover data assets across AWS services"""
assets = []
# Discover S3 buckets
try:
buckets = self.s3_client.list_buckets()['Buckets']
for bucket in buckets:
classification = self._get_resource_classification(f"arn:aws:s3:::{bucket['Name']}")
size = self._get_s3_bucket_size(bucket['Name'])
asset = DataAsset(
resource_arn=f"arn:aws:s3:::{bucket['Name']}",
service_type='s3',
classification=classification,
creation_date=bucket['CreationDate'],
last_accessed=datetime.utcnow(), # Simplified
size_bytes=size,
compliance_requirements=self._get_compliance_requirements(classification)
)
assets.append(asset)
except Exception as e:
logger.error(f"Error discovering S3 assets: {str(e)}")
# Discover RDS instances
try:
db_instances = self.rds_client.describe_db_instances()['DBInstances']
for db in db_instances:
classification = self._get_resource_classification(db['DBInstanceArn'])
asset = DataAsset(
resource_arn=db['DBInstanceArn'],
service_type='rds',
classification=classification,
creation_date=db['InstanceCreateTime'],
last_accessed=datetime.utcnow(), # Simplified
size_bytes=db.get('AllocatedStorage', 0) * 1024**3, # Convert GB to bytes
compliance_requirements=self._get_compliance_requirements(classification)
)
assets.append(asset)
except Exception as e:
logger.error(f"Error discovering RDS assets: {str(e)}")
return assets
def apply_lifecycle_policies(self, assets: List[DataAsset]) -> Dict[str, Any]:
"""Apply lifecycle policies to discovered assets"""
results = {
'total_assets': len(assets),
'policies_applied': 0,
'errors': [],
'service_breakdown': {}
}
for asset in assets:
try:
if asset.service_type == 's3':
self._apply_s3_lifecycle_policy(asset)
elif asset.service_type == 'rds':
self._apply_rds_lifecycle_policy(asset)
elif asset.service_type == 'dynamodb':
self._apply_dynamodb_lifecycle_policy(asset)
results['policies_applied'] += 1
# Update service breakdown
service = asset.service_type
if service not in results['service_breakdown']:
results['service_breakdown'][service] = 0
results['service_breakdown'][service] += 1
except Exception as e:
results['errors'].append({
'resource_arn': asset.resource_arn,
'error': str(e)
})
return results
def _apply_s3_lifecycle_policy(self, asset: DataAsset):
"""Apply S3 lifecycle policy"""
bucket_name = asset.resource_arn.split(':')[-1]
policy = self.service_policies['s3'].get(asset.classification, self.service_policies['s3']['public'])
rules = [{
'ID': f'{asset.classification}-lifecycle',
'Status': 'Enabled',
'Filter': {'Tag': {'Key': 'DataClassification', 'Value': asset.classification}},
'Transitions': [
{'Days': t['days'], 'StorageClass': t['storage_class']}
for t in policy['transitions']
]
}]
if policy['expiration_days']:
rules[0]['Expiration'] = {'Days': policy['expiration_days']}
self.s3_client.put_bucket_lifecycle_configuration(
Bucket=bucket_name,
LifecycleConfiguration={'Rules': rules}
)
def _apply_rds_lifecycle_policy(self, asset: DataAsset):
"""Apply RDS lifecycle policy"""
db_identifier = asset.resource_arn.split(':')[-1]
policy = self.service_policies['rds'].get(asset.classification, self.service_policies['rds']['public'])
# Update backup retention
self.rds_client.modify_db_instance(
DBInstanceIdentifier=db_identifier,
BackupRetentionPeriod=policy['backup_retention_days'],
ApplyImmediately=False
)
def _apply_dynamodb_lifecycle_policy(self, asset: DataAsset):
"""Apply DynamoDB lifecycle policy"""
table_name = asset.resource_arn.split('/')[-1]
policy = self.service_policies['dynamodb'].get(asset.classification, self.service_policies['dynamodb']['public'])
# Update point-in-time recovery
if policy['point_in_time_recovery']:
self.dynamodb_client.update_continuous_backups(
TableName=table_name,
PointInTimeRecoverySpecification={'PointInTimeRecoveryEnabled': True}
)
def _get_resource_classification(self, resource_arn: str) -> str:
"""Get resource classification from tags"""
try:
if 's3' in resource_arn:
bucket_name = resource_arn.split(':')[-1]
tags = self.s3_client.get_bucket_tagging(Bucket=bucket_name)['TagSet']
for tag in tags:
if tag['Key'] == 'DataClassification':
return tag['Value']
except:
pass
return 'internal' # Default classification
def _get_s3_bucket_size(self, bucket_name: str) -> int:
"""Get S3 bucket size in bytes"""
try:
cloudwatch = boto3.client('cloudwatch', region_name=self.region)
response = cloudwatch.get_metric_statistics(
Namespace='AWS/S3',
MetricName='BucketSizeBytes',
Dimensions=[
{'Name': 'BucketName', 'Value': bucket_name},
{'Name': 'StorageType', 'Value': 'StandardStorage'}
],
StartTime=datetime.utcnow() - timedelta(days=2),
EndTime=datetime.utcnow(),
Period=86400,
Statistics=['Average']
)
if response['Datapoints']:
return int(response['Datapoints'][-1]['Average'])
except:
pass
return 0
def _get_compliance_requirements(self, classification: str) -> List[str]:
"""Get compliance requirements for classification"""
compliance_map = {
'public': [],
'internal': ['SOX'],
'confidential': ['GDPR', 'HIPAA'],
'restricted': ['PCI_DSS', 'HIPAA', 'GDPR']
}
return compliance_map.get(classification, [])
def create_lifecycle_workflow(self) -> Dict[str, Any]:
"""Create Step Functions workflow for lifecycle management"""
workflow_definition = {
"Comment": "Multi-service data lifecycle management workflow",
"StartAt": "DiscoverAssets",
"States": {
"DiscoverAssets": {
"Type": "Task",
"Resource": f"arn:aws:lambda:{self.region}:{self._get_account_id()}:function:discover-data-assets",
"Next": "ApplyPolicies"
},
"ApplyPolicies": {
"Type": "Map",
"ItemsPath": "$.assets",
"Iterator": {
"StartAt": "ApplyLifecyclePolicy",
"States": {
"ApplyLifecyclePolicy": {
"Type": "Task",
"Resource": f"arn:aws:lambda:{self.region}:{self._get_account_id()}:function:apply-lifecycle-policy",
"End": True
}
}
},
"Next": "GenerateReport"
},
"GenerateReport": {
"Type": "Task",
"Resource": f"arn:aws:lambda:{self.region}:{self._get_account_id()}:function:generate-lifecycle-report",
"End": True
}
}
}
try:
response = self.stepfunctions_client.create_state_machine(
name='DataLifecycleManagement',
definition=json.dumps(workflow_definition),
roleArn=f'arn:aws:iam::{self._get_account_id()}:role/StepFunctionsLifecycleRole'
)
return {
'state_machine_arn': response['stateMachineArn'],
'status': 'created'
}
except Exception as e:
return {
'status': 'error',
'message': str(e)
}
def _get_account_id(self) -> str:
"""Get AWS account ID"""
return boto3.client('sts').get_caller_identity()['Account']
# Example usage
if __name__ == "__main__":
manager = MultiServiceLifecycleManager()
# Discover assets
assets = manager.discover_data_assets()
print(f"Discovered {len(assets)} data assets")
# Apply lifecycle policies
results = manager.apply_lifecycle_policies(assets)
print(f"Applied policies to {results['policies_applied']} assets")
# Create workflow
workflow_result = manager.create_lifecycle_workflow()
print(f"Workflow creation: {workflow_result}")Example 3: Compliance-Driven Lifecycle Automation
View code
# compliance-lifecycle-automation.yaml
AWSTemplateFormatVersion: '2010-09-09'
Description: 'Compliance-driven data lifecycle automation'
Parameters:
Environment:
Type: String
Default: 'prod'
AllowedValues: ['dev', 'staging', 'prod']
ComplianceFramework:
Type: String
Default: 'GDPR'
AllowedValues: ['GDPR', 'HIPAA', 'PCI_DSS', 'SOX']
Resources:
# DynamoDB table for lifecycle tracking
LifecycleTrackingTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: !Sub '${Environment}-lifecycle-tracking'
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: resource_arn
AttributeType: S
- AttributeName: lifecycle_stage
AttributeType: S
- AttributeName: compliance_deadline
AttributeType: S
KeySchema:
- AttributeName: resource_arn
KeyType: HASH
- AttributeName: lifecycle_stage
KeyType: RANGE
GlobalSecondaryIndexes:
- IndexName: ComplianceDeadlineIndex
KeySchema:
- AttributeName: compliance_deadline
KeyType: HASH
Projection:
ProjectionType: ALL
TimeToLiveSpecification:
AttributeName: ttl
Enabled: true
# Lambda function for lifecycle enforcement
LifecycleEnforcementFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName: !Sub '${Environment}-lifecycle-enforcement'
Runtime: python3.9
Handler: index.lambda_handler
Role: !GetAtt LifecycleEnforcementRole.Arn
Timeout: 900
Environment:
Variables:
TRACKING_TABLE: !Ref LifecycleTrackingTable
COMPLIANCE_FRAMEWORK: !Ref ComplianceFramework
Code:
ZipFile: |
import boto3
import json
import os
from datetime import datetime, timedelta
def lambda_handler(event, context):
"""Enforce lifecycle policies based on compliance requirements"""
s3_client = boto3.client('s3')
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.environ['TRACKING_TABLE'])
compliance_framework = os.environ['COMPLIANCE_FRAMEWORK']
# Define compliance-based retention periods
retention_policies = {
'GDPR': {
'personal_data': 2555, # 7 years max
'consent_records': 1095, # 3 years
'processing_logs': 365 # 1 year
},
'HIPAA': {
'phi_data': 2190, # 6 years
'audit_logs': 2190, # 6 years
'access_logs': 2190 # 6 years
},
'PCI_DSS': {
'cardholder_data': 365, # 1 year
'audit_trails': 365, # 1 year
'vulnerability_scans': 365 # 1 year
}
}
try:
# Get resources approaching compliance deadlines
deadline_threshold = (datetime.utcnow() + timedelta(days=30)).isoformat()
response = table.query(
IndexName='ComplianceDeadlineIndex',
KeyConditionExpression='compliance_deadline < :threshold',
ExpressionAttributeValues={':threshold': deadline_threshold}
)
results = {
'resources_processed': 0,
'actions_taken': [],
'errors': []
}
for item in response['Items']:
try:
resource_arn = item['resource_arn']
data_type = item.get('data_type', 'general')
# Apply compliance-specific actions
if 's3' in resource_arn:
action_result = apply_s3_compliance_action(
s3_client, resource_arn, data_type, compliance_framework
)
results['actions_taken'].append(action_result)
results['resources_processed'] += 1
except Exception as e:
results['errors'].append({
'resource': item.get('resource_arn', 'unknown'),
'error': str(e)
})
return {
'statusCode': 200,
'body': json.dumps(results)
}
except Exception as e:
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
def apply_s3_compliance_action(s3_client, resource_arn, data_type, framework):
"""Apply compliance-specific action to S3 resource"""
bucket_name = resource_arn.split(':')[-1]
if framework == 'GDPR' and data_type == 'personal_data':
# Apply GDPR-specific lifecycle policy
lifecycle_config = {
'Rules': [{
'ID': 'GDPR-PersonalData-Lifecycle',
'Status': 'Enabled',
'Filter': {'Tag': {'Key': 'DataType', 'Value': 'personal_data'}},
'Expiration': {'Days': 2555} # 7 years max retention
}]
}
s3_client.put_bucket_lifecycle_configuration(
Bucket=bucket_name,
LifecycleConfiguration=lifecycle_config
)
return {
'resource': resource_arn,
'action': 'GDPR lifecycle policy applied',
'retention_days': 2555
}
return {
'resource': resource_arn,
'action': 'no action required',
'framework': framework
}
# IAM role for lifecycle enforcement
LifecycleEnforcementRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Policies:
- PolicyName: LifecycleEnforcementPolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- dynamodb:Query
- dynamodb:GetItem
- dynamodb:PutItem
- dynamodb:UpdateItem
- s3:PutLifecycleConfiguration
- s3:GetLifecycleConfiguration
- s3:PutBucketTagging
- s3:GetBucketTagging
- rds:ModifyDBInstance
- rds:DescribeDBInstances
Resource: '*'
# EventBridge rule for scheduled enforcement
LifecycleEnforcementSchedule:
Type: AWS::Events::Rule
Properties:
Name: !Sub '${Environment}-lifecycle-enforcement-schedule'
ScheduleExpression: 'rate(1 day)'
State: ENABLED
Targets:
- Arn: !GetAtt LifecycleEnforcementFunction.Arn
Id: LifecycleEnforcementTarget
# Permission for EventBridge to invoke Lambda
LifecycleEnforcementPermission:
Type: AWS::Lambda::Permission
Properties:
FunctionName: !Ref LifecycleEnforcementFunction
Action: lambda:InvokeFunction
Principal: events.amazonaws.com
SourceArn: !GetAtt LifecycleEnforcementSchedule.Arn
Outputs:
TrackingTableName:
Description: 'Name of the lifecycle tracking table'
Value: !Ref LifecycleTrackingTable
Export:
Name: !Sub '${AWS::StackName}-TrackingTable'
EnforcementFunctionArn:
Description: 'ARN of the lifecycle enforcement function'
Value: !GetAtt LifecycleEnforcementFunction.Arn
Export:
Name: !Sub '${AWS::StackName}-EnforcementFunction'Example 4: Cost-Optimized Lifecycle Management
View code
# cost_optimized_lifecycle.py
import boto3
from typing import Dict, List, Any
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
class CostOptimizedLifecycleManager:
"""
Cost-optimized data lifecycle management with intelligent tiering
"""
def __init__(self, region: str = 'us-east-1'):
self.region = region
self.s3_client = boto3.client('s3', region_name=region)
self.cloudwatch = boto3.client('cloudwatch', region_name=region)
self.cost_explorer = boto3.client('ce', region_name='us-east-1') # Cost Explorer is only in us-east-1
# Storage class pricing (approximate, per GB per month)
self.storage_pricing = {
'STANDARD': 0.023,
'STANDARD_IA': 0.0125,
'ONEZONE_IA': 0.01,
'GLACIER': 0.004,
'DEEP_ARCHIVE': 0.00099,
'INTELLIGENT_TIERING': 0.0125 # Base price, auto-optimizes
}
def analyze_access_patterns(self, bucket_name: str, days: int = 30) -> Dict[str, Any]:
"""Analyze S3 bucket access patterns for lifecycle optimization"""
try:
# Get access metrics from CloudWatch
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=days)
# Get number of requests
requests_response = self.cloudwatch.get_metric_statistics(
Namespace='AWS/S3',
MetricName='NumberOfObjects',
Dimensions=[{'Name': 'BucketName', 'Value': bucket_name}],
StartTime=start_time,
EndTime=end_time,
Period=86400, # Daily
Statistics=['Average']
)
# Get bucket size
size_response = self.cloudwatch.get_metric_statistics(
Namespace='AWS/S3',
MetricName='BucketSizeBytes',
Dimensions=[
{'Name': 'BucketName', 'Value': bucket_name},
{'Name': 'StorageType', 'Value': 'StandardStorage'}
],
StartTime=start_time,
EndTime=end_time,
Period=86400,
Statistics=['Average']
)
# Calculate access frequency
total_requests = sum(dp['Average'] for dp in requests_response['Datapoints'])
avg_daily_requests = total_requests / days if days > 0 else 0
# Get current size
current_size_bytes = 0
if size_response['Datapoints']:
current_size_bytes = size_response['Datapoints'][-1]['Average']
current_size_gb = current_size_bytes / (1024**3)
# Determine access pattern
if avg_daily_requests > 100:
access_pattern = 'frequent'
elif avg_daily_requests > 10:
access_pattern = 'moderate'
elif avg_daily_requests > 1:
access_pattern = 'infrequent'
else:
access_pattern = 'rare'
return {
'bucket': bucket_name,
'size_gb': round(current_size_gb, 2),
'avg_daily_requests': round(avg_daily_requests, 2),
'access_pattern': access_pattern,
'analysis_period_days': days
}
except Exception as e:
logger.error(f"Error analyzing access patterns for {bucket_name}: {str(e)}")
return {
'bucket': bucket_name,
'error': str(e)
}
def recommend_optimal_lifecycle_policy(self, bucket_name: str, classification: str) -> Dict[str, Any]:
"""Recommend optimal lifecycle policy based on access patterns and classification"""
try:
# Analyze access patterns
access_analysis = self.analyze_access_patterns(bucket_name)
if 'error' in access_analysis:
return access_analysis
access_pattern = access_analysis['access_pattern']
size_gb = access_analysis['size_gb']
# Base recommendations on access pattern and classification
recommendations = {
'bucket': bucket_name,
'classification': classification,
'access_pattern': access_pattern,
'size_gb': size_gb,
'recommended_policy': {},
'cost_analysis': {},
'rationale': []
}
# Define lifecycle policy based on access pattern and classification
if access_pattern == 'frequent':
if size_gb > 1000: # Large bucket
policy = {
'intelligent_tiering': True,
'transitions': [
{'days': 90, 'storage_class': 'GLACIER'},
{'days': 365, 'storage_class': 'DEEP_ARCHIVE'}
]
}
recommendations['rationale'].append('Large frequently accessed bucket - use Intelligent Tiering')
else:
policy = {
'transitions': [
{'days': 30, 'storage_class': 'STANDARD_IA'},
{'days': 180, 'storage_class': 'GLACIER'}
]
}
recommendations['rationale'].append('Frequently accessed bucket - delayed transitions')
elif access_pattern == 'moderate':
policy = {
'transitions': [
{'days': 30, 'storage_class': 'STANDARD_IA'},
{'days': 90, 'storage_class': 'GLACIER'},
{'days': 365, 'storage_class': 'DEEP_ARCHIVE'}
]
}
recommendations['rationale'].append('Moderately accessed bucket - standard transitions')
elif access_pattern == 'infrequent':
policy = {
'transitions': [
{'days': 7, 'storage_class': 'STANDARD_IA'},
{'days': 30, 'storage_class': 'GLACIER'},
{'days': 90, 'storage_class': 'DEEP_ARCHIVE'}
]
}
recommendations['rationale'].append('Infrequently accessed bucket - aggressive transitions')
else: # rare access
policy = {
'transitions': [
{'days': 1, 'storage_class': 'GLACIER'},
{'days': 30, 'storage_class': 'DEEP_ARCHIVE'}
]
}
recommendations['rationale'].append('Rarely accessed bucket - immediate archival')
# Adjust for classification requirements
if classification == 'restricted':
# Restricted data may need longer retention in accessible tiers
policy['transitions'] = [t for t in policy['transitions'] if t['days'] >= 30]
recommendations['rationale'].append('Restricted classification - extended accessible retention')
# Calculate cost savings
cost_analysis = self._calculate_cost_savings(size_gb, policy)
recommendations['recommended_policy'] = policy
recommendations['cost_analysis'] = cost_analysis
return recommendations
except Exception as e:
logger.error(f"Error recommending lifecycle policy for {bucket_name}: {str(e)}")
return {
'bucket': bucket_name,
'error': str(e)
}
def _calculate_cost_savings(self, size_gb: float, policy: Dict[str, Any]) -> Dict[str, Any]:
"""Calculate potential cost savings from lifecycle policy"""
try:
# Current cost (all in STANDARD)
current_monthly_cost = size_gb * self.storage_pricing['STANDARD']
# Projected cost with lifecycle policy
projected_cost = 0
remaining_data = size_gb
if policy.get('intelligent_tiering'):
# Assume 70% stays in Standard, 30% moves to IA automatically
projected_cost += (remaining_data * 0.7 * self.storage_pricing['STANDARD'])
projected_cost += (remaining_data * 0.3 * self.storage_pricing['STANDARD_IA'])
else:
# Calculate based on transitions
current_tier_cost = self.storage_pricing['STANDARD']
for transition in policy.get('transitions', []):
# Assume 50% of data transitions at each stage
transitioning_data = remaining_data * 0.5
staying_data = remaining_data * 0.5
projected_cost += staying_data * current_tier_cost
remaining_data = transitioning_data
current_tier_cost = self.storage_pricing[transition['storage_class']]
# Add cost for final tier
projected_cost += remaining_data * current_tier_cost
monthly_savings = current_monthly_cost - projected_cost
annual_savings = monthly_savings * 12
savings_percentage = (monthly_savings / current_monthly_cost * 100) if current_monthly_cost > 0 else 0
return {
'current_monthly_cost': round(current_monthly_cost, 2),
'projected_monthly_cost': round(projected_cost, 2),
'monthly_savings': round(monthly_savings, 2),
'annual_savings': round(annual_savings, 2),
'savings_percentage': round(savings_percentage, 1)
}
except Exception as e:
logger.error(f"Error calculating cost savings: {str(e)}")
return {'error': str(e)}
def implement_cost_optimized_policy(self, bucket_name: str, policy: Dict[str, Any]) -> Dict[str, Any]:
"""Implement cost-optimized lifecycle policy"""
try:
rules = []
if policy.get('intelligent_tiering'):
# Enable Intelligent Tiering
rules.append({
'ID': 'IntelligentTieringRule',
'Status': 'Enabled',
'Filter': {'Prefix': ''},
'Transitions': [
{'Days': 0, 'StorageClass': 'INTELLIGENT_TIERING'}
] + policy.get('transitions', [])
})
else:
# Standard lifecycle transitions
rules.append({
'ID': 'CostOptimizedLifecycle',
'Status': 'Enabled',
'Filter': {'Prefix': ''},
'Transitions': policy.get('transitions', []),
'AbortIncompleteMultipartUpload': {'DaysAfterInitiation': 7}
})
# Apply lifecycle configuration
self.s3_client.put_bucket_lifecycle_configuration(
Bucket=bucket_name,
LifecycleConfiguration={'Rules': rules}
)
return {
'status': 'success',
'bucket': bucket_name,
'rules_applied': len(rules),
'policy_type': 'intelligent_tiering' if policy.get('intelligent_tiering') else 'standard'
}
except Exception as e:
logger.error(f"Error implementing lifecycle policy for {bucket_name}: {str(e)}")
return {
'status': 'error',
'bucket': bucket_name,
'error': str(e)
}
# Example usage
if __name__ == "__main__":
manager = CostOptimizedLifecycleManager()
# Analyze access patterns
analysis = manager.analyze_access_patterns('my-data-bucket')
print(f"Access analysis: {analysis}")
# Get recommendations
recommendations = manager.recommend_optimal_lifecycle_policy('my-data-bucket', 'confidential')
print(f"Recommendations: {recommendations}")
# Implement policy
if 'recommended_policy' in recommendations:
implementation = manager.implement_cost_optimized_policy(
'my-data-bucket',
recommendations['recommended_policy']
)
print(f"Implementation result: {implementation}")Relevant AWS Services
Core Lifecycle Services
- Amazon S3: Object lifecycle management with storage class transitions
- AWS Backup: Centralized backup across AWS services with lifecycle policies
- Amazon EBS: Snapshot lifecycle management
- Amazon RDS: Automated backups and snapshot retention
Automation Services
- AWS Lambda: Serverless functions for lifecycle automation
- Amazon EventBridge: Event-driven lifecycle triggers
- AWS Step Functions: Complex lifecycle workflow orchestration
- AWS Systems Manager: Automated lifecycle maintenance
Monitoring and Analytics
- Amazon CloudWatch: Metrics and monitoring for lifecycle decisions
- AWS Cost Explorer: Cost analysis and optimization
- AWS CloudTrail: Audit trails for lifecycle actions
- Amazon QuickSight: Lifecycle reporting and dashboards
Compliance Services
- AWS Config: Configuration compliance monitoring
- AWS Security Hub: Centralized compliance findings
- Amazon Macie: Data classification for lifecycle decisions
- AWS Artifact: Compliance documentation and reports
Benefits of Scalable Data Lifecycle Management
Cost Benefits
- Storage Optimization: Automatic transitions to lower-cost storage classes
- Resource Efficiency: Right-sizing based on actual usage patterns
- Predictable Costs: Automated cost management and optimization
- Waste Reduction: Elimination of unnecessary data retention
Operational Benefits
- Automation: Reduced manual effort for lifecycle management
- Scalability: Handles growing data volumes automatically
- Consistency: Uniform lifecycle policies across all data assets
- Efficiency: Streamlined data management processes
Compliance Benefits
- Regulatory Adherence: Automated compliance with retention requirements
- Audit Readiness: Complete lifecycle audit trails
- Risk Management: Controlled data disposal and retention
- Policy Enforcement: Consistent application of lifecycle policies
Security Benefits
- Data Protection: Appropriate security controls throughout lifecycle
- Access Control: Lifecycle-aware access management
- Secure Disposal: Certified data destruction methods
- Compliance Monitoring: Continuous lifecycle compliance validation
Related Resources
- AWS Well-Architected Framework - Data Lifecycle Management
- Amazon S3 Lifecycle Management
- AWS Backup User Guide
- AWS Cost Optimization Best Practices
- AWS Compliance Center
- AWS Security Blog - Data Lifecycle Management
View code