REL01
REL01-BP05 - Automate quota management
REL01-BP05: Automate quota management
Overview
Implement fully automated quota management systems that proactively monitor, analyze, and adjust service quotas without manual intervention. Automate the entire quota lifecycle from monitoring and alerting to increase requests and approval workflows, ensuring optimal resource availability while minimizing operational overhead.
Implementation Steps
1. Deploy Intelligent Quota Automation
- Implement machine learning-based quota prediction and management
- Set up automated quota increase workflows with approval chains
- Create self-healing quota management systems
- Establish automated quota optimization and right-sizing
2. Integrate with Infrastructure Automation
- Embed quota management in CI/CD pipelines and deployment processes
- Implement quota-aware infrastructure provisioning and scaling
- Create automated quota validation for infrastructure changes
- Set up dynamic quota adjustment based on workload patterns
3. Establish Event-Driven Quota Management
- Implement real-time quota adjustment based on usage patterns
- Set up automated responses to quota threshold breaches
- Create event-driven quota coordination across accounts and regions
- Establish automated disaster recovery quota pre-warming
4. Create Autonomous Quota Governance
- Implement automated quota policy enforcement and compliance
- Set up automated quota cost optimization and budget management
- Create automated quota audit trails and reporting
- Establish automated quota security and access controls
5. Deploy Predictive Quota Management
- Implement forecasting models for quota demand prediction
- Set up automated capacity planning and quota pre-allocation
- Create seasonal and trend-based quota adjustment automation
- Establish automated quota buffer management and optimization
6. Integrate Cross-Service Quota Orchestration
- Implement automated quota coordination across multiple AWS services
- Set up automated quota dependency management and resolution
- Create automated quota impact analysis and mitigation
- Establish automated quota rollback and recovery procedures
Implementation Examples
Example 1: Intelligent Quota Automation Engine
View code
import boto3
import json
import logging
import asyncio
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, asdict
from enum import Enum
import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestRegressor
import joblib
import os
class AutomationLevel(Enum):
MONITOR_ONLY = "monitor_only"
ALERT_ONLY = "alert_only"
AUTO_REQUEST = "auto_request"
AUTO_APPROVE = "auto_approve"
FULL_AUTO = "full_auto"
@dataclass
class QuotaAutomationRule:
service_code: str
quota_code: str
region: str
automation_level: AutomationLevel
threshold_warning: float = 70.0
threshold_critical: float = 85.0
threshold_emergency: float = 95.0
auto_increase_multiplier: float = 1.5
max_auto_increase: float = 10000
approval_required: bool = False
business_hours_only: bool = False
cost_threshold: float = 1000.0
class IntelligentQuotaAutomationEngine:
def __init__(self, config: Dict):
self.config = config
self.service_quotas = boto3.client('service-quotas')
self.support = boto3.client('support')
self.cloudwatch = boto3.client('cloudwatch')
self.sns = boto3.client('sns')
self.stepfunctions = boto3.client('stepfunctions')
self.dynamodb = boto3.resource('dynamodb')
# Initialize tables
self.quota_table = self.dynamodb.Table(config['quota_table_name'])
self.rules_table = self.dynamodb.Table(config['rules_table_name'])
self.automation_log_table = self.dynamodb.Table(config['automation_log_table_name'])
# ML models for prediction
self.models = {}
self.load_prediction_models()
def load_prediction_models(self):
"""Load pre-trained ML models for quota prediction"""
model_path = self.config.get('model_path', '/tmp/models')
try:
if os.path.exists(f"{model_path}/quota_predictor.joblib"):
self.models['quota_predictor'] = joblib.load(f"{model_path}/quota_predictor.joblib")
logging.info("Loaded quota prediction model")
else:
# Create and train a simple model if none exists
self.models['quota_predictor'] = self.create_default_model()
logging.info("Created default quota prediction model")
except Exception as e:
logging.error(f"Error loading models: {str(e)}")
self.models['quota_predictor'] = self.create_default_model()
def create_default_model(self):
"""Create a default prediction model"""
return RandomForestRegressor(n_estimators=100, random_state=42)
async def run_automation_cycle(self) -> Dict:
"""Run complete automation cycle"""
cycle_start = datetime.utcnow()
results = {
'cycle_start': cycle_start.isoformat(),
'quotas_processed': 0,
'automations_executed': 0,
'errors': [],
'actions_taken': []
}
try:
# Get all automation rules
automation_rules = await self.get_automation_rules()
# Process each rule
for rule in automation_rules:
try:
await self.process_automation_rule(rule, results)
results['quotas_processed'] += 1
except Exception as e:
error_msg = f"Error processing rule {rule.service_code}/{rule.quota_code}: {str(e)}"
logging.error(error_msg)
results['errors'].append(error_msg)
# Update ML models with new data
await self.update_prediction_models()
# Generate automation report
await self.generate_automation_report(results)
except Exception as e:
logging.error(f"Error in automation cycle: {str(e)}")
results['errors'].append(str(e))
results['cycle_duration'] = (datetime.utcnow() - cycle_start).total_seconds()
return results
async def get_automation_rules(self) -> List[QuotaAutomationRule]:
"""Get all automation rules from DynamoDB"""
rules = []
try:
response = self.rules_table.scan()
for item in response['Items']:
rule = QuotaAutomationRule(
service_code=item['service_code'],
quota_code=item['quota_code'],
region=item['region'],
automation_level=AutomationLevel(item['automation_level']),
threshold_warning=float(item.get('threshold_warning', 70.0)),
threshold_critical=float(item.get('threshold_critical', 85.0)),
threshold_emergency=float(item.get('threshold_emergency', 95.0)),
auto_increase_multiplier=float(item.get('auto_increase_multiplier', 1.5)),
max_auto_increase=float(item.get('max_auto_increase', 10000)),
approval_required=item.get('approval_required', False),
business_hours_only=item.get('business_hours_only', False),
cost_threshold=float(item.get('cost_threshold', 1000.0))
)
rules.append(rule)
except Exception as e:
logging.error(f"Error getting automation rules: {str(e)}")
return rules
async def process_automation_rule(self, rule: QuotaAutomationRule, results: Dict):
"""Process individual automation rule"""
# Get current quota data
quota_data = await self.get_quota_data(rule.service_code, rule.quota_code, rule.region)
if not quota_data:
return
current_usage = quota_data['current_usage']
quota_value = quota_data['quota_value']
utilization = (current_usage / quota_value) * 100
# Predict future usage
predicted_usage = await self.predict_quota_usage(rule, quota_data)
predicted_utilization = (predicted_usage / quota_value) * 100
# Determine action based on automation level and thresholds
action_needed = self.determine_automation_action(rule, utilization, predicted_utilization)
if action_needed:
await self.execute_automation_action(rule, quota_data, action_needed, results)
async def get_quota_data(self, service_code: str, quota_code: str, region: str) -> Optional[Dict]:
"""Get current quota data"""
try:
quota_id = f"{service_code}#{quota_code}#{region}"
# Get latest quota data
response = self.quota_table.query(
KeyConditionExpression='quota_id = :quota_id',
ScanIndexForward=False,
Limit=1,
ExpressionAttributeValues={':quota_id': quota_id}
)
if response['Items']:
return response['Items'][0]
except Exception as e:
logging.error(f"Error getting quota data: {str(e)}")
return None
async def predict_quota_usage(self, rule: QuotaAutomationRule, quota_data: Dict) -> float:
"""Predict future quota usage using ML models"""
try:
# Get historical data for prediction
historical_data = await self.get_historical_quota_data(
rule.service_code, rule.quota_code, rule.region, days=30
)
if len(historical_data) < 7: # Need minimum data points
# Use simple linear extrapolation
current_usage = quota_data['current_usage']
return current_usage * 1.1 # 10% growth assumption
# Prepare features for ML model
features = self.prepare_prediction_features(historical_data)
# Use ML model for prediction
model = self.models.get('quota_predictor')
if model and len(features) > 0:
# Predict usage for next 7 days
prediction = model.predict([features[-1]])[0]
return max(prediction, quota_data['current_usage'])
except Exception as e:
logging.error(f"Error predicting quota usage: {str(e)}")
# Fallback to simple growth calculation
return quota_data['current_usage'] * 1.2
async def get_historical_quota_data(self, service_code: str, quota_code: str,
region: str, days: int = 30) -> List[Dict]:
"""Get historical quota data for trend analysis"""
try:
quota_id = f"{service_code}#{quota_code}#{region}"
start_time = int((datetime.utcnow() - timedelta(days=days)).timestamp())
response = self.quota_table.query(
KeyConditionExpression='quota_id = :quota_id AND #ts >= :start_time',
ExpressionAttributeNames={'#ts': 'timestamp'},
ExpressionAttributeValues={
':quota_id': quota_id,
':start_time': start_time
}
)
return response['Items']
except Exception as e:
logging.error(f"Error getting historical data: {str(e)}")
return []
def prepare_prediction_features(self, historical_data: List[Dict]) -> List[List[float]]:
"""Prepare features for ML prediction"""
if not historical_data:
return []
# Convert to DataFrame for easier processing
df = pd.DataFrame(historical_data)
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='s')
df = df.sort_values('timestamp')
features = []
for i in range(len(df)):
if i >= 6: # Need at least 7 data points for features
# Create features: usage trend, day of week, hour, etc.
recent_usage = df['current_usage'].iloc[i-6:i+1].values
usage_trend = np.polyfit(range(7), recent_usage, 1)[0]
avg_usage = np.mean(recent_usage)
max_usage = np.max(recent_usage)
min_usage = np.min(recent_usage)
timestamp = df['timestamp'].iloc[i]
day_of_week = timestamp.weekday()
hour_of_day = timestamp.hour
feature_vector = [
usage_trend,
avg_usage,
max_usage,
min_usage,
day_of_week,
hour_of_day,
df['utilization_percentage'].iloc[i]
]
features.append(feature_vector)
return features
def determine_automation_action(self, rule: QuotaAutomationRule,
current_utilization: float,
predicted_utilization: float) -> Optional[str]:
"""Determine what automation action to take"""
max_utilization = max(current_utilization, predicted_utilization)
# Check business hours constraint
if rule.business_hours_only and not self.is_business_hours():
if max_utilization >= rule.threshold_emergency:
return "emergency_increase" # Override business hours for emergencies
return None
# Determine action based on automation level and thresholds
if rule.automation_level == AutomationLevel.MONITOR_ONLY:
return None
if max_utilization >= rule.threshold_emergency:
if rule.automation_level in [AutomationLevel.AUTO_APPROVE, AutomationLevel.FULL_AUTO]:
return "emergency_increase"
elif rule.automation_level == AutomationLevel.AUTO_REQUEST:
return "request_increase"
else:
return "alert_emergency"
elif max_utilization >= rule.threshold_critical:
if rule.automation_level in [AutomationLevel.AUTO_APPROVE, AutomationLevel.FULL_AUTO]:
return "auto_increase"
elif rule.automation_level == AutomationLevel.AUTO_REQUEST:
return "request_increase"
else:
return "alert_critical"
elif max_utilization >= rule.threshold_warning:
if rule.automation_level == AutomationLevel.FULL_AUTO:
return "preemptive_increase"
else:
return "alert_warning"
return None
def is_business_hours(self) -> bool:
"""Check if current time is within business hours"""
now = datetime.utcnow()
# Assume business hours are 9 AM to 6 PM UTC, Monday to Friday
return (now.weekday() < 5 and 9 <= now.hour < 18)
async def execute_automation_action(self, rule: QuotaAutomationRule,
quota_data: Dict, action: str, results: Dict):
"""Execute the determined automation action"""
try:
action_result = None
if action in ["emergency_increase", "auto_increase", "preemptive_increase"]:
action_result = await self.execute_quota_increase(rule, quota_data, action)
elif action == "request_increase":
action_result = await self.request_quota_increase(rule, quota_data)
elif action.startswith("alert_"):
action_result = await self.send_quota_alert(rule, quota_data, action)
if action_result:
results['automations_executed'] += 1
results['actions_taken'].append({
'rule': f"{rule.service_code}/{rule.quota_code}/{rule.region}",
'action': action,
'result': action_result,
'timestamp': datetime.utcnow().isoformat()
})
# Log automation action
await self.log_automation_action(rule, action, action_result)
except Exception as e:
error_msg = f"Error executing action {action}: {str(e)}"
logging.error(error_msg)
results['errors'].append(error_msg)
async def execute_quota_increase(self, rule: QuotaAutomationRule,
quota_data: Dict, action_type: str) -> Dict:
"""Execute automated quota increase"""
current_value = quota_data['quota_value']
# Calculate new quota value
if action_type == "emergency_increase":
new_value = min(current_value * 2.0, rule.max_auto_increase)
elif action_type == "auto_increase":
new_value = min(current_value * rule.auto_increase_multiplier, rule.max_auto_increase)
else: # preemptive_increase
new_value = min(current_value * 1.2, rule.max_auto_increase)
# Check cost implications
estimated_cost = await self.estimate_quota_cost(rule, current_value, new_value)
if estimated_cost > rule.cost_threshold and rule.approval_required:
# Trigger approval workflow
return await self.trigger_approval_workflow(rule, quota_data, new_value, estimated_cost)
# Execute quota increase
try:
response = self.service_quotas.request_service_quota_increase(
ServiceCode=rule.service_code,
QuotaCode=rule.quota_code,
DesiredValue=new_value
)
return {
'status': 'success',
'action': 'quota_increased',
'old_value': current_value,
'new_value': new_value,
'request_id': response.get('RequestedQuota', {}).get('Id'),
'estimated_cost': estimated_cost
}
except Exception as e:
# Fall back to support case
case_id = await self.create_support_case(rule, quota_data, new_value)
return {
'status': 'support_case_created',
'action': 'support_case',
'old_value': current_value,
'new_value': new_value,
'case_id': case_id,
'estimated_cost': estimated_cost
}
async def estimate_quota_cost(self, rule: QuotaAutomationRule,
current_value: float, new_value: float) -> float:
"""Estimate cost impact of quota increase"""
# This is a simplified cost estimation
# In practice, you would integrate with AWS Pricing API or Cost Explorer
service_cost_per_unit = {
'ec2': 0.10, # per instance hour
'lambda': 0.0000002, # per request
'rds': 0.20, # per instance hour
's3': 0.023, # per GB
}
base_cost = service_cost_per_unit.get(rule.service_code, 0.01)
increase_amount = new_value - current_value
# Estimate monthly cost impact
estimated_monthly_cost = increase_amount * base_cost * 24 * 30
return estimated_monthly_cost
async def trigger_approval_workflow(self, rule: QuotaAutomationRule,
quota_data: Dict, new_value: float,
estimated_cost: float) -> Dict:
"""Trigger Step Functions workflow for approval"""
try:
workflow_input = {
'rule': asdict(rule),
'quota_data': quota_data,
'new_value': new_value,
'estimated_cost': estimated_cost,
'timestamp': datetime.utcnow().isoformat()
}
response = self.stepfunctions.start_execution(
stateMachineArn=self.config['approval_workflow_arn'],
input=json.dumps(workflow_input)
)
return {
'status': 'approval_pending',
'action': 'approval_workflow_triggered',
'execution_arn': response['executionArn'],
'estimated_cost': estimated_cost
}
except Exception as e:
logging.error(f"Error triggering approval workflow: {str(e)}")
return {
'status': 'error',
'action': 'approval_workflow_failed',
'error': str(e)
}
async def create_support_case(self, rule: QuotaAutomationRule,
quota_data: Dict, new_value: float) -> str:
"""Create automated support case for quota increase"""
try:
case_body = f"""
Automated Quota Increase Request
Service: {rule.service_code}
Quota Code: {rule.quota_code}
Region: {rule.region}
Current Limit: {quota_data['quota_value']}
Requested Limit: {new_value}
Current Usage: {quota_data['current_usage']}
Utilization: {quota_data['utilization_percentage']:.1f}%
This request was automatically generated by our intelligent quota management system
based on usage patterns and predictive analysis.
Justification:
- Current utilization exceeds safe operating thresholds
- Predictive models indicate continued growth
- Automated system determined quota increase is necessary
- Request follows established automation policies
Business Impact:
- Prevents service disruptions and availability issues
- Maintains application performance standards
- Supports business growth and scaling requirements
""".strip()
response = self.support.create_case(
subject=f"Automated Quota Increase: {rule.service_code} - {rule.quota_code}",
serviceCode='service-limit-increase',
severityCode='normal',
categoryCode='service-limit-increase',
communicationBody=case_body,
ccEmailAddresses=self.config.get('notification_emails', []),
language='en'
)
return response['caseId']
except Exception as e:
logging.error(f"Error creating support case: {str(e)}")
return None
async def log_automation_action(self, rule: QuotaAutomationRule,
action: str, result: Dict):
"""Log automation action to DynamoDB"""
try:
log_item = {
'log_id': f"{rule.service_code}#{rule.quota_code}#{rule.region}#{int(datetime.utcnow().timestamp())}",
'timestamp': int(datetime.utcnow().timestamp()),
'service_code': rule.service_code,
'quota_code': rule.quota_code,
'region': rule.region,
'automation_level': rule.automation_level.value,
'action_taken': action,
'result': json.dumps(result),
'ttl': int((datetime.utcnow() + timedelta(days=365)).timestamp())
}
self.automation_log_table.put_item(Item=log_item)
except Exception as e:
logging.error(f"Error logging automation action: {str(e)}")
# Usage example
async def main():
config = {
'quota_table_name': 'quota-monitoring',
'rules_table_name': 'quota-automation-rules',
'automation_log_table_name': 'quota-automation-log',
'approval_workflow_arn': 'arn:aws:states:us-east-1:123456789012:stateMachine:QuotaApprovalWorkflow',
'notification_emails': ['admin@company.com'],
'model_path': '/tmp/models'
}
engine = IntelligentQuotaAutomationEngine(config)
results = await engine.run_automation_cycle()
print(f"Automation cycle completed:")
print(f"- Quotas processed: {results['quotas_processed']}")
print(f"- Automations executed: {results['automations_executed']}")
print(f"- Errors: {len(results['errors'])}")
print(f"- Duration: {results['cycle_duration']:.2f} seconds")
if __name__ == "__main__":
asyncio.run(main())Example 2: Event-Driven Quota Automation System
View code
import boto3
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from dataclasses import dataclass
import asyncio
import aiohttp
@dataclass
class QuotaEvent:
event_type: str
service_code: str
quota_code: str
region: str
account_id: str
current_usage: float
quota_value: float
utilization_percentage: float
timestamp: datetime
metadata: Dict = None
class EventDrivenQuotaAutomation:
def __init__(self, config: Dict):
self.config = config
self.eventbridge = boto3.client('events')
self.lambda_client = boto3.client('lambda')
self.service_quotas = boto3.client('service-quotas')
self.dynamodb = boto3.resource('dynamodb')
self.sns = boto3.client('sns')
# Initialize tables
self.events_table = self.dynamodb.Table(config['events_table_name'])
self.automation_state_table = self.dynamodb.Table(config['automation_state_table_name'])
# Event handlers
self.event_handlers = {
'quota_threshold_exceeded': self.handle_threshold_exceeded,
'quota_usage_spike': self.handle_usage_spike,
'quota_prediction_alert': self.handle_prediction_alert,
'infrastructure_scaling': self.handle_infrastructure_scaling,
'disaster_recovery_triggered': self.handle_disaster_recovery,
'cost_optimization_required': self.handle_cost_optimization
}
async def process_quota_event(self, event: QuotaEvent) -> Dict:
"""Process incoming quota event"""
try:
# Log event
await self.log_quota_event(event)
# Get automation state
automation_state = await self.get_automation_state(
event.service_code, event.quota_code, event.region
)
# Check if event should trigger automation
if not self.should_process_event(event, automation_state):
return {'status': 'skipped', 'reason': 'automation_conditions_not_met'}
# Route to appropriate handler
handler = self.event_handlers.get(event.event_type)
if handler:
result = await handler(event, automation_state)
# Update automation state
await self.update_automation_state(event, result)
return result
else:
return {'status': 'error', 'reason': f'no_handler_for_event_type_{event.event_type}'}
except Exception as e:
logging.error(f"Error processing quota event: {str(e)}")
return {'status': 'error', 'reason': str(e)}
async def handle_threshold_exceeded(self, event: QuotaEvent, state: Dict) -> Dict:
"""Handle quota threshold exceeded events"""
actions_taken = []
try:
# Immediate response based on utilization level
if event.utilization_percentage >= 95:
# Emergency scaling
result = await self.emergency_quota_increase(event)
actions_taken.append(result)
# Trigger infrastructure scaling if available
scaling_result = await self.trigger_infrastructure_scaling(event)
if scaling_result:
actions_taken.append(scaling_result)
elif event.utilization_percentage >= 85:
# Automated quota increase
result = await self.automated_quota_increase(event)
actions_taken.append(result)
elif event.utilization_percentage >= 70:
# Predictive scaling preparation
result = await self.prepare_predictive_scaling(event)
actions_taken.append(result)
# Send notifications
await self.send_event_notification(event, actions_taken)
return {
'status': 'success',
'event_type': event.event_type,
'actions_taken': actions_taken,
'timestamp': datetime.utcnow().isoformat()
}
except Exception as e:
logging.error(f"Error handling threshold exceeded: {str(e)}")
return {'status': 'error', 'reason': str(e)}
async def handle_usage_spike(self, event: QuotaEvent, state: Dict) -> Dict:
"""Handle sudden usage spike events"""
try:
# Analyze spike pattern
spike_analysis = await self.analyze_usage_spike(event)
actions_taken = []
if spike_analysis['severity'] == 'high':
# Immediate quota buffer increase
buffer_result = await self.increase_quota_buffer(event, multiplier=1.5)
actions_taken.append(buffer_result)
# Scale out infrastructure if possible
scale_result = await self.scale_out_infrastructure(event)
if scale_result:
actions_taken.append(scale_result)
elif spike_analysis['severity'] == 'medium':
# Moderate quota adjustment
adjust_result = await self.adjust_quota_proactively(event, multiplier=1.2)
actions_taken.append(adjust_result)
# Set up enhanced monitoring
monitoring_result = await self.enable_enhanced_monitoring(event)
actions_taken.append(monitoring_result)
return {
'status': 'success',
'event_type': event.event_type,
'spike_analysis': spike_analysis,
'actions_taken': actions_taken
}
except Exception as e:
logging.error(f"Error handling usage spike: {str(e)}")
return {'status': 'error', 'reason': str(e)}
async def handle_prediction_alert(self, event: QuotaEvent, state: Dict) -> Dict:
"""Handle predictive quota alerts"""
try:
# Get prediction details from metadata
prediction_data = event.metadata.get('prediction', {})
predicted_utilization = prediction_data.get('predicted_utilization_7d', 0)
confidence = prediction_data.get('confidence', 0)
actions_taken = []
if confidence > 0.8 and predicted_utilization > 80:
# High confidence prediction - take proactive action
proactive_result = await self.proactive_quota_adjustment(event, prediction_data)
actions_taken.append(proactive_result)
# Schedule capacity planning review
planning_result = await self.schedule_capacity_planning(event, prediction_data)
actions_taken.append(planning_result)
elif confidence > 0.6 and predicted_utilization > 70:
# Medium confidence - prepare for potential increase
preparation_result = await self.prepare_quota_increase(event, prediction_data)
actions_taken.append(preparation_result)
return {
'status': 'success',
'event_type': event.event_type,
'prediction_data': prediction_data,
'actions_taken': actions_taken
}
except Exception as e:
logging.error(f"Error handling prediction alert: {str(e)}")
return {'status': 'error', 'reason': str(e)}
async def handle_infrastructure_scaling(self, event: QuotaEvent, state: Dict) -> Dict:
"""Handle infrastructure scaling events"""
try:
scaling_metadata = event.metadata.get('scaling', {})
scaling_direction = scaling_metadata.get('direction', 'up')
scaling_factor = scaling_metadata.get('factor', 1.0)
actions_taken = []
if scaling_direction == 'up':
# Pre-emptively increase quotas for scaling up
quota_result = await self.preemptive_quota_increase(event, scaling_factor)
actions_taken.append(quota_result)
# Coordinate with other services that might be affected
coordination_result = await self.coordinate_cross_service_quotas(event, scaling_factor)
actions_taken.append(coordination_result)
elif scaling_direction == 'down':
# Optimize quotas for scaling down
optimization_result = await self.optimize_quotas_for_scale_down(event, scaling_factor)
actions_taken.append(optimization_result)
return {
'status': 'success',
'event_type': event.event_type,
'scaling_metadata': scaling_metadata,
'actions_taken': actions_taken
}
except Exception as e:
logging.error(f"Error handling infrastructure scaling: {str(e)}")
return {'status': 'error', 'reason': str(e)}
async def handle_disaster_recovery(self, event: QuotaEvent, state: Dict) -> Dict:
"""Handle disaster recovery triggered events"""
try:
dr_metadata = event.metadata.get('disaster_recovery', {})
dr_region = dr_metadata.get('target_region')
dr_type = dr_metadata.get('type', 'failover')
actions_taken = []
if dr_type == 'failover':
# Ensure DR region has adequate quotas
dr_quota_result = await self.ensure_dr_region_quotas(event, dr_region)
actions_taken.append(dr_quota_result)
# Coordinate quota increases across dependent services
coordination_result = await self.coordinate_dr_quota_increases(event, dr_region)
actions_taken.append(coordination_result)
elif dr_type == 'failback':
# Restore original region quotas
restore_result = await self.restore_original_region_quotas(event)
actions_taken.append(restore_result)
return {
'status': 'success',
'event_type': event.event_type,
'dr_metadata': dr_metadata,
'actions_taken': actions_taken
}
except Exception as e:
logging.error(f"Error handling disaster recovery: {str(e)}")
return {'status': 'error', 'reason': str(e)}
async def emergency_quota_increase(self, event: QuotaEvent) -> Dict:
"""Execute emergency quota increase"""
try:
current_quota = event.quota_value
emergency_quota = current_quota * 2.0 # Double the quota for emergency
# Try Service Quotas API first
try:
response = self.service_quotas.request_service_quota_increase(
ServiceCode=event.service_code,
QuotaCode=event.quota_code,
DesiredValue=emergency_quota
)
return {
'action': 'emergency_quota_increase',
'method': 'service_quotas_api',
'old_value': current_quota,
'new_value': emergency_quota,
'request_id': response.get('RequestedQuota', {}).get('Id'),
'status': 'submitted'
}
except Exception as api_error:
# Fall back to support case with high priority
support_case_id = await self.create_emergency_support_case(event, emergency_quota)
return {
'action': 'emergency_quota_increase',
'method': 'support_case',
'old_value': current_quota,
'new_value': emergency_quota,
'case_id': support_case_id,
'status': 'support_case_created',
'api_error': str(api_error)
}
except Exception as e:
logging.error(f"Error in emergency quota increase: {str(e)}")
return {'action': 'emergency_quota_increase', 'status': 'error', 'error': str(e)}
async def trigger_infrastructure_scaling(self, event: QuotaEvent) -> Optional[Dict]:
"""Trigger infrastructure scaling to reduce quota pressure"""
try:
# This would integrate with your infrastructure automation
# For example, triggering Auto Scaling Group scaling, Lambda concurrency adjustments, etc.
scaling_config = self.config.get('infrastructure_scaling', {})
if event.service_code == 'ec2':
# Trigger EC2 Auto Scaling
return await self.trigger_ec2_scaling(event, scaling_config)
elif event.service_code == 'lambda':
# Adjust Lambda concurrency or trigger additional functions
return await self.trigger_lambda_scaling(event, scaling_config)
elif event.service_code == 'rds':
# Consider read replica scaling or connection pooling
return await self.trigger_rds_scaling(event, scaling_config)
return None
except Exception as e:
logging.error(f"Error triggering infrastructure scaling: {str(e)}")
return {'action': 'infrastructure_scaling', 'status': 'error', 'error': str(e)}
async def analyze_usage_spike(self, event: QuotaEvent) -> Dict:
"""Analyze usage spike characteristics"""
try:
# Get recent usage history
history = await self.get_recent_usage_history(
event.service_code, event.quota_code, event.region, hours=24
)
if not history:
return {'severity': 'unknown', 'pattern': 'insufficient_data'}
# Calculate spike characteristics
recent_usage = [h['current_usage'] for h in history[-6:]] # Last 6 data points
baseline_usage = [h['current_usage'] for h in history[:-6]] # Earlier data points
if baseline_usage:
baseline_avg = sum(baseline_usage) / len(baseline_usage)
current_usage = event.current_usage
spike_ratio = current_usage / baseline_avg if baseline_avg > 0 else 1
# Determine severity
if spike_ratio >= 3.0:
severity = 'high'
elif spike_ratio >= 2.0:
severity = 'medium'
else:
severity = 'low'
return {
'severity': severity,
'spike_ratio': spike_ratio,
'baseline_avg': baseline_avg,
'current_usage': current_usage,
'pattern': 'analyzed'
}
return {'severity': 'unknown', 'pattern': 'insufficient_baseline'}
except Exception as e:
logging.error(f"Error analyzing usage spike: {str(e)}")
return {'severity': 'unknown', 'pattern': 'error', 'error': str(e)}
async def coordinate_cross_service_quotas(self, event: QuotaEvent, scaling_factor: float) -> Dict:
"""Coordinate quota increases across related services"""
try:
# Define service dependencies
service_dependencies = {
'ec2': ['vpc', 'ebs', 'elasticloadbalancing'],
'lambda': ['logs', 'iam'],
'rds': ['vpc', 'kms'],
'ecs': ['ec2', 'elasticloadbalancing', 'logs']
}
dependent_services = service_dependencies.get(event.service_code, [])
coordination_results = []
for dependent_service in dependent_services:
try:
# Get related quotas for dependent service
related_quotas = await self.get_related_quotas(dependent_service, event.region)
for quota in related_quotas:
# Check if quota needs adjustment
if quota['utilization_percentage'] > 60: # Proactive threshold
increase_result = await self.increase_related_quota(
dependent_service, quota, scaling_factor
)
coordination_results.append(increase_result)
except Exception as e:
logging.error(f"Error coordinating {dependent_service}: {str(e)}")
coordination_results.append({
'service': dependent_service,
'status': 'error',
'error': str(e)
})
return {
'action': 'cross_service_coordination',
'dependent_services': dependent_services,
'results': coordination_results,
'status': 'completed'
}
except Exception as e:
logging.error(f"Error in cross-service coordination: {str(e)}")
return {'action': 'cross_service_coordination', 'status': 'error', 'error': str(e)}
async def log_quota_event(self, event: QuotaEvent):
"""Log quota event to DynamoDB"""
try:
event_item = {
'event_id': f"{event.service_code}#{event.quota_code}#{event.region}#{int(event.timestamp.timestamp())}",
'timestamp': int(event.timestamp.timestamp()),
'event_type': event.event_type,
'service_code': event.service_code,
'quota_code': event.quota_code,
'region': event.region,
'account_id': event.account_id,
'current_usage': event.current_usage,
'quota_value': event.quota_value,
'utilization_percentage': event.utilization_percentage,
'metadata': json.dumps(event.metadata or {}),
'ttl': int((datetime.utcnow() + timedelta(days=90)).timestamp())
}
self.events_table.put_item(Item=event_item)
except Exception as e:
logging.error(f"Error logging quota event: {str(e)}")
# Lambda function handler for EventBridge integration
def lambda_handler(event, context):
"""Lambda handler for processing EventBridge quota events"""
try:
config = {
'events_table_name': os.environ['EVENTS_TABLE_NAME'],
'automation_state_table_name': os.environ['AUTOMATION_STATE_TABLE_NAME'],
'infrastructure_scaling': {
'enabled': os.environ.get('INFRASTRUCTURE_SCALING_ENABLED', 'false').lower() == 'true'
}
}
automation = EventDrivenQuotaAutomation(config)
# Parse EventBridge event
quota_event = QuotaEvent(
event_type=event['detail']['event_type'],
service_code=event['detail']['service_code'],
quota_code=event['detail']['quota_code'],
region=event['detail']['region'],
account_id=event['detail']['account_id'],
current_usage=float(event['detail']['current_usage']),
quota_value=float(event['detail']['quota_value']),
utilization_percentage=float(event['detail']['utilization_percentage']),
timestamp=datetime.fromisoformat(event['detail']['timestamp']),
metadata=event['detail'].get('metadata', {})
)
# Process event
result = asyncio.run(automation.process_quota_event(quota_event))
return {
'statusCode': 200,
'body': json.dumps(result)
}
except Exception as e:
logging.error(f"Error in lambda handler: {str(e)}")
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
# Usage example
async def main():
config = {
'events_table_name': 'quota-events',
'automation_state_table_name': 'quota-automation-state',
'infrastructure_scaling': {
'enabled': True,
'ec2_asg_names': ['web-tier-asg', 'app-tier-asg'],
'lambda_functions': ['data-processor', 'api-handler']
}
}
automation = EventDrivenQuotaAutomation(config)
# Example event
test_event = QuotaEvent(
event_type='quota_threshold_exceeded',
service_code='ec2',
quota_code='L-1216C47A',
region='us-east-1',
account_id='123456789012',
current_usage=85.0,
quota_value=100.0,
utilization_percentage=85.0,
timestamp=datetime.utcnow(),
metadata={'alert_level': 'warning'}
)
result = await automation.process_quota_event(test_event)
print(f"Event processing result: {json.dumps(result, indent=2)}")
if __name__ == "__main__":
asyncio.run(main())Example 3: Terraform Infrastructure for Automated Quota Management
View code
# Terraform configuration for automated quota management infrastructure
terraform {
required_version = ">= 1.0"
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 5.0"
}
}
}
variable "environment" {
description = "Environment name"
type = string
default = "production"
}
variable "notification_email" {
description = "Email for quota notifications"
type = string
}
variable "automation_level" {
description = "Level of automation (monitor, alert, auto_request, auto_approve, full_auto)"
type = string
default = "auto_request"
validation {
condition = contains([
"monitor", "alert", "auto_request", "auto_approve", "full_auto"
], var.automation_level)
error_message = "Automation level must be one of: monitor, alert, auto_request, auto_approve, full_auto."
}
}
variable "cost_threshold" {
description = "Cost threshold for automated approvals"
type = number
default = 1000
}
# Data sources
data "aws_caller_identity" "current" {}
data "aws_region" "current" {}
# DynamoDB Tables
resource "aws_dynamodb_table" "quota_monitoring" {
name = "${var.environment}-quota-monitoring"
billing_mode = "PAY_PER_REQUEST"
hash_key = "quota_id"
range_key = "timestamp"
attribute {
name = "quota_id"
type = "S"
}
attribute {
name = "timestamp"
type = "N"
}
attribute {
name = "service_code"
type = "S"
}
attribute {
name = "utilization_percentage"
type = "N"
}
global_secondary_index {
name = "service-utilization-index"
hash_key = "service_code"
range_key = "utilization_percentage"
projection_type = "ALL"
}
ttl {
attribute_name = "ttl"
enabled = true
}
stream_enabled = true
stream_view_type = "NEW_AND_OLD_IMAGES"
point_in_time_recovery {
enabled = true
}
tags = {
Environment = var.environment
Purpose = "QuotaMonitoring"
}
}
resource "aws_dynamodb_table" "quota_automation_rules" {
name = "${var.environment}-quota-automation-rules"
billing_mode = "PAY_PER_REQUEST"
hash_key = "rule_id"
attribute {
name = "rule_id"
type = "S"
}
attribute {
name = "service_code"
type = "S"
}
attribute {
name = "automation_level"
type = "S"
}
global_secondary_index {
name = "service-automation-index"
hash_key = "service_code"
range_key = "automation_level"
projection_type = "ALL"
}
tags = {
Environment = var.environment
Purpose = "QuotaAutomationRules"
}
}
resource "aws_dynamodb_table" "quota_automation_log" {
name = "${var.environment}-quota-automation-log"
billing_mode = "PAY_PER_REQUEST"
hash_key = "log_id"
range_key = "timestamp"
attribute {
name = "log_id"
type = "S"
}
attribute {
name = "timestamp"
type = "N"
}
attribute {
name = "action_taken"
type = "S"
}
global_secondary_index {
name = "action-timestamp-index"
hash_key = "action_taken"
range_key = "timestamp"
projection_type = "ALL"
}
ttl {
attribute_name = "ttl"
enabled = true
}
tags = {
Environment = var.environment
Purpose = "QuotaAutomationLog"
}
}
resource "aws_dynamodb_table" "quota_events" {
name = "${var.environment}-quota-events"
billing_mode = "PAY_PER_REQUEST"
hash_key = "event_id"
range_key = "timestamp"
attribute {
name = "event_id"
type = "S"
}
attribute {
name = "timestamp"
type = "N"
}
attribute {
name = "event_type"
type = "S"
}
global_secondary_index {
name = "event-type-timestamp-index"
hash_key = "event_type"
range_key = "timestamp"
projection_type = "ALL"
}
ttl {
attribute_name = "ttl"
enabled = true
}
stream_enabled = true
stream_view_type = "NEW_AND_OLD_IMAGES"
tags = {
Environment = var.environment
Purpose = "QuotaEvents"
}
}
# SNS Topics
resource "aws_sns_topic" "quota_alerts" {
name = "${var.environment}-quota-alerts"
tags = {
Environment = var.environment
Purpose = "QuotaAlerts"
}
}
resource "aws_sns_topic_subscription" "quota_alerts_email" {
topic_arn = aws_sns_topic.quota_alerts.arn
protocol = "email"
endpoint = var.notification_email
}
resource "aws_sns_topic" "quota_automation_events" {
name = "${var.environment}-quota-automation-events"
tags = {
Environment = var.environment
Purpose = "QuotaAutomationEvents"
}
}
# EventBridge Custom Bus
resource "aws_cloudwatch_event_bus" "quota_automation" {
name = "${var.environment}-quota-automation"
tags = {
Environment = var.environment
Purpose = "QuotaAutomation"
}
}
# EventBridge Rules
resource "aws_cloudwatch_event_rule" "quota_threshold_exceeded" {
name = "${var.environment}-quota-threshold-exceeded"
event_bus_name = aws_cloudwatch_event_bus.quota_automation.name
event_pattern = jsonencode({
source = ["quota.automation"]
detail-type = ["Quota Threshold Exceeded"]
detail = {
utilization_percentage = [{
numeric = [">", 70]
}]
}
})
tags = {
Environment = var.environment
Purpose = "QuotaThresholdMonitoring"
}
}
resource "aws_cloudwatch_event_rule" "quota_usage_spike" {
name = "${var.environment}-quota-usage-spike"
event_bus_name = aws_cloudwatch_event_bus.quota_automation.name
event_pattern = jsonencode({
source = ["quota.automation"]
detail-type = ["Quota Usage Spike"]
})
tags = {
Environment = var.environment
Purpose = "QuotaUsageSpike"
}
}
# IAM Roles
resource "aws_iam_role" "quota_automation_role" {
name = "${var.environment}-quota-automation-role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {
Service = [
"lambda.amazonaws.com",
"states.amazonaws.com",
"events.amazonaws.com"
]
}
}
]
})
tags = {
Environment = var.environment
Purpose = "QuotaAutomation"
}
}
resource "aws_iam_role_policy" "quota_automation_policy" {
name = "${var.environment}-quota-automation-policy"
role = aws_iam_role.quota_automation_role.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = [
"service-quotas:*",
"support:*",
"cloudwatch:*",
"logs:*"
]
Resource = "*"
},
{
Effect = "Allow"
Action = [
"dynamodb:GetItem",
"dynamodb:PutItem",
"dynamodb:UpdateItem",
"dynamodb:DeleteItem",
"dynamodb:Query",
"dynamodb:Scan"
]
Resource = [
aws_dynamodb_table.quota_monitoring.arn,
aws_dynamodb_table.quota_automation_rules.arn,
aws_dynamodb_table.quota_automation_log.arn,
aws_dynamodb_table.quota_events.arn,
"${aws_dynamodb_table.quota_monitoring.arn}/index/*",
"${aws_dynamodb_table.quota_automation_rules.arn}/index/*",
"${aws_dynamodb_table.quota_automation_log.arn}/index/*",
"${aws_dynamodb_table.quota_events.arn}/index/*"
]
},
{
Effect = "Allow"
Action = [
"sns:Publish"
]
Resource = [
aws_sns_topic.quota_alerts.arn,
aws_sns_topic.quota_automation_events.arn
]
},
{
Effect = "Allow"
Action = [
"events:PutEvents"
]
Resource = aws_cloudwatch_event_bus.quota_automation.arn
},
{
Effect = "Allow"
Action = [
"states:StartExecution"
]
Resource = aws_sfn_state_machine.quota_approval_workflow.arn
},
{
Effect = "Allow"
Action = [
"lambda:InvokeFunction"
]
Resource = [
aws_lambda_function.quota_automation_engine.arn,
aws_lambda_function.quota_event_processor.arn
]
}
]
})
}
resource "aws_iam_role_policy_attachment" "quota_automation_basic" {
role = aws_iam_role.quota_automation_role.name
policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
}
# Lambda Functions
resource "aws_lambda_function" "quota_automation_engine" {
filename = "quota_automation_engine.zip"
function_name = "${var.environment}-quota-automation-engine"
role = aws_iam_role.quota_automation_role.arn
handler = "index.lambda_handler"
runtime = "python3.9"
timeout = 900
memory_size = 1024
environment {
variables = {
ENVIRONMENT = var.environment
QUOTA_TABLE_NAME = aws_dynamodb_table.quota_monitoring.name
RULES_TABLE_NAME = aws_dynamodb_table.quota_automation_rules.name
AUTOMATION_LOG_TABLE_NAME = aws_dynamodb_table.quota_automation_log.name
ALERT_TOPIC_ARN = aws_sns_topic.quota_alerts.arn
AUTOMATION_EVENTS_TOPIC_ARN = aws_sns_topic.quota_automation_events.arn
EVENT_BUS_NAME = aws_cloudwatch_event_bus.quota_automation.name
APPROVAL_WORKFLOW_ARN = aws_sfn_state_machine.quota_approval_workflow.arn
AUTOMATION_LEVEL = var.automation_level
COST_THRESHOLD = var.cost_threshold
}
}
tags = {
Environment = var.environment
Purpose = "QuotaAutomationEngine"
}
}
resource "aws_lambda_function" "quota_event_processor" {
filename = "quota_event_processor.zip"
function_name = "${var.environment}-quota-event-processor"
role = aws_iam_role.quota_automation_role.arn
handler = "index.lambda_handler"
runtime = "python3.9"
timeout = 300
memory_size = 512
environment {
variables = {
ENVIRONMENT = var.environment
EVENTS_TABLE_NAME = aws_dynamodb_table.quota_events.name
AUTOMATION_STATE_TABLE_NAME = aws_dynamodb_table.quota_automation_rules.name
ALERT_TOPIC_ARN = aws_sns_topic.quota_alerts.arn
EVENT_BUS_NAME = aws_cloudwatch_event_bus.quota_automation.name
}
}
tags = {
Environment = var.environment
Purpose = "QuotaEventProcessor"
}
}
# EventBridge Targets
resource "aws_cloudwatch_event_target" "quota_threshold_target" {
rule = aws_cloudwatch_event_rule.quota_threshold_exceeded.name
event_bus_name = aws_cloudwatch_event_bus.quota_automation.name
target_id = "QuotaThresholdTarget"
arn = aws_lambda_function.quota_event_processor.arn
}
resource "aws_cloudwatch_event_target" "quota_spike_target" {
rule = aws_cloudwatch_event_rule.quota_usage_spike.name
event_bus_name = aws_cloudwatch_event_bus.quota_automation.name
target_id = "QuotaSpikeTarget"
arn = aws_lambda_function.quota_event_processor.arn
}
# Lambda Permissions
resource "aws_lambda_permission" "allow_eventbridge_threshold" {
statement_id = "AllowExecutionFromEventBridgeThreshold"
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.quota_event_processor.function_name
principal = "events.amazonaws.com"
source_arn = aws_cloudwatch_event_rule.quota_threshold_exceeded.arn
}
resource "aws_lambda_permission" "allow_eventbridge_spike" {
statement_id = "AllowExecutionFromEventBridgeSpike"
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.quota_event_processor.function_name
principal = "events.amazonaws.com"
source_arn = aws_cloudwatch_event_rule.quota_usage_spike.arn
}
# Step Functions State Machine for Approval Workflow
resource "aws_sfn_state_machine" "quota_approval_workflow" {
name = "${var.environment}-quota-approval-workflow"
role_arn = aws_iam_role.quota_automation_role.arn
definition = jsonencode({
Comment = "Quota increase approval workflow"
StartAt = "EvaluateRequest"
States = {
EvaluateRequest = {
Type = "Task"
Resource = aws_lambda_function.quota_automation_engine.arn
Parameters = {
"action": "evaluate_request",
"input.$": "$"
}
Next = "CheckApprovalRequired"
}
CheckApprovalRequired = {
Type = "Choice"
Choices = [
{
Variable = "$.approval_required"
BooleanEquals = true
Next = "SendApprovalRequest"
}
]
Default = "AutoApprove"
}
SendApprovalRequest = {
Type = "Task"
Resource = "arn:aws:states:::sns:publish"
Parameters = {
TopicArn = aws_sns_topic.quota_alerts.arn
Subject = "Quota Increase Approval Required"
Message.$= "$.approval_message"
}
Next = "WaitForApproval"
}
WaitForApproval = {
Type = "Wait"
Seconds = 3600
Next = "CheckApprovalStatus"
}
CheckApprovalStatus = {
Type = "Task"
Resource = aws_lambda_function.quota_automation_engine.arn
Parameters = {
"action": "check_approval_status",
"input.$": "$"
}
Next = "ApprovalDecision"
}
ApprovalDecision = {
Type = "Choice"
Choices = [
{
Variable = "$.approved"
BooleanEquals = true
Next = "ExecuteIncrease"
}
]
Default = "ApprovalDenied"
}
AutoApprove = {
Type = "Pass"
Result = {
"approved": true,
"approval_method": "automatic"
}
Next = "ExecuteIncrease"
}
ExecuteIncrease = {
Type = "Task"
Resource = aws_lambda_function.quota_automation_engine.arn
Parameters = {
"action": "execute_quota_increase",
"input.$": "$"
}
Next = "NotifySuccess"
}
NotifySuccess = {
Type = "Task"
Resource = "arn:aws:states:::sns:publish"
Parameters = {
TopicArn = aws_sns_topic.quota_automation_events.arn
Subject = "Quota Increase Completed"
Message.$= "$.success_message"
}
End = true
}
ApprovalDenied = {
Type = "Task"
Resource = "arn:aws:states:::sns:publish"
Parameters = {
TopicArn = aws_sns_topic.quota_automation_events.arn
Subject = "Quota Increase Denied"
Message.$= "$.denial_message"
}
End = true
}
}
})
tags = {
Environment = var.environment
Purpose = "QuotaApprovalWorkflow"
}
}
# EventBridge Scheduler for Regular Automation Runs
resource "aws_cloudwatch_event_rule" "quota_automation_schedule" {
name = "${var.environment}-quota-automation-schedule"
description = "Trigger quota automation engine"
schedule_expression = "rate(5 minutes)"
tags = {
Environment = var.environment
Purpose = "QuotaAutomationSchedule"
}
}
resource "aws_cloudwatch_event_target" "quota_automation_target" {
rule = aws_cloudwatch_event_rule.quota_automation_schedule.name
target_id = "QuotaAutomationTarget"
arn = aws_lambda_function.quota_automation_engine.arn
}
resource "aws_lambda_permission" "allow_eventbridge_schedule" {
statement_id = "AllowExecutionFromEventBridge"
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.quota_automation_engine.function_name
principal = "events.amazonaws.com"
source_arn = aws_cloudwatch_event_rule.quota_automation_schedule.arn
}
# CloudWatch Dashboard
resource "aws_cloudwatch_dashboard" "quota_automation" {
dashboard_name = "${var.environment}-quota-automation"
dashboard_body = jsonencode({
widgets = [
{
type = "metric"
x = 0
y = 0
width = 12
height = 6
properties = {
metrics = [
["AWS/Lambda", "Duration", "FunctionName", aws_lambda_function.quota_automation_engine.function_name],
[".", "Errors", ".", "."],
[".", "Invocations", ".", "."]
]
view = "timeSeries"
stacked = false
region = data.aws_region.current.name
title = "Quota Automation Engine Metrics"
period = 300
}
},
{
type = "metric"
x = 12
y = 0
width = 12
height = 6
properties = {
metrics = [
["AWS/Events", "MatchedEvents", "RuleName", aws_cloudwatch_event_rule.quota_threshold_exceeded.name],
[".", ".", ".", aws_cloudwatch_event_rule.quota_usage_spike.name]
]
view = "timeSeries"
stacked = false
region = data.aws_region.current.name
title = "Quota Events Processed"
period = 300
}
},
{
type = "log"
x = 0
y = 6
width = 24
height = 6
properties = {
query = "SOURCE '/aws/lambda/${aws_lambda_function.quota_automation_engine.function_name}' | fields @timestamp, @message | filter @message like /ERROR/ | sort @timestamp desc | limit 20"
region = data.aws_region.current.name
title = "Recent Automation Errors"
view = "table"
}
}
]
})
}
# CloudWatch Alarms
resource "aws_cloudwatch_metric_alarm" "quota_automation_errors" {
alarm_name = "${var.environment}-quota-automation-errors"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = "2"
metric_name = "Errors"
namespace = "AWS/Lambda"
period = "300"
statistic = "Sum"
threshold = "5"
alarm_description = "This metric monitors quota automation errors"
alarm_actions = [aws_sns_topic.quota_alerts.arn]
dimensions = {
FunctionName = aws_lambda_function.quota_automation_engine.function_name
}
tags = {
Environment = var.environment
Purpose = "QuotaAutomationMonitoring"
}
}
# Outputs
output "quota_monitoring_table_name" {
description = "Name of the quota monitoring DynamoDB table"
value = aws_dynamodb_table.quota_monitoring.name
}
output "quota_automation_rules_table_name" {
description = "Name of the quota automation rules DynamoDB table"
value = aws_dynamodb_table.quota_automation_rules.name
}
output "quota_alerts_topic_arn" {
description = "ARN of the quota alerts SNS topic"
value = aws_sns_topic.quota_alerts.arn
}
output "quota_automation_engine_function_name" {
description = "Name of the quota automation engine Lambda function"
value = aws_lambda_function.quota_automation_engine.function_name
}
output "quota_event_bus_name" {
description = "Name of the quota automation EventBridge bus"
value = aws_cloudwatch_event_bus.quota_automation.name
}
output "quota_approval_workflow_arn" {
description = "ARN of the quota approval Step Functions workflow"
value = aws_sfn_state_machine.quota_approval_workflow.arn
}
output "dashboard_url" {
description = "URL of the quota automation CloudWatch dashboard"
value = "https://${data.aws_region.current.name}.console.aws.amazon.com/cloudwatch/home?region=${data.aws_region.current.name}#dashboards:name=${aws_cloudwatch_dashboard.quota_automation.dashboard_name}"
}Example 4: CI/CD Integration for Quota-Aware Deployments
{% raw %}
View code
# GitHub Actions workflow for quota-aware deployments
name: Quota-Aware Deployment Pipeline
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
env:
AWS_REGION: us-east-1
ENVIRONMENT: ${{ github.ref == 'refs/heads/main' && 'production' || 'staging' }}
jobs:
quota-validation:
name: Validate Quota Requirements
runs-on: ubuntu-latest
outputs:
quota-check-passed: ${{ steps.quota-validation.outputs.passed }}
required-quotas: ${{ steps.quota-validation.outputs.required-quotas }}
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v2
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: ${{ env.AWS_REGION }}
- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: '3.9'
- name: Install dependencies
run: |
pip install boto3 pyyaml jinja2
- name: Analyze infrastructure requirements
id: quota-validation
run: |
python .github/scripts/quota_validator.py \
--environment ${{ env.ENVIRONMENT }} \
--infrastructure-config infrastructure/config.yaml \
--output-format github-actions
- name: Upload quota analysis
uses: actions/upload-artifact@v3
with:
name: quota-analysis-${{ env.ENVIRONMENT }}
path: quota-analysis.json
quota-preemptive-increase:
name: Preemptive Quota Increases
runs-on: ubuntu-latest
needs: quota-validation
if: needs.quota-validation.outputs.quota-check-passed == 'false'
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v2
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: ${{ env.AWS_REGION }}
- name: Download quota analysis
uses: actions/download-artifact@v3
with:
name: quota-analysis-${{ env.ENVIRONMENT }}
- name: Request quota increases
run: |
python .github/scripts/quota_increaser.py \
--analysis-file quota-analysis.json \
--environment ${{ env.ENVIRONMENT }} \
--auto-approve-threshold 1000
- name: Wait for quota increases
run: |
python .github/scripts/quota_waiter.py \
--analysis-file quota-analysis.json \
--max-wait-time 1800 \
--check-interval 60
deploy:
name: Deploy Infrastructure
runs-on: ubuntu-latest
needs: [quota-validation, quota-preemptive-increase]
if: always() && (needs.quota-validation.outputs.quota-check-passed == 'true' || needs.quota-preemptive-increase.result == 'success')
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v2
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: ${{ env.AWS_REGION }}
- name: Setup Terraform
uses: hashicorp/setup-terraform@v2
with:
terraform_version: 1.5.0
- name: Terraform Init
run: terraform init
working-directory: infrastructure
- name: Terraform Plan with Quota Validation
run: |
terraform plan \
-var="environment=${{ env.ENVIRONMENT }}" \
-var="enable_quota_validation=true" \
-out=tfplan
working-directory: infrastructure
- name: Terraform Apply
run: terraform apply tfplan
working-directory: infrastructure
- name: Post-deployment quota monitoring
run: |
python .github/scripts/post_deployment_monitor.py \
--environment ${{ env.ENVIRONMENT }} \
--deployment-id ${{ github.run_id }}
quota-optimization:
name: Post-Deployment Quota Optimization
runs-on: ubuntu-latest
needs: deploy
if: success()
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v2
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: ${{ env.AWS_REGION }}
- name: Analyze actual resource usage
run: |
python .github/scripts/usage_analyzer.py \
--environment ${{ env.ENVIRONMENT }} \
--analysis-period 24h
- name: Optimize quota allocations
run: |
python .github/scripts/quota_optimizer.py \
--environment ${{ env.ENVIRONMENT }} \
--optimization-strategy cost-aware{% endraw %}
View code
# .github/scripts/quota_validator.py
#!/usr/bin/env python3
import boto3
import yaml
import json
import argparse
import sys
from typing import Dict, List, Tuple
from dataclasses import dataclass
@dataclass
class QuotaRequirement:
service_code: str
quota_code: str
required_value: float
current_value: float
buffer_percentage: float = 20.0
class QuotaValidator:
def __init__(self, region: str):
self.region = region
self.service_quotas = boto3.client('service-quotas', region_name=region)
self.cloudformation = boto3.client('cloudformation', region_name=region)
def analyze_infrastructure_config(self, config_path: str) -> List[QuotaRequirement]:
"""Analyze infrastructure configuration to determine quota requirements"""
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
requirements = []
# Analyze EC2 requirements
if 'ec2' in config:
ec2_config = config['ec2']
total_instances = sum(
asg.get('desired_capacity', 0) for asg in ec2_config.get('auto_scaling_groups', [])
)
if total_instances > 0:
current_quota = self.get_current_quota('ec2', 'L-1216C47A') # Running On-Demand instances
requirements.append(QuotaRequirement(
service_code='ec2',
quota_code='L-1216C47A',
required_value=total_instances,
current_value=current_quota
))
# Analyze Lambda requirements
if 'lambda' in config:
lambda_config = config['lambda']
total_concurrent = sum(
func.get('reserved_concurrency', 0) for func in lambda_config.get('functions', [])
)
if total_concurrent > 0:
current_quota = self.get_current_quota('lambda', 'L-B99A9384') # Concurrent executions
requirements.append(QuotaRequirement(
service_code='lambda',
quota_code='L-B99A9384',
required_value=total_concurrent,
current_value=current_quota
))
# Analyze RDS requirements
if 'rds' in config:
rds_config = config['rds']
total_instances = len(rds_config.get('instances', []))
if total_instances > 0:
current_quota = self.get_current_quota('rds', 'L-7B6409FD') # DB instances
requirements.append(QuotaRequirement(
service_code='rds',
quota_code='L-7B6409FD',
required_value=total_instances,
current_value=current_quota
))
return requirements
def get_current_quota(self, service_code: str, quota_code: str) -> float:
"""Get current quota value"""
try:
response = self.service_quotas.get_service_quota(
ServiceCode=service_code,
QuotaCode=quota_code
)
return response['Quota']['Value']
except Exception as e:
print(f"Warning: Could not get quota for {service_code}/{quota_code}: {e}")
return 0.0
def validate_quotas(self, requirements: List[QuotaRequirement]) -> Tuple[bool, List[Dict]]:
"""Validate if current quotas are sufficient"""
validation_results = []
all_passed = True
for req in requirements:
required_with_buffer = req.required_value * (1 + req.buffer_percentage / 100)
sufficient = req.current_value >= required_with_buffer
if not sufficient:
all_passed = False
validation_results.append({
'service_code': req.service_code,
'quota_code': req.quota_code,
'required_value': req.required_value,
'required_with_buffer': required_with_buffer,
'current_value': req.current_value,
'sufficient': sufficient,
'shortfall': max(0, required_with_buffer - req.current_value)
})
return all_passed, validation_results
def main():
parser = argparse.ArgumentParser(description='Validate quota requirements for deployment')
parser.add_argument('--environment', required=True, help='Environment name')
parser.add_argument('--infrastructure-config', required=True, help='Infrastructure configuration file')
parser.add_argument('--output-format', choices=['json', 'github-actions'], default='json')
args = parser.parse_args()
validator = QuotaValidator('us-east-1') # Could be parameterized
# Analyze requirements
requirements = validator.analyze_infrastructure_config(args.infrastructure_config)
# Validate quotas
passed, results = validator.validate_quotas(requirements)
# Output results
analysis = {
'environment': args.environment,
'validation_passed': passed,
'requirements': results,
'timestamp': datetime.utcnow().isoformat()
}
if args.output_format == 'github-actions':
print(f"::set-output name=passed::{str(passed).lower()}")
print(f"::set-output name=required-quotas::{json.dumps(results)}")
if not passed:
print("::warning::Quota validation failed - some quotas need to be increased")
for result in results:
if not result['sufficient']:
print(f"::warning::Insufficient quota for {result['service_code']}/{result['quota_code']}: need {result['required_with_buffer']}, have {result['current_value']}")
# Save analysis file
with open('quota-analysis.json', 'w') as f:
json.dump(analysis, f, indent=2)
if not passed:
sys.exit(1)
if __name__ == '__main__':
main()View code
# .github/scripts/quota_increaser.py
#!/usr/bin/env python3
import boto3
import json
import argparse
import time
from typing import Dict, List
class QuotaIncreaser:
def __init__(self, region: str):
self.region = region
self.service_quotas = boto3.client('service-quotas', region_name=region)
self.support = boto3.client('support', region_name=region)
def process_quota_increases(self, analysis_file: str, auto_approve_threshold: float) -> List[Dict]:
"""Process quota increase requests based on analysis"""
with open(analysis_file, 'r') as f:
analysis = json.load(f)
results = []
for requirement in analysis['requirements']:
if not requirement['sufficient']:
result = self.request_quota_increase(
requirement, auto_approve_threshold
)
results.append(result)
return results
def request_quota_increase(self, requirement: Dict, auto_approve_threshold: float) -> Dict:
"""Request quota increase for a specific requirement"""
service_code = requirement['service_code']
quota_code = requirement['quota_code']
new_value = requirement['required_with_buffer']
try:
# Try Service Quotas API first
response = self.service_quotas.request_service_quota_increase(
ServiceCode=service_code,
QuotaCode=quota_code,
DesiredValue=new_value
)
return {
'service_code': service_code,
'quota_code': quota_code,
'requested_value': new_value,
'method': 'service_quotas_api',
'request_id': response['RequestedQuota']['Id'],
'status': 'submitted',
'estimated_cost': self.estimate_cost_impact(requirement)
}
except Exception as e:
# Fall back to support case if cost is below threshold
estimated_cost = self.estimate_cost_impact(requirement)
if estimated_cost <= auto_approve_threshold:
case_id = self.create_support_case(requirement)
return {
'service_code': service_code,
'quota_code': quota_code,
'requested_value': new_value,
'method': 'support_case',
'case_id': case_id,
'status': 'support_case_created',
'estimated_cost': estimated_cost,
'api_error': str(e)
}
else:
return {
'service_code': service_code,
'quota_code': quota_code,
'requested_value': new_value,
'method': 'manual_approval_required',
'status': 'requires_manual_approval',
'estimated_cost': estimated_cost,
'reason': f'Cost ${estimated_cost} exceeds auto-approval threshold ${auto_approve_threshold}'
}
def estimate_cost_impact(self, requirement: Dict) -> float:
"""Estimate cost impact of quota increase"""
# Simplified cost estimation
service_costs = {
'ec2': 0.10, # per instance hour
'lambda': 0.0000002, # per request
'rds': 0.20, # per instance hour
}
service_code = requirement['service_code']
shortfall = requirement['shortfall']
base_cost = service_costs.get(service_code, 0.01)
monthly_cost = shortfall * base_cost * 24 * 30
return monthly_cost
def create_support_case(self, requirement: Dict) -> str:
"""Create support case for quota increase"""
service_code = requirement['service_code']
quota_code = requirement['quota_code']
new_value = requirement['required_with_buffer']
case_body = f"""
Automated Quota Increase Request from CI/CD Pipeline
Service: {service_code}
Quota Code: {quota_code}
Current Limit: {requirement['current_value']}
Requested Limit: {new_value}
Required for Deployment: {requirement['required_value']}
This request was automatically generated during our deployment pipeline
to ensure adequate capacity for infrastructure deployment.
Business Justification:
- Required for automated deployment pipeline
- Prevents deployment failures due to quota constraints
- Supports infrastructure scaling requirements
- Part of automated quota management strategy
Please process this request with high priority to avoid deployment delays.
""".strip()
response = self.support.create_case(
subject=f"CI/CD Quota Increase: {service_code} - {quota_code}",
serviceCode='service-limit-increase',
severityCode='normal',
categoryCode='service-limit-increase',
communicationBody=case_body,
language='en'
)
return response['caseId']
def main():
parser = argparse.ArgumentParser(description='Request quota increases based on analysis')
parser.add_argument('--analysis-file', required=True, help='Quota analysis JSON file')
parser.add_argument('--environment', required=True, help='Environment name')
parser.add_argument('--auto-approve-threshold', type=float, default=1000.0, help='Auto-approval cost threshold')
args = parser.parse_args()
increaser = QuotaIncreaser('us-east-1')
results = increaser.process_quota_increases(args.analysis_file, args.auto_approve_threshold)
print(f"Processed {len(results)} quota increase requests:")
for result in results:
print(f"- {result['service_code']}/{result['quota_code']}: {result['status']}")
# Save results for next step
with open('quota-increase-results.json', 'w') as f:
json.dump(results, f, indent=2)
if __name__ == '__main__':
main()AWS Services Used
- AWS Service Quotas: Core service for automated quota monitoring and management
- Amazon EventBridge: Event-driven automation and workflow orchestration
- AWS Lambda: Serverless execution of automation logic and event processing
- AWS Step Functions: Complex workflow orchestration for approval processes
- Amazon DynamoDB: Storage for automation rules, events, and audit trails
- Amazon SNS: Notification system for alerts and automation events
- Amazon CloudWatch: Metrics, monitoring, and automated alerting
- AWS Support API: Automated support case creation for quota increases
- AWS Systems Manager: Parameter storage and configuration management
- Amazon S3: Storage for ML models and automation artifacts
- AWS IAM: Fine-grained access control for automation components
- AWS CloudFormation/Terraform: Infrastructure as code with quota awareness
Benefits
- Zero-Touch Operations: Fully automated quota management without manual intervention
- Predictive Management: ML-based prediction and proactive quota adjustments
- Event-Driven Response: Real-time response to quota events and threshold breaches
- Cost-Aware Automation: Intelligent cost consideration in automation decisions
- CI/CD Integration: Seamless integration with deployment pipelines and infrastructure automation
- Multi-Account Orchestration: Coordinated automation across complex AWS environments
- Audit and Compliance: Complete audit trails and governance for all automation actions
- Self-Healing Systems: Automatic recovery and optimization of quota allocations
- Business Hours Awareness: Configurable automation behavior based on business requirements
- Approval Workflows: Flexible approval processes for high-impact quota changes