Skip to content
REL01

REL01-BP05 - Automate quota management

REL01-BP05: Automate quota management

Overview

Implement fully automated quota management systems that proactively monitor, analyze, and adjust service quotas without manual intervention. Automate the entire quota lifecycle from monitoring and alerting to increase requests and approval workflows, ensuring optimal resource availability while minimizing operational overhead.

Implementation Steps

1. Deploy Intelligent Quota Automation

  • Implement machine learning-based quota prediction and management
  • Set up automated quota increase workflows with approval chains
  • Create self-healing quota management systems
  • Establish automated quota optimization and right-sizing

2. Integrate with Infrastructure Automation

  • Embed quota management in CI/CD pipelines and deployment processes
  • Implement quota-aware infrastructure provisioning and scaling
  • Create automated quota validation for infrastructure changes
  • Set up dynamic quota adjustment based on workload patterns

3. Establish Event-Driven Quota Management

  • Implement real-time quota adjustment based on usage patterns
  • Set up automated responses to quota threshold breaches
  • Create event-driven quota coordination across accounts and regions
  • Establish automated disaster recovery quota pre-warming

4. Create Autonomous Quota Governance

  • Implement automated quota policy enforcement and compliance
  • Set up automated quota cost optimization and budget management
  • Create automated quota audit trails and reporting
  • Establish automated quota security and access controls

5. Deploy Predictive Quota Management

  • Implement forecasting models for quota demand prediction
  • Set up automated capacity planning and quota pre-allocation
  • Create seasonal and trend-based quota adjustment automation
  • Establish automated quota buffer management and optimization

6. Integrate Cross-Service Quota Orchestration

  • Implement automated quota coordination across multiple AWS services
  • Set up automated quota dependency management and resolution
  • Create automated quota impact analysis and mitigation
  • Establish automated quota rollback and recovery procedures

Implementation Examples

Example 1: Intelligent Quota Automation Engine

View code
import boto3
import json
import logging
import asyncio
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, asdict
from enum import Enum
import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestRegressor
import joblib
import os

class AutomationLevel(Enum):
    MONITOR_ONLY = "monitor_only"
    ALERT_ONLY = "alert_only"
    AUTO_REQUEST = "auto_request"
    AUTO_APPROVE = "auto_approve"
    FULL_AUTO = "full_auto"

@dataclass
class QuotaAutomationRule:
    service_code: str
    quota_code: str
    region: str
    automation_level: AutomationLevel
    threshold_warning: float = 70.0
    threshold_critical: float = 85.0
    threshold_emergency: float = 95.0
    auto_increase_multiplier: float = 1.5
    max_auto_increase: float = 10000
    approval_required: bool = False
    business_hours_only: bool = False
    cost_threshold: float = 1000.0

