Skip to content
COST04

COST04-BP04 - Decommission resources automatically

One-Click Remediation

Deploy CloudFormation stacks to implement this best practice with a single click.

Auto-Stop Dev Instances
Lambda + EventBridge to auto-stop tagged dev EC2 instances outside business hours
Launch Stack

Stacks deploy to your AWS account. Review parameters before creating. Standard AWS charges apply.

Implementation guidance

Automated decommissioning enables organizations to systematically identify and remove unused or underutilized resources without manual intervention, reducing costs and operational overhead while maintaining safety and compliance requirements.

Automation Principles

Policy-Driven: Use clearly defined policies and criteria to determine when resources should be automatically decommissioned.

Safety First: Implement comprehensive safety checks and validation to prevent accidental decommissioning of critical resources.

Gradual Implementation: Start with low-risk scenarios and gradually expand automation to more complex use cases.

Monitoring and Alerting: Maintain visibility into automated decommissioning activities with comprehensive logging and alerting.

Automation Components

Resource Discovery: Automated identification of resources that meet decommissioning criteria.

Policy Evaluation: Systematic evaluation of resources against decommissioning policies and rules.

Safety Validation: Automated checks to ensure resources can be safely decommissioned.

Execution Engine: Automated execution of decommissioning procedures with proper error handling.

AWS Services to Consider

AWS Lambda

Implement automated decommissioning logic and workflows. Use Lambda for event-driven and scheduled decommissioning tasks.

Amazon EventBridge

Trigger automated decommissioning based on events and schedules. Use EventBridge for coordinating complex automation workflows.

AWS Step Functions

Orchestrate complex automated decommissioning workflows. Use Step Functions for multi-step automation with error handling.

Amazon CloudWatch

Monitor resource utilization and trigger automated decommissioning. Use CloudWatch metrics and alarms for automation triggers.

AWS Config

Evaluate resource compliance with decommissioning policies. Use Config rules for automated policy evaluation and remediation.

AWS Systems Manager

Automate resource management and decommissioning tasks. Use Systems Manager for coordinated automation across multiple resources.

Implementation Steps

1. Define Automation Policies

  • Establish clear criteria for automated decommissioning
  • Define safety checks and validation requirements
  • Create exception handling and escalation procedures
  • Document automation policies and approval processes

2. Implement Resource Discovery

  • Create automated resource discovery and classification
  • Implement utilization monitoring and analysis
  • Set up policy evaluation and scoring systems
  • Create candidate identification and prioritization

3. Build Safety Validation

  • Implement dependency checking and impact analysis
  • Create business criticality assessment automation
  • Set up stakeholder notification and approval workflows
  • Design rollback and recovery mechanisms

4. Deploy Automation Engine

  • Create automated decommissioning execution workflows
  • Implement error handling and exception management
  • Set up comprehensive logging and audit trails
  • Create monitoring and alerting for automation activities

5. Enable Gradual Rollout

  • Start with low-risk, non-critical resources
  • Implement pilot programs and validation phases
  • Gradually expand automation scope and complexity
  • Create feedback loops for continuous improvement

6. Monitor and Optimize

  • Track automation effectiveness and accuracy
  • Monitor false positives and safety incidents
  • Gather feedback from stakeholders and users
  • Continuously refine automation policies and procedures

Automated Decommissioning Framework

Core Automation Engine

View code
import boto3
import json
from datetime import datetime, timedelta
from enum import Enum

