REL01
REL01-BP04 - Monitor and manage quotas
One-Click Remediation
Deploy CloudFormation stacks to implement this best practice with a single click.
Service Quota Monitoring
CloudWatch alarms for critical service quotas with SNS notifications
Stacks deploy to your AWS account. Review parameters before creating. Standard AWS charges apply.
REL01-BP04: Monitor and manage quotas
Overview
Continuously monitor service quotas and usage patterns to proactively manage capacity and prevent service disruptions. Implement automated monitoring, alerting, and quota management processes to maintain optimal resource availability across your AWS environment.
Implementation Steps
1. Implement Comprehensive Quota Monitoring
- Deploy automated quota monitoring across all AWS services and regions
- Set up real-time usage tracking with configurable alert thresholds
- Create centralized quota dashboards for visibility and management
- Establish baseline usage patterns and growth trend analysis
2. Configure Proactive Alerting Systems
- Set up multi-tier alerting at 70%, 80%, and 90% quota utilization
- Implement escalation procedures for critical quota breaches
- Configure automated notifications to relevant teams and stakeholders
- Create runbooks for quota management response procedures
3. Automate Quota Increase Requests
- Implement automated quota increase request workflows
- Set up approval processes for quota modifications
- Create trend-based predictive quota management
- Establish emergency quota increase procedures
4. Establish Cross-Account and Cross-Region Coordination
- Implement centralized quota management for multi-account environments
- Set up cross-region quota monitoring and coordination
- Create quota sharing and pooling strategies where applicable
- Establish disaster recovery quota pre-warming procedures
5. Integrate with Infrastructure Automation
- Embed quota checks in CI/CD pipelines and infrastructure deployment
- Implement quota-aware resource provisioning
- Create automated quota validation for infrastructure changes
- Set up quota impact assessment for new deployments
6. Maintain Quota Governance and Optimization
- Establish quota review and optimization processes
- Implement cost-aware quota management strategies
- Create quota utilization reporting and analytics
- Maintain quota documentation and change management
Implementation Examples
Example 1: Advanced Quota Monitoring and Management System
View code
import boto3
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from dataclasses import dataclass
import asyncio
import aiohttp
@dataclass
class QuotaAlert:
service_code: str
quota_code: str
region: str
current_usage: float
quota_value: float
utilization_percentage: float
alert_level: str
timestamp: datetime
class AdvancedQuotaMonitor:
def __init__(self, config: Dict):
self.config = config
self.service_quotas = boto3.client('service-quotas')
self.cloudwatch = boto3.client('cloudwatch')
self.sns = boto3.client('sns')
self.dynamodb = boto3.resource('dynamodb')
self.quota_table = self.dynamodb.Table(config['quota_table_name'])
self.alert_thresholds = config.get('alert_thresholds', [70, 80, 90])
async def monitor_all_quotas(self) -> List[QuotaAlert]:
"""Monitor quotas across all services and regions"""
alerts = []
# Get all AWS regions
ec2 = boto3.client('ec2')
regions = [region['RegionName'] for region in ec2.describe_regions()['Regions']]
# Monitor quotas in parallel across regions
tasks = []
for region in regions:
task = self.monitor_region_quotas(region)
tasks.append(task)
region_results = await asyncio.gather(*tasks, return_exceptions=True)
for result in region_results:
if isinstance(result, list):
alerts.extend(result)
return alerts
async def monitor_region_quotas(self, region: str) -> List[QuotaAlert]:
"""Monitor quotas for a specific region"""
alerts = []
try:
# Create region-specific clients
regional_quotas = boto3.client('service-quotas', region_name=region)
regional_cloudwatch = boto3.client('cloudwatch', region_name=region)
# Get all services with quotas
services = regional_quotas.list_services()['Services']
for service in services:
service_code = service['ServiceCode']
service_alerts = await self.monitor_service_quotas(
regional_quotas, regional_cloudwatch, service_code, region
)
alerts.extend(service_alerts)
except Exception as e:
logging.error(f"Error monitoring region {region}: {str(e)}")
return alerts
async def monitor_service_quotas(self, quotas_client, cloudwatch_client,
service_code: str, region: str) -> List[QuotaAlert]:
"""Monitor quotas for a specific service"""
alerts = []
try:
# Get all quotas for the service
paginator = quotas_client.get_paginator('list_service_quotas')
for page in paginator.paginate(ServiceCode=service_code):
for quota in page['Quotas']:
quota_code = quota['QuotaCode']
quota_value = quota['Value']
# Get current usage
current_usage = await self.get_quota_usage(
cloudwatch_client, service_code, quota_code, region
)
if current_usage is not None:
utilization = (current_usage / quota_value) * 100
# Check if alert threshold is exceeded
for threshold in self.alert_thresholds:
if utilization >= threshold:
alert = QuotaAlert(
service_code=service_code,
quota_code=quota_code,
region=region,
current_usage=current_usage,
quota_value=quota_value,
utilization_percentage=utilization,
alert_level=self.get_alert_level(utilization),
timestamp=datetime.utcnow()
)
alerts.append(alert)
break
# Store quota data for trend analysis
await self.store_quota_data(quota, current_usage, region)
except Exception as e:
logging.error(f"Error monitoring service {service_code} in {region}: {str(e)}")
return alerts
async def get_quota_usage(self, cloudwatch_client, service_code: str,
quota_code: str, region: str) -> Optional[float]:
"""Get current usage for a quota using CloudWatch metrics"""
try:
# Map service codes to CloudWatch metrics
metric_mapping = self.get_metric_mapping(service_code, quota_code)
if not metric_mapping:
return None
response = cloudwatch_client.get_metric_statistics(
Namespace=metric_mapping['namespace'],
MetricName=metric_mapping['metric_name'],
Dimensions=metric_mapping.get('dimensions', []),
StartTime=datetime.utcnow() - timedelta(minutes=5),
EndTime=datetime.utcnow(),
Period=300,
Statistics=['Maximum']
)
if response['Datapoints']:
return max(dp['Maximum'] for dp in response['Datapoints'])
except Exception as e:
logging.error(f"Error getting usage for {service_code}/{quota_code}: {str(e)}")
return None
def get_metric_mapping(self, service_code: str, quota_code: str) -> Optional[Dict]:
"""Map service quotas to CloudWatch metrics"""
mappings = {
'ec2': {
'L-1216C47A': { # Running On-Demand instances
'namespace': 'AWS/EC2',
'metric_name': 'RunningInstances',
'dimensions': []
}
},
'lambda': {
'L-B99A9384': { # Concurrent executions
'namespace': 'AWS/Lambda',
'metric_name': 'ConcurrentExecutions',
'dimensions': []
}
},
'rds': {
'L-7B6409FD': { # DB instances
'namespace': 'AWS/RDS',
'metric_name': 'DatabaseConnections',
'dimensions': []
}
}
}
return mappings.get(service_code, {}).get(quota_code)
async def store_quota_data(self, quota: Dict, current_usage: float, region: str):
"""Store quota data for trend analysis"""
try:
item = {
'quota_id': f"{quota['ServiceCode']}#{quota['QuotaCode']}#{region}",
'timestamp': int(datetime.utcnow().timestamp()),
'service_code': quota['ServiceCode'],
'quota_code': quota['QuotaCode'],
'region': region,
'quota_name': quota['QuotaName'],
'quota_value': quota['Value'],
'current_usage': current_usage,
'utilization_percentage': (current_usage / quota['Value']) * 100,
'ttl': int((datetime.utcnow() + timedelta(days=90)).timestamp())
}
self.quota_table.put_item(Item=item)
except Exception as e:
logging.error(f"Error storing quota data: {str(e)}")
def get_alert_level(self, utilization: float) -> str:
"""Determine alert level based on utilization"""
if utilization >= 90:
return 'CRITICAL'
elif utilization >= 80:
return 'WARNING'
elif utilization >= 70:
return 'INFO'
return 'OK'
async def send_alerts(self, alerts: List[QuotaAlert]):
"""Send alerts for quota violations"""
for alert in alerts:
await self.send_alert(alert)
async def send_alert(self, alert: QuotaAlert):
"""Send individual alert"""
try:
message = {
'alert_level': alert.alert_level,
'service': alert.service_code,
'quota': alert.quota_code,
'region': alert.region,
'utilization': alert.utilization_percentage,
'current_usage': alert.current_usage,
'quota_value': alert.quota_value,
'timestamp': alert.timestamp.isoformat()
}
# Send SNS notification
self.sns.publish(
TopicArn=self.config['alert_topic_arn'],
Message=json.dumps(message),
Subject=f"Quota Alert: {alert.service_code} {alert.alert_level}"
)
# Send to CloudWatch as custom metric
self.cloudwatch.put_metric_data(
Namespace='QuotaMonitoring',
MetricData=[
{
'MetricName': 'QuotaUtilization',
'Dimensions': [
{'Name': 'Service', 'Value': alert.service_code},
{'Name': 'Region', 'Value': alert.region},
{'Name': 'AlertLevel', 'Value': alert.alert_level}
],
'Value': alert.utilization_percentage,
'Unit': 'Percent'
}
]
)
except Exception as e:
logging.error(f"Error sending alert: {str(e)}")
# Usage example
async def main():
config = {
'quota_table_name': 'quota-monitoring',
'alert_topic_arn': 'arn:aws:sns:us-east-1:123456789012:quota-alerts',
'alert_thresholds': [70, 80, 90]
}
monitor = AdvancedQuotaMonitor(config)
alerts = await monitor.monitor_all_quotas()
await monitor.send_alerts(alerts)
print(f"Processed {len(alerts)} quota alerts")
if __name__ == "__main__":
asyncio.run(main())Example 2: Automated Quota Increase Management System
View code
import boto3
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
import pandas as pd
import numpy as np
class RequestStatus(Enum):
PENDING = "PENDING"
APPROVED = "APPROVED"
DENIED = "DENIED"
CASE_OPENED = "CASE_OPENED"
CASE_RESOLVED = "CASE_RESOLVED"
@dataclass
class QuotaIncreaseRequest:
service_code: str
quota_code: str
region: str
current_value: float
requested_value: float
justification: str
priority: str
status: RequestStatus
case_id: Optional[str] = None
created_at: Optional[datetime] = None
class AutomatedQuotaManager:
def __init__(self, config: Dict):
self.config = config
self.service_quotas = boto3.client('service-quotas')
self.support = boto3.client('support')
self.dynamodb = boto3.resource('dynamodb')
self.requests_table = self.dynamodb.Table(config['requests_table_name'])
self.quota_table = self.dynamodb.Table(config['quota_table_name'])
def analyze_quota_trends(self, service_code: str, quota_code: str,
region: str, days: int = 30) -> Dict:
"""Analyze quota usage trends to predict future needs"""
try:
# Query historical data
response = self.quota_table.query(
KeyConditionExpression='quota_id = :quota_id',
FilterExpression='#ts >= :start_time',
ExpressionAttributeNames={'#ts': 'timestamp'},
ExpressionAttributeValues={
':quota_id': f"{service_code}#{quota_code}#{region}",
':start_time': int((datetime.utcnow() - timedelta(days=days)).timestamp())
}
)
if not response['Items']:
return {'trend': 'insufficient_data', 'prediction': None}
# Convert to DataFrame for analysis
df = pd.DataFrame(response['Items'])
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='s')
df = df.sort_values('timestamp')
# Calculate trend
usage_values = df['current_usage'].values
time_values = np.arange(len(usage_values))
# Linear regression for trend
coefficients = np.polyfit(time_values, usage_values, 1)
trend_slope = coefficients[0]
# Predict usage in 30 days
future_usage = usage_values[-1] + (trend_slope * 30)
current_quota = df['quota_value'].iloc[-1]
# Calculate growth rate
if len(usage_values) > 1:
growth_rate = (usage_values[-1] - usage_values[0]) / usage_values[0] * 100
else:
growth_rate = 0
return {
'trend': 'increasing' if trend_slope > 0 else 'stable',
'growth_rate': growth_rate,
'predicted_usage_30d': future_usage,
'current_quota': current_quota,
'predicted_utilization_30d': (future_usage / current_quota) * 100,
'recommendation': self.get_quota_recommendation(
future_usage, current_quota, growth_rate
)
}
except Exception as e:
logging.error(f"Error analyzing trends: {str(e)}")
return {'trend': 'error', 'prediction': None}
def get_quota_recommendation(self, predicted_usage: float,
current_quota: float, growth_rate: float) -> Dict:
"""Generate quota increase recommendation"""
predicted_utilization = (predicted_usage / current_quota) * 100
if predicted_utilization > 80:
# Calculate recommended new quota with buffer
buffer_multiplier = 1.5 if growth_rate > 50 else 1.3
recommended_quota = predicted_usage * buffer_multiplier
return {
'action': 'increase_recommended',
'recommended_value': recommended_quota,
'priority': 'high' if predicted_utilization > 90 else 'medium',
'justification': f"Predicted utilization: {predicted_utilization:.1f}%, "
f"Growth rate: {growth_rate:.1f}%"
}
return {'action': 'no_action_needed'}
async def process_quota_recommendations(self) -> List[QuotaIncreaseRequest]:
"""Process quota recommendations and create increase requests"""
requests = []
try:
# Get all monitored quotas
response = self.quota_table.scan()
quota_items = response['Items']
# Group by quota for trend analysis
quota_groups = {}
for item in quota_items:
key = f"{item['service_code']}#{item['quota_code']}#{item['region']}"
if key not in quota_groups:
quota_groups[key] = []
quota_groups[key].append(item)
# Analyze each quota group
for quota_key, items in quota_groups.items():
service_code, quota_code, region = quota_key.split('#')
# Analyze trends
trend_analysis = self.analyze_quota_trends(
service_code, quota_code, region
)
recommendation = trend_analysis.get('recommendation', {})
if recommendation.get('action') == 'increase_recommended':
# Check if request already exists
existing_request = self.get_existing_request(
service_code, quota_code, region
)
if not existing_request:
request = QuotaIncreaseRequest(
service_code=service_code,
quota_code=quota_code,
region=region,
current_value=trend_analysis['current_quota'],
requested_value=recommendation['recommended_value'],
justification=recommendation['justification'],
priority=recommendation['priority'],
status=RequestStatus.PENDING,
created_at=datetime.utcnow()
)
requests.append(request)
except Exception as e:
logging.error(f"Error processing recommendations: {str(e)}")
return requests
def get_existing_request(self, service_code: str, quota_code: str,
region: str) -> Optional[Dict]:
"""Check if quota increase request already exists"""
try:
response = self.requests_table.get_item(
Key={
'request_id': f"{service_code}#{quota_code}#{region}",
'status': 'PENDING'
}
)
return response.get('Item')
except:
return None
async def submit_quota_increase_request(self, request: QuotaIncreaseRequest) -> bool:
"""Submit quota increase request to AWS"""
try:
# Try Service Quotas API first
try:
response = self.service_quotas.request_service_quota_increase(
ServiceCode=request.service_code,
QuotaCode=request.quota_code,
DesiredValue=request.requested_value
)
request.status = RequestStatus.APPROVED
request.case_id = response.get('RequestedQuota', {}).get('Id')
except self.service_quotas.exceptions.InvalidParameterValueException:
# Fall back to Support API for quotas that require support cases
case_response = self.support.create_case(
subject=f"Service Quota Increase Request: {request.service_code}",
serviceCode='service-limit-increase',
severityCode='low',
categoryCode='service-limit-increase',
communicationBody=self.generate_support_case_body(request),
ccEmailAddresses=self.config.get('notification_emails', []),
language='en'
)
request.status = RequestStatus.CASE_OPENED
request.case_id = case_response['caseId']
# Store request in DynamoDB
await self.store_quota_request(request)
return True
except Exception as e:
logging.error(f"Error submitting quota request: {str(e)}")
request.status = RequestStatus.DENIED
await self.store_quota_request(request)
return False
def generate_support_case_body(self, request: QuotaIncreaseRequest) -> str:
"""Generate support case body for quota increase request"""
return f"""
Service Quota Increase Request
Service: {request.service_code}
Quota Code: {request.quota_code}
Region: {request.region}
Current Limit: {request.current_value}
Requested Limit: {request.requested_value}
Priority: {request.priority}
Justification:
{request.justification}
This request was automatically generated based on usage trend analysis.
Please process this quota increase to prevent service disruptions.
Business Impact:
- Prevents service availability issues
- Supports planned capacity growth
- Maintains application performance standards
Technical Details:
- Usage trends indicate approaching quota limits
- Automated monitoring detected the need for increase
- Request includes appropriate buffer for future growth
""".strip()
async def store_quota_request(self, request: QuotaIncreaseRequest):
"""Store quota request in DynamoDB"""
try:
item = {
'request_id': f"{request.service_code}#{request.quota_code}#{request.region}",
'timestamp': int(request.created_at.timestamp()),
'service_code': request.service_code,
'quota_code': request.quota_code,
'region': request.region,
'current_value': request.current_value,
'requested_value': request.requested_value,
'justification': request.justification,
'priority': request.priority,
'status': request.status.value,
'case_id': request.case_id,
'ttl': int((datetime.utcnow() + timedelta(days=365)).timestamp())
}
self.requests_table.put_item(Item=item)
except Exception as e:
logging.error(f"Error storing quota request: {str(e)}")
async def check_request_status(self) -> List[Dict]:
"""Check status of pending quota requests"""
updates = []
try:
# Get pending requests
response = self.requests_table.scan(
FilterExpression='#status IN (:pending, :case_opened)',
ExpressionAttributeNames={'#status': 'status'},
ExpressionAttributeValues={
':pending': 'PENDING',
':case_opened': 'CASE_OPENED'
}
)
for item in response['Items']:
if item.get('case_id'):
# Check support case status
try:
case_response = self.support.describe_cases(
caseIdList=[item['case_id']],
includeResolvedCases=True
)
if case_response['cases']:
case = case_response['cases'][0]
if case['status'] == 'resolved':
# Update request status
self.requests_table.update_item(
Key={
'request_id': item['request_id'],
'timestamp': item['timestamp']
},
UpdateExpression='SET #status = :status',
ExpressionAttributeNames={'#status': 'status'},
ExpressionAttributeValues={':status': 'CASE_RESOLVED'}
)
updates.append({
'request_id': item['request_id'],
'old_status': item['status'],
'new_status': 'CASE_RESOLVED'
})
except Exception as e:
logging.error(f"Error checking case status: {str(e)}")
except Exception as e:
logging.error(f"Error checking request status: {str(e)}")
return updates
# Usage example
async def main():
config = {
'requests_table_name': 'quota-requests',
'quota_table_name': 'quota-monitoring',
'notification_emails': ['admin@company.com']
}
manager = AutomatedQuotaManager(config)
# Process recommendations and submit requests
requests = await manager.process_quota_recommendations()
for request in requests:
success = await manager.submit_quota_increase_request(request)
print(f"Request for {request.service_code}/{request.quota_code}: {'Success' if success else 'Failed'}")
# Check status of existing requests
updates = await manager.check_request_status()
print(f"Status updates: {len(updates)}")
if __name__ == "__main__":
asyncio.run(main())Example 3: CloudFormation Template for Quota Management Infrastructure
View code
AWSTemplateFormatVersion: '2010-09-09'
Description: 'Comprehensive quota monitoring and management infrastructure'
Parameters:
AlertEmail:
Type: String
Description: Email address for quota alerts
Default: admin@company.com
MonitoringSchedule:
Type: String
Description: CloudWatch Events schedule for quota monitoring
Default: 'rate(5 minutes)'
Environment:
Type: String
Description: Environment name
Default: production
AllowedValues: [development, staging, production]
Resources:
# DynamoDB Tables
QuotaMonitoringTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: !Sub '${Environment}-quota-monitoring'
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: quota_id
AttributeType: S
- AttributeName: timestamp
AttributeType: N
KeySchema:
- AttributeName: quota_id
KeyType: HASH
- AttributeName: timestamp
KeyType: RANGE
TimeToLiveSpecification:
AttributeName: ttl
Enabled: true
StreamSpecification:
StreamViewType: NEW_AND_OLD_IMAGES
PointInTimeRecoverySpecification:
PointInTimeRecoveryEnabled: true
QuotaRequestsTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: !Sub '${Environment}-quota-requests'
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: request_id
AttributeType: S
- AttributeName: timestamp
AttributeType: N
- AttributeName: status
AttributeType: S
KeySchema:
- AttributeName: request_id
KeyType: HASH
- AttributeName: timestamp
KeyType: RANGE
GlobalSecondaryIndexes:
- IndexName: status-index
KeySchema:
- AttributeName: status
KeyType: HASH
- AttributeName: timestamp
KeyType: RANGE
Projection:
ProjectionType: ALL
TimeToLiveSpecification:
AttributeName: ttl
Enabled: true
# SNS Topics
QuotaAlertTopic:
Type: AWS::SNS::Topic
Properties:
TopicName: !Sub '${Environment}-quota-alerts'
DisplayName: 'AWS Quota Monitoring Alerts'
QuotaAlertSubscription:
Type: AWS::SNS::Subscription
Properties:
Protocol: email
TopicArn: !Ref QuotaAlertTopic
Endpoint: !Ref AlertEmail
# IAM Roles
QuotaMonitoringRole:
Type: AWS::IAM::Role
Properties:
RoleName: !Sub '${Environment}-quota-monitoring-role'
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Policies:
- PolicyName: QuotaMonitoringPolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- service-quotas:*
- cloudwatch:*
- dynamodb:*
- sns:Publish
- support:*
- ec2:DescribeRegions
- organizations:ListAccounts
- sts:AssumeRole
Resource: '*'
# Lambda Functions
QuotaMonitoringFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName: !Sub '${Environment}-quota-monitoring'
Runtime: python3.9
Handler: index.lambda_handler
Role: !GetAtt QuotaMonitoringRole.Arn
Timeout: 900
MemorySize: 1024
Environment:
Variables:
QUOTA_TABLE_NAME: !Ref QuotaMonitoringTable
ALERT_TOPIC_ARN: !Ref QuotaAlertTopic
ENVIRONMENT: !Ref Environment
Code:
ZipFile: |
import json
import boto3
import os
import asyncio
from datetime import datetime, timedelta
def lambda_handler(event, context):
# Import the monitoring class (would be in a layer in practice)
# This is a simplified version for the template
config = {
'quota_table_name': os.environ['QUOTA_TABLE_NAME'],
'alert_topic_arn': os.environ['ALERT_TOPIC_ARN'],
'alert_thresholds': [70, 80, 90]
}
# Initialize monitoring
service_quotas = boto3.client('service-quotas')
cloudwatch = boto3.client('cloudwatch')
dynamodb = boto3.resource('dynamodb')
sns = boto3.client('sns')
# Basic quota monitoring logic
try:
# Get current region quotas
services = service_quotas.list_services()['Services']
alerts_sent = 0
for service in services[:5]: # Limit for demo
service_code = service['ServiceCode']
try:
quotas = service_quotas.list_service_quotas(
ServiceCode=service_code
)['Quotas']
for quota in quotas[:3]: # Limit for demo
# Store quota data
table = dynamodb.Table(config['quota_table_name'])
table.put_item(
Item={
'quota_id': f"{service_code}#{quota['QuotaCode']}#{context.invoked_function_arn.split(':')[3]}",
'timestamp': int(datetime.utcnow().timestamp()),
'service_code': service_code,
'quota_code': quota['QuotaCode'],
'quota_name': quota['QuotaName'],
'quota_value': quota['Value'],
'current_usage': 0, # Would get from CloudWatch
'utilization_percentage': 0,
'ttl': int((datetime.utcnow() + timedelta(days=90)).timestamp())
}
)
except Exception as e:
print(f"Error processing service {service_code}: {str(e)}")
continue
return {
'statusCode': 200,
'body': json.dumps({
'message': f'Quota monitoring completed',
'alerts_sent': alerts_sent
})
}
except Exception as e:
print(f"Error in quota monitoring: {str(e)}")
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
QuotaManagerFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName: !Sub '${Environment}-quota-manager'
Runtime: python3.9
Handler: index.lambda_handler
Role: !GetAtt QuotaMonitoringRole.Arn
Timeout: 900
MemorySize: 1024
Environment:
Variables:
QUOTA_TABLE_NAME: !Ref QuotaMonitoringTable
REQUESTS_TABLE_NAME: !Ref QuotaRequestsTable
ALERT_TOPIC_ARN: !Ref QuotaAlertTopic
ENVIRONMENT: !Ref Environment
Code:
ZipFile: |
import json
import boto3
import os
from datetime import datetime, timedelta
def lambda_handler(event, context):
# Quota management logic
config = {
'quota_table_name': os.environ['QUOTA_TABLE_NAME'],
'requests_table_name': os.environ['REQUESTS_TABLE_NAME'],
'alert_topic_arn': os.environ['ALERT_TOPIC_ARN']
}
service_quotas = boto3.client('service-quotas')
support = boto3.client('support')
dynamodb = boto3.resource('dynamodb')
try:
# Check for quota increase opportunities
quota_table = dynamodb.Table(config['quota_table_name'])
requests_table = dynamodb.Table(config['requests_table_name'])
# Scan for high utilization quotas
response = quota_table.scan(
FilterExpression='utilization_percentage > :threshold',
ExpressionAttributeValues={':threshold': 80}
)
requests_created = 0
for item in response['Items']:
# Check if request already exists
request_id = f"{item['service_code']}#{item['quota_code']}#{item.get('region', 'us-east-1')}"
try:
existing = requests_table.get_item(
Key={'request_id': request_id, 'timestamp': int(datetime.utcnow().timestamp())}
)
if 'Item' not in existing:
# Create new request
requests_table.put_item(
Item={
'request_id': request_id,
'timestamp': int(datetime.utcnow().timestamp()),
'service_code': item['service_code'],
'quota_code': item['quota_code'],
'current_value': item['quota_value'],
'requested_value': item['quota_value'] * 2,
'justification': f"High utilization: {item['utilization_percentage']}%",
'priority': 'high' if item['utilization_percentage'] > 90 else 'medium',
'status': 'PENDING',
'ttl': int((datetime.utcnow() + timedelta(days=365)).timestamp())
}
)
requests_created += 1
except Exception as e:
print(f"Error processing quota request: {str(e)}")
continue
return {
'statusCode': 200,
'body': json.dumps({
'message': 'Quota management completed',
'requests_created': requests_created
})
}
except Exception as e:
print(f"Error in quota management: {str(e)}")
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
# EventBridge Rules
QuotaMonitoringSchedule:
Type: AWS::Events::Rule
Properties:
Name: !Sub '${Environment}-quota-monitoring-schedule'
Description: 'Schedule for quota monitoring'
ScheduleExpression: !Ref MonitoringSchedule
State: ENABLED
Targets:
- Arn: !GetAtt QuotaMonitoringFunction.Arn
Id: QuotaMonitoringTarget
QuotaManagerSchedule:
Type: AWS::Events::Rule
Properties:
Name: !Sub '${Environment}-quota-manager-schedule'
Description: 'Schedule for quota management'
ScheduleExpression: 'rate(1 hour)'
State: ENABLED
Targets:
- Arn: !GetAtt QuotaManagerFunction.Arn
Id: QuotaManagerTarget
# Lambda Permissions
QuotaMonitoringPermission:
Type: AWS::Lambda::Permission
Properties:
FunctionName: !Ref QuotaMonitoringFunction
Action: lambda:InvokeFunction
Principal: events.amazonaws.com
SourceArn: !GetAtt QuotaMonitoringSchedule.Arn
QuotaManagerPermission:
Type: AWS::Lambda::Permission
Properties:
FunctionName: !Ref QuotaManagerFunction
Action: lambda:InvokeFunction
Principal: events.amazonaws.com
SourceArn: !GetAtt QuotaManagerSchedule.Arn
# CloudWatch Dashboard
QuotaDashboard:
Type: AWS::CloudWatch::Dashboard
Properties:
DashboardName: !Sub '${Environment}-quota-monitoring'
DashboardBody: !Sub |
{
"widgets": [
{
"type": "metric",
"x": 0,
"y": 0,
"width": 12,
"height": 6,
"properties": {
"metrics": [
[ "QuotaMonitoring", "QuotaUtilization", "AlertLevel", "CRITICAL" ],
[ ".", ".", ".", "WARNING" ],
[ ".", ".", ".", "INFO" ]
],
"view": "timeSeries",
"stacked": false,
"region": "${AWS::Region}",
"title": "Quota Utilization by Alert Level",
"period": 300
}
},
{
"type": "metric",
"x": 12,
"y": 0,
"width": 12,
"height": 6,
"properties": {
"metrics": [
[ "AWS/Lambda", "Duration", "FunctionName", "${QuotaMonitoringFunction}" ],
[ ".", "Errors", ".", "." ],
[ ".", "Invocations", ".", "." ]
],
"view": "timeSeries",
"stacked": false,
"region": "${AWS::Region}",
"title": "Quota Monitoring Function Metrics",
"period": 300
}
}
]
}
# CloudWatch Alarms
HighQuotaUtilizationAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
AlarmName: !Sub '${Environment}-high-quota-utilization'
AlarmDescription: 'Alert when quota utilization is high'
MetricName: QuotaUtilization
Namespace: QuotaMonitoring
Statistic: Maximum
Period: 300
EvaluationPeriods: 2
Threshold: 90
ComparisonOperator: GreaterThanThreshold
AlarmActions:
- !Ref QuotaAlertTopic
Dimensions:
- Name: AlertLevel
Value: CRITICAL
Outputs:
QuotaMonitoringTableName:
Description: 'Name of the quota monitoring DynamoDB table'
Value: !Ref QuotaMonitoringTable
Export:
Name: !Sub '${Environment}-quota-monitoring-table'
QuotaRequestsTableName:
Description: 'Name of the quota requests DynamoDB table'
Value: !Ref QuotaRequestsTable
Export:
Name: !Sub '${Environment}-quota-requests-table'
AlertTopicArn:
Description: 'ARN of the quota alert SNS topic'
Value: !Ref QuotaAlertTopic
Export:
Name: !Sub '${Environment}-quota-alert-topic'
DashboardURL:
Description: 'URL of the quota monitoring dashboard'
Value: !Sub 'https://${AWS::Region}.console.aws.amazon.com/cloudwatch/home?region=${AWS::Region}#dashboards:name=${Environment}-quota-monitoring'Example 4: Multi-Account Quota Coordination System
View code
#!/bin/bash
# Multi-Account Quota Coordination Script
# Coordinates quota monitoring and management across AWS Organizations
set -euo pipefail
# Configuration
CONFIG_FILE="${CONFIG_FILE:-./quota-config.json}"
LOG_FILE="${LOG_FILE:-./quota-coordination.log}"
TEMP_DIR="${TEMP_DIR:-/tmp/quota-coordination}"
# Logging function
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*" | tee -a "$LOG_FILE"
}
# Create temporary directory
mkdir -p "$TEMP_DIR"
# Load configuration
if [[ ! -f "$CONFIG_FILE" ]]; then
log "ERROR: Configuration file $CONFIG_FILE not found"
exit 1
fi
# Parse configuration
MASTER_ACCOUNT=$(jq -r '.master_account' "$CONFIG_FILE")
REGIONS=($(jq -r '.regions[]' "$CONFIG_FILE"))
SERVICES=($(jq -r '.services[]' "$CONFIG_FILE"))
ALERT_THRESHOLD=$(jq -r '.alert_threshold' "$CONFIG_FILE")
log "Starting multi-account quota coordination"
log "Master Account: $MASTER_ACCOUNT"
log "Regions: ${REGIONS[*]}"
log "Services: ${SERVICES[*]}"
# Function to assume role in target account
assume_role() {
local account_id="$1"
local role_name="$2"
local session_name="quota-coordination-$(date +%s)"
aws sts assume-role \
--role-arn "arn:aws:iam::${account_id}:role/${role_name}" \
--role-session-name "$session_name" \
--output json > "$TEMP_DIR/credentials-${account_id}.json"
if [[ $? -eq 0 ]]; then
log "Successfully assumed role in account $account_id"
return 0
else
log "ERROR: Failed to assume role in account $account_id"
return 1
fi
}
# Function to set credentials from assumed role
set_credentials() {
local account_id="$1"
local creds_file="$TEMP_DIR/credentials-${account_id}.json"
if [[ -f "$creds_file" ]]; then
export AWS_ACCESS_KEY_ID=$(jq -r '.Credentials.AccessKeyId' "$creds_file")
export AWS_SECRET_ACCESS_KEY=$(jq -r '.Credentials.SecretAccessKey' "$creds_file")
export AWS_SESSION_TOKEN=$(jq -r '.Credentials.SessionToken' "$creds_file")
log "Set credentials for account $account_id"
else
log "ERROR: Credentials file not found for account $account_id"
return 1
fi
}
# Function to clear credentials
clear_credentials() {
unset AWS_ACCESS_KEY_ID
unset AWS_SECRET_ACCESS_KEY
unset AWS_SESSION_TOKEN
}
# Function to get quota utilization for an account
get_account_quotas() {
local account_id="$1"
local region="$2"
local output_file="$TEMP_DIR/quotas-${account_id}-${region}.json"
log "Getting quotas for account $account_id in region $region"
# Initialize output file
echo '{"account_id": "'$account_id'", "region": "'$region'", "quotas": []}' > "$output_file"
for service in "${SERVICES[@]}"; do
log "Processing service $service in account $account_id, region $region"
# Get service quotas
aws service-quotas list-service-quotas \
--service-code "$service" \
--region "$region" \
--output json > "$TEMP_DIR/service-quotas-${service}.json" 2>/dev/null || {
log "WARNING: Could not get quotas for service $service in region $region"
continue
}
# Process each quota
jq -r '.Quotas[] | @base64' "$TEMP_DIR/service-quotas-${service}.json" | while read -r quota_data; do
quota=$(echo "$quota_data" | base64 -d)
quota_code=$(echo "$quota" | jq -r '.QuotaCode')
quota_name=$(echo "$quota" | jq -r '.QuotaName')
quota_value=$(echo "$quota" | jq -r '.Value')
# Get current usage (simplified - would use CloudWatch in practice)
current_usage=0
utilization=0
# Create quota record
quota_record=$(jq -n \
--arg service "$service" \
--arg quota_code "$quota_code" \
--arg quota_name "$quota_name" \
--argjson quota_value "$quota_value" \
--argjson current_usage "$current_usage" \
--argjson utilization "$utilization" \
'{
service_code: $service,
quota_code: $quota_code,
quota_name: $quota_name,
quota_value: $quota_value,
current_usage: $current_usage,
utilization_percentage: $utilization
}')
# Add to output file
jq --argjson quota "$quota_record" '.quotas += [$quota]' "$output_file" > "$output_file.tmp"
mv "$output_file.tmp" "$output_file"
done
done
log "Completed quota collection for account $account_id in region $region"
}
# Function to analyze cross-account quota utilization
analyze_cross_account_quotas() {
local analysis_file="$TEMP_DIR/quota-analysis.json"
log "Analyzing cross-account quota utilization"
# Initialize analysis file
echo '{"timestamp": "'$(date -u +%Y-%m-%dT%H:%M:%SZ)'", "analysis": {}}' > "$analysis_file"
# Combine all quota files
for quota_file in "$TEMP_DIR"/quotas-*.json; do
if [[ -f "$quota_file" ]]; then
account_id=$(jq -r '.account_id' "$quota_file")
region=$(jq -r '.region' "$quota_file")
log "Processing quota data for account $account_id, region $region"
# Process each quota in the file
jq -r '.quotas[] | @base64' "$quota_file" | while read -r quota_data; do
quota=$(echo "$quota_data" | base64 -d)
service_code=$(echo "$quota" | jq -r '.service_code')
quota_code=$(echo "$quota" | jq -r '.quota_code')
utilization=$(echo "$quota" | jq -r '.utilization_percentage')
# Check if utilization exceeds threshold
if (( $(echo "$utilization >= $ALERT_THRESHOLD" | bc -l) )); then
log "ALERT: High utilization in account $account_id, region $region, service $service_code, quota $quota_code: ${utilization}%"
# Add to analysis
alert_record=$(jq -n \
--arg account_id "$account_id" \
--arg region "$region" \
--arg service_code "$service_code" \
--arg quota_code "$quota_code" \
--argjson utilization "$utilization" \
'{
account_id: $account_id,
region: $region,
service_code: $service_code,
quota_code: $quota_code,
utilization_percentage: $utilization,
alert_level: (if $utilization >= 90 then "CRITICAL" elif $utilization >= 80 then "WARNING" else "INFO" end)
}')
# Add alert to analysis file
key="${service_code}_${quota_code}"
jq --arg key "$key" --argjson alert "$alert_record" \
'.analysis[$key] += [$alert]' "$analysis_file" > "$analysis_file.tmp"
mv "$analysis_file.tmp" "$analysis_file"
fi
done
fi
done
log "Cross-account quota analysis completed"
}
# Function to generate quota coordination recommendations
generate_recommendations() {
local analysis_file="$TEMP_DIR/quota-analysis.json"
local recommendations_file="$TEMP_DIR/recommendations.json"
log "Generating quota coordination recommendations"
# Initialize recommendations file
echo '{"timestamp": "'$(date -u +%Y-%m-%dT%H:%M:%SZ)'", "recommendations": []}' > "$recommendations_file"
# Process analysis results
if [[ -f "$analysis_file" ]]; then
jq -r '.analysis | keys[]' "$analysis_file" | while read -r quota_key; do
alerts=$(jq -r ".analysis[\"$quota_key\"]" "$analysis_file")
alert_count=$(echo "$alerts" | jq 'length')
if [[ "$alert_count" -gt 1 ]]; then
# Multiple accounts affected - recommend coordination
recommendation=$(jq -n \
--arg quota_key "$quota_key" \
--argjson alert_count "$alert_count" \
--argjson alerts "$alerts" \
'{
type: "cross_account_coordination",
quota_key: $quota_key,
affected_accounts: $alert_count,
recommendation: "Consider workload redistribution or coordinated quota increases",
priority: "high",
affected_resources: $alerts
}')
jq --argjson rec "$recommendation" '.recommendations += [$rec]' "$recommendations_file" > "$recommendations_file.tmp"
mv "$recommendations_file.tmp" "$recommendations_file"
log "RECOMMENDATION: Cross-account coordination needed for $quota_key ($alert_count accounts affected)"
fi
done
fi
log "Recommendations generated"
}
# Function to send notifications
send_notifications() {
local analysis_file="$TEMP_DIR/quota-analysis.json"
local recommendations_file="$TEMP_DIR/recommendations.json"
log "Sending notifications"
# Check if SNS topic is configured
SNS_TOPIC=$(jq -r '.sns_topic_arn // empty' "$CONFIG_FILE")
if [[ -n "$SNS_TOPIC" ]]; then
# Send analysis results
if [[ -f "$analysis_file" ]]; then
aws sns publish \
--topic-arn "$SNS_TOPIC" \
--subject "Multi-Account Quota Analysis Results" \
--message file://"$analysis_file" \
--region us-east-1
log "Sent analysis results to SNS topic"
fi
# Send recommendations
if [[ -f "$recommendations_file" ]]; then
aws sns publish \
--topic-arn "$SNS_TOPIC" \
--subject "Multi-Account Quota Recommendations" \
--message file://"$recommendations_file" \
--region us-east-1
log "Sent recommendations to SNS topic"
fi
else
log "No SNS topic configured, skipping notifications"
fi
}
# Main execution
main() {
log "Starting multi-account quota coordination process"
# Get list of accounts from AWS Organizations
ACCOUNTS=($(aws organizations list-accounts --query 'Accounts[?Status==`ACTIVE`].Id' --output text))
log "Found ${#ACCOUNTS[@]} active accounts"
# Process each account
for account_id in "${ACCOUNTS[@]}"; do
log "Processing account $account_id"
# Skip master account for role assumption
if [[ "$account_id" == "$MASTER_ACCOUNT" ]]; then
log "Skipping role assumption for master account"
else
# Assume role in target account
if ! assume_role "$account_id" "QuotaMonitoringRole"; then
log "Skipping account $account_id due to role assumption failure"
continue
fi
set_credentials "$account_id"
fi
# Process each region
for region in "${REGIONS[@]}"; do
get_account_quotas "$account_id" "$region"
done
# Clear credentials if not master account
if [[ "$account_id" != "$MASTER_ACCOUNT" ]]; then
clear_credentials
fi
done
# Analyze results
analyze_cross_account_quotas
generate_recommendations
send_notifications
log "Multi-account quota coordination completed"
}
# Configuration file template
create_config_template() {
cat > quota-config.json << 'EOF'
{
"master_account": "123456789012",
"regions": ["us-east-1", "us-west-2", "eu-west-1"],
"services": ["ec2", "lambda", "rds", "s3"],
"alert_threshold": 80,
"sns_topic_arn": "arn:aws:sns:us-east-1:123456789012:quota-alerts"
}
EOF
log "Created configuration template: quota-config.json"
}
# Command line argument handling
case "${1:-}" in
"config")
create_config_template
;;
"run"|"")
main
;;
*)
echo "Usage: $0 [config|run]"
echo " config - Create configuration template"
echo " run - Run quota coordination (default)"
exit 1
;;
esac
# Cleanup
rm -rf "$TEMP_DIR"
log "Cleanup completed"AWS Services Used
- AWS Service Quotas: Core service for quota monitoring and management
- Amazon CloudWatch: Metrics collection and alerting for quota utilization
- Amazon DynamoDB: Storage for quota data, trends, and request tracking
- Amazon SNS: Notification system for quota alerts and updates
- AWS Lambda: Serverless execution of monitoring and management functions
- Amazon EventBridge: Scheduling and event-driven quota management
- AWS Support API: Automated support case creation for quota increases
- AWS Organizations: Multi-account quota coordination and governance
- AWS Step Functions: Orchestration of complex quota management workflows
- Amazon CloudFormation: Infrastructure as code for quota management systems
Benefits
- Proactive Management: Prevents service disruptions through early quota monitoring
- Automated Operations: Reduces manual overhead with automated monitoring and requests
- Cross-Account Visibility: Provides centralized view of quota utilization across accounts
- Trend Analysis: Enables predictive quota management based on usage patterns
- Cost Optimization: Prevents over-provisioning while ensuring adequate capacity
- Compliance Support: Maintains audit trails and governance for quota changes
- Scalable Architecture: Handles monitoring across multiple accounts and regions
- Integration Ready: Works with existing CI/CD and infrastructure automation