class IntelligentQuotaAutomationEngine:
    def __init__(self, config: Dict):
        self.config = config
        self.service_quotas = boto3.client('service-quotas')
        self.support = boto3.client('support')
        self.cloudwatch = boto3.client('cloudwatch')
        self.sns = boto3.client('sns')
        self.stepfunctions = boto3.client('stepfunctions')
        self.dynamodb = boto3.resource('dynamodb')
        
        # Initialize tables
        self.quota_table = self.dynamodb.Table(config['quota_table_name'])
        self.rules_table = self.dynamodb.Table(config['rules_table_name'])
        self.automation_log_table = self.dynamodb.Table(config['automation_log_table_name'])
        
        # ML models for prediction
        self.models = {}
        self.load_prediction_models()
        
    def load_prediction_models(self):
        """Load pre-trained ML models for quota prediction"""
        model_path = self.config.get('model_path', '/tmp/models')
        
        try:
            if os.path.exists(f"{model_path}/quota_predictor.joblib"):
                self.models['quota_predictor'] = joblib.load(f"{model_path}/quota_predictor.joblib")
                logging.info("Loaded quota prediction model")
            else:
                # Create and train a simple model if none exists
                self.models['quota_predictor'] = self.create_default_model()
                logging.info("Created default quota prediction model")
        except Exception as e:
            logging.error(f"Error loading models: {str(e)}")
            self.models['quota_predictor'] = self.create_default_model()
    
    def create_default_model(self):
        """Create a default prediction model"""
        return RandomForestRegressor(n_estimators=100, random_state=42)
    
    async def run_automation_cycle(self) -> Dict:
        """Run complete automation cycle"""
        cycle_start = datetime.utcnow()
        results = {
            'cycle_start': cycle_start.isoformat(),
            'quotas_processed': 0,
            'automations_executed': 0,
            'errors': [],
            'actions_taken': []
        }
        
        try:
            # Get all automation rules
            automation_rules = await self.get_automation_rules()
            
            # Process each rule
            for rule in automation_rules:
                try:
                    await self.process_automation_rule(rule, results)
                    results['quotas_processed'] += 1
                except Exception as e:
                    error_msg = f"Error processing rule {rule.service_code}/{rule.quota_code}: {str(e)}"
                    logging.error(error_msg)
                    results['errors'].append(error_msg)
            
            # Update ML models with new data
            await self.update_prediction_models()
            
            # Generate automation report
            await self.generate_automation_report(results)
            
        except Exception as e:
            logging.error(f"Error in automation cycle: {str(e)}")
            results['errors'].append(str(e))
        
        results['cycle_duration'] = (datetime.utcnow() - cycle_start).total_seconds()
        return results
    
    async def get_automation_rules(self) -> List[QuotaAutomationRule]:
        """Get all automation rules from DynamoDB"""
        rules = []
        
        try:
            response = self.rules_table.scan()
            
            for item in response['Items']:
                rule = QuotaAutomationRule(
                    service_code=item['service_code'],
                    quota_code=item['quota_code'],
                    region=item['region'],
                    automation_level=AutomationLevel(item['automation_level']),
                    threshold_warning=float(item.get('threshold_warning', 70.0)),
                    threshold_critical=float(item.get('threshold_critical', 85.0)),
                    threshold_emergency=float(item.get('threshold_emergency', 95.0)),
                    auto_increase_multiplier=float(item.get('auto_increase_multiplier', 1.5)),
                    max_auto_increase=float(item.get('max_auto_increase', 10000)),
                    approval_required=item.get('approval_required', False),
                    business_hours_only=item.get('business_hours_only', False),
                    cost_threshold=float(item.get('cost_threshold', 1000.0))
                )
                rules.append(rule)
                
        except Exception as e:
            logging.error(f"Error getting automation rules: {str(e)}")
        
        return rules
    
    async def process_automation_rule(self, rule: QuotaAutomationRule, results: Dict):
        """Process individual automation rule"""
        # Get current quota data
        quota_data = await self.get_quota_data(rule.service_code, rule.quota_code, rule.region)
        
        if not quota_data:
            return
        
        current_usage = quota_data['current_usage']
        quota_value = quota_data['quota_value']
        utilization = (current_usage / quota_value) * 100
        
        # Predict future usage
        predicted_usage = await self.predict_quota_usage(rule, quota_data)
        predicted_utilization = (predicted_usage / quota_value) * 100
        
        # Determine action based on automation level and thresholds
        action_needed = self.determine_automation_action(rule, utilization, predicted_utilization)
        
        if action_needed:
            await self.execute_automation_action(rule, quota_data, action_needed, results)
    
    async def get_quota_data(self, service_code: str, quota_code: str, region: str) -> Optional[Dict]:
        """Get current quota data"""
        try:
            quota_id = f"{service_code}#{quota_code}#{region}"
            
            # Get latest quota data
            response = self.quota_table.query(
                KeyConditionExpression='quota_id = :quota_id',
                ScanIndexForward=False,
                Limit=1,
                ExpressionAttributeValues={':quota_id': quota_id}
            )
            
            if response['Items']:
                return response['Items'][0]
                
        except Exception as e:
            logging.error(f"Error getting quota data: {str(e)}")
        
        return None
    
    async def predict_quota_usage(self, rule: QuotaAutomationRule, quota_data: Dict) -> float:
        """Predict future quota usage using ML models"""
        try:
            # Get historical data for prediction
            historical_data = await self.get_historical_quota_data(
                rule.service_code, rule.quota_code, rule.region, days=30
            )
            
            if len(historical_data) < 7:  # Need minimum data points
                # Use simple linear extrapolation
                current_usage = quota_data['current_usage']
                return current_usage * 1.1  # 10% growth assumption
            
            # Prepare features for ML model
            features = self.prepare_prediction_features(historical_data)
            
            # Use ML model for prediction
            model = self.models.get('quota_predictor')
            if model and len(features) > 0:
                # Predict usage for next 7 days
                prediction = model.predict([features[-1]])[0]
                return max(prediction, quota_data['current_usage'])
            
        except Exception as e:
            logging.error(f"Error predicting quota usage: {str(e)}")
        
        # Fallback to simple growth calculation
        return quota_data['current_usage'] * 1.2
    
    async def get_historical_quota_data(self, service_code: str, quota_code: str, 
                                      region: str, days: int = 30) -> List[Dict]:
        """Get historical quota data for trend analysis"""
        try:
            quota_id = f"{service_code}#{quota_code}#{region}"
            start_time = int((datetime.utcnow() - timedelta(days=days)).timestamp())
            
            response = self.quota_table.query(
                KeyConditionExpression='quota_id = :quota_id AND #ts >= :start_time',
                ExpressionAttributeNames={'#ts': 'timestamp'},
                ExpressionAttributeValues={
                    ':quota_id': quota_id,
                    ':start_time': start_time
                }
            )
            
            return response['Items']
            
        except Exception as e:
            logging.error(f"Error getting historical data: {str(e)}")
            return []
    
    def prepare_prediction_features(self, historical_data: List[Dict]) -> List[List[float]]:
        """Prepare features for ML prediction"""
        if not historical_data:
            return []
        
        # Convert to DataFrame for easier processing
        df = pd.DataFrame(historical_data)
        df['timestamp'] = pd.to_datetime(df['timestamp'], unit='s')
        df = df.sort_values('timestamp')
        
        features = []
        
        for i in range(len(df)):
            if i >= 6:  # Need at least 7 data points for features
                # Create features: usage trend, day of week, hour, etc.
                recent_usage = df['current_usage'].iloc[i-6:i+1].values
                usage_trend = np.polyfit(range(7), recent_usage, 1)[0]
                avg_usage = np.mean(recent_usage)
                max_usage = np.max(recent_usage)
                min_usage = np.min(recent_usage)
                
                timestamp = df['timestamp'].iloc[i]
                day_of_week = timestamp.weekday()
                hour_of_day = timestamp.hour
                
                feature_vector = [
                    usage_trend,
                    avg_usage,
                    max_usage,
                    min_usage,
                    day_of_week,
                    hour_of_day,
                    df['utilization_percentage'].iloc[i]
                ]
                
                features.append(feature_vector)
        
        return features
    
    def determine_automation_action(self, rule: QuotaAutomationRule, 
                                  current_utilization: float, 
                                  predicted_utilization: float) -> Optional[str]:
        """Determine what automation action to take"""
        max_utilization = max(current_utilization, predicted_utilization)
        
        # Check business hours constraint
        if rule.business_hours_only and not self.is_business_hours():
            if max_utilization >= rule.threshold_emergency:
                return "emergency_increase"  # Override business hours for emergencies
            return None
        
        # Determine action based on automation level and thresholds
        if rule.automation_level == AutomationLevel.MONITOR_ONLY:
            return None
        
        if max_utilization >= rule.threshold_emergency:
            if rule.automation_level in [AutomationLevel.AUTO_APPROVE, AutomationLevel.FULL_AUTO]:
                return "emergency_increase"
            elif rule.automation_level == AutomationLevel.AUTO_REQUEST:
                return "request_increase"
            else:
                return "alert_emergency"
        
        elif max_utilization >= rule.threshold_critical:
            if rule.automation_level in [AutomationLevel.AUTO_APPROVE, AutomationLevel.FULL_AUTO]:
                return "auto_increase"
            elif rule.automation_level == AutomationLevel.AUTO_REQUEST:
                return "request_increase"
            else:
                return "alert_critical"
        
        elif max_utilization >= rule.threshold_warning:
            if rule.automation_level == AutomationLevel.FULL_AUTO:
                return "preemptive_increase"
            else:
                return "alert_warning"
        
        return None
    
    def is_business_hours(self) -> bool:
        """Check if current time is within business hours"""
        now = datetime.utcnow()
        # Assume business hours are 9 AM to 6 PM UTC, Monday to Friday
        return (now.weekday() < 5 and 9 <= now.hour < 18)
    
    async def execute_automation_action(self, rule: QuotaAutomationRule, 
                                      quota_data: Dict, action: str, results: Dict):
        """Execute the determined automation action"""
        try:
            action_result = None
            
            if action in ["emergency_increase", "auto_increase", "preemptive_increase"]:
                action_result = await self.execute_quota_increase(rule, quota_data, action)
            elif action == "request_increase":
                action_result = await self.request_quota_increase(rule, quota_data)
            elif action.startswith("alert_"):
                action_result = await self.send_quota_alert(rule, quota_data, action)
            
            if action_result:
                results['automations_executed'] += 1
                results['actions_taken'].append({
                    'rule': f"{rule.service_code}/{rule.quota_code}/{rule.region}",
                    'action': action,
                    'result': action_result,
                    'timestamp': datetime.utcnow().isoformat()
                })
                
                # Log automation action
                await self.log_automation_action(rule, action, action_result)
            
        except Exception as e:
            error_msg = f"Error executing action {action}: {str(e)}"
            logging.error(error_msg)
            results['errors'].append(error_msg)
    
    async def execute_quota_increase(self, rule: QuotaAutomationRule, 
                                   quota_data: Dict, action_type: str) -> Dict:
        """Execute automated quota increase"""
        current_value = quota_data['quota_value']
        
        # Calculate new quota value
        if action_type == "emergency_increase":
            new_value = min(current_value * 2.0, rule.max_auto_increase)
        elif action_type == "auto_increase":
            new_value = min(current_value * rule.auto_increase_multiplier, rule.max_auto_increase)
        else:  # preemptive_increase
            new_value = min(current_value * 1.2, rule.max_auto_increase)
        
        # Check cost implications
        estimated_cost = await self.estimate_quota_cost(rule, current_value, new_value)
        
        if estimated_cost > rule.cost_threshold and rule.approval_required:
            # Trigger approval workflow
            return await self.trigger_approval_workflow(rule, quota_data, new_value, estimated_cost)
        
        # Execute quota increase
        try:
            response = self.service_quotas.request_service_quota_increase(
                ServiceCode=rule.service_code,
                QuotaCode=rule.quota_code,
                DesiredValue=new_value
            )
            
            return {
                'status': 'success',
                'action': 'quota_increased',
                'old_value': current_value,
                'new_value': new_value,
                'request_id': response.get('RequestedQuota', {}).get('Id'),
                'estimated_cost': estimated_cost
            }
            
        except Exception as e:
            # Fall back to support case
            case_id = await self.create_support_case(rule, quota_data, new_value)
            
            return {
                'status': 'support_case_created',
                'action': 'support_case',
                'old_value': current_value,
                'new_value': new_value,
                'case_id': case_id,
                'estimated_cost': estimated_cost
            }
    
    async def estimate_quota_cost(self, rule: QuotaAutomationRule, 
                                current_value: float, new_value: float) -> float:
        """Estimate cost impact of quota increase"""
        # This is a simplified cost estimation
        # In practice, you would integrate with AWS Pricing API or Cost Explorer
        
        service_cost_per_unit = {
            'ec2': 0.10,  # per instance hour
            'lambda': 0.0000002,  # per request
            'rds': 0.20,  # per instance hour
            's3': 0.023,  # per GB
        }
        
        base_cost = service_cost_per_unit.get(rule.service_code, 0.01)
        increase_amount = new_value - current_value
        
        # Estimate monthly cost impact
        estimated_monthly_cost = increase_amount * base_cost * 24 * 30
        
        return estimated_monthly_cost
    
    async def trigger_approval_workflow(self, rule: QuotaAutomationRule, 
                                      quota_data: Dict, new_value: float, 
                                      estimated_cost: float) -> Dict:
        """Trigger Step Functions workflow for approval"""
        try:
            workflow_input = {
                'rule': asdict(rule),
                'quota_data': quota_data,
                'new_value': new_value,
                'estimated_cost': estimated_cost,
                'timestamp': datetime.utcnow().isoformat()
            }
            
            response = self.stepfunctions.start_execution(
                stateMachineArn=self.config['approval_workflow_arn'],
                input=json.dumps(workflow_input)
            )
            
            return {
                'status': 'approval_pending',
                'action': 'approval_workflow_triggered',
                'execution_arn': response['executionArn'],
                'estimated_cost': estimated_cost
            }
            
        except Exception as e:
            logging.error(f"Error triggering approval workflow: {str(e)}")
            return {
                'status': 'error',
                'action': 'approval_workflow_failed',
                'error': str(e)
            }
    
    async def create_support_case(self, rule: QuotaAutomationRule, 
                                quota_data: Dict, new_value: float) -> str:
        """Create automated support case for quota increase"""
        try:
            case_body = f"""
Automated Quota Increase Request

Service: {rule.service_code}
Quota Code: {rule.quota_code}
Region: {rule.region}
Current Limit: {quota_data['quota_value']}
Requested Limit: {new_value}
Current Usage: {quota_data['current_usage']}
Utilization: {quota_data['utilization_percentage']:.1f}%

This request was automatically generated by our intelligent quota management system
based on usage patterns and predictive analysis.

Justification:
- Current utilization exceeds safe operating thresholds
- Predictive models indicate continued growth
- Automated system determined quota increase is necessary
- Request follows established automation policies

Business Impact:
- Prevents service disruptions and availability issues
- Maintains application performance standards
- Supports business growth and scaling requirements
            """.strip()
            
            response = self.support.create_case(
                subject=f"Automated Quota Increase: {rule.service_code} - {rule.quota_code}",
                serviceCode='service-limit-increase',
                severityCode='normal',
                categoryCode='service-limit-increase',
                communicationBody=case_body,
                ccEmailAddresses=self.config.get('notification_emails', []),
                language='en'
            )
            
            return response['caseId']
            
        except Exception as e:
            logging.error(f"Error creating support case: {str(e)}")
            return None
    
    async def log_automation_action(self, rule: QuotaAutomationRule, 
                                  action: str, result: Dict):
        """Log automation action to DynamoDB"""
        try:
            log_item = {
                'log_id': f"{rule.service_code}#{rule.quota_code}#{rule.region}#{int(datetime.utcnow().timestamp())}",
                'timestamp': int(datetime.utcnow().timestamp()),
                'service_code': rule.service_code,
                'quota_code': rule.quota_code,
                'region': rule.region,
                'automation_level': rule.automation_level.value,
                'action_taken': action,
                'result': json.dumps(result),
                'ttl': int((datetime.utcnow() + timedelta(days=365)).timestamp())
            }
            
            self.automation_log_table.put_item(Item=log_item)
            
        except Exception as e:
            logging.error(f"Error logging automation action: {str(e)}")