class AutomationRiskLevel(Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"

class AutomatedDecommissioner:
    def __init__(self):
        self.ec2 = boto3.client('ec2')
        self.rds = boto3.client('rds')
        self.cloudwatch = boto3.client('cloudwatch')
        self.dynamodb = boto3.resource('dynamodb')
        self.sns = boto3.client('sns')
        self.lambda_client = boto3.client('lambda')
        
        # Initialize tables
        self.automation_table = self.dynamodb.Table('AutomatedDecommissioning')
        self.policy_table = self.dynamodb.Table('DecommissioningPolicies')
        self.whitelist_table = self.dynamodb.Table('DecommissioningWhitelist')
    
    def run_automated_decommissioning(self):
        """Main function to run automated decommissioning process"""
        
        execution_log = {
            'execution_id': f"AUTO-{datetime.now().strftime('%Y%m%d%H%M%S')}",
            'start_time': datetime.now().isoformat(),
            'candidates_identified': 0,
            'resources_decommissioned': 0,
            'errors': [],
            'status': 'running'
        }
        
        try:
            # Step 1: Discover decommissioning candidates
            candidates = self.discover_decommissioning_candidates()
            execution_log['candidates_identified'] = len(candidates)
            execution_log['candidates'] = candidates
            
            # Step 2: Process each candidate
            for candidate in candidates:
                try:
                    result = self.process_decommissioning_candidate(candidate)
                    if result['action_taken']:
                        execution_log['resources_decommissioned'] += 1
                        
                except Exception as e:
                    execution_log['errors'].append({
                        'resource_id': candidate['resource_id'],
                        'error': str(e)
                    })
            
            execution_log['status'] = 'completed'
            
        except Exception as e:
            execution_log['status'] = 'failed'
            execution_log['error'] = str(e)
        
        execution_log['end_time'] = datetime.now().isoformat()
        
        # Store execution log
        self.store_execution_log(execution_log)
        
        # Send summary notification
        self.send_execution_summary(execution_log)
        
        return execution_log
    
    def discover_decommissioning_candidates(self):
        """Discover resources that are candidates for automated decommissioning"""
        
        candidates = []
        
        # Discover EC2 candidates
        ec2_candidates = self.discover_ec2_candidates()
        candidates.extend(ec2_candidates)
        
        # Discover RDS candidates
        rds_candidates = self.discover_rds_candidates()
        candidates.extend(rds_candidates)
        
        # Discover EBS volume candidates
        ebs_candidates = self.discover_ebs_candidates()
        candidates.extend(ebs_candidates)
        
        # Discover S3 bucket candidates
        s3_candidates = self.discover_s3_candidates()
        candidates.extend(s3_candidates)
        
        return candidates
    
    def discover_ec2_candidates(self):
        """Discover EC2 instances that are candidates for decommissioning"""
        
        candidates = []
        
        # Get all instances
        instances = self.ec2.describe_instances()
        
        for reservation in instances['Reservations']:
            for instance in reservation['Instances']:
                if instance['State']['Name'] in ['running', 'stopped']:
                    candidate = self.evaluate_ec2_instance(instance)
                    if candidate['eligible']:
                        candidates.append(candidate)
        
        return candidates
    
    def evaluate_ec2_instance(self, instance):
        """Evaluate EC2 instance for automated decommissioning"""
        
        instance_id = instance['InstanceId']
        
        candidate = {
            'resource_id': instance_id,
            'resource_type': 'EC2Instance',
            'eligible': False,
            'risk_level': AutomationRiskLevel.HIGH.value,
            'reasons': [],
            'safety_checks': {},
            'automation_policy': None
        }
        
        # Check if instance is whitelisted
        if self.is_resource_whitelisted(instance_id):
            candidate['reasons'].append('Resource is whitelisted')
            return candidate
        
        # Get instance metadata
        tags = {tag['Key']: tag['Value'] for tag in instance.get('Tags', [])}
        launch_time = instance['LaunchTime']
        instance_age = (datetime.now(launch_time.tzinfo) - launch_time).days
        
        # Check age-based policies
        age_policy = self.check_age_based_policy(instance_age, tags)
        if age_policy['eligible']:
            candidate['eligible'] = True
            candidate['automation_policy'] = age_policy
            candidate['reasons'].append(f"Instance age ({instance_age} days) exceeds policy threshold")
        
        # Check utilization-based policies
        utilization_policy = self.check_utilization_policy(instance_id, tags)
        if utilization_policy['eligible']:
            candidate['eligible'] = True
            candidate['automation_policy'] = utilization_policy
            candidate['reasons'].append("Low utilization detected")
        
        # Perform safety checks
        candidate['safety_checks'] = self.perform_safety_checks(instance_id, 'EC2Instance', tags)
        
        # Determine risk level
        candidate['risk_level'] = self.calculate_automation_risk_level(candidate)
        
        return candidate
    
    def check_age_based_policy(self, resource_age, tags):
        """Check if resource meets age-based decommissioning policy"""
        
        policy = {
            'eligible': False,
            'policy_type': 'age_based',
            'threshold_days': 0,
            'environment_factor': 1.0
        }
        
        # Get environment-specific thresholds
        environment = tags.get('Environment', 'unknown').lower()
        
        age_thresholds = {
            'sandbox': 7,      # 1 week
            'development': 30, # 1 month
            'testing': 60,     # 2 months
            'staging': 90,     # 3 months
            'production': 365  # 1 year (very conservative)
        }
        
        threshold = age_thresholds.get(environment, 180)  # Default 6 months
        policy['threshold_days'] = threshold
        
        if resource_age > threshold:
            policy['eligible'] = True
        
        return policy
    
    def check_utilization_policy(self, resource_id, tags):
        """Check if resource meets utilization-based decommissioning policy"""
        
        policy = {
            'eligible': False,
            'policy_type': 'utilization_based',
            'avg_cpu_utilization': 0,
            'monitoring_period_days': 14,
            'threshold_percentage': 5
        }
        
        try:
            # Get CPU utilization for the last 14 days
            end_time = datetime.now()
            start_time = end_time - timedelta(days=14)
            
            response = self.cloudwatch.get_metric_statistics(
                Namespace='AWS/EC2',
                MetricName='CPUUtilization',
                Dimensions=[{'Name': 'InstanceId', 'Value': resource_id}],
                StartTime=start_time,
                EndTime=end_time,
                Period=3600,  # 1 hour periods
                Statistics=['Average']
            )
            
            if response['Datapoints']:
                avg_cpu = sum(dp['Average'] for dp in response['Datapoints']) / len(response['Datapoints'])
                policy['avg_cpu_utilization'] = avg_cpu
                
                # Check if utilization is below threshold
                environment = tags.get('Environment', 'unknown').lower()
                
                # Environment-specific thresholds
                utilization_thresholds = {
                    'sandbox': 1,      # Very low threshold for sandbox
                    'development': 3,  # Low threshold for dev
                    'testing': 5,      # Standard threshold
                    'staging': 10,     # Higher threshold for staging
                    'production': 20   # Much higher threshold for production
                }
                
                threshold = utilization_thresholds.get(environment, 5)
                policy['threshold_percentage'] = threshold
                
                if avg_cpu < threshold:
                    policy['eligible'] = True
            
        except Exception as e:
            policy['error'] = str(e)
        
        return policy
    
    def perform_safety_checks(self, resource_id, resource_type, tags):
        """Perform comprehensive safety checks before automated decommissioning"""
        
        safety_checks = {
            'whitelist_check': self.is_resource_whitelisted(resource_id),
            'dependency_check': self.check_resource_dependencies(resource_id),
            'business_hours_check': self.is_business_hours(),
            'environment_check': self.check_environment_safety(tags),
            'backup_check': self.check_backup_requirements(resource_id, resource_type),
            'approval_check': self.check_approval_requirements(resource_id, tags)
        }
        
        # Overall safety assessment
        safety_checks['safe_to_automate'] = all([
            not safety_checks['whitelist_check'],  # Not whitelisted
            not safety_checks['dependency_check']['has_critical_dependencies'],
            not safety_checks['business_hours_check'],  # Outside business hours
            safety_checks['environment_check']['safe_environment'],
            safety_checks['backup_check']['backup_not_required'] or safety_checks['backup_check']['backup_exists'],
            not safety_checks['approval_check']['approval_required']
        ])
        
        return safety_checks
    
    def check_resource_dependencies(self, resource_id):
        """Check for resource dependencies that would prevent safe decommissioning"""
        
        dependency_check = {
            'has_dependencies': False,
            'has_critical_dependencies': False,
            'dependency_count': 0,
            'dependencies': []
        }
        
        try:
            # Get dependency information from tracking system
            dependency_table = self.dynamodb.Table('ResourceDependencies')
            response = dependency_table.get_item(Key={'ResourceId': resource_id})
            
            if 'Item' in response:
                dependencies = response['Item'].get('Dependents', [])
                dependency_check['dependency_count'] = len(dependencies)
                dependency_check['dependencies'] = dependencies
                
                if dependencies:
                    dependency_check['has_dependencies'] = True
                    
                    # Check for critical dependencies
                    for dep in dependencies:
                        if dep.get('dependency_type') in ['critical', 'required']:
                            dependency_check['has_critical_dependencies'] = True
                            break
            
        except Exception as e:
            dependency_check['error'] = str(e)
        
        return dependency_check
    
    def check_environment_safety(self, tags):
        """Check if the resource environment is safe for automated decommissioning"""
        
        environment = tags.get('Environment', 'unknown').lower()
        
        # Define safe environments for automation
        safe_environments = ['sandbox', 'development', 'testing', 'dev', 'test']
        
        return {
            'environment': environment,
            'safe_environment': environment in safe_environments,
            'requires_approval': environment in ['staging', 'production', 'prod']
        }
    
    def calculate_automation_risk_level(self, candidate):
        """Calculate risk level for automated decommissioning"""
        
        risk_score = 0
        
        # Safety check scoring
        safety_checks = candidate['safety_checks']
        
        if safety_checks.get('dependency_check', {}).get('has_critical_dependencies'):
            risk_score += 3
        
        if safety_checks.get('environment_check', {}).get('requires_approval'):
            risk_score += 2
        
        if safety_checks.get('backup_check', {}).get('backup_required') and not safety_checks.get('backup_check', {}).get('backup_exists'):
            risk_score += 2
        
        if safety_checks.get('business_hours_check'):
            risk_score += 1
        
        # Policy type scoring
        if candidate.get('automation_policy', {}).get('policy_type') == 'utilization_based':
            risk_score += 1  # Utilization-based is slightly riskier
        
        # Determine risk level
        if risk_score >= 6:
            return AutomationRiskLevel.CRITICAL.value
        elif risk_score >= 4:
            return AutomationRiskLevel.HIGH.value
        elif risk_score >= 2:
            return AutomationRiskLevel.MEDIUM.value
        else:
            return AutomationRiskLevel.LOW.value
    
    def process_decommissioning_candidate(self, candidate):
        """Process a decommissioning candidate based on risk level and policies"""
        
        result = {
            'resource_id': candidate['resource_id'],
            'action_taken': False,
            'action_type': 'none',
            'reason': '',
            'timestamp': datetime.now().isoformat()
        }
        
        # Only proceed if safety checks pass
        if not candidate['safety_checks']['safe_to_automate']:
            result['reason'] = 'Safety checks failed'
            return result
        
        # Process based on risk level
        risk_level = candidate['risk_level']
        
        if risk_level == AutomationRiskLevel.LOW.value:
            # Automatically decommission low-risk resources
            result = self.execute_automated_decommissioning(candidate)
            
        elif risk_level == AutomationRiskLevel.MEDIUM.value:
            # Send notification and wait for approval or auto-approve after delay
            result = self.handle_medium_risk_decommissioning(candidate)
            
        else:
            # High and critical risk resources require manual approval
            result = self.request_manual_approval(candidate)
        
        return result
    
    def execute_automated_decommissioning(self, candidate):
        """Execute automated decommissioning for low-risk resources"""
        
        result = {
            'resource_id': candidate['resource_id'],
            'action_taken': False,
            'action_type': 'automated_decommission',
            'reason': '',
            'timestamp': datetime.now().isoformat()
        }
        
        try:
            resource_type = candidate['resource_type']
            resource_id = candidate['resource_id']
            
            if resource_type == 'EC2Instance':
                # Stop instance first, then schedule termination
                self.ec2.stop_instances(InstanceIds=[resource_id])
                
                # Schedule termination after a grace period
                self.schedule_delayed_termination(resource_id, hours=24)
                
                result['action_taken'] = True
                result['reason'] = 'Instance stopped, termination scheduled in 24 hours'
                
            elif resource_type == 'EBSVolume':
                # Create snapshot before deletion
                snapshot = self.ec2.create_snapshot(
                    VolumeId=resource_id,
                    Description=f'Automated backup before decommissioning {resource_id}'
                )
                
                # Schedule volume deletion after snapshot completion
                self.schedule_volume_deletion(resource_id, snapshot['SnapshotId'])
                
                result['action_taken'] = True
                result['reason'] = 'Snapshot created, volume deletion scheduled'
            
            # Log the action
            self.log_automation_action(candidate, result)
            
        except Exception as e:
            result['reason'] = f'Error during automated decommissioning: {str(e)}'
        
        return result
    
    def schedule_delayed_termination(self, instance_id, hours=24):
        """Schedule delayed termination of an instance"""
        
        # Use EventBridge to schedule delayed termination
        eventbridge = boto3.client('events')
        
        # Schedule rule for delayed execution
        rule_name = f'delayed-termination-{instance_id}'
        
        eventbridge.put_rule(
            Name=rule_name,
            ScheduleExpression=f'rate({hours} hours)',
            Description=f'Delayed termination for instance {instance_id}',
            State='ENABLED'
        )
        
        # Add target to execute termination
        eventbridge.put_targets(
            Rule=rule_name,
            Targets=[
                {
                    'Id': '1',
                    'Arn': 'arn:aws:lambda:REGION:ACCOUNT:function:ExecuteDelayedTermination',
                    'Input': json.dumps({
                        'instance_id': instance_id,
                        'action': 'terminate',
                        'scheduled_time': (datetime.now() + timedelta(hours=hours)).isoformat()
                    })
                }
            ]
        )

Automated Policy Engine

View code
def create_automated_policy_engine():
    """Create comprehensive automated policy engine"""
    
    # Lambda function for policy evaluation
    lambda_code = '''
import boto3
import json
from datetime import datetime, timedelta

def lambda_handler(event, context):
    """Evaluate resources against automated decommissioning policies"""
    
    # Initialize clients
    dynamodb = boto3.resource('dynamodb')
    policy_table = dynamodb.Table('DecommissioningPolicies')
    
    # Get active policies
    policies = get_active_policies(policy_table)
    
    # Evaluate each resource type
    results = {}
    
    for policy in policies:
        policy_results = evaluate_policy(policy)
        results[policy['PolicyId']] = policy_results
    
    return {
        'statusCode': 200,
        'body': json.dumps(results)
    }

def get_active_policies(policy_table):
    """Get all active decommissioning policies"""
    
    response = policy_table.scan(
        FilterExpression='PolicyStatus = :status',
        ExpressionAttributeValues={':status': 'active'}
    )
    
    return response['Items']

def evaluate_policy(policy):
    """Evaluate a specific decommissioning policy"""
    
    policy_type = policy['PolicyType']
    
    if policy_type == 'age_based':
        return evaluate_age_based_policy(policy)
    elif policy_type == 'utilization_based':
        return evaluate_utilization_based_policy(policy)
    elif policy_type == 'cost_based':
        return evaluate_cost_based_policy(policy)
    else:
        return {'error': f'Unknown policy type: {policy_type}'}

def evaluate_age_based_policy(policy):
    """Evaluate age-based decommissioning policy"""
    
    # Implementation for age-based policy evaluation
    candidates = []
    
    # Get resources older than threshold
    threshold_days = policy['Parameters']['ThresholdDays']
    cutoff_date = datetime.now() - timedelta(days=threshold_days)
    
    # Query resources based on age
    # Implementation would depend on resource tracking system
    
    return {
        'policy_id': policy['PolicyId'],
        'candidates_found': len(candidates),
        'candidates': candidates
    }
'''
    
    # Create Lambda function
    lambda_client = boto3.client('lambda')
    
    try:
        lambda_client.create_function(
            FunctionName='AutomatedPolicyEngine',
            Runtime='python3.9',
            Role='arn:aws:iam::ACCOUNT:role/AutomatedDecommissioningRole',
            Handler='lambda_function.lambda_handler',
            Code={'ZipFile': lambda_code.encode()},
            Description='Automated policy engine for resource decommissioning',
            Timeout=300
        )
        
        print("Created automated policy engine")
        
    except Exception as e:
        print(f"Error creating policy engine: {str(e)}")

Common Challenges and Solutions

Challenge: False Positives in Automated Detection

Solution: Implement comprehensive safety checks and validation. Use machine learning to improve detection accuracy over time. Create feedback loops to learn from false positives.

Challenge: Stakeholder Trust in Automation

Solution: Start with low-risk scenarios and gradually build trust. Provide comprehensive visibility and control. Implement easy override and rollback mechanisms.

Challenge: Complex Dependency Management

Solution: Implement sophisticated dependency mapping and analysis. Use gradual automation rollout. Create comprehensive testing and validation procedures.

Challenge: Compliance and Audit Requirements

Solution: Implement comprehensive audit logging for all automated activities. Create detailed documentation and approval trails. Use automated compliance checking and reporting.

Challenge: Balancing Automation and Safety

Solution: Use risk-based automation approaches. Implement multiple safety checks and validation layers. Create clear escalation and override procedures.