COST04-BP01 - Track resources over their lifetime
Implementation guidance
Resource lifecycle tracking provides the foundation for effective decommissioning by maintaining comprehensive visibility into all resources, their usage patterns, dependencies, and business context throughout their entire lifecycle.
Tracking Framework Principles
Comprehensive Coverage: Track all resources across all accounts, regions, and services to ensure no resources are overlooked during decommissioning activities.
Lifecycle Visibility: Maintain visibility into resource status from creation through active use to eventual decommissioning.
Business Context: Include business context such as project association, ownership, and purpose to enable informed decommissioning decisions.
Automated Discovery: Use automated tools to continuously discover and catalog resources to maintain accurate and up-to-date inventory.
Resource Tracking Components
Resource Inventory: Comprehensive catalog of all resources with metadata including creation date, owner, purpose, and current status.
Usage Monitoring: Continuous monitoring of resource utilization patterns to identify underutilized or unused resources.
Dependency Mapping: Documentation of resource relationships and dependencies to understand impact of decommissioning decisions.
Cost Attribution: Association of costs with resources to enable cost-based decommissioning prioritization.
AWS Services to Consider
AWS Config
Automatically discover and track resource configurations and changes. Use Config to maintain comprehensive resource inventory and track configuration drift.
AWS Systems Manager Inventory
Collect detailed information about resources and their configurations. Use Systems Manager to gather metadata and track resource attributes.
AWS Resource Groups
Organize resources into logical groups for tracking and management. Use resource groups to track related resources and their lifecycle status.
Amazon CloudWatch
Monitor resource utilization and performance metrics. Use CloudWatch to track usage patterns and identify decommissioning candidates.
AWS CloudTrail
Track resource creation, modification, and access activities. Use CloudTrail to understand resource usage patterns and ownership.
Amazon DynamoDB
Store resource tracking data and metadata. Use DynamoDB for fast access to resource information and lifecycle status.
Implementation Steps
1. Design Tracking Architecture
- Define resource tracking requirements and scope
- Design data model for resource lifecycle information
- Plan integration with existing systems and tools
- Establish data retention and archival policies
2. Implement Resource Discovery
- Set up automated resource discovery across all accounts
- Configure resource inventory collection and updates
- Implement resource classification and categorization
- Create resource ownership and accountability frameworks
3. Deploy Monitoring Infrastructure
- Set up utilization monitoring for all resource types
- Configure performance and usage metric collection
- Implement dependency discovery and mapping
- Create cost attribution and tracking mechanisms
4. Create Tracking Dashboards
- Build comprehensive resource inventory dashboards
- Create lifecycle status and utilization reports
- Implement alerting for tracking anomalies
- Set up automated reporting and notifications
5. Establish Governance Processes
- Create resource lifecycle management policies
- Implement ownership and accountability procedures
- Set up regular review and validation processes
- Create audit and compliance reporting capabilities
6. Enable Continuous Improvement
- Monitor tracking system effectiveness and accuracy
- Gather feedback from stakeholders and users
- Refine tracking processes based on lessons learned
- Expand tracking coverage to new services and use cases
Resource Tracking Implementation
Automated Resource Discovery
View code
import boto3
import json
from datetime import datetime, timedelta
class ResourceTracker:
def __init__(self):
self.config = boto3.client('config')
self.ec2 = boto3.client('ec2')
self.rds = boto3.client('rds')
self.s3 = boto3.client('s3')
self.dynamodb = boto3.resource('dynamodb')
self.tracking_table = self.dynamodb.Table('ResourceTracking')
def discover_all_resources(self):
"""Discover and catalog all resources across services"""
resources = {}
# Discover EC2 resources
resources['ec2'] = self.discover_ec2_resources()
# Discover RDS resources
resources['rds'] = self.discover_rds_resources()
# Discover S3 resources
resources['s3'] = self.discover_s3_resources()
# Store tracking information
self.store_resource_tracking(resources)
return resources
def discover_ec2_resources(self):
"""Discover EC2 instances and related resources"""
ec2_resources = []
# Get all instances
instances = self.ec2.describe_instances()
for reservation in instances['Reservations']:
for instance in reservation['Instances']:
resource_info = {
'resource_id': instance['InstanceId'],
'resource_type': 'EC2Instance',
'state': instance['State']['Name'],
'launch_time': instance['LaunchTime'].isoformat(),
'instance_type': instance['InstanceType'],
'tags': {tag['Key']: tag['Value'] for tag in instance.get('Tags', [])},
'vpc_id': instance.get('VpcId'),
'subnet_id': instance.get('SubnetId'),
'security_groups': [sg['GroupId'] for sg in instance.get('SecurityGroups', [])],
'discovered_at': datetime.now().isoformat()
}
# Add business context from tags
resource_info['owner'] = resource_info['tags'].get('Owner', 'Unknown')
resource_info['project'] = resource_info['tags'].get('Project', 'Unknown')
resource_info['environment'] = resource_info['tags'].get('Environment', 'Unknown')
resource_info['cost_center'] = resource_info['tags'].get('CostCenter', 'Unknown')
ec2_resources.append(resource_info)
# Get EBS volumes
volumes = self.ec2.describe_volumes()
for volume in volumes['Volumes']:
resource_info = {
'resource_id': volume['VolumeId'],
'resource_type': 'EBSVolume',
'state': volume['State'],
'create_time': volume['CreateTime'].isoformat(),
'size': volume['Size'],
'volume_type': volume['VolumeType'],
'tags': {tag['Key']: tag['Value'] for tag in volume.get('Tags', [])},
'attachments': volume.get('Attachments', []),
'discovered_at': datetime.now().isoformat()
}
# Add business context
resource_info['owner'] = resource_info['tags'].get('Owner', 'Unknown')
resource_info['project'] = resource_info['tags'].get('Project', 'Unknown')
ec2_resources.append(resource_info)
return ec2_resources
def discover_rds_resources(self):
"""Discover RDS instances and clusters"""
rds_resources = []
# Get RDS instances
instances = self.rds.describe_db_instances()
for instance in instances['DBInstances']:
# Get tags for the instance
tags_response = self.rds.list_tags_for_resource(
ResourceName=instance['DBInstanceArn']
)
tags = {tag['Key']: tag['Value'] for tag in tags_response['TagList']}
resource_info = {
'resource_id': instance['DBInstanceIdentifier'],
'resource_type': 'RDSInstance',
'state': instance['DBInstanceStatus'],
'create_time': instance['InstanceCreateTime'].isoformat(),
'engine': instance['Engine'],
'instance_class': instance['DBInstanceClass'],
'allocated_storage': instance['AllocatedStorage'],
'tags': tags,
'vpc_id': instance.get('DbSubnetGroup', {}).get('VpcId'),
'discovered_at': datetime.now().isoformat()
}
# Add business context
resource_info['owner'] = tags.get('Owner', 'Unknown')
resource_info['project'] = tags.get('Project', 'Unknown')
resource_info['environment'] = tags.get('Environment', 'Unknown')
rds_resources.append(resource_info)
return rds_resources
def discover_s3_resources(self):
"""Discover S3 buckets"""
s3_resources = []
# Get all buckets
buckets = self.s3.list_buckets()
for bucket in buckets['Buckets']:
bucket_name = bucket['Name']
try:
# Get bucket tags
tags_response = self.s3.get_bucket_tagging(Bucket=bucket_name)
tags = {tag['Key']: tag['Value'] for tag in tags_response['TagSet']}
except:
tags = {}
try:
# Get bucket location
location = self.s3.get_bucket_location(Bucket=bucket_name)
region = location['LocationConstraint'] or 'us-east-1'
except:
region = 'Unknown'
resource_info = {
'resource_id': bucket_name,
'resource_type': 'S3Bucket',
'create_time': bucket['CreationDate'].isoformat(),
'region': region,
'tags': tags,
'discovered_at': datetime.now().isoformat()
}
# Add business context
resource_info['owner'] = tags.get('Owner', 'Unknown')
resource_info['project'] = tags.get('Project', 'Unknown')
resource_info['environment'] = tags.get('Environment', 'Unknown')
s3_resources.append(resource_info)
return s3_resources
def store_resource_tracking(self, resources):
"""Store resource tracking information in DynamoDB"""
for service, service_resources in resources.items():
for resource in service_resources:
try:
# Calculate resource age
if 'create_time' in resource:
create_time = datetime.fromisoformat(resource['create_time'].replace('Z', '+00:00'))
age_days = (datetime.now() - create_time.replace(tzinfo=None)).days
resource['age_days'] = age_days
elif 'launch_time' in resource:
launch_time = datetime.fromisoformat(resource['launch_time'].replace('Z', '+00:00'))
age_days = (datetime.now() - launch_time.replace(tzinfo=None)).days
resource['age_days'] = age_days
# Store in DynamoDB
self.tracking_table.put_item(
Item={
'ResourceId': resource['resource_id'],
'ResourceType': resource['resource_type'],
'ServiceCategory': service,
'TrackingData': resource,
'LastUpdated': datetime.now().isoformat(),
'TTL': int((datetime.now() + timedelta(days=365)).timestamp())
}
)
except Exception as e:
print(f"Error storing resource {resource['resource_id']}: {str(e)}")Usage Monitoring Integration
View code
def implement_usage_monitoring():
"""Implement comprehensive usage monitoring for tracked resources"""
cloudwatch = boto3.client('cloudwatch')
# Lambda function for usage monitoring
lambda_code = '''
import boto3
import json
from datetime import datetime, timedelta
def lambda_handler(event, context):
"""Monitor resource usage and update tracking data"""
cloudwatch = boto3.client('cloudwatch')
dynamodb = boto3.resource('dynamodb')
tracking_table = dynamodb.Table('ResourceTracking')
# Get all tracked resources
response = tracking_table.scan()
resources = response['Items']
for resource in resources:
resource_id = resource['ResourceId']
resource_type = resource['ResourceType']
# Get usage metrics based on resource type
usage_data = get_resource_usage_metrics(resource_id, resource_type, cloudwatch)
# Update tracking data with usage information
tracking_table.update_item(
Key={
'ResourceId': resource_id,
'ResourceType': resource_type
},
UpdateExpression='SET UsageData = :usage, LastMonitored = :timestamp',
ExpressionAttributeValues={
':usage': usage_data,
':timestamp': datetime.now().isoformat()
}
)
return {'statusCode': 200, 'body': json.dumps(f'Monitored {len(resources)} resources')}
def get_resource_usage_metrics(resource_id, resource_type, cloudwatch):
"""Get usage metrics for specific resource types"""
end_time = datetime.now()
start_time = end_time - timedelta(days=7) # Last 7 days
usage_data = {}
try:
if resource_type == 'EC2Instance':
# Get CPU utilization
cpu_response = cloudwatch.get_metric_statistics(
Namespace='AWS/EC2',
MetricName='CPUUtilization',
Dimensions=[{'Name': 'InstanceId', 'Value': resource_id}],
StartTime=start_time,
EndTime=end_time,
Period=3600,
Statistics=['Average']
)
if cpu_response['Datapoints']:
avg_cpu = sum(dp['Average'] for dp in cpu_response['Datapoints']) / len(cpu_response['Datapoints'])
usage_data['avg_cpu_utilization'] = avg_cpu
usage_data['max_cpu_utilization'] = max(dp['Average'] for dp in cpu_response['Datapoints'])
usage_data['cpu_datapoints'] = len(cpu_response['Datapoints'])
# Get network metrics
network_response = cloudwatch.get_metric_statistics(
Namespace='AWS/EC2',
MetricName='NetworkIn',
Dimensions=[{'Name': 'InstanceId', 'Value': resource_id}],
StartTime=start_time,
EndTime=end_time,
Period=3600,
Statistics=['Sum']
)
if network_response['Datapoints']:
total_network_in = sum(dp['Sum'] for dp in network_response['Datapoints'])
usage_data['total_network_in'] = total_network_in
elif resource_type == 'RDSInstance':
# Get database connections
conn_response = cloudwatch.get_metric_statistics(
Namespace='AWS/RDS',
MetricName='DatabaseConnections',
Dimensions=[{'Name': 'DBInstanceIdentifier', 'Value': resource_id}],
StartTime=start_time,
EndTime=end_time,
Period=3600,
Statistics=['Average']
)
if conn_response['Datapoints']:
avg_connections = sum(dp['Average'] for dp in conn_response['Datapoints']) / len(conn_response['Datapoints'])
usage_data['avg_connections'] = avg_connections
usage_data['max_connections'] = max(dp['Average'] for dp in conn_response['Datapoints'])
elif resource_type == 'S3Bucket':
# Get bucket size
size_response = cloudwatch.get_metric_statistics(
Namespace='AWS/S3',
MetricName='BucketSizeBytes',
Dimensions=[
{'Name': 'BucketName', 'Value': resource_id},
{'Name': 'StorageType', 'Value': 'StandardStorage'}
],
StartTime=start_time,
EndTime=end_time,
Period=86400, # Daily
Statistics=['Average']
)
if size_response['Datapoints']:
latest_size = size_response['Datapoints'][-1]['Average']
usage_data['bucket_size_bytes'] = latest_size
# Add common usage indicators
usage_data['monitoring_period_days'] = 7
usage_data['last_monitored'] = datetime.now().isoformat()
# Determine usage status
usage_data['usage_status'] = determine_usage_status(resource_type, usage_data)
except Exception as e:
usage_data['error'] = str(e)
usage_data['usage_status'] = 'monitoring_error'
return usage_data
def determine_usage_status(resource_type, usage_data):
"""Determine usage status based on metrics"""
if resource_type == 'EC2Instance':
avg_cpu = usage_data.get('avg_cpu_utilization', 0)
if avg_cpu < 5:
return 'unused'
elif avg_cpu < 20:
return 'underutilized'
else:
return 'active'
elif resource_type == 'RDSInstance':
avg_connections = usage_data.get('avg_connections', 0)
if avg_connections < 1:
return 'unused'
elif avg_connections < 5:
return 'underutilized'
else:
return 'active'
elif resource_type == 'S3Bucket':
bucket_size = usage_data.get('bucket_size_bytes', 0)
if bucket_size == 0:
return 'empty'
else:
return 'active'
return 'unknown'
'''
# Create Lambda function
lambda_client = boto3.client('lambda')
try:
lambda_client.create_function(
FunctionName='ResourceUsageMonitoring',
Runtime='python3.9',
Role='arn:aws:iam::ACCOUNT:role/ResourceTrackingRole',
Handler='lambda_function.lambda_handler',
Code={'ZipFile': lambda_code.encode()},
Description='Monitor usage for tracked resources',
Timeout=300
)
# Set up scheduled execution
events_client = boto3.client('events')
events_client.put_rule(
Name='ResourceUsageMonitoringSchedule',
ScheduleExpression='rate(1 day)', # Daily monitoring
Description='Trigger daily resource usage monitoring'
)
events_client.put_targets(
Rule='ResourceUsageMonitoringSchedule',
Targets=[
{
'Id': '1',
'Arn': f'arn:aws:lambda:REGION:ACCOUNT:function:ResourceUsageMonitoring'
}
]
)
print("Set up resource usage monitoring")
except Exception as e:
print(f"Error setting up usage monitoring: {str(e)}")Dependency Mapping and Analysis
Resource Dependency Discovery
View code
def implement_dependency_mapping():
"""Implement comprehensive dependency mapping for resources"""
class DependencyMapper:
def __init__(self):
self.ec2 = boto3.client('ec2')
self.elbv2 = boto3.client('elbv2')
self.rds = boto3.client('rds')
self.dynamodb = boto3.resource('dynamodb')
self.dependency_table = self.dynamodb.Table('ResourceDependencies')
def map_all_dependencies(self):
"""Map dependencies for all tracked resources"""
dependencies = {}
# Map EC2 dependencies
dependencies.update(self.map_ec2_dependencies())
# Map Load Balancer dependencies
dependencies.update(self.map_load_balancer_dependencies())
# Map RDS dependencies
dependencies.update(self.map_rds_dependencies())
# Store dependency information
self.store_dependencies(dependencies)
return dependencies
def map_ec2_dependencies(self):
"""Map EC2 instance dependencies"""
dependencies = {}
# Get all instances
instances = self.ec2.describe_instances()
for reservation in instances['Reservations']:
for instance in reservation['Instances']:
instance_id = instance['InstanceId']
instance_dependencies = {
'resource_id': instance_id,
'resource_type': 'EC2Instance',
'dependencies': [],
'dependents': []
}
# VPC dependency
if 'VpcId' in instance:
instance_dependencies['dependencies'].append({
'resource_id': instance['VpcId'],
'resource_type': 'VPC',
'dependency_type': 'network'
})
# Subnet dependency
if 'SubnetId' in instance:
instance_dependencies['dependencies'].append({
'resource_id': instance['SubnetId'],
'resource_type': 'Subnet',
'dependency_type': 'network'
})
# Security Group dependencies
for sg in instance.get('SecurityGroups', []):
instance_dependencies['dependencies'].append({
'resource_id': sg['GroupId'],
'resource_type': 'SecurityGroup',
'dependency_type': 'security'
})
# EBS Volume dependencies
for bdm in instance.get('BlockDeviceMappings', []):
if 'Ebs' in bdm:
instance_dependencies['dependencies'].append({
'resource_id': bdm['Ebs']['VolumeId'],
'resource_type': 'EBSVolume',
'dependency_type': 'storage'
})
dependencies[instance_id] = instance_dependencies
return dependencies
def map_load_balancer_dependencies(self):
"""Map load balancer dependencies"""
dependencies = {}
# Get all load balancers
load_balancers = self.elbv2.describe_load_balancers()
for lb in load_balancers['LoadBalancers']:
lb_arn = lb['LoadBalancerArn']
lb_name = lb['LoadBalancerName']
lb_dependencies = {
'resource_id': lb_name,
'resource_type': 'LoadBalancer',
'dependencies': [],
'dependents': []
}
# Subnet dependencies
for subnet_id in lb.get('AvailabilityZones', []):
if 'SubnetId' in subnet_id:
lb_dependencies['dependencies'].append({
'resource_id': subnet_id['SubnetId'],
'resource_type': 'Subnet',
'dependency_type': 'network'
})
# Security Group dependencies
for sg_id in lb.get('SecurityGroups', []):
lb_dependencies['dependencies'].append({
'resource_id': sg_id,
'resource_type': 'SecurityGroup',
'dependency_type': 'security'
})
# Target Group dependencies
target_groups = self.elbv2.describe_target_groups(
LoadBalancerArn=lb_arn
)
for tg in target_groups['TargetGroups']:
lb_dependencies['dependents'].append({
'resource_id': tg['TargetGroupName'],
'resource_type': 'TargetGroup',
'dependency_type': 'routing'
})
dependencies[lb_name] = lb_dependencies
return dependencies
def store_dependencies(self, dependencies):
"""Store dependency information in DynamoDB"""
for resource_id, dependency_info in dependencies.items():
try:
self.dependency_table.put_item(
Item={
'ResourceId': resource_id,
'ResourceType': dependency_info['resource_type'],
'Dependencies': dependency_info['dependencies'],
'Dependents': dependency_info['dependents'],
'LastUpdated': datetime.now().isoformat(),
'TTL': int((datetime.now() + timedelta(days=90)).timestamp())
}
)
except Exception as e:
print(f"Error storing dependencies for {resource_id}: {str(e)}")
# Initialize and run dependency mapping
mapper = DependencyMapper()
dependencies = mapper.map_all_dependencies()
return dependenciesCommon Challenges and Solutions
Challenge: Resource Discovery Across Multiple Accounts
Solution: Use AWS Organizations and cross-account roles for centralized discovery. Implement automated discovery tools that can access multiple accounts. Create standardized tagging and naming conventions across accounts.
Challenge: Tracking Dynamic Resources
Solution: Implement real-time discovery and tracking updates. Use event-driven tracking with CloudWatch Events. Create automated processes for tracking short-lived resources.
Challenge: Maintaining Data Quality
Solution: Implement comprehensive data validation and quality checks. Use automated reconciliation processes. Create feedback loops for data accuracy improvement.
Challenge: Scalability of Tracking Systems
Solution: Use scalable storage and processing solutions. Implement efficient data structures and indexing. Use managed services for large-scale data processing.
Challenge: Integration with Existing Systems
Solution: Design flexible integration architectures. Use standard APIs and data formats. Implement gradual migration strategies for existing systems.