# Usage example
async def main():
    config = {
        'quota_table_name': 'quota-monitoring',
        'rules_table_name': 'quota-automation-rules',
        'automation_log_table_name': 'quota-automation-log',
        'approval_workflow_arn': 'arn:aws:states:us-east-1:123456789012:stateMachine:QuotaApprovalWorkflow',
        'notification_emails': ['admin@company.com'],
        'model_path': '/tmp/models'
    }
    
    engine = IntelligentQuotaAutomationEngine(config)
    results = await engine.run_automation_cycle()
    
    print(f"Automation cycle completed:")
    print(f"- Quotas processed: {results['quotas_processed']}")
    print(f"- Automations executed: {results['automations_executed']}")
    print(f"- Errors: {len(results['errors'])}")
    print(f"- Duration: {results['cycle_duration']:.2f} seconds")

if __name__ == "__main__":
    asyncio.run(main())

Example 2: Event-Driven Quota Automation System

View code
import boto3
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from dataclasses import dataclass
import asyncio
import aiohttp

@dataclass
class QuotaEvent:
    event_type: str
    service_code: str
    quota_code: str
    region: str
    account_id: str
    current_usage: float
    quota_value: float
    utilization_percentage: float
    timestamp: datetime
    metadata: Dict = None

class EventDrivenQuotaAutomation:
    def __init__(self, config: Dict):
        self.config = config
        self.eventbridge = boto3.client('events')
        self.lambda_client = boto3.client('lambda')
        self.service_quotas = boto3.client('service-quotas')
        self.dynamodb = boto3.resource('dynamodb')
        self.sns = boto3.client('sns')
        
        # Initialize tables
        self.events_table = self.dynamodb.Table(config['events_table_name'])
        self.automation_state_table = self.dynamodb.Table(config['automation_state_table_name'])
        
        # Event handlers
        self.event_handlers = {
            'quota_threshold_exceeded': self.handle_threshold_exceeded,
            'quota_usage_spike': self.handle_usage_spike,
            'quota_prediction_alert': self.handle_prediction_alert,
            'infrastructure_scaling': self.handle_infrastructure_scaling,
            'disaster_recovery_triggered': self.handle_disaster_recovery,
            'cost_optimization_required': self.handle_cost_optimization
        }
    
    async def process_quota_event(self, event: QuotaEvent) -> Dict:
        """Process incoming quota event"""
        try:
            # Log event
            await self.log_quota_event(event)
            
            # Get automation state
            automation_state = await self.get_automation_state(
                event.service_code, event.quota_code, event.region
            )
            
            # Check if event should trigger automation
            if not self.should_process_event(event, automation_state):
                return {'status': 'skipped', 'reason': 'automation_conditions_not_met'}
            
            # Route to appropriate handler
            handler = self.event_handlers.get(event.event_type)
            if handler:
                result = await handler(event, automation_state)
                
                # Update automation state
                await self.update_automation_state(event, result)
                
                return result
            else:
                return {'status': 'error', 'reason': f'no_handler_for_event_type_{event.event_type}'}
                
        except Exception as e:
            logging.error(f"Error processing quota event: {str(e)}")
            return {'status': 'error', 'reason': str(e)}
    
    async def handle_threshold_exceeded(self, event: QuotaEvent, state: Dict) -> Dict:
        """Handle quota threshold exceeded events"""
        actions_taken = []
        
        try:
            # Immediate response based on utilization level
            if event.utilization_percentage >= 95:
                # Emergency scaling
                result = await self.emergency_quota_increase(event)
                actions_taken.append(result)
                
                # Trigger infrastructure scaling if available
                scaling_result = await self.trigger_infrastructure_scaling(event)
                if scaling_result:
                    actions_taken.append(scaling_result)
                    
            elif event.utilization_percentage >= 85:
                # Automated quota increase
                result = await self.automated_quota_increase(event)
                actions_taken.append(result)
                
            elif event.utilization_percentage >= 70:
                # Predictive scaling preparation
                result = await self.prepare_predictive_scaling(event)
                actions_taken.append(result)
            
            # Send notifications
            await self.send_event_notification(event, actions_taken)
            
            return {
                'status': 'success',
                'event_type': event.event_type,
                'actions_taken': actions_taken,
                'timestamp': datetime.utcnow().isoformat()
            }
            
        except Exception as e:
            logging.error(f"Error handling threshold exceeded: {str(e)}")
            return {'status': 'error', 'reason': str(e)}
    
    async def handle_usage_spike(self, event: QuotaEvent, state: Dict) -> Dict:
        """Handle sudden usage spike events"""
        try:
            # Analyze spike pattern
            spike_analysis = await self.analyze_usage_spike(event)
            
            actions_taken = []
            
            if spike_analysis['severity'] == 'high':
                # Immediate quota buffer increase
                buffer_result = await self.increase_quota_buffer(event, multiplier=1.5)
                actions_taken.append(buffer_result)
                
                # Scale out infrastructure if possible
                scale_result = await self.scale_out_infrastructure(event)
                if scale_result:
                    actions_taken.append(scale_result)
                    
            elif spike_analysis['severity'] == 'medium':
                # Moderate quota adjustment
                adjust_result = await self.adjust_quota_proactively(event, multiplier=1.2)
                actions_taken.append(adjust_result)
            
            # Set up enhanced monitoring
            monitoring_result = await self.enable_enhanced_monitoring(event)
            actions_taken.append(monitoring_result)
            
            return {
                'status': 'success',
                'event_type': event.event_type,
                'spike_analysis': spike_analysis,
                'actions_taken': actions_taken
            }
            
        except Exception as e:
            logging.error(f"Error handling usage spike: {str(e)}")
            return {'status': 'error', 'reason': str(e)}
    
    async def handle_prediction_alert(self, event: QuotaEvent, state: Dict) -> Dict:
        """Handle predictive quota alerts"""
        try:
            # Get prediction details from metadata
            prediction_data = event.metadata.get('prediction', {})
            predicted_utilization = prediction_data.get('predicted_utilization_7d', 0)
            confidence = prediction_data.get('confidence', 0)
            
            actions_taken = []
            
            if confidence > 0.8 and predicted_utilization > 80:
                # High confidence prediction - take proactive action
                proactive_result = await self.proactive_quota_adjustment(event, prediction_data)
                actions_taken.append(proactive_result)
                
                # Schedule capacity planning review
                planning_result = await self.schedule_capacity_planning(event, prediction_data)
                actions_taken.append(planning_result)
                
            elif confidence > 0.6 and predicted_utilization > 70:
                # Medium confidence - prepare for potential increase
                preparation_result = await self.prepare_quota_increase(event, prediction_data)
                actions_taken.append(preparation_result)
            
            return {
                'status': 'success',
                'event_type': event.event_type,
                'prediction_data': prediction_data,
                'actions_taken': actions_taken
            }
            
        except Exception as e:
            logging.error(f"Error handling prediction alert: {str(e)}")
            return {'status': 'error', 'reason': str(e)}
    
    async def handle_infrastructure_scaling(self, event: QuotaEvent, state: Dict) -> Dict:
        """Handle infrastructure scaling events"""
        try:
            scaling_metadata = event.metadata.get('scaling', {})
            scaling_direction = scaling_metadata.get('direction', 'up')
            scaling_factor = scaling_metadata.get('factor', 1.0)
            
            actions_taken = []
            
            if scaling_direction == 'up':
                # Pre-emptively increase quotas for scaling up
                quota_result = await self.preemptive_quota_increase(event, scaling_factor)
                actions_taken.append(quota_result)
                
                # Coordinate with other services that might be affected
                coordination_result = await self.coordinate_cross_service_quotas(event, scaling_factor)
                actions_taken.append(coordination_result)
                
            elif scaling_direction == 'down':
                # Optimize quotas for scaling down
                optimization_result = await self.optimize_quotas_for_scale_down(event, scaling_factor)
                actions_taken.append(optimization_result)
            
            return {
                'status': 'success',
                'event_type': event.event_type,
                'scaling_metadata': scaling_metadata,
                'actions_taken': actions_taken
            }
            
        except Exception as e:
            logging.error(f"Error handling infrastructure scaling: {str(e)}")
            return {'status': 'error', 'reason': str(e)}
    
    async def handle_disaster_recovery(self, event: QuotaEvent, state: Dict) -> Dict:
        """Handle disaster recovery triggered events"""
        try:
            dr_metadata = event.metadata.get('disaster_recovery', {})
            dr_region = dr_metadata.get('target_region')
            dr_type = dr_metadata.get('type', 'failover')
            
            actions_taken = []
            
            if dr_type == 'failover':
                # Ensure DR region has adequate quotas
                dr_quota_result = await self.ensure_dr_region_quotas(event, dr_region)
                actions_taken.append(dr_quota_result)
                
                # Coordinate quota increases across dependent services
                coordination_result = await self.coordinate_dr_quota_increases(event, dr_region)
                actions_taken.append(coordination_result)
                
            elif dr_type == 'failback':
                # Restore original region quotas
                restore_result = await self.restore_original_region_quotas(event)
                actions_taken.append(restore_result)
            
            return {
                'status': 'success',
                'event_type': event.event_type,
                'dr_metadata': dr_metadata,
                'actions_taken': actions_taken
            }
            
        except Exception as e:
            logging.error(f"Error handling disaster recovery: {str(e)}")
            return {'status': 'error', 'reason': str(e)}
    
    async def emergency_quota_increase(self, event: QuotaEvent) -> Dict:
        """Execute emergency quota increase"""
        try:
            current_quota = event.quota_value
            emergency_quota = current_quota * 2.0  # Double the quota for emergency
            
            # Try Service Quotas API first
            try:
                response = self.service_quotas.request_service_quota_increase(
                    ServiceCode=event.service_code,
                    QuotaCode=event.quota_code,
                    DesiredValue=emergency_quota
                )
                
                return {
                    'action': 'emergency_quota_increase',
                    'method': 'service_quotas_api',
                    'old_value': current_quota,
                    'new_value': emergency_quota,
                    'request_id': response.get('RequestedQuota', {}).get('Id'),
                    'status': 'submitted'
                }
                
            except Exception as api_error:
                # Fall back to support case with high priority
                support_case_id = await self.create_emergency_support_case(event, emergency_quota)
                
                return {
                    'action': 'emergency_quota_increase',
                    'method': 'support_case',
                    'old_value': current_quota,
                    'new_value': emergency_quota,
                    'case_id': support_case_id,
                    'status': 'support_case_created',
                    'api_error': str(api_error)
                }
                
        except Exception as e:
            logging.error(f"Error in emergency quota increase: {str(e)}")
            return {'action': 'emergency_quota_increase', 'status': 'error', 'error': str(e)}
    
    async def trigger_infrastructure_scaling(self, event: QuotaEvent) -> Optional[Dict]:
        """Trigger infrastructure scaling to reduce quota pressure"""
        try:
            # This would integrate with your infrastructure automation
            # For example, triggering Auto Scaling Group scaling, Lambda concurrency adjustments, etc.
            
            scaling_config = self.config.get('infrastructure_scaling', {})
            
            if event.service_code == 'ec2':
                # Trigger EC2 Auto Scaling
                return await self.trigger_ec2_scaling(event, scaling_config)
            elif event.service_code == 'lambda':
                # Adjust Lambda concurrency or trigger additional functions
                return await self.trigger_lambda_scaling(event, scaling_config)
            elif event.service_code == 'rds':
                # Consider read replica scaling or connection pooling
                return await self.trigger_rds_scaling(event, scaling_config)
            
            return None
            
        except Exception as e:
            logging.error(f"Error triggering infrastructure scaling: {str(e)}")
            return {'action': 'infrastructure_scaling', 'status': 'error', 'error': str(e)}
    
    async def analyze_usage_spike(self, event: QuotaEvent) -> Dict:
        """Analyze usage spike characteristics"""
        try:
            # Get recent usage history
            history = await self.get_recent_usage_history(
                event.service_code, event.quota_code, event.region, hours=24
            )
            
            if not history:
                return {'severity': 'unknown', 'pattern': 'insufficient_data'}
            
            # Calculate spike characteristics
            recent_usage = [h['current_usage'] for h in history[-6:]]  # Last 6 data points
            baseline_usage = [h['current_usage'] for h in history[:-6]]  # Earlier data points
            
            if baseline_usage:
                baseline_avg = sum(baseline_usage) / len(baseline_usage)
                current_usage = event.current_usage
                spike_ratio = current_usage / baseline_avg if baseline_avg > 0 else 1
                
                # Determine severity
                if spike_ratio >= 3.0:
                    severity = 'high'
                elif spike_ratio >= 2.0:
                    severity = 'medium'
                else:
                    severity = 'low'
                
                return {
                    'severity': severity,
                    'spike_ratio': spike_ratio,
                    'baseline_avg': baseline_avg,
                    'current_usage': current_usage,
                    'pattern': 'analyzed'
                }
            
            return {'severity': 'unknown', 'pattern': 'insufficient_baseline'}
            
        except Exception as e:
            logging.error(f"Error analyzing usage spike: {str(e)}")
            return {'severity': 'unknown', 'pattern': 'error', 'error': str(e)}
    
    async def coordinate_cross_service_quotas(self, event: QuotaEvent, scaling_factor: float) -> Dict:
        """Coordinate quota increases across related services"""
        try:
            # Define service dependencies
            service_dependencies = {
                'ec2': ['vpc', 'ebs', 'elasticloadbalancing'],
                'lambda': ['logs', 'iam'],
                'rds': ['vpc', 'kms'],
                'ecs': ['ec2', 'elasticloadbalancing', 'logs']
            }
            
            dependent_services = service_dependencies.get(event.service_code, [])
            coordination_results = []
            
            for dependent_service in dependent_services:
                try:
                    # Get related quotas for dependent service
                    related_quotas = await self.get_related_quotas(dependent_service, event.region)
                    
                    for quota in related_quotas:
                        # Check if quota needs adjustment
                        if quota['utilization_percentage'] > 60:  # Proactive threshold
                            increase_result = await self.increase_related_quota(
                                dependent_service, quota, scaling_factor
                            )
                            coordination_results.append(increase_result)
                            
                except Exception as e:
                    logging.error(f"Error coordinating {dependent_service}: {str(e)}")
                    coordination_results.append({
                        'service': dependent_service,
                        'status': 'error',
                        'error': str(e)
                    })
            
            return {
                'action': 'cross_service_coordination',
                'dependent_services': dependent_services,
                'results': coordination_results,
                'status': 'completed'
            }
            
        except Exception as e:
            logging.error(f"Error in cross-service coordination: {str(e)}")
            return {'action': 'cross_service_coordination', 'status': 'error', 'error': str(e)}
    
    async def log_quota_event(self, event: QuotaEvent):
        """Log quota event to DynamoDB"""
        try:
            event_item = {
                'event_id': f"{event.service_code}#{event.quota_code}#{event.region}#{int(event.timestamp.timestamp())}",
                'timestamp': int(event.timestamp.timestamp()),
                'event_type': event.event_type,
                'service_code': event.service_code,
                'quota_code': event.quota_code,
                'region': event.region,
                'account_id': event.account_id,
                'current_usage': event.current_usage,
                'quota_value': event.quota_value,
                'utilization_percentage': event.utilization_percentage,
                'metadata': json.dumps(event.metadata or {}),
                'ttl': int((datetime.utcnow() + timedelta(days=90)).timestamp())
            }
            
            self.events_table.put_item(Item=event_item)
            
        except Exception as e:
            logging.error(f"Error logging quota event: {str(e)}")

# Lambda function handler for EventBridge integration
def lambda_handler(event, context):
    """Lambda handler for processing EventBridge quota events"""
    try:
        config = {
            'events_table_name': os.environ['EVENTS_TABLE_NAME'],
            'automation_state_table_name': os.environ['AUTOMATION_STATE_TABLE_NAME'],
            'infrastructure_scaling': {
                'enabled': os.environ.get('INFRASTRUCTURE_SCALING_ENABLED', 'false').lower() == 'true'
            }
        }
        
        automation = EventDrivenQuotaAutomation(config)
        
        # Parse EventBridge event
        quota_event = QuotaEvent(
            event_type=event['detail']['event_type'],
            service_code=event['detail']['service_code'],
            quota_code=event['detail']['quota_code'],
            region=event['detail']['region'],
            account_id=event['detail']['account_id'],
            current_usage=float(event['detail']['current_usage']),
            quota_value=float(event['detail']['quota_value']),
            utilization_percentage=float(event['detail']['utilization_percentage']),
            timestamp=datetime.fromisoformat(event['detail']['timestamp']),
            metadata=event['detail'].get('metadata', {})
        )
        
        # Process event
        result = asyncio.run(automation.process_quota_event(quota_event))
        
        return {
            'statusCode': 200,
            'body': json.dumps(result)
        }
        
    except Exception as e:
        logging.error(f"Error in lambda handler: {str(e)}")
        return {
            'statusCode': 500,
            'body': json.dumps({'error': str(e)})
        }

# Usage example
async def main():
    config = {
        'events_table_name': 'quota-events',
        'automation_state_table_name': 'quota-automation-state',
        'infrastructure_scaling': {
            'enabled': True,
            'ec2_asg_names': ['web-tier-asg', 'app-tier-asg'],
            'lambda_functions': ['data-processor', 'api-handler']
        }
    }
    
    automation = EventDrivenQuotaAutomation(config)
    
    # Example event
    test_event = QuotaEvent(
        event_type='quota_threshold_exceeded',
        service_code='ec2',
        quota_code='L-1216C47A',
        region='us-east-1',
        account_id='123456789012',
        current_usage=85.0,
        quota_value=100.0,
        utilization_percentage=85.0,
        timestamp=datetime.utcnow(),
        metadata={'alert_level': 'warning'}
    )
    
    result = await automation.process_quota_event(test_event)
    print(f"Event processing result: {json.dumps(result, indent=2)}")

if __name__ == "__main__":
    asyncio.run(main())

Example 3: Terraform Infrastructure for Automated Quota Management

View code
# Terraform configuration for automated quota management infrastructure

terraform {
  required_version = ">= 1.0"
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 5.0"
    }
  }
}

variable "environment" {
  description = "Environment name"
  type        = string
  default     = "production"
}

variable "notification_email" {
  description = "Email for quota notifications"
  type        = string
}

variable "automation_level" {
  description = "Level of automation (monitor, alert, auto_request, auto_approve, full_auto)"
  type        = string
  default     = "auto_request"
  
  validation {
    condition = contains([
      "monitor", "alert", "auto_request", "auto_approve", "full_auto"
    ], var.automation_level)
    error_message = "Automation level must be one of: monitor, alert, auto_request, auto_approve, full_auto."
  }
}

variable "cost_threshold" {
  description = "Cost threshold for automated approvals"
  type        = number
  default     = 1000
}

# Data sources
data "aws_caller_identity" "current" {}
data "aws_region" "current" {}

# DynamoDB Tables
resource "aws_dynamodb_table" "quota_monitoring" {
  name           = "${var.environment}-quota-monitoring"
  billing_mode   = "PAY_PER_REQUEST"
  hash_key       = "quota_id"
  range_key      = "timestamp"

  attribute {
    name = "quota_id"
    type = "S"
  }

  attribute {
    name = "timestamp"
    type = "N"
  }

  attribute {
    name = "service_code"
    type = "S"
  }

  attribute {
    name = "utilization_percentage"
    type = "N"
  }

  global_secondary_index {
    name            = "service-utilization-index"
    hash_key        = "service_code"
    range_key       = "utilization_percentage"
    projection_type = "ALL"
  }

  ttl {
    attribute_name = "ttl"
    enabled        = true
  }

  stream_enabled   = true
  stream_view_type = "NEW_AND_OLD_IMAGES"

  point_in_time_recovery {
    enabled = true
  }

  tags = {
    Environment = var.environment
    Purpose     = "QuotaMonitoring"
  }
}

resource "aws_dynamodb_table" "quota_automation_rules" {
  name         = "${var.environment}-quota-automation-rules"
  billing_mode = "PAY_PER_REQUEST"
  hash_key     = "rule_id"

  attribute {
    name = "rule_id"
    type = "S"
  }

  attribute {
    name = "service_code"
    type = "S"
  }

  attribute {
    name = "automation_level"
    type = "S"
  }

  global_secondary_index {
    name            = "service-automation-index"
    hash_key        = "service_code"
    range_key       = "automation_level"
    projection_type = "ALL"
  }

  tags = {
    Environment = var.environment
    Purpose     = "QuotaAutomationRules"
  }
}

resource "aws_dynamodb_table" "quota_automation_log" {
  name           = "${var.environment}-quota-automation-log"
  billing_mode   = "PAY_PER_REQUEST"
  hash_key       = "log_id"
  range_key      = "timestamp"

  attribute {
    name = "log_id"
    type = "S"
  }

  attribute {
    name = "timestamp"
    type = "N"
  }

  attribute {
    name = "action_taken"
    type = "S"
  }

  global_secondary_index {
    name            = "action-timestamp-index"
    hash_key        = "action_taken"
    range_key       = "timestamp"
    projection_type = "ALL"
  }

  ttl {
    attribute_name = "ttl"
    enabled        = true
  }

  tags = {
    Environment = var.environment
    Purpose     = "QuotaAutomationLog"
  }
}

resource "aws_dynamodb_table" "quota_events" {
  name           = "${var.environment}-quota-events"
  billing_mode   = "PAY_PER_REQUEST"
  hash_key       = "event_id"
  range_key      = "timestamp"

  attribute {
    name = "event_id"
    type = "S"
  }

  attribute {
    name = "timestamp"
    type = "N"
  }

  attribute {
    name = "event_type"
    type = "S"
  }

  global_secondary_index {
    name            = "event-type-timestamp-index"
    hash_key        = "event_type"
    range_key       = "timestamp"
    projection_type = "ALL"
  }

  ttl {
    attribute_name = "ttl"
    enabled        = true
  }

  stream_enabled   = true
  stream_view_type = "NEW_AND_OLD_IMAGES"

  tags = {
    Environment = var.environment
    Purpose     = "QuotaEvents"
  }
}

# SNS Topics
resource "aws_sns_topic" "quota_alerts" {
  name = "${var.environment}-quota-alerts"

  tags = {
    Environment = var.environment
    Purpose     = "QuotaAlerts"
  }
}

resource "aws_sns_topic_subscription" "quota_alerts_email" {
  topic_arn = aws_sns_topic.quota_alerts.arn
  protocol  = "email"
  endpoint  = var.notification_email
}

resource "aws_sns_topic" "quota_automation_events" {
  name = "${var.environment}-quota-automation-events"

  tags = {
    Environment = var.environment
    Purpose     = "QuotaAutomationEvents"
  }
}

# EventBridge Custom Bus
resource "aws_cloudwatch_event_bus" "quota_automation" {
  name = "${var.environment}-quota-automation"

  tags = {
    Environment = var.environment
    Purpose     = "QuotaAutomation"
  }
}

# EventBridge Rules
resource "aws_cloudwatch_event_rule" "quota_threshold_exceeded" {
  name           = "${var.environment}-quota-threshold-exceeded"
  event_bus_name = aws_cloudwatch_event_bus.quota_automation.name

  event_pattern = jsonencode({
    source      = ["quota.automation"]
    detail-type = ["Quota Threshold Exceeded"]
    detail = {
      utilization_percentage = [{
        numeric = [">", 70]
      }]
    }
  })

  tags = {
    Environment = var.environment
    Purpose     = "QuotaThresholdMonitoring"
  }
}

resource "aws_cloudwatch_event_rule" "quota_usage_spike" {
  name           = "${var.environment}-quota-usage-spike"
  event_bus_name = aws_cloudwatch_event_bus.quota_automation.name

  event_pattern = jsonencode({
    source      = ["quota.automation"]
    detail-type = ["Quota Usage Spike"]
  })

  tags = {
    Environment = var.environment
    Purpose     = "QuotaUsageSpike"
  }
}

# IAM Roles
resource "aws_iam_role" "quota_automation_role" {
  name = "${var.environment}-quota-automation-role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = "sts:AssumeRole"
        Effect = "Allow"
        Principal = {
          Service = [
            "lambda.amazonaws.com",
            "states.amazonaws.com",
            "events.amazonaws.com"
          ]
        }
      }
    ]
  })

  tags = {
    Environment = var.environment
    Purpose     = "QuotaAutomation"
  }
}

resource "aws_iam_role_policy" "quota_automation_policy" {
  name = "${var.environment}-quota-automation-policy"
  role = aws_iam_role.quota_automation_role.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "service-quotas:*",
          "support:*",
          "cloudwatch:*",
          "logs:*"
        ]
        Resource = "*"
      },
      {
        Effect = "Allow"
        Action = [
          "dynamodb:GetItem",
          "dynamodb:PutItem",
          "dynamodb:UpdateItem",
          "dynamodb:DeleteItem",
          "dynamodb:Query",
          "dynamodb:Scan"
        ]
        Resource = [
          aws_dynamodb_table.quota_monitoring.arn,
          aws_dynamodb_table.quota_automation_rules.arn,
          aws_dynamodb_table.quota_automation_log.arn,
          aws_dynamodb_table.quota_events.arn,
          "${aws_dynamodb_table.quota_monitoring.arn}/index/*",
          "${aws_dynamodb_table.quota_automation_rules.arn}/index/*",
          "${aws_dynamodb_table.quota_automation_log.arn}/index/*",
          "${aws_dynamodb_table.quota_events.arn}/index/*"
        ]
      },
      {
        Effect = "Allow"
        Action = [
          "sns:Publish"
        ]
        Resource = [
          aws_sns_topic.quota_alerts.arn,
          aws_sns_topic.quota_automation_events.arn
        ]
      },
      {
        Effect = "Allow"
        Action = [
          "events:PutEvents"
        ]
        Resource = aws_cloudwatch_event_bus.quota_automation.arn
      },
      {
        Effect = "Allow"
        Action = [
          "states:StartExecution"
        ]
        Resource = aws_sfn_state_machine.quota_approval_workflow.arn
      },
      {
        Effect = "Allow"
        Action = [
          "lambda:InvokeFunction"
        ]
        Resource = [
          aws_lambda_function.quota_automation_engine.arn,
          aws_lambda_function.quota_event_processor.arn
        ]
      }
    ]
  })
}

resource "aws_iam_role_policy_attachment" "quota_automation_basic" {
  role       = aws_iam_role.quota_automation_role.name
  policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
}

# Lambda Functions
resource "aws_lambda_function" "quota_automation_engine" {
  filename         = "quota_automation_engine.zip"
  function_name    = "${var.environment}-quota-automation-engine"
  role            = aws_iam_role.quota_automation_role.arn
  handler         = "index.lambda_handler"
  runtime         = "python3.9"
  timeout         = 900
  memory_size     = 1024

  environment {
    variables = {
      ENVIRONMENT                = var.environment
      QUOTA_TABLE_NAME          = aws_dynamodb_table.quota_monitoring.name
      RULES_TABLE_NAME          = aws_dynamodb_table.quota_automation_rules.name
      AUTOMATION_LOG_TABLE_NAME = aws_dynamodb_table.quota_automation_log.name
      ALERT_TOPIC_ARN           = aws_sns_topic.quota_alerts.arn
      AUTOMATION_EVENTS_TOPIC_ARN = aws_sns_topic.quota_automation_events.arn
      EVENT_BUS_NAME            = aws_cloudwatch_event_bus.quota_automation.name
      APPROVAL_WORKFLOW_ARN     = aws_sfn_state_machine.quota_approval_workflow.arn
      AUTOMATION_LEVEL          = var.automation_level
      COST_THRESHOLD            = var.cost_threshold
    }
  }

  tags = {
    Environment = var.environment
    Purpose     = "QuotaAutomationEngine"
  }
}

resource "aws_lambda_function" "quota_event_processor" {
  filename         = "quota_event_processor.zip"
  function_name    = "${var.environment}-quota-event-processor"
  role            = aws_iam_role.quota_automation_role.arn
  handler         = "index.lambda_handler"
  runtime         = "python3.9"
  timeout         = 300
  memory_size     = 512

  environment {
    variables = {
      ENVIRONMENT           = var.environment
      EVENTS_TABLE_NAME     = aws_dynamodb_table.quota_events.name
      AUTOMATION_STATE_TABLE_NAME = aws_dynamodb_table.quota_automation_rules.name
      ALERT_TOPIC_ARN       = aws_sns_topic.quota_alerts.arn
      EVENT_BUS_NAME        = aws_cloudwatch_event_bus.quota_automation.name
    }
  }

  tags = {
    Environment = var.environment
    Purpose     = "QuotaEventProcessor"
  }
}

# EventBridge Targets
resource "aws_cloudwatch_event_target" "quota_threshold_target" {
  rule           = aws_cloudwatch_event_rule.quota_threshold_exceeded.name
  event_bus_name = aws_cloudwatch_event_bus.quota_automation.name
  target_id      = "QuotaThresholdTarget"
  arn            = aws_lambda_function.quota_event_processor.arn
}

resource "aws_cloudwatch_event_target" "quota_spike_target" {
  rule           = aws_cloudwatch_event_rule.quota_usage_spike.name
  event_bus_name = aws_cloudwatch_event_bus.quota_automation.name
  target_id      = "QuotaSpikeTarget"
  arn            = aws_lambda_function.quota_event_processor.arn
}

# Lambda Permissions
resource "aws_lambda_permission" "allow_eventbridge_threshold" {
  statement_id  = "AllowExecutionFromEventBridgeThreshold"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.quota_event_processor.function_name
  principal     = "events.amazonaws.com"
  source_arn    = aws_cloudwatch_event_rule.quota_threshold_exceeded.arn
}

resource "aws_lambda_permission" "allow_eventbridge_spike" {
  statement_id  = "AllowExecutionFromEventBridgeSpike"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.quota_event_processor.function_name
  principal     = "events.amazonaws.com"
  source_arn    = aws_cloudwatch_event_rule.quota_usage_spike.arn
}

# Step Functions State Machine for Approval Workflow
resource "aws_sfn_state_machine" "quota_approval_workflow" {
  name     = "${var.environment}-quota-approval-workflow"
  role_arn = aws_iam_role.quota_automation_role.arn

  definition = jsonencode({
    Comment = "Quota increase approval workflow"
    StartAt = "EvaluateRequest"
    States = {
      EvaluateRequest = {
        Type = "Task"
        Resource = aws_lambda_function.quota_automation_engine.arn
        Parameters = {
          "action": "evaluate_request",
          "input.$": "$"
        }
        Next = "CheckApprovalRequired"
      }
      CheckApprovalRequired = {
        Type = "Choice"
        Choices = [
          {
            Variable = "$.approval_required"
            BooleanEquals = true
            Next = "SendApprovalRequest"
          }
        ]
        Default = "AutoApprove"
      }
      SendApprovalRequest = {
        Type = "Task"
        Resource = "arn:aws:states:::sns:publish"
        Parameters = {
          TopicArn = aws_sns_topic.quota_alerts.arn
          Subject = "Quota Increase Approval Required"
          Message.$= "$.approval_message"
        }
        Next = "WaitForApproval"
      }
      WaitForApproval = {
        Type = "Wait"
        Seconds = 3600
        Next = "CheckApprovalStatus"
      }
      CheckApprovalStatus = {
        Type = "Task"
        Resource = aws_lambda_function.quota_automation_engine.arn
        Parameters = {
          "action": "check_approval_status",
          "input.$": "$"
        }
        Next = "ApprovalDecision"
      }
      ApprovalDecision = {
        Type = "Choice"
        Choices = [
          {
            Variable = "$.approved"
            BooleanEquals = true
            Next = "ExecuteIncrease"
          }
        ]
        Default = "ApprovalDenied"
      }
      AutoApprove = {
        Type = "Pass"
        Result = {
          "approved": true,
          "approval_method": "automatic"
        }
        Next = "ExecuteIncrease"
      }
      ExecuteIncrease = {
        Type = "Task"
        Resource = aws_lambda_function.quota_automation_engine.arn
        Parameters = {
          "action": "execute_quota_increase",
          "input.$": "$"
        }
        Next = "NotifySuccess"
      }
      NotifySuccess = {
        Type = "Task"
        Resource = "arn:aws:states:::sns:publish"
        Parameters = {
          TopicArn = aws_sns_topic.quota_automation_events.arn
          Subject = "Quota Increase Completed"
          Message.$= "$.success_message"
        }
        End = true
      }
      ApprovalDenied = {
        Type = "Task"
        Resource = "arn:aws:states:::sns:publish"
        Parameters = {
          TopicArn = aws_sns_topic.quota_automation_events.arn
          Subject = "Quota Increase Denied"
          Message.$= "$.denial_message"
        }
        End = true
      }
    }
  })

  tags = {
    Environment = var.environment
    Purpose     = "QuotaApprovalWorkflow"
  }
}

# EventBridge Scheduler for Regular Automation Runs
resource "aws_cloudwatch_event_rule" "quota_automation_schedule" {
  name                = "${var.environment}-quota-automation-schedule"
  description         = "Trigger quota automation engine"
  schedule_expression = "rate(5 minutes)"

  tags = {
    Environment = var.environment
    Purpose     = "QuotaAutomationSchedule"
  }
}

resource "aws_cloudwatch_event_target" "quota_automation_target" {
  rule      = aws_cloudwatch_event_rule.quota_automation_schedule.name
  target_id = "QuotaAutomationTarget"
  arn       = aws_lambda_function.quota_automation_engine.arn
}

resource "aws_lambda_permission" "allow_eventbridge_schedule" {
  statement_id  = "AllowExecutionFromEventBridge"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.quota_automation_engine.function_name
  principal     = "events.amazonaws.com"
  source_arn    = aws_cloudwatch_event_rule.quota_automation_schedule.arn
}

# CloudWatch Dashboard
resource "aws_cloudwatch_dashboard" "quota_automation" {
  dashboard_name = "${var.environment}-quota-automation"

  dashboard_body = jsonencode({
    widgets = [
      {
        type   = "metric"
        x      = 0
        y      = 0
        width  = 12
        height = 6

        properties = {
          metrics = [
            ["AWS/Lambda", "Duration", "FunctionName", aws_lambda_function.quota_automation_engine.function_name],
            [".", "Errors", ".", "."],
            [".", "Invocations", ".", "."]
          ]
          view    = "timeSeries"
          stacked = false
          region  = data.aws_region.current.name
          title   = "Quota Automation Engine Metrics"
          period  = 300
        }
      },
      {
        type   = "metric"
        x      = 12
        y      = 0
        width  = 12
        height = 6

        properties = {
          metrics = [
            ["AWS/Events", "MatchedEvents", "RuleName", aws_cloudwatch_event_rule.quota_threshold_exceeded.name],
            [".", ".", ".", aws_cloudwatch_event_rule.quota_usage_spike.name]
          ]
          view    = "timeSeries"
          stacked = false
          region  = data.aws_region.current.name
          title   = "Quota Events Processed"
          period  = 300
        }
      },
      {
        type   = "log"
        x      = 0
        y      = 6
        width  = 24
        height = 6

        properties = {
          query   = "SOURCE '/aws/lambda/${aws_lambda_function.quota_automation_engine.function_name}' | fields @timestamp, @message | filter @message like /ERROR/ | sort @timestamp desc | limit 20"
          region  = data.aws_region.current.name
          title   = "Recent Automation Errors"
          view    = "table"
        }
      }
    ]
  })
}

# CloudWatch Alarms
resource "aws_cloudwatch_metric_alarm" "quota_automation_errors" {
  alarm_name          = "${var.environment}-quota-automation-errors"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = "2"
  metric_name         = "Errors"
  namespace           = "AWS/Lambda"
  period              = "300"
  statistic           = "Sum"
  threshold           = "5"
  alarm_description   = "This metric monitors quota automation errors"
  alarm_actions       = [aws_sns_topic.quota_alerts.arn]

  dimensions = {
    FunctionName = aws_lambda_function.quota_automation_engine.function_name
  }

  tags = {
    Environment = var.environment
    Purpose     = "QuotaAutomationMonitoring"
  }
}

# Outputs
output "quota_monitoring_table_name" {
  description = "Name of the quota monitoring DynamoDB table"
  value       = aws_dynamodb_table.quota_monitoring.name
}

output "quota_automation_rules_table_name" {
  description = "Name of the quota automation rules DynamoDB table"
  value       = aws_dynamodb_table.quota_automation_rules.name
}

output "quota_alerts_topic_arn" {
  description = "ARN of the quota alerts SNS topic"
  value       = aws_sns_topic.quota_alerts.arn
}

output "quota_automation_engine_function_name" {
  description = "Name of the quota automation engine Lambda function"
  value       = aws_lambda_function.quota_automation_engine.function_name
}

output "quota_event_bus_name" {
  description = "Name of the quota automation EventBridge bus"
  value       = aws_cloudwatch_event_bus.quota_automation.name
}

output "quota_approval_workflow_arn" {
  description = "ARN of the quota approval Step Functions workflow"
  value       = aws_sfn_state_machine.quota_approval_workflow.arn
}

output "dashboard_url" {
  description = "URL of the quota automation CloudWatch dashboard"
  value       = "https://${data.aws_region.current.name}.console.aws.amazon.com/cloudwatch/home?region=${data.aws_region.current.name}#dashboards:name=${aws_cloudwatch_dashboard.quota_automation.dashboard_name}"
}

Example 4: CI/CD Integration for Quota-Aware Deployments

{% raw %}

View code
# GitHub Actions workflow for quota-aware deployments
name: Quota-Aware Deployment Pipeline

on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main]

env:
  AWS_REGION: us-east-1
  ENVIRONMENT: ${{ github.ref == 'refs/heads/main' && 'production' || 'staging' }}

jobs:
  quota-validation:
    name: Validate Quota Requirements
    runs-on: ubuntu-latest
    outputs:
      quota-check-passed: ${{ steps.quota-validation.outputs.passed }}
      required-quotas: ${{ steps.quota-validation.outputs.required-quotas }}
    
    steps:
      - name: Checkout code
        uses: actions/checkout@v3
      
      - name: Configure AWS credentials
        uses: aws-actions/configure-aws-credentials@v2
        with:
          aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
          aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
          aws-region: ${{ env.AWS_REGION }}
      
      - name: Setup Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.9'
      
      - name: Install dependencies
        run: |
          pip install boto3 pyyaml jinja2
      
      - name: Analyze infrastructure requirements
        id: quota-validation
        run: |
          python .github/scripts/quota_validator.py \
            --environment ${{ env.ENVIRONMENT }} \
            --infrastructure-config infrastructure/config.yaml \
            --output-format github-actions
      
      - name: Upload quota analysis
        uses: actions/upload-artifact@v3
        with:
          name: quota-analysis-${{ env.ENVIRONMENT }}
          path: quota-analysis.json

  quota-preemptive-increase:
    name: Preemptive Quota Increases
    runs-on: ubuntu-latest
    needs: quota-validation
    if: needs.quota-validation.outputs.quota-check-passed == 'false'
    
    steps:
      - name: Checkout code
        uses: actions/checkout@v3
      
      - name: Configure AWS credentials
        uses: aws-actions/configure-aws-credentials@v2
        with:
          aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
          aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
          aws-region: ${{ env.AWS_REGION }}
      
      - name: Download quota analysis
        uses: actions/download-artifact@v3
        with:
          name: quota-analysis-${{ env.ENVIRONMENT }}
      
      - name: Request quota increases
        run: |
          python .github/scripts/quota_increaser.py \
            --analysis-file quota-analysis.json \
            --environment ${{ env.ENVIRONMENT }} \
            --auto-approve-threshold 1000
      
      - name: Wait for quota increases
        run: |
          python .github/scripts/quota_waiter.py \
            --analysis-file quota-analysis.json \
            --max-wait-time 1800 \
            --check-interval 60

  deploy:
    name: Deploy Infrastructure
    runs-on: ubuntu-latest
    needs: [quota-validation, quota-preemptive-increase]
    if: always() && (needs.quota-validation.outputs.quota-check-passed == 'true' || needs.quota-preemptive-increase.result == 'success')
    
    steps:
      - name: Checkout code
        uses: actions/checkout@v3
      
      - name: Configure AWS credentials
        uses: aws-actions/configure-aws-credentials@v2
        with:
          aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
          aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
          aws-region: ${{ env.AWS_REGION }}
      
      - name: Setup Terraform
        uses: hashicorp/setup-terraform@v2
        with:
          terraform_version: 1.5.0
      
      - name: Terraform Init
        run: terraform init
        working-directory: infrastructure
      
      - name: Terraform Plan with Quota Validation
        run: |
          terraform plan \
            -var="environment=${{ env.ENVIRONMENT }}" \
            -var="enable_quota_validation=true" \
            -out=tfplan
        working-directory: infrastructure
      
      - name: Terraform Apply
        run: terraform apply tfplan
        working-directory: infrastructure
      
      - name: Post-deployment quota monitoring
        run: |
          python .github/scripts/post_deployment_monitor.py \
            --environment ${{ env.ENVIRONMENT }} \
            --deployment-id ${{ github.run_id }}

  quota-optimization:
    name: Post-Deployment Quota Optimization
    runs-on: ubuntu-latest
    needs: deploy
    if: success()
    
    steps:
      - name: Checkout code
        uses: actions/checkout@v3
      
      - name: Configure AWS credentials
        uses: aws-actions/configure-aws-credentials@v2
        with:
          aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
          aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
          aws-region: ${{ env.AWS_REGION }}
      
      - name: Analyze actual resource usage
        run: |
          python .github/scripts/usage_analyzer.py \
            --environment ${{ env.ENVIRONMENT }} \
            --analysis-period 24h
      
      - name: Optimize quota allocations
        run: |
          python .github/scripts/quota_optimizer.py \
            --environment ${{ env.ENVIRONMENT }} \
            --optimization-strategy cost-aware

{% endraw %}

View code
# .github/scripts/quota_validator.py
#!/usr/bin/env python3

import boto3
import yaml
import json
import argparse
import sys
from typing import Dict, List, Tuple
from dataclasses import dataclass

@dataclass
class QuotaRequirement:
    service_code: str
    quota_code: str
    required_value: float
    current_value: float
    buffer_percentage: float = 20.0

class QuotaValidator:
    def __init__(self, region: str):
        self.region = region
        self.service_quotas = boto3.client('service-quotas', region_name=region)
        self.cloudformation = boto3.client('cloudformation', region_name=region)
        
    def analyze_infrastructure_config(self, config_path: str) -> List[QuotaRequirement]:
        """Analyze infrastructure configuration to determine quota requirements"""
        with open(config_path, 'r') as f:
            config = yaml.safe_load(f)
        
        requirements = []
        
        # Analyze EC2 requirements
        if 'ec2' in config:
            ec2_config = config['ec2']
            total_instances = sum(
                asg.get('desired_capacity', 0) for asg in ec2_config.get('auto_scaling_groups', [])
            )
            
            if total_instances > 0:
                current_quota = self.get_current_quota('ec2', 'L-1216C47A')  # Running On-Demand instances
                requirements.append(QuotaRequirement(
                    service_code='ec2',
                    quota_code='L-1216C47A',
                    required_value=total_instances,
                    current_value=current_quota
                ))
        
        # Analyze Lambda requirements
        if 'lambda' in config:
            lambda_config = config['lambda']
            total_concurrent = sum(
                func.get('reserved_concurrency', 0) for func in lambda_config.get('functions', [])
            )
            
            if total_concurrent > 0:
                current_quota = self.get_current_quota('lambda', 'L-B99A9384')  # Concurrent executions
                requirements.append(QuotaRequirement(
                    service_code='lambda',
                    quota_code='L-B99A9384',
                    required_value=total_concurrent,
                    current_value=current_quota
                ))
        
        # Analyze RDS requirements
        if 'rds' in config:
            rds_config = config['rds']
            total_instances = len(rds_config.get('instances', []))
            
            if total_instances > 0:
                current_quota = self.get_current_quota('rds', 'L-7B6409FD')  # DB instances
                requirements.append(QuotaRequirement(
                    service_code='rds',
                    quota_code='L-7B6409FD',
                    required_value=total_instances,
                    current_value=current_quota
                ))
        
        return requirements
    
    def get_current_quota(self, service_code: str, quota_code: str) -> float:
        """Get current quota value"""
        try:
            response = self.service_quotas.get_service_quota(
                ServiceCode=service_code,
                QuotaCode=quota_code
            )
            return response['Quota']['Value']
        except Exception as e:
            print(f"Warning: Could not get quota for {service_code}/{quota_code}: {e}")
            return 0.0
    
    def validate_quotas(self, requirements: List[QuotaRequirement]) -> Tuple[bool, List[Dict]]:
        """Validate if current quotas are sufficient"""
        validation_results = []
        all_passed = True
        
        for req in requirements:
            required_with_buffer = req.required_value * (1 + req.buffer_percentage / 100)
            sufficient = req.current_value >= required_with_buffer
            
            if not sufficient:
                all_passed = False
            
            validation_results.append({
                'service_code': req.service_code,
                'quota_code': req.quota_code,
                'required_value': req.required_value,
                'required_with_buffer': required_with_buffer,
                'current_value': req.current_value,
                'sufficient': sufficient,
                'shortfall': max(0, required_with_buffer - req.current_value)
            })
        
        return all_passed, validation_results

def main():
    parser = argparse.ArgumentParser(description='Validate quota requirements for deployment')
    parser.add_argument('--environment', required=True, help='Environment name')
    parser.add_argument('--infrastructure-config', required=True, help='Infrastructure configuration file')
    parser.add_argument('--output-format', choices=['json', 'github-actions'], default='json')
    
    args = parser.parse_args()
    
    validator = QuotaValidator('us-east-1')  # Could be parameterized
    
    # Analyze requirements
    requirements = validator.analyze_infrastructure_config(args.infrastructure_config)
    
    # Validate quotas
    passed, results = validator.validate_quotas(requirements)
    
    # Output results
    analysis = {
        'environment': args.environment,
        'validation_passed': passed,
        'requirements': results,
        'timestamp': datetime.utcnow().isoformat()
    }
    
    if args.output_format == 'github-actions':
        print(f"::set-output name=passed::{str(passed).lower()}")
        print(f"::set-output name=required-quotas::{json.dumps(results)}")
        
        if not passed:
            print("::warning::Quota validation failed - some quotas need to be increased")
            for result in results:
                if not result['sufficient']:
                    print(f"::warning::Insufficient quota for {result['service_code']}/{result['quota_code']}: need {result['required_with_buffer']}, have {result['current_value']}")
    
    # Save analysis file
    with open('quota-analysis.json', 'w') as f:
        json.dump(analysis, f, indent=2)
    
    if not passed:
        sys.exit(1)

if __name__ == '__main__':
    main()
View code
# .github/scripts/quota_increaser.py
#!/usr/bin/env python3

import boto3
import json
import argparse
import time
from typing import Dict, List

class QuotaIncreaser:
    def __init__(self, region: str):
        self.region = region
        self.service_quotas = boto3.client('service-quotas', region_name=region)
        self.support = boto3.client('support', region_name=region)
    
    def process_quota_increases(self, analysis_file: str, auto_approve_threshold: float) -> List[Dict]:
        """Process quota increase requests based on analysis"""
        with open(analysis_file, 'r') as f:
            analysis = json.load(f)
        
        results = []
        
        for requirement in analysis['requirements']:
            if not requirement['sufficient']:
                result = self.request_quota_increase(
                    requirement, auto_approve_threshold
                )
                results.append(result)
        
        return results
    
    def request_quota_increase(self, requirement: Dict, auto_approve_threshold: float) -> Dict:
        """Request quota increase for a specific requirement"""
        service_code = requirement['service_code']
        quota_code = requirement['quota_code']
        new_value = requirement['required_with_buffer']
        
        try:
            # Try Service Quotas API first
            response = self.service_quotas.request_service_quota_increase(
                ServiceCode=service_code,
                QuotaCode=quota_code,
                DesiredValue=new_value
            )
            
            return {
                'service_code': service_code,
                'quota_code': quota_code,
                'requested_value': new_value,
                'method': 'service_quotas_api',
                'request_id': response['RequestedQuota']['Id'],
                'status': 'submitted',
                'estimated_cost': self.estimate_cost_impact(requirement)
            }
            
        except Exception as e:
            # Fall back to support case if cost is below threshold
            estimated_cost = self.estimate_cost_impact(requirement)
            
            if estimated_cost <= auto_approve_threshold:
                case_id = self.create_support_case(requirement)
                return {
                    'service_code': service_code,
                    'quota_code': quota_code,
                    'requested_value': new_value,
                    'method': 'support_case',
                    'case_id': case_id,
                    'status': 'support_case_created',
                    'estimated_cost': estimated_cost,
                    'api_error': str(e)
                }
            else:
                return {
                    'service_code': service_code,
                    'quota_code': quota_code,
                    'requested_value': new_value,
                    'method': 'manual_approval_required',
                    'status': 'requires_manual_approval',
                    'estimated_cost': estimated_cost,
                    'reason': f'Cost ${estimated_cost} exceeds auto-approval threshold ${auto_approve_threshold}'
                }
    
    def estimate_cost_impact(self, requirement: Dict) -> float:
        """Estimate cost impact of quota increase"""
        # Simplified cost estimation
        service_costs = {
            'ec2': 0.10,  # per instance hour
            'lambda': 0.0000002,  # per request
            'rds': 0.20,  # per instance hour
        }
        
        service_code = requirement['service_code']
        shortfall = requirement['shortfall']
        
        base_cost = service_costs.get(service_code, 0.01)
        monthly_cost = shortfall * base_cost * 24 * 30
        
        return monthly_cost
    
    def create_support_case(self, requirement: Dict) -> str:
        """Create support case for quota increase"""
        service_code = requirement['service_code']
        quota_code = requirement['quota_code']
        new_value = requirement['required_with_buffer']
        
        case_body = f"""
Automated Quota Increase Request from CI/CD Pipeline

Service: {service_code}
Quota Code: {quota_code}
Current Limit: {requirement['current_value']}
Requested Limit: {new_value}
Required for Deployment: {requirement['required_value']}

This request was automatically generated during our deployment pipeline
to ensure adequate capacity for infrastructure deployment.

Business Justification:
- Required for automated deployment pipeline
- Prevents deployment failures due to quota constraints
- Supports infrastructure scaling requirements
- Part of automated quota management strategy

Please process this request with high priority to avoid deployment delays.
        """.strip()
        
        response = self.support.create_case(
            subject=f"CI/CD Quota Increase: {service_code} - {quota_code}",
            serviceCode='service-limit-increase',
            severityCode='normal',
            categoryCode='service-limit-increase',
            communicationBody=case_body,
            language='en'
        )
        
        return response['caseId']

def main():
    parser = argparse.ArgumentParser(description='Request quota increases based on analysis')
    parser.add_argument('--analysis-file', required=True, help='Quota analysis JSON file')
    parser.add_argument('--environment', required=True, help='Environment name')
    parser.add_argument('--auto-approve-threshold', type=float, default=1000.0, help='Auto-approval cost threshold')
    
    args = parser.parse_args()
    
    increaser = QuotaIncreaser('us-east-1')
    results = increaser.process_quota_increases(args.analysis_file, args.auto_approve_threshold)
    
    print(f"Processed {len(results)} quota increase requests:")
    for result in results:
        print(f"- {result['service_code']}/{result['quota_code']}: {result['status']}")
    
    # Save results for next step
    with open('quota-increase-results.json', 'w') as f:
        json.dump(results, f, indent=2)

if __name__ == '__main__':
    main()

AWS Services Used

  • AWS Service Quotas: Core service for automated quota monitoring and management
  • Amazon EventBridge: Event-driven automation and workflow orchestration
  • AWS Lambda: Serverless execution of automation logic and event processing
  • AWS Step Functions: Complex workflow orchestration for approval processes
  • Amazon DynamoDB: Storage for automation rules, events, and audit trails
  • Amazon SNS: Notification system for alerts and automation events
  • Amazon CloudWatch: Metrics, monitoring, and automated alerting
  • AWS Support API: Automated support case creation for quota increases
  • AWS Systems Manager: Parameter storage and configuration management
  • Amazon S3: Storage for ML models and automation artifacts
  • AWS IAM: Fine-grained access control for automation components
  • AWS CloudFormation/Terraform: Infrastructure as code with quota awareness

Benefits

  • Zero-Touch Operations: Fully automated quota management without manual intervention
  • Predictive Management: ML-based prediction and proactive quota adjustments
  • Event-Driven Response: Real-time response to quota events and threshold breaches
  • Cost-Aware Automation: Intelligent cost consideration in automation decisions
  • CI/CD Integration: Seamless integration with deployment pipelines and infrastructure automation
  • Multi-Account Orchestration: Coordinated automation across complex AWS environments
  • Audit and Compliance: Complete audit trails and governance for all automation actions
  • Self-Healing Systems: Automatic recovery and optimization of quota allocations
  • Business Hours Awareness: Configurable automation behavior based on business requirements
  • Approval Workflows: Flexible approval processes for high-impact quota changes