REL02
REL02-BP03 - Ensure IP subnet allocation accounts for expansion and availability
REL02-BP03: Ensure IP subnet allocation accounts for expansion and availability
Overview
Design and implement IP subnet allocation strategies that accommodate future growth, multi-AZ deployment requirements, and service expansion while maintaining network isolation and security. This involves careful planning of CIDR blocks, subnet sizing, and address space management to prevent IP exhaustion and enable seamless scaling.
Implementation Steps
1. Design Comprehensive IP Address Strategy
- Plan hierarchical IP addressing scheme for multi-region and multi-account architectures
- Allocate sufficient address space for current and future requirements
- Implement standardized subnet sizing and naming conventions
- Establish IP address management (IPAM) processes and governance
2. Implement Multi-AZ Subnet Architecture
- Deploy subnets across multiple Availability Zones for high availability
- Size subnets appropriately for expected workload growth
- Maintain consistent subnet patterns across environments
- Plan for disaster recovery and cross-region expansion
3. Establish Network Segmentation Strategy
- Create separate subnets for different tiers (web, application, database)
- Implement security zones with appropriate isolation
- Plan for microservices and container networking requirements
- Design subnets for shared services and infrastructure components
4. Configure Dynamic IP Management
- Implement automated subnet creation and management
- Set up IP address monitoring and utilization tracking
- Configure automatic subnet expansion capabilities
- Establish IP address reclamation and optimization processes
5. Plan for Service Integration and Expansion
- Reserve address space for AWS managed services
- Plan for VPC peering and Transit Gateway connectivity
- Allocate space for load balancers, NAT gateways, and endpoints
- Design for container orchestration and serverless architectures
6. Implement IP Address Governance and Monitoring
- Establish IP address allocation policies and procedures
- Set up monitoring and alerting for subnet utilization
- Implement automated compliance checking and reporting
- Create documentation and change management processes
Implementation Examples
Example 1: Intelligent IP Address Planning and Management System
View code
import boto3
import json
import logging
import ipaddress
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, asdict
from enum import Enum
import math
class SubnetType(Enum):
PUBLIC = "public"
PRIVATE = "private"
DATABASE = "database"
TRANSIT_GATEWAY = "transit_gateway"
LOAD_BALANCER = "load_balancer"
CONTAINER = "container"
LAMBDA = "lambda"
RESERVED = "reserved"
class EnvironmentType(Enum):
PRODUCTION = "production"
STAGING = "staging"
DEVELOPMENT = "development"
SHARED_SERVICES = "shared_services"
@dataclass
class SubnetPlan:
subnet_type: SubnetType
environment: EnvironmentType
availability_zone: str
cidr_block: str
expected_hosts: int
growth_factor: float
utilization_threshold: float = 0.8
@dataclass
class IPAllocationStrategy:
region: str
vpc_cidr: str
environment_allocations: Dict[str, str]
subnet_size_defaults: Dict[str, int]
growth_projections: Dict[str, float]
availability_zones: List[str]
class IntelligentIPAddressManager:
def __init__(self, config: Dict):
self.config = config
self.ec2 = boto3.client('ec2')
self.cloudwatch = boto3.client('cloudwatch')
self.dynamodb = boto3.resource('dynamodb')
self.sns = boto3.client('sns')
# Initialize IPAM table
self.ipam_table = self.dynamodb.Table(config.get('ipam_table_name', 'ip-address-management'))
def design_comprehensive_ip_strategy(self, strategy_config: Dict) -> Dict:
"""Design comprehensive IP address allocation strategy"""
strategy_id = f"ip_strategy_{int(datetime.utcnow().timestamp())}"
strategy_result = {
'strategy_id': strategy_id,
'timestamp': datetime.utcnow().isoformat(),
'strategy_config': strategy_config,
'ip_allocations': {},
'subnet_plans': {},
'utilization_projections': {},
'status': 'initiated'
}
try:
# 1. Analyze current IP utilization
current_utilization = self.analyze_current_ip_utilization(
strategy_config.get('existing_vpcs', [])
)
strategy_result['current_utilization'] = current_utilization
# 2. Calculate future requirements
future_requirements = self.calculate_future_ip_requirements(
strategy_config, current_utilization
)
strategy_result['future_requirements'] = future_requirements
# 3. Design hierarchical IP allocation
ip_allocations = self.design_hierarchical_ip_allocation(
strategy_config, future_requirements
)
strategy_result['ip_allocations'] = ip_allocations
# 4. Create detailed subnet plans
subnet_plans = self.create_detailed_subnet_plans(
ip_allocations, strategy_config
)
strategy_result['subnet_plans'] = subnet_plans
# 5. Generate utilization projections
utilization_projections = self.generate_utilization_projections(
subnet_plans, strategy_config
)
strategy_result['utilization_projections'] = utilization_projections
# 6. Validate and optimize allocation
validation_result = self.validate_and_optimize_allocation(
strategy_result
)
strategy_result['validation'] = validation_result
strategy_result['status'] = 'completed'
except Exception as e:
logging.error(f"Error designing IP strategy: {str(e)}")
strategy_result['status'] = 'failed'
strategy_result['error'] = str(e)
return strategy_result
def analyze_current_ip_utilization(self, existing_vpcs: List[str]) -> Dict:
"""Analyze current IP address utilization across existing VPCs"""
utilization_analysis = {
'vpcs': {},
'total_allocated_ips': 0,
'total_used_ips': 0,
'overall_utilization': 0.0,
'subnet_utilization': []
}
try:
for vpc_id in existing_vpcs:
vpc_analysis = self.analyze_vpc_utilization(vpc_id)
utilization_analysis['vpcs'][vpc_id] = vpc_analysis
utilization_analysis['total_allocated_ips'] += vpc_analysis['allocated_ips']
utilization_analysis['total_used_ips'] += vpc_analysis['used_ips']
# Calculate overall utilization
if utilization_analysis['total_allocated_ips'] > 0:
utilization_analysis['overall_utilization'] = (
utilization_analysis['total_used_ips'] /
utilization_analysis['total_allocated_ips']
)
except Exception as e:
logging.error(f"Error analyzing current utilization: {str(e)}")
return utilization_analysis
def analyze_vpc_utilization(self, vpc_id: str) -> Dict:
"""Analyze IP utilization for a specific VPC"""
try:
# Get VPC information
vpc_response = self.ec2.describe_vpcs(VpcIds=[vpc_id])
if not vpc_response['Vpcs']:
return {'error': 'VPC not found'}
vpc = vpc_response['Vpcs'][0]
vpc_cidr = vpc['CidrBlock']
vpc_network = ipaddress.IPv4Network(vpc_cidr)
# Get subnets
subnets_response = self.ec2.describe_subnets(
Filters=[{'Name': 'vpc-id', 'Values': [vpc_id]}]
)
subnet_analysis = []
total_allocated = 0
total_used = 0
for subnet in subnets_response['Subnets']:
subnet_network = ipaddress.IPv4Network(subnet['CidrBlock'])
available_ips = subnet_network.num_addresses - 5 # AWS reserves 5 IPs
used_ips = available_ips - subnet['AvailableIpAddressCount']
utilization = (used_ips / available_ips) * 100 if available_ips > 0 else 0
subnet_info = {
'subnet_id': subnet['SubnetId'],
'cidr_block': subnet['CidrBlock'],
'availability_zone': subnet['AvailabilityZone'],
'available_ips': available_ips,
'used_ips': used_ips,
'utilization_percentage': utilization,
'tags': subnet.get('Tags', [])
}
subnet_analysis.append(subnet_info)
total_allocated += available_ips
total_used += used_ips
return {
'vpc_id': vpc_id,
'vpc_cidr': vpc_cidr,
'total_vpc_ips': vpc_network.num_addresses,
'allocated_ips': total_allocated,
'used_ips': total_used,
'utilization_percentage': (total_used / total_allocated * 100) if total_allocated > 0 else 0,
'subnets': subnet_analysis
}
except Exception as e:
logging.error(f"Error analyzing VPC {vpc_id}: {str(e)}")
return {'error': str(e)}
def calculate_future_ip_requirements(self, strategy_config: Dict,
current_utilization: Dict) -> Dict:
"""Calculate future IP address requirements based on growth projections"""
requirements = {
'environments': {},
'services': {},
'total_requirements': 0,
'growth_timeline': {}
}
try:
# Get growth projections
growth_projections = strategy_config.get('growth_projections', {})
planning_horizon = strategy_config.get('planning_horizon_years', 3)
# Calculate requirements by environment
for env_name, env_config in strategy_config.get('environments', {}).items():
env_requirements = self.calculate_environment_requirements(
env_config, growth_projections, planning_horizon
)
requirements['environments'][env_name] = env_requirements
requirements['total_requirements'] += env_requirements['total_ips']
# Calculate requirements by service type
for service_name, service_config in strategy_config.get('services', {}).items():
service_requirements = self.calculate_service_requirements(
service_config, growth_projections, planning_horizon
)
requirements['services'][service_name] = service_requirements
# Generate growth timeline
requirements['growth_timeline'] = self.generate_growth_timeline(
requirements, planning_horizon
)
except Exception as e:
logging.error(f"Error calculating future requirements: {str(e)}")
return requirements
def calculate_environment_requirements(self, env_config: Dict,
growth_projections: Dict,
planning_horizon: int) -> Dict:
"""Calculate IP requirements for a specific environment"""
base_requirements = env_config.get('base_ip_requirements', 1000)
growth_rate = growth_projections.get(env_config.get('name', 'default'), 0.2)
# Calculate compound growth
future_requirements = base_requirements * ((1 + growth_rate) ** planning_horizon)
# Add buffer for unexpected growth
buffer_percentage = env_config.get('buffer_percentage', 0.5)
total_requirements = future_requirements * (1 + buffer_percentage)
return {
'base_requirements': base_requirements,
'growth_rate': growth_rate,
'future_requirements': future_requirements,
'buffer_percentage': buffer_percentage,
'total_ips': int(total_requirements),
'recommended_cidr_size': self.calculate_recommended_cidr_size(total_requirements)
}
def calculate_service_requirements(self, service_config: Dict,
growth_projections: Dict,
planning_horizon: int) -> Dict:
"""Calculate IP requirements for a specific service"""
service_type = service_config.get('type', 'general')
base_instances = service_config.get('base_instances', 10)
# Service-specific multipliers
service_multipliers = {
'web': 2, # Load balancers, auto-scaling
'app': 3, # Application servers, middleware
'database': 1, # Typically fewer instances
'container': 5, # Container orchestration overhead
'lambda': 0.1, # Serverless, minimal IP requirements
'analytics': 4 # Big data, processing clusters
}
multiplier = service_multipliers.get(service_type, 2)
growth_rate = growth_projections.get(service_type, 0.3)
# Calculate future requirements
future_instances = base_instances * ((1 + growth_rate) ** planning_horizon)
total_ips = future_instances * multiplier
return {
'service_type': service_type,
'base_instances': base_instances,
'multiplier': multiplier,
'growth_rate': growth_rate,
'future_instances': int(future_instances),
'total_ips': int(total_ips)
}
def calculate_recommended_cidr_size(self, required_ips: float) -> int:
"""Calculate recommended CIDR block size for required IPs"""
# Find the smallest CIDR that can accommodate required IPs
# Account for AWS reserved IPs and growth buffer
required_ips_with_aws_reserved = required_ips + (required_ips * 0.1) # 10% for AWS overhead
# Find the power of 2 that accommodates the requirement
cidr_size = 32 - math.ceil(math.log2(required_ips_with_aws_reserved))
# Ensure minimum and maximum bounds
cidr_size = max(16, min(28, cidr_size)) # Between /16 and /28
return cidr_size
def design_hierarchical_ip_allocation(self, strategy_config: Dict,
future_requirements: Dict) -> Dict:
"""Design hierarchical IP address allocation"""
allocation_design = {
'master_cidr': strategy_config.get('master_cidr', '10.0.0.0/8'),
'regional_allocations': {},
'environment_allocations': {},
'service_allocations': {},
'reserved_blocks': {}
}
try:
master_network = ipaddress.IPv4Network(allocation_design['master_cidr'])
# Allocate by region
regions = strategy_config.get('regions', ['us-east-1'])
regional_subnets = list(master_network.subnets(new_prefix=12)) # /12 per region
for i, region in enumerate(regions):
if i < len(regional_subnets):
allocation_design['regional_allocations'][region] = str(regional_subnets[i])
# Allocate by environment within each region
for region, region_cidr in allocation_design['regional_allocations'].items():
region_network = ipaddress.IPv4Network(region_cidr)
env_allocations = {}
environments = strategy_config.get('environments', {})
env_subnets = list(region_network.subnets(new_prefix=14)) # /14 per environment
for i, (env_name, env_config) in enumerate(environments.items()):
if i < len(env_subnets):
env_allocations[env_name] = str(env_subnets[i])
allocation_design['environment_allocations'][region] = env_allocations
# Reserve blocks for special purposes
allocation_design['reserved_blocks'] = {
'transit_gateway': '10.255.0.0/16',
'direct_connect': '10.254.0.0/16',
'vpn_connections': '10.253.0.0/16',
'future_expansion': '10.252.0.0/16'
}
except Exception as e:
logging.error(f"Error designing hierarchical allocation: {str(e)}")
return allocation_design
def create_detailed_subnet_plans(self, ip_allocations: Dict,
strategy_config: Dict) -> Dict:
"""Create detailed subnet plans for each environment and service"""
subnet_plans = {}
try:
for region, env_allocations in ip_allocations.get('environment_allocations', {}).items():
region_plans = {}
for env_name, env_cidr in env_allocations.items():
env_network = ipaddress.IPv4Network(env_cidr)
env_config = strategy_config.get('environments', {}).get(env_name, {})
# Get availability zones for the region
azs = self.get_availability_zones(region)
# Create subnet plans for this environment
env_subnet_plans = self.create_environment_subnet_plans(
env_network, env_config, azs, region
)
region_plans[env_name] = env_subnet_plans
subnet_plans[region] = region_plans
except Exception as e:
logging.error(f"Error creating subnet plans: {str(e)}")
return subnet_plans
def create_environment_subnet_plans(self, env_network: ipaddress.IPv4Network,
env_config: Dict, azs: List[str],
region: str) -> Dict:
"""Create subnet plans for a specific environment"""
subnet_plans = {
'public_subnets': [],
'private_subnets': [],
'database_subnets': [],
'container_subnets': [],
'reserved_subnets': []
}
try:
# Define subnet types and their allocation percentages
subnet_allocations = {
'public': 0.1, # 10% for public subnets
'private': 0.6, # 60% for private subnets
'database': 0.1, # 10% for database subnets
'container': 0.15, # 15% for container subnets
'reserved': 0.05 # 5% for future use
}
# Calculate subnet sizes
total_ips = env_network.num_addresses
current_network = env_network
for subnet_type, percentage in subnet_allocations.items():
required_ips = int(total_ips * percentage)
subnet_size = 32 - math.ceil(math.log2(required_ips / len(azs)))
subnet_size = max(20, min(28, subnet_size)) # Between /20 and /28
# Create subnets across AZs
type_subnets = []
subnets_iter = current_network.subnets(new_prefix=subnet_size)
for i, az in enumerate(azs):
try:
subnet_cidr = next(subnets_iter)
subnet_plan = SubnetPlan(
subnet_type=SubnetType(subnet_type.lower()),
environment=EnvironmentType(env_config.get('type', 'development')),
availability_zone=az,
cidr_block=str(subnet_cidr),
expected_hosts=required_ips // len(azs),
growth_factor=env_config.get('growth_factor', 1.5),
utilization_threshold=env_config.get('utilization_threshold', 0.8)
)
type_subnets.append(asdict(subnet_plan))
except StopIteration:
break
subnet_plans[f"{subnet_type}_subnets"] = type_subnets
# Update current network for next allocation
try:
remaining_subnets = list(subnets_iter)
if remaining_subnets:
current_network = remaining_subnets[0].supernet()
except:
pass
except Exception as e:
logging.error(f"Error creating environment subnet plans: {str(e)}")
return subnet_plans
def get_availability_zones(self, region: str) -> List[str]:
"""Get availability zones for a region"""
try:
# Create regional EC2 client
regional_ec2 = boto3.client('ec2', region_name=region)
azs_response = regional_ec2.describe_availability_zones(
Filters=[{'Name': 'state', 'Values': ['available']}]
)
return [az['ZoneName'] for az in azs_response['AvailabilityZones'][:3]]
except Exception as e:
logging.error(f"Error getting AZs for region {region}: {str(e)}")
return [f"{region}a", f"{region}b", f"{region}c"] # Fallback
def generate_utilization_projections(self, subnet_plans: Dict,
strategy_config: Dict) -> Dict:
"""Generate utilization projections for subnet plans"""
projections = {
'timeline': [],
'utilization_by_region': {},
'capacity_warnings': [],
'expansion_recommendations': []
}
try:
planning_horizon = strategy_config.get('planning_horizon_years', 3)
# Generate monthly projections
for month in range(planning_horizon * 12):
month_projection = {
'month': month,
'date': (datetime.utcnow() + timedelta(days=month * 30)).strftime('%Y-%m'),
'regional_utilization': {}
}
for region, region_plans in subnet_plans.items():
region_utilization = self.calculate_monthly_utilization(
region_plans, month, strategy_config
)
month_projection['regional_utilization'][region] = region_utilization
projections['timeline'].append(month_projection)
# Identify capacity warnings
projections['capacity_warnings'] = self.identify_capacity_warnings(
projections['timeline']
)
# Generate expansion recommendations
projections['expansion_recommendations'] = self.generate_expansion_recommendations(
subnet_plans, projections['capacity_warnings']
)
except Exception as e:
logging.error(f"Error generating utilization projections: {str(e)}")
return projections
def calculate_monthly_utilization(self, region_plans: Dict, month: int,
strategy_config: Dict) -> Dict:
"""Calculate utilization for a specific month"""
utilization = {
'total_capacity': 0,
'projected_usage': 0,
'utilization_percentage': 0,
'environment_breakdown': {}
}
try:
growth_rate = strategy_config.get('monthly_growth_rate', 0.02) # 2% per month
for env_name, env_plans in region_plans.items():
env_capacity = 0
env_usage = 0
for subnet_type, subnets in env_plans.items():
for subnet in subnets:
subnet_network = ipaddress.IPv4Network(subnet['cidr_block'])
capacity = subnet_network.num_addresses - 5 # AWS reserved
# Calculate projected usage with growth
base_usage = subnet['expected_hosts']
projected_usage = base_usage * ((1 + growth_rate) ** month)
env_capacity += capacity
env_usage += min(projected_usage, capacity) # Cap at capacity
utilization['environment_breakdown'][env_name] = {
'capacity': env_capacity,
'usage': env_usage,
'utilization_percentage': (env_usage / env_capacity * 100) if env_capacity > 0 else 0
}
utilization['total_capacity'] += env_capacity
utilization['projected_usage'] += env_usage
# Calculate overall utilization
if utilization['total_capacity'] > 0:
utilization['utilization_percentage'] = (
utilization['projected_usage'] / utilization['total_capacity'] * 100
)
except Exception as e:
logging.error(f"Error calculating monthly utilization: {str(e)}")
return utilization
def identify_capacity_warnings(self, timeline: List[Dict]) -> List[Dict]:
"""Identify potential capacity issues from projections"""
warnings = []
try:
for projection in timeline:
for region, utilization in projection['regional_utilization'].items():
if utilization['utilization_percentage'] > 80:
warnings.append({
'region': region,
'month': projection['month'],
'date': projection['date'],
'utilization_percentage': utilization['utilization_percentage'],
'severity': 'critical' if utilization['utilization_percentage'] > 90 else 'warning',
'message': f"High utilization projected for {region} in {projection['date']}"
})
except Exception as e:
logging.error(f"Error identifying capacity warnings: {str(e)}")
return warnings
def validate_and_optimize_allocation(self, strategy_result: Dict) -> Dict:
"""Validate and optimize the IP allocation strategy"""
validation = {
'validation_checks': [],
'optimization_recommendations': [],
'compliance_status': 'compliant',
'efficiency_score': 0
}
try:
# Check for IP conflicts
conflict_check = self.check_ip_conflicts(strategy_result['ip_allocations'])
validation['validation_checks'].append(conflict_check)
# Check subnet sizing efficiency
sizing_check = self.check_subnet_sizing_efficiency(strategy_result['subnet_plans'])
validation['validation_checks'].append(sizing_check)
# Check growth accommodation
growth_check = self.check_growth_accommodation(strategy_result['utilization_projections'])
validation['validation_checks'].append(growth_check)
# Generate optimization recommendations
validation['optimization_recommendations'] = self.generate_optimization_recommendations(
validation['validation_checks']
)
# Calculate efficiency score
validation['efficiency_score'] = self.calculate_efficiency_score(
validation['validation_checks']
)
except Exception as e:
logging.error(f"Error validating allocation: {str(e)}")
return validation
# Usage example
def main():
config = {
'region': 'us-east-1',
'ipam_table_name': 'ip-address-management'
}
ip_manager = IntelligentIPAddressManager(config)
# Define IP strategy configuration
strategy_config = {
'master_cidr': '10.0.0.0/8',
'regions': ['us-east-1', 'us-west-2', 'eu-west-1'],
'planning_horizon_years': 5,
'monthly_growth_rate': 0.03,
'environments': {
'production': {
'type': 'production',
'base_ip_requirements': 5000,
'buffer_percentage': 1.0,
'growth_factor': 2.0,
'utilization_threshold': 0.7
},
'staging': {
'type': 'staging',
'base_ip_requirements': 1000,
'buffer_percentage': 0.5,
'growth_factor': 1.5,
'utilization_threshold': 0.8
},
'development': {
'type': 'development',
'base_ip_requirements': 500,
'buffer_percentage': 0.3,
'growth_factor': 1.2,
'utilization_threshold': 0.9
}
},
'services': {
'web_tier': {
'type': 'web',
'base_instances': 20
},
'app_tier': {
'type': 'app',
'base_instances': 50
},
'database_tier': {
'type': 'database',
'base_instances': 10
},
'container_platform': {
'type': 'container',
'base_instances': 100
}
},
'growth_projections': {
'production': 0.4,
'staging': 0.2,
'development': 0.1,
'web': 0.3,
'app': 0.4,
'database': 0.2,
'container': 0.6
}
}
# Design comprehensive IP strategy
result = ip_manager.design_comprehensive_ip_strategy(strategy_config)
print(f"IP Strategy Status: {result['status']}")
if result['status'] == 'completed':
print("IP address strategy designed successfully!")
print(f"Total IP requirements: {result['future_requirements']['total_requirements']}")
print(f"Efficiency score: {result['validation']['efficiency_score']}")
# Display warnings
warnings = result['utilization_projections']['capacity_warnings']
if warnings:
print(f"Capacity warnings: {len(warnings)}")
for warning in warnings[:3]: # Show first 3 warnings
print(f"- {warning['message']}")
else:
print(f"Strategy design failed: {result.get('error', 'Unknown error')}")
if __name__ == "__main__":
main()Example 2: Automated Subnet Management and Expansion System
View code
import boto3
import json
import logging
import ipaddress
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from dataclasses import dataclass
import threading
import time
@dataclass
class SubnetMonitoringConfig:
subnet_id: str
utilization_threshold: float
expansion_trigger: float
max_expansion_size: int
notification_topic: str
@dataclass
class SubnetExpansionPlan:
current_subnet: str
current_utilization: float
recommended_action: str
new_subnet_cidr: Optional[str]
expansion_timeline: str
estimated_cost: float
class AutomatedSubnetManager:
def __init__(self, config: Dict):
self.config = config
self.ec2 = boto3.client('ec2')
self.cloudwatch = boto3.client('cloudwatch')
self.sns = boto3.client('sns')
self.dynamodb = boto3.resource('dynamodb')
# Initialize monitoring table
self.monitoring_table = self.dynamodb.Table(
config.get('monitoring_table_name', 'subnet-monitoring')
)
# Monitoring state
self.monitoring_active = False
self.monitoring_thread = None
def start_automated_subnet_management(self, management_config: Dict) -> Dict:
"""Start automated subnet monitoring and management"""
try:
# Initialize subnet monitoring
self.initialize_subnet_monitoring(management_config)
# Start monitoring thread
self.monitoring_active = True
self.monitoring_thread = threading.Thread(
target=self.monitoring_loop,
daemon=True
)
self.monitoring_thread.start()
# Set up CloudWatch alarms
self.setup_subnet_utilization_alarms(management_config)
return {
'status': 'started',
'monitored_subnets': len(management_config.get('subnets', [])),
'monitoring_interval': management_config.get('monitoring_interval', 300),
'timestamp': datetime.utcnow().isoformat()
}
except Exception as e:
logging.error(f"Error starting subnet management: {str(e)}")
return {'status': 'failed', 'error': str(e)}
def initialize_subnet_monitoring(self, management_config: Dict):
"""Initialize monitoring for configured subnets"""
for subnet_config in management_config.get('subnets', []):
try:
# Store monitoring configuration
monitoring_item = {
'subnet_id': subnet_config['subnet_id'],
'vpc_id': subnet_config.get('vpc_id', ''),
'utilization_threshold': subnet_config.get('utilization_threshold', 0.8),
'expansion_trigger': subnet_config.get('expansion_trigger', 0.9),
'max_expansion_size': subnet_config.get('max_expansion_size', 1024),
'notification_topic': subnet_config.get('notification_topic', ''),
'last_check': int(datetime.utcnow().timestamp()),
'status': 'monitoring'
}
self.monitoring_table.put_item(Item=monitoring_item)
except Exception as e:
logging.error(f"Error initializing monitoring for subnet {subnet_config['subnet_id']}: {str(e)}")
def monitoring_loop(self):
"""Main monitoring loop for subnet utilization"""
while self.monitoring_active:
try:
# Get all monitored subnets
monitored_subnets = self.get_monitored_subnets()
# Check each subnet
for subnet_config in monitored_subnets:
self.check_subnet_utilization(subnet_config)
# Sleep until next check
monitoring_interval = self.config.get('monitoring_interval', 300)
time.sleep(monitoring_interval)
except Exception as e:
logging.error(f"Error in monitoring loop: {str(e)}")
time.sleep(60) # Wait longer on error
def get_monitored_subnets(self) -> List[Dict]:
"""Get list of monitored subnets from DynamoDB"""
try:
response = self.monitoring_table.scan(
FilterExpression='#status = :status',
ExpressionAttributeNames={'#status': 'status'},
ExpressionAttributeValues={':status': 'monitoring'}
)
return response.get('Items', [])
except Exception as e:
logging.error(f"Error getting monitored subnets: {str(e)}")
return []
def check_subnet_utilization(self, subnet_config: Dict):
"""Check utilization for a specific subnet"""
subnet_id = subnet_config['subnet_id']
try:
# Get current subnet information
subnet_info = self.get_subnet_info(subnet_id)
if not subnet_info:
return
# Calculate utilization
utilization = self.calculate_subnet_utilization(subnet_info)
# Update monitoring record
self.update_monitoring_record(subnet_id, utilization)
# Check if action is needed
utilization_threshold = float(subnet_config.get('utilization_threshold', 0.8))
expansion_trigger = float(subnet_config.get('expansion_trigger', 0.9))
if utilization >= expansion_trigger:
# Trigger subnet expansion
self.trigger_subnet_expansion(subnet_config, subnet_info, utilization)
elif utilization >= utilization_threshold:
# Send warning notification
self.send_utilization_warning(subnet_config, subnet_info, utilization)
# Send metrics to CloudWatch
self.send_utilization_metrics(subnet_id, utilization)
except Exception as e:
logging.error(f"Error checking subnet {subnet_id}: {str(e)}")
def get_subnet_info(self, subnet_id: str) -> Optional[Dict]:
"""Get detailed subnet information"""
try:
response = self.ec2.describe_subnets(SubnetIds=[subnet_id])
if response['Subnets']:
subnet = response['Subnets'][0]
return {
'subnet_id': subnet['SubnetId'],
'vpc_id': subnet['VpcId'],
'cidr_block': subnet['CidrBlock'],
'availability_zone': subnet['AvailabilityZone'],
'available_ip_count': subnet['AvailableIpAddressCount'],
'tags': subnet.get('Tags', [])
}
return None
except Exception as e:
logging.error(f"Error getting subnet info for {subnet_id}: {str(e)}")
return None
def calculate_subnet_utilization(self, subnet_info: Dict) -> float:
"""Calculate subnet IP utilization percentage"""
try:
cidr_block = subnet_info['cidr_block']
available_ips = subnet_info['available_ip_count']
# Calculate total usable IPs (subtract AWS reserved IPs)
network = ipaddress.IPv4Network(cidr_block)
total_ips = network.num_addresses - 5 # AWS reserves 5 IPs
# Calculate used IPs
used_ips = total_ips - available_ips
# Calculate utilization percentage
utilization = (used_ips / total_ips) * 100 if total_ips > 0 else 0
return utilization
except Exception as e:
logging.error(f"Error calculating utilization: {str(e)}")
return 0.0
def trigger_subnet_expansion(self, subnet_config: Dict, subnet_info: Dict, utilization: float):
"""Trigger automated subnet expansion"""
subnet_id = subnet_config['subnet_id']
try:
logging.warning(f"Triggering expansion for subnet {subnet_id} (utilization: {utilization:.1f}%)")
# Create expansion plan
expansion_plan = self.create_subnet_expansion_plan(subnet_config, subnet_info, utilization)
# Execute expansion if auto-expansion is enabled
if subnet_config.get('auto_expansion_enabled', False):
expansion_result = self.execute_subnet_expansion(expansion_plan)
# Send success notification
self.send_expansion_notification(subnet_config, expansion_plan, expansion_result)
else:
# Send expansion recommendation
self.send_expansion_recommendation(subnet_config, expansion_plan)
except Exception as e:
logging.error(f"Error triggering expansion for subnet {subnet_id}: {str(e)}")
def create_subnet_expansion_plan(self, subnet_config: Dict, subnet_info: Dict,
utilization: float) -> SubnetExpansionPlan:
"""Create a plan for subnet expansion"""
try:
current_cidr = subnet_info['cidr_block']
vpc_id = subnet_info['vpc_id']
# Analyze VPC for available address space
available_space = self.analyze_vpc_address_space(vpc_id)
# Determine expansion strategy
expansion_strategy = self.determine_expansion_strategy(
current_cidr, available_space, subnet_config
)
# Calculate estimated cost
estimated_cost = self.estimate_expansion_cost(expansion_strategy)
return SubnetExpansionPlan(
current_subnet=current_cidr,
current_utilization=utilization,
recommended_action=expansion_strategy['action'],
new_subnet_cidr=expansion_strategy.get('new_cidr'),
expansion_timeline=expansion_strategy.get('timeline', 'immediate'),
estimated_cost=estimated_cost
)
except Exception as e:
logging.error(f"Error creating expansion plan: {str(e)}")
return SubnetExpansionPlan(
current_subnet=subnet_info['cidr_block'],
current_utilization=utilization,
recommended_action='manual_review_required',
new_subnet_cidr=None,
expansion_timeline='unknown',
estimated_cost=0.0
)
def analyze_vpc_address_space(self, vpc_id: str) -> Dict:
"""Analyze available address space in VPC"""
try:
# Get VPC information
vpc_response = self.ec2.describe_vpcs(VpcIds=[vpc_id])
if not vpc_response['Vpcs']:
return {'error': 'VPC not found'}
vpc_cidr = vpc_response['Vpcs'][0]['CidrBlock']
vpc_network = ipaddress.IPv4Network(vpc_cidr)
# Get all subnets in VPC
subnets_response = self.ec2.describe_subnets(
Filters=[{'Name': 'vpc-id', 'Values': [vpc_id]}]
)
# Calculate used address space
used_networks = []
for subnet in subnets_response['Subnets']:
used_networks.append(ipaddress.IPv4Network(subnet['CidrBlock']))
# Find available address space
available_space = self.find_available_address_space(vpc_network, used_networks)
return {
'vpc_cidr': vpc_cidr,
'total_ips': vpc_network.num_addresses,
'used_networks': [str(net) for net in used_networks],
'available_space': available_space
}
except Exception as e:
logging.error(f"Error analyzing VPC address space: {str(e)}")
return {'error': str(e)}
def find_available_address_space(self, vpc_network: ipaddress.IPv4Network,
used_networks: List[ipaddress.IPv4Network]) -> List[str]:
"""Find available address space in VPC"""
try:
# Sort used networks by network address
used_networks.sort(key=lambda x: x.network_address)
available_spaces = []
current_address = vpc_network.network_address
for used_network in used_networks:
# Check if there's space before this used network
if current_address < used_network.network_address:
# Calculate available space
available_size = int(used_network.network_address) - int(current_address)
if available_size >= 256: # At least /24
# Find the largest possible subnet
available_prefix = 32 - (available_size - 1).bit_length()
available_cidr = f"{current_address}/{available_prefix}"
available_spaces.append(available_cidr)
# Move current address past this used network
current_address = used_network.broadcast_address + 1
# Check for space after the last used network
if current_address <= vpc_network.broadcast_address:
remaining_size = int(vpc_network.broadcast_address) - int(current_address) + 1
if remaining_size >= 256: # At least /24
available_prefix = 32 - (remaining_size - 1).bit_length()
available_cidr = f"{current_address}/{available_prefix}"
available_spaces.append(available_cidr)
return available_spaces
except Exception as e:
logging.error(f"Error finding available address space: {str(e)}")
return []
def determine_expansion_strategy(self, current_cidr: str, available_space: Dict,
subnet_config: Dict) -> Dict:
"""Determine the best expansion strategy"""
try:
current_network = ipaddress.IPv4Network(current_cidr)
max_expansion_size = subnet_config.get('max_expansion_size', 1024)
# Strategy 1: Create additional subnet in same AZ
if available_space.get('available_space'):
for available_cidr in available_space['available_space']:
available_network = ipaddress.IPv4Network(available_cidr)
# Check if this space can accommodate our expansion needs
if available_network.num_addresses >= max_expansion_size:
# Calculate optimal subnet size
optimal_size = min(max_expansion_size, available_network.num_addresses // 2)
optimal_prefix = 32 - (optimal_size - 1).bit_length()
new_subnet_cidr = f"{available_network.network_address}/{optimal_prefix}"
return {
'action': 'create_additional_subnet',
'new_cidr': new_subnet_cidr,
'timeline': 'immediate',
'method': 'additional_subnet'
}
# Strategy 2: Recommend VPC expansion
return {
'action': 'expand_vpc_cidr',
'timeline': 'manual_intervention_required',
'method': 'vpc_expansion',
'reason': 'Insufficient address space in current VPC'
}
except Exception as e:
logging.error(f"Error determining expansion strategy: {str(e)}")
return {
'action': 'manual_review_required',
'timeline': 'unknown',
'method': 'manual',
'error': str(e)
}
def execute_subnet_expansion(self, expansion_plan: SubnetExpansionPlan) -> Dict:
"""Execute the subnet expansion plan"""
try:
if expansion_plan.recommended_action == 'create_additional_subnet':
return self.create_additional_subnet(expansion_plan)
elif expansion_plan.recommended_action == 'expand_vpc_cidr':
return self.request_vpc_expansion(expansion_plan)
else:
return {
'status': 'skipped',
'reason': 'Manual intervention required',
'action': expansion_plan.recommended_action
}
except Exception as e:
logging.error(f"Error executing expansion: {str(e)}")
return {'status': 'failed', 'error': str(e)}
def create_additional_subnet(self, expansion_plan: SubnetExpansionPlan) -> Dict:
"""Create an additional subnet for expansion"""
try:
# This would create a new subnet in the same VPC
# For demonstration, we'll return a success response
logging.info(f"Creating additional subnet: {expansion_plan.new_subnet_cidr}")
# In practice, you would:
# 1. Create the new subnet
# 2. Configure route tables
# 3. Update security groups
# 4. Configure load balancer targets
# 5. Update auto-scaling groups
return {
'status': 'success',
'action': 'additional_subnet_created',
'new_subnet_cidr': expansion_plan.new_subnet_cidr,
'estimated_completion': (datetime.utcnow() + timedelta(minutes=30)).isoformat()
}
except Exception as e:
logging.error(f"Error creating additional subnet: {str(e)}")
return {'status': 'failed', 'error': str(e)}
def send_utilization_warning(self, subnet_config: Dict, subnet_info: Dict, utilization: float):
"""Send warning notification for high utilization"""
try:
notification_topic = subnet_config.get('notification_topic')
if not notification_topic:
return
message = {
'alert_type': 'subnet_utilization_warning',
'subnet_id': subnet_info['subnet_id'],
'vpc_id': subnet_info['vpc_id'],
'cidr_block': subnet_info['cidr_block'],
'availability_zone': subnet_info['availability_zone'],
'utilization_percentage': utilization,
'threshold': subnet_config.get('utilization_threshold', 0.8) * 100,
'available_ips': subnet_info['available_ip_count'],
'timestamp': datetime.utcnow().isoformat(),
'message': f'Subnet {subnet_info["subnet_id"]} utilization is {utilization:.1f}%'
}
self.sns.publish(
TopicArn=notification_topic,
Subject=f'Subnet Utilization Warning: {subnet_info["subnet_id"]}',
Message=json.dumps(message, indent=2)
)
except Exception as e:
logging.error(f"Error sending utilization warning: {str(e)}")
def send_expansion_notification(self, subnet_config: Dict, expansion_plan: SubnetExpansionPlan,
expansion_result: Dict):
"""Send notification about subnet expansion"""
try:
notification_topic = subnet_config.get('notification_topic')
if not notification_topic:
return
message = {
'alert_type': 'subnet_expansion_executed',
'expansion_plan': {
'current_subnet': expansion_plan.current_subnet,
'current_utilization': expansion_plan.current_utilization,
'recommended_action': expansion_plan.recommended_action,
'new_subnet_cidr': expansion_plan.new_subnet_cidr,
'estimated_cost': expansion_plan.estimated_cost
},
'execution_result': expansion_result,
'timestamp': datetime.utcnow().isoformat()
}
self.sns.publish(
TopicArn=notification_topic,
Subject='Subnet Expansion Executed',
Message=json.dumps(message, indent=2)
)
except Exception as e:
logging.error(f"Error sending expansion notification: {str(e)}")
def setup_subnet_utilization_alarms(self, management_config: Dict):
"""Set up CloudWatch alarms for subnet utilization"""
try:
for subnet_config in management_config.get('subnets', []):
subnet_id = subnet_config['subnet_id']
threshold = subnet_config.get('utilization_threshold', 0.8) * 100
# Create CloudWatch alarm
self.cloudwatch.put_metric_alarm(
AlarmName=f'subnet-utilization-{subnet_id}',
ComparisonOperator='GreaterThanThreshold',
EvaluationPeriods=2,
MetricName='SubnetUtilization',
Namespace='Custom/Networking',
Period=300,
Statistic='Average',
Threshold=threshold,
ActionsEnabled=True,
AlarmActions=[
subnet_config.get('notification_topic', '')
] if subnet_config.get('notification_topic') else [],
AlarmDescription=f'Subnet utilization alarm for {subnet_id}',
Dimensions=[
{
'Name': 'SubnetId',
'Value': subnet_id
}
],
Unit='Percent'
)
except Exception as e:
logging.error(f"Error setting up utilization alarms: {str(e)}")
def send_utilization_metrics(self, subnet_id: str, utilization: float):
"""Send utilization metrics to CloudWatch"""
try:
self.cloudwatch.put_metric_data(
Namespace='Custom/Networking',
MetricData=[
{
'MetricName': 'SubnetUtilization',
'Dimensions': [
{
'Name': 'SubnetId',
'Value': subnet_id
}
],
'Value': utilization,
'Unit': 'Percent',
'Timestamp': datetime.utcnow()
}
]
)
except Exception as e:
logging.error(f"Error sending utilization metrics: {str(e)}")
# Usage example
def main():
config = {
'region': 'us-east-1',
'monitoring_table_name': 'subnet-monitoring',
'monitoring_interval': 300
}
subnet_manager = AutomatedSubnetManager(config)
# Define subnet management configuration
management_config = {
'monitoring_interval': 300, # 5 minutes
'subnets': [
{
'subnet_id': 'subnet-1234567890abcdef0',
'vpc_id': 'vpc-1234567890abcdef0',
'utilization_threshold': 0.8,
'expansion_trigger': 0.9,
'max_expansion_size': 2048,
'auto_expansion_enabled': True,
'notification_topic': 'arn:aws:sns:us-east-1:123456789012:subnet-alerts'
},
{
'subnet_id': 'subnet-0987654321fedcba1',
'vpc_id': 'vpc-1234567890abcdef0',
'utilization_threshold': 0.75,
'expansion_trigger': 0.85,
'max_expansion_size': 1024,
'auto_expansion_enabled': False,
'notification_topic': 'arn:aws:sns:us-east-1:123456789012:subnet-alerts'
}
]
}
# Start automated subnet management
result = subnet_manager.start_automated_subnet_management(management_config)
print(f"Subnet Management Status: {result['status']}")
if result['status'] == 'started':
print("Automated subnet management started successfully!")
print(f"- Monitored subnets: {result['monitored_subnets']}")
print(f"- Monitoring interval: {result['monitoring_interval']} seconds")
# Keep monitoring running
try:
while True:
time.sleep(60)
except KeyboardInterrupt:
print("Stopping subnet management...")
subnet_manager.monitoring_active = False
else:
print(f"Failed to start management: {result.get('error', 'Unknown error')}")
if __name__ == "__main__":
main()Example 3: CloudFormation Template for Scalable Multi-AZ Subnet Architecture
View code
AWSTemplateFormatVersion: '2010-09-09'
Description: 'Scalable multi-AZ subnet architecture with expansion capabilities'
Parameters:
Environment:
Type: String
Description: Environment name
Default: production
AllowedValues: [development, staging, production]
VpcCidr:
Type: String
Description: CIDR block for the VPC
Default: 10.0.0.0/16
AllowedPattern: ^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])(\/(1[6-9]|2[0-8]))$
PublicSubnetSize:
Type: Number
Description: Subnet size for public subnets (CIDR suffix)
Default: 24
MinValue: 20
MaxValue: 28
PrivateSubnetSize:
Type: Number
Description: Subnet size for private subnets (CIDR suffix)
Default: 22
MinValue: 20
MaxValue: 28
DatabaseSubnetSize:
Type: Number
Description: Subnet size for database subnets (CIDR suffix)
Default: 24
MinValue: 20
MaxValue: 28
ContainerSubnetSize:
Type: Number
Description: Subnet size for container subnets (CIDR suffix)
Default: 20
MinValue: 18
MaxValue: 24
EnableExpansionReserve:
Type: String
Description: Reserve address space for future expansion
Default: 'true'
AllowedValues: ['true', 'false']
ExpansionReservePercentage:
Type: Number
Description: Percentage of VPC CIDR to reserve for expansion
Default: 25
MinValue: 10
MaxValue: 50
Conditions:
CreateExpansionReserve: !Equals [!Ref EnableExpansionReserve, 'true']
Resources:
# VPC
ScalableVPC:
Type: AWS::EC2::VPC
Properties:
CidrBlock: !Ref VpcCidr
EnableDnsHostnames: true
EnableDnsSupport: true
Tags:
- Key: Name
Value: !Sub '${Environment}-scalable-vpc'
- Key: Environment
Value: !Ref Environment
- Key: Purpose
Value: ScalableNetworking
# Internet Gateway
InternetGateway:
Type: AWS::EC2::InternetGateway
Properties:
Tags:
- Key: Name
Value: !Sub '${Environment}-scalable-igw'
- Key: Environment
Value: !Ref Environment
InternetGatewayAttachment:
Type: AWS::EC2::VPCGatewayAttachment
Properties:
InternetGatewayId: !Ref InternetGateway
VpcId: !Ref ScalableVPC
# Public Subnets (Multi-AZ)
PublicSubnet1:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref ScalableVPC
AvailabilityZone: !Select [0, !GetAZs '']
CidrBlock: !Select [0, !Cidr [!Ref VpcCidr, 32, !Ref PublicSubnetSize]]
MapPublicIpOnLaunch: true
Tags:
- Key: Name
Value: !Sub '${Environment}-public-subnet-1'
- Key: Environment
Value: !Ref Environment
- Key: Type
Value: Public
- Key: Tier
Value: Web
PublicSubnet2:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref ScalableVPC
AvailabilityZone: !Select [1, !GetAZs '']
CidrBlock: !Select [1, !Cidr [!Ref VpcCidr, 32, !Ref PublicSubnetSize]]
MapPublicIpOnLaunch: true
Tags:
- Key: Name
Value: !Sub '${Environment}-public-subnet-2'
- Key: Environment
Value: !Ref Environment
- Key: Type
Value: Public
- Key: Tier
Value: Web
PublicSubnet3:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref ScalableVPC
AvailabilityZone: !Select [2, !GetAZs '']
CidrBlock: !Select [2, !Cidr [!Ref VpcCidr, 32, !Ref PublicSubnetSize]]
MapPublicIpOnLaunch: true
Tags:
- Key: Name
Value: !Sub '${Environment}-public-subnet-3'
- Key: Environment
Value: !Ref Environment
- Key: Type
Value: Public
- Key: Tier
Value: Web
# Private Subnets (Multi-AZ) - Larger for application workloads
PrivateSubnet1:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref ScalableVPC
AvailabilityZone: !Select [0, !GetAZs '']
CidrBlock: !Select [3, !Cidr [!Ref VpcCidr, 32, !Ref PrivateSubnetSize]]
Tags:
- Key: Name
Value: !Sub '${Environment}-private-subnet-1'
- Key: Environment
Value: !Ref Environment
- Key: Type
Value: Private
- Key: Tier
Value: Application
PrivateSubnet2:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref ScalableVPC
AvailabilityZone: !Select [1, !GetAZs '']
CidrBlock: !Select [4, !Cidr [!Ref VpcCidr, 32, !Ref PrivateSubnetSize]]
Tags:
- Key: Name
Value: !Sub '${Environment}-private-subnet-2'
- Key: Environment
Value: !Ref Environment
- Key: Type
Value: Private
- Key: Tier
Value: Application
PrivateSubnet3:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref ScalableVPC
AvailabilityZone: !Select [2, !GetAZs '']
CidrBlock: !Select [5, !Cidr [!Ref VpcCidr, 32, !Ref PrivateSubnetSize]]
Tags:
- Key: Name
Value: !Sub '${Environment}-private-subnet-3'
- Key: Environment
Value: !Ref Environment
- Key: Type
Value: Private
- Key: Tier
Value: Application
# Database Subnets (Multi-AZ)
DatabaseSubnet1:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref ScalableVPC
AvailabilityZone: !Select [0, !GetAZs '']
CidrBlock: !Select [6, !Cidr [!Ref VpcCidr, 32, !Ref DatabaseSubnetSize]]
Tags:
- Key: Name
Value: !Sub '${Environment}-database-subnet-1'
- Key: Environment
Value: !Ref Environment
- Key: Type
Value: Database
- Key: Tier
Value: Data
DatabaseSubnet2:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref ScalableVPC
AvailabilityZone: !Select [1, !GetAZs '']
CidrBlock: !Select [7, !Cidr [!Ref VpcCidr, 32, !Ref DatabaseSubnetSize]]
Tags:
- Key: Name
Value: !Sub '${Environment}-database-subnet-2'
- Key: Environment
Value: !Ref Environment
- Key: Type
Value: Database
- Key: Tier
Value: Data
DatabaseSubnet3:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref ScalableVPC
AvailabilityZone: !Select [2, !GetAZs '']
CidrBlock: !Select [8, !Cidr [!Ref VpcCidr, 32, !Ref DatabaseSubnetSize]]
Tags:
- Key: Name
Value: !Sub '${Environment}-database-subnet-3'
- Key: Environment
Value: !Ref Environment
- Key: Type
Value: Database
- Key: Tier
Value: Data
# Container/EKS Subnets (Multi-AZ) - Larger for pod networking
ContainerSubnet1:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref ScalableVPC
AvailabilityZone: !Select [0, !GetAZs '']
CidrBlock: !Select [9, !Cidr [!Ref VpcCidr, 32, !Ref ContainerSubnetSize]]
Tags:
- Key: Name
Value: !Sub '${Environment}-container-subnet-1'
- Key: Environment
Value: !Ref Environment
- Key: Type
Value: Container
- Key: Tier
Value: Container
- Key: kubernetes.io/role/elb
Value: '1'
ContainerSubnet2:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref ScalableVPC
AvailabilityZone: !Select [1, !GetAZs '']
CidrBlock: !Select [10, !Cidr [!Ref VpcCidr, 32, !Ref ContainerSubnetSize]]
Tags:
- Key: Name
Value: !Sub '${Environment}-container-subnet-2'
- Key: Environment
Value: !Ref Environment
- Key: Type
Value: Container
- Key: Tier
Value: Container
- Key: kubernetes.io/role/elb
Value: '1'
ContainerSubnet3:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref ScalableVPC
AvailabilityZone: !Select [2, !GetAZs '']
CidrBlock: !Select [11, !Cidr [!Ref VpcCidr, 32, !Ref ContainerSubnetSize]]
Tags:
- Key: Name
Value: !Sub '${Environment}-container-subnet-3'
- Key: Environment
Value: !Ref Environment
- Key: Type
Value: Container
- Key: Tier
Value: Container
- Key: kubernetes.io/role/elb
Value: '1'
# Reserved Subnets for Future Expansion
ReservedSubnet1:
Type: AWS::EC2::Subnet
Condition: CreateExpansionReserve
Properties:
VpcId: !Ref ScalableVPC
AvailabilityZone: !Select [0, !GetAZs '']
CidrBlock: !Select [12, !Cidr [!Ref VpcCidr, 32, 22]]
Tags:
- Key: Name
Value: !Sub '${Environment}-reserved-subnet-1'
- Key: Environment
Value: !Ref Environment
- Key: Type
Value: Reserved
- Key: Purpose
Value: FutureExpansion
ReservedSubnet2:
Type: AWS::EC2::Subnet
Condition: CreateExpansionReserve
Properties:
VpcId: !Ref ScalableVPC
AvailabilityZone: !Select [1, !GetAZs '']
CidrBlock: !Select [13, !Cidr [!Ref VpcCidr, 32, 22]]
Tags:
- Key: Name
Value: !Sub '${Environment}-reserved-subnet-2'
- Key: Environment
Value: !Ref Environment
- Key: Type
Value: Reserved
- Key: Purpose
Value: FutureExpansion
ReservedSubnet3:
Type: AWS::EC2::Subnet
Condition: CreateExpansionReserve
Properties:
VpcId: !Ref ScalableVPC
AvailabilityZone: !Select [2, !GetAZs '']
CidrBlock: !Select [14, !Cidr [!Ref VpcCidr, 32, 22]]
Tags:
- Key: Name
Value: !Sub '${Environment}-reserved-subnet-3'
- Key: Environment
Value: !Ref Environment
- Key: Type
Value: Reserved
- Key: Purpose
Value: FutureExpansion
# NAT Gateways for high availability
NatGateway1EIP:
Type: AWS::EC2::EIP
DependsOn: InternetGatewayAttachment
Properties:
Domain: vpc
Tags:
- Key: Name
Value: !Sub '${Environment}-nat-eip-1'
NatGateway2EIP:
Type: AWS::EC2::EIP
DependsOn: InternetGatewayAttachment
Properties:
Domain: vpc
Tags:
- Key: Name
Value: !Sub '${Environment}-nat-eip-2'
NatGateway3EIP:
Type: AWS::EC2::EIP
DependsOn: InternetGatewayAttachment
Properties:
Domain: vpc
Tags:
- Key: Name
Value: !Sub '${Environment}-nat-eip-3'
NatGateway1:
Type: AWS::EC2::NatGateway
Properties:
AllocationId: !GetAtt NatGateway1EIP.AllocationId
SubnetId: !Ref PublicSubnet1
Tags:
- Key: Name
Value: !Sub '${Environment}-nat-gateway-1'
NatGateway2:
Type: AWS::EC2::NatGateway
Properties:
AllocationId: !GetAtt NatGateway2EIP.AllocationId
SubnetId: !Ref PublicSubnet2
Tags:
- Key: Name
Value: !Sub '${Environment}-nat-gateway-2'
NatGateway3:
Type: AWS::EC2::NatGateway
Properties:
AllocationId: !GetAtt NatGateway3EIP.AllocationId
SubnetId: !Ref PublicSubnet3
Tags:
- Key: Name
Value: !Sub '${Environment}-nat-gateway-3'
# Route Tables
PublicRouteTable:
Type: AWS::EC2::RouteTable
Properties:
VpcId: !Ref ScalableVPC
Tags:
- Key: Name
Value: !Sub '${Environment}-public-routes'
- Key: Environment
Value: !Ref Environment
DefaultPublicRoute:
Type: AWS::EC2::Route
DependsOn: InternetGatewayAttachment
Properties:
RouteTableId: !Ref PublicRouteTable
DestinationCidrBlock: 0.0.0.0/0
GatewayId: !Ref InternetGateway
# Private Route Tables (one per AZ for high availability)
PrivateRouteTable1:
Type: AWS::EC2::RouteTable
Properties:
VpcId: !Ref ScalableVPC
Tags:
- Key: Name
Value: !Sub '${Environment}-private-routes-1'
- Key: Environment
Value: !Ref Environment
DefaultPrivateRoute1:
Type: AWS::EC2::Route
Properties:
RouteTableId: !Ref PrivateRouteTable1
DestinationCidrBlock: 0.0.0.0/0
NatGatewayId: !Ref NatGateway1
PrivateRouteTable2:
Type: AWS::EC2::RouteTable
Properties:
VpcId: !Ref ScalableVPC
Tags:
- Key: Name
Value: !Sub '${Environment}-private-routes-2'
- Key: Environment
Value: !Ref Environment
DefaultPrivateRoute2:
Type: AWS::EC2::Route
Properties:
RouteTableId: !Ref PrivateRouteTable2
DestinationCidrBlock: 0.0.0.0/0
NatGatewayId: !Ref NatGateway2
PrivateRouteTable3:
Type: AWS::EC2::RouteTable
Properties:
VpcId: !Ref ScalableVPC
Tags:
- Key: Name
Value: !Sub '${Environment}-private-routes-3'
- Key: Environment
Value: !Ref Environment
DefaultPrivateRoute3:
Type: AWS::EC2::Route
Properties:
RouteTableId: !Ref PrivateRouteTable3
DestinationCidrBlock: 0.0.0.0/0
NatGatewayId: !Ref NatGateway3
# Database Route Tables (isolated)
DatabaseRouteTable1:
Type: AWS::EC2::RouteTable
Properties:
VpcId: !Ref ScalableVPC
Tags:
- Key: Name
Value: !Sub '${Environment}-database-routes-1'
- Key: Environment
Value: !Ref Environment
DatabaseRouteTable2:
Type: AWS::EC2::RouteTable
Properties:
VpcId: !Ref ScalableVPC
Tags:
- Key: Name
Value: !Sub '${Environment}-database-routes-2'
- Key: Environment
Value: !Ref Environment
DatabaseRouteTable3:
Type: AWS::EC2::RouteTable
Properties:
VpcId: !Ref ScalableVPC
Tags:
- Key: Name
Value: !Sub '${Environment}-database-routes-3'
- Key: Environment
Value: !Ref Environment
# Subnet Route Table Associations
PublicSubnet1RouteTableAssociation:
Type: AWS::EC2::SubnetRouteTableAssociation
Properties:
RouteTableId: !Ref PublicRouteTable
SubnetId: !Ref PublicSubnet1
PublicSubnet2RouteTableAssociation:
Type: AWS::EC2::SubnetRouteTableAssociation
Properties:
RouteTableId: !Ref PublicRouteTable
SubnetId: !Ref PublicSubnet2
PublicSubnet3RouteTableAssociation:
Type: AWS::EC2::SubnetRouteTableAssociation
Properties:
RouteTableId: !Ref PublicRouteTable
SubnetId: !Ref PublicSubnet3
PrivateSubnet1RouteTableAssociation:
Type: AWS::EC2::SubnetRouteTableAssociation
Properties:
RouteTableId: !Ref PrivateRouteTable1
SubnetId: !Ref PrivateSubnet1
PrivateSubnet2RouteTableAssociation:
Type: AWS::EC2::SubnetRouteTableAssociation
Properties:
RouteTableId: !Ref PrivateRouteTable2
SubnetId: !Ref PrivateSubnet2
PrivateSubnet3RouteTableAssociation:
Type: AWS::EC2::SubnetRouteTableAssociation
Properties:
RouteTableId: !Ref PrivateRouteTable3
SubnetId: !Ref PrivateSubnet3
DatabaseSubnet1RouteTableAssociation:
Type: AWS::EC2::SubnetRouteTableAssociation
Properties:
RouteTableId: !Ref DatabaseRouteTable1
SubnetId: !Ref DatabaseSubnet1
DatabaseSubnet2RouteTableAssociation:
Type: AWS::EC2::SubnetRouteTableAssociation
Properties:
RouteTableId: !Ref DatabaseRouteTable2
SubnetId: !Ref DatabaseSubnet2
DatabaseSubnet3RouteTableAssociation:
Type: AWS::EC2::SubnetRouteTableAssociation
Properties:
RouteTableId: !Ref DatabaseRouteTable3
SubnetId: !Ref DatabaseSubnet3
ContainerSubnet1RouteTableAssociation:
Type: AWS::EC2::SubnetRouteTableAssociation
Properties:
RouteTableId: !Ref PrivateRouteTable1
SubnetId: !Ref ContainerSubnet1
ContainerSubnet2RouteTableAssociation:
Type: AWS::EC2::SubnetRouteTableAssociation
Properties:
RouteTableId: !Ref PrivateRouteTable2
SubnetId: !Ref ContainerSubnet2
ContainerSubnet3RouteTableAssociation:
Type: AWS::EC2::SubnetRouteTableAssociation
Properties:
RouteTableId: !Ref PrivateRouteTable3
SubnetId: !Ref ContainerSubnet3
# VPC Endpoints for AWS Services (to reduce NAT Gateway costs)
S3VPCEndpoint:
Type: AWS::EC2::VPCEndpoint
Properties:
VpcId: !Ref ScalableVPC
ServiceName: !Sub 'com.amazonaws.${AWS::Region}.s3'
VpcEndpointType: Gateway
RouteTableIds:
- !Ref PrivateRouteTable1
- !Ref PrivateRouteTable2
- !Ref PrivateRouteTable3
- !Ref DatabaseRouteTable1
- !Ref DatabaseRouteTable2
- !Ref DatabaseRouteTable3
DynamoDBVPCEndpoint:
Type: AWS::EC2::VPCEndpoint
Properties:
VpcId: !Ref ScalableVPC
ServiceName: !Sub 'com.amazonaws.${AWS::Region}.dynamodb'
VpcEndpointType: Gateway
RouteTableIds:
- !Ref PrivateRouteTable1
- !Ref PrivateRouteTable2
- !Ref PrivateRouteTable3
# Security Groups
WebTierSecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupName: !Sub '${Environment}-web-tier-sg'
GroupDescription: Security group for web tier
VpcId: !Ref ScalableVPC
SecurityGroupIngress:
- IpProtocol: tcp
FromPort: 80
ToPort: 80
CidrIp: 0.0.0.0/0
Description: HTTP from anywhere
- IpProtocol: tcp
FromPort: 443
ToPort: 443
CidrIp: 0.0.0.0/0
Description: HTTPS from anywhere
Tags:
- Key: Name
Value: !Sub '${Environment}-web-tier-sg'
- Key: Environment
Value: !Ref Environment
AppTierSecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupName: !Sub '${Environment}-app-tier-sg'
GroupDescription: Security group for application tier
VpcId: !Ref ScalableVPC
SecurityGroupIngress:
- IpProtocol: tcp
FromPort: 8080
ToPort: 8080
SourceSecurityGroupId: !Ref WebTierSecurityGroup
Description: Application port from web tier
Tags:
- Key: Name
Value: !Sub '${Environment}-app-tier-sg'
- Key: Environment
Value: !Ref Environment
DatabaseTierSecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupName: !Sub '${Environment}-database-tier-sg'
GroupDescription: Security group for database tier
VpcId: !Ref ScalableVPC
SecurityGroupIngress:
- IpProtocol: tcp
FromPort: 3306
ToPort: 3306
SourceSecurityGroupId: !Ref AppTierSecurityGroup
Description: MySQL from application tier
- IpProtocol: tcp
FromPort: 5432
ToPort: 5432
SourceSecurityGroupId: !Ref AppTierSecurityGroup
Description: PostgreSQL from application tier
Tags:
- Key: Name
Value: !Sub '${Environment}-database-tier-sg'
- Key: Environment
Value: !Ref Environment
ContainerSecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupName: !Sub '${Environment}-container-sg'
GroupDescription: Security group for container workloads
VpcId: !Ref ScalableVPC
SecurityGroupIngress:
- IpProtocol: -1
SourceSecurityGroupId: !Ref ContainerSecurityGroup
Description: All traffic from same security group
Tags:
- Key: Name
Value: !Sub '${Environment}-container-sg'
- Key: Environment
Value: !Ref Environment
# DB Subnet Group
DatabaseSubnetGroup:
Type: AWS::RDS::DBSubnetGroup
Properties:
DBSubnetGroupName: !Sub '${Environment}-database-subnet-group'
DBSubnetGroupDescription: Subnet group for RDS databases
SubnetIds:
- !Ref DatabaseSubnet1
- !Ref DatabaseSubnet2
- !Ref DatabaseSubnet3
Tags:
- Key: Name
Value: !Sub '${Environment}-database-subnet-group'
- Key: Environment
Value: !Ref Environment
Outputs:
VPCId:
Description: VPC ID
Value: !Ref ScalableVPC
Export:
Name: !Sub '${Environment}-scalable-vpc-id'
VPCCidr:
Description: VPC CIDR block
Value: !Ref VpcCidr
Export:
Name: !Sub '${Environment}-vpc-cidr'
PublicSubnets:
Description: Public subnet IDs
Value: !Join
- ','
- - !Ref PublicSubnet1
- !Ref PublicSubnet2
- !Ref PublicSubnet3
Export:
Name: !Sub '${Environment}-public-subnets'
PrivateSubnets:
Description: Private subnet IDs
Value: !Join
- ','
- - !Ref PrivateSubnet1
- !Ref PrivateSubnet2
- !Ref PrivateSubnet3
Export:
Name: !Sub '${Environment}-private-subnets'
DatabaseSubnets:
Description: Database subnet IDs
Value: !Join
- ','
- - !Ref DatabaseSubnet1
- !Ref DatabaseSubnet2
- !Ref DatabaseSubnet3
Export:
Name: !Sub '${Environment}-database-subnets'
ContainerSubnets:
Description: Container subnet IDs
Value: !Join
- ','
- - !Ref ContainerSubnet1
- !Ref ContainerSubnet2
- !Ref ContainerSubnet3
Export:
Name: !Sub '${Environment}-container-subnets'
ReservedSubnets:
Condition: CreateExpansionReserve
Description: Reserved subnet IDs for future expansion
Value: !Join
- ','
- - !Ref ReservedSubnet1
- !Ref ReservedSubnet2
- !Ref ReservedSubnet3
Export:
Name: !Sub '${Environment}-reserved-subnets'
DatabaseSubnetGroup:
Description: Database subnet group name
Value: !Ref DatabaseSubnetGroup
Export:
Name: !Sub '${Environment}-database-subnet-group'
WebTierSecurityGroup:
Description: Web tier security group ID
Value: !Ref WebTierSecurityGroup
Export:
Name: !Sub '${Environment}-web-tier-sg'
AppTierSecurityGroup:
Description: Application tier security group ID
Value: !Ref AppTierSecurityGroup
Export:
Name: !Sub '${Environment}-app-tier-sg'
DatabaseTierSecurityGroup:
Description: Database tier security group ID
Value: !Ref DatabaseTierSecurityGroup
Export:
Name: !Sub '${Environment}-database-tier-sg'
ContainerSecurityGroup:
Description: Container security group ID
Value: !Ref ContainerSecurityGroup
Export:
Name: !Sub '${Environment}-container-sg'
AvailabilityZones:
Description: Availability zones used
Value: !Join
- ','
- - !Select [0, !GetAZs '']
- !Select [1, !GetAZs '']
- !Select [2, !GetAZs '']
Export:
Name: !Sub '${Environment}-availability-zones'Example 4: IP Address Management and Utilization Analysis Tool
View code
#!/bin/bash
# IP Address Management and Utilization Analysis Tool
# Comprehensive analysis and management of IP address allocation and utilization
set -euo pipefail
# Configuration
CONFIG_FILE="${CONFIG_FILE:-./ipam-config.json}"
LOG_FILE="${LOG_FILE:-./ipam-analysis.log}"
RESULTS_DIR="${RESULTS_DIR:-./ipam-results}"
TEMP_DIR="${TEMP_DIR:-/tmp/ipam-analysis}"
# Create directories
mkdir -p "$RESULTS_DIR" "$TEMP_DIR"
# Logging function
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*" | tee -a "$LOG_FILE"
}
# Load configuration
if [[ ! -f "$CONFIG_FILE" ]]; then
log "ERROR: Configuration file $CONFIG_FILE not found"
exit 1
fi
# Parse configuration
REGIONS=($(jq -r '.regions[]' "$CONFIG_FILE"))
ENVIRONMENTS=($(jq -r '.environments[]' "$CONFIG_FILE"))
UTILIZATION_THRESHOLD=$(jq -r '.utilization_threshold // 80' "$CONFIG_FILE")
EXPANSION_THRESHOLD=$(jq -r '.expansion_threshold // 90' "$CONFIG_FILE")
log "Starting IP address management analysis"
log "Regions: ${#REGIONS[@]}"
log "Environments: ${#ENVIRONMENTS[@]}"
log "Utilization threshold: ${UTILIZATION_THRESHOLD}%"
# Function to analyze VPC IP utilization
analyze_vpc_utilization() {
local region="$1"
local analysis_id="vpc_analysis_$(date +%s)"
local results_file="$RESULTS_DIR/vpc_utilization_${region}_${analysis_id}.json"
log "Analyzing VPC utilization in region: $region"
# Initialize results
cat > "$results_file" << EOF
{
"analysis_id": "$analysis_id",
"region": "$region",
"timestamp": "$(date -u +%Y-%m-%dT%H:%M:%SZ)",
"vpcs": [],
"summary": {
"total_vpcs": 0,
"total_allocated_ips": 0,
"total_used_ips": 0,
"overall_utilization": 0,
"high_utilization_vpcs": 0
}
}
EOF
# Get all VPCs in the region
local vpcs=$(aws ec2 describe-vpcs \
--region "$region" \
--query 'Vpcs[*].{VpcId:VpcId,CidrBlock:CidrBlock,Tags:Tags}' \
--output json 2>/dev/null || echo '[]')
local total_allocated=0
local total_used=0
local high_util_count=0
# Analyze each VPC
echo "$vpcs" | jq -c '.[]' | while read -r vpc; do
local vpc_id=$(echo "$vpc" | jq -r '.VpcId')
local vpc_cidr=$(echo "$vpc" | jq -r '.CidrBlock')
log "Analyzing VPC: $vpc_id ($vpc_cidr)"
# Analyze VPC subnets
local vpc_analysis=$(analyze_vpc_subnets "$region" "$vpc_id" "$vpc_cidr")
# Add VPC analysis to results
jq --argjson vpc_analysis "$vpc_analysis" \
'.vpcs += [$vpc_analysis]' "$results_file" > "$results_file.tmp"
mv "$results_file.tmp" "$results_file"
# Update totals
local vpc_allocated=$(echo "$vpc_analysis" | jq -r '.total_allocated_ips')
local vpc_used=$(echo "$vpc_analysis" | jq -r '.total_used_ips')
local vpc_utilization=$(echo "$vpc_analysis" | jq -r '.utilization_percentage')
total_allocated=$((total_allocated + vpc_allocated))
total_used=$((total_used + vpc_used))
if (( $(echo "$vpc_utilization >= $UTILIZATION_THRESHOLD" | bc -l) )); then
high_util_count=$((high_util_count + 1))
fi
done
# Calculate overall utilization
local overall_utilization=0
if [[ $total_allocated -gt 0 ]]; then
overall_utilization=$(echo "scale=2; $total_used * 100 / $total_allocated" | bc -l)
fi
# Update summary
local vpc_count=$(echo "$vpcs" | jq 'length')
jq --argjson total_vpcs "$vpc_count" \
--argjson total_allocated "$total_allocated" \
--argjson total_used "$total_used" \
--argjson overall_util "$overall_utilization" \
--argjson high_util_count "$high_util_count" \
'.summary.total_vpcs = $total_vpcs |
.summary.total_allocated_ips = $total_allocated |
.summary.total_used_ips = $total_used |
.summary.overall_utilization = $overall_util |
.summary.high_utilization_vpcs = $high_util_count' \
"$results_file" > "$results_file.tmp"
mv "$results_file.tmp" "$results_file"
log "VPC utilization analysis completed for $region"
echo "$results_file"
}
# Function to analyze subnets within a VPC
analyze_vpc_subnets() {
local region="$1"
local vpc_id="$2"
local vpc_cidr="$3"
# Get all subnets in the VPC
local subnets=$(aws ec2 describe-subnets \
--region "$region" \
--filters "Name=vpc-id,Values=$vpc_id" \
--query 'Subnets[*].{SubnetId:SubnetId,CidrBlock:CidrBlock,AvailabilityZone:AvailabilityZone,AvailableIpAddressCount:AvailableIpAddressCount,Tags:Tags}' \
--output json 2>/dev/null || echo '[]')
local subnet_analysis=()
local total_allocated=0
local total_used=0
# Analyze each subnet
echo "$subnets" | jq -c '.[]' | while read -r subnet; do
local subnet_id=$(echo "$subnet" | jq -r '.SubnetId')
local subnet_cidr=$(echo "$subnet" | jq -r '.CidrBlock')
local az=$(echo "$subnet" | jq -r '.AvailabilityZone')
local available_ips=$(echo "$subnet" | jq -r '.AvailableIpAddressCount')
# Calculate subnet utilization
local subnet_network_size=$(calculate_network_size "$subnet_cidr")
local usable_ips=$((subnet_network_size - 5)) # AWS reserves 5 IPs
local used_ips=$((usable_ips - available_ips))
local utilization=0
if [[ $usable_ips -gt 0 ]]; then
utilization=$(echo "scale=2; $used_ips * 100 / $usable_ips" | bc -l)
fi
# Get subnet tags for categorization
local subnet_tags=$(echo "$subnet" | jq -r '.Tags // []')
local subnet_type=$(echo "$subnet_tags" | jq -r '.[] | select(.Key == "Type") | .Value // "unknown"')
local subnet_tier=$(echo "$subnet_tags" | jq -r '.[] | select(.Key == "Tier") | .Value // "unknown"')
subnet_analysis+=("{
\"subnet_id\": \"$subnet_id\",
\"cidr_block\": \"$subnet_cidr\",
\"availability_zone\": \"$az\",
\"subnet_type\": \"$subnet_type\",
\"subnet_tier\": \"$subnet_tier\",
\"total_ips\": $subnet_network_size,
\"usable_ips\": $usable_ips,
\"used_ips\": $used_ips,
\"available_ips\": $available_ips,
\"utilization_percentage\": $utilization,
\"status\": \"$(get_utilization_status "$utilization")\"
}")
total_allocated=$((total_allocated + usable_ips))
total_used=$((total_used + used_ips))
done
# Calculate VPC utilization
local vpc_utilization=0
if [[ $total_allocated -gt 0 ]]; then
vpc_utilization=$(echo "scale=2; $total_used * 100 / $total_allocated" | bc -l)
fi
# Get VPC tags
local vpc_tags=$(aws ec2 describe-vpcs \
--region "$region" \
--vpc-ids "$vpc_id" \
--query 'Vpcs[0].Tags // []' \
--output json 2>/dev/null || echo '[]')
local vpc_name=$(echo "$vpc_tags" | jq -r '.[] | select(.Key == "Name") | .Value // "unnamed"')
local vpc_environment=$(echo "$vpc_tags" | jq -r '.[] | select(.Key == "Environment") | .Value // "unknown"')
# Create subnet analysis JSON
local subnet_analysis_json=$(printf '%s\n' "${subnet_analysis[@]}" | jq -s .)
cat << EOF
{
"vpc_id": "$vpc_id",
"vpc_name": "$vpc_name",
"vpc_environment": "$vpc_environment",
"vpc_cidr": "$vpc_cidr",
"region": "$region",
"total_allocated_ips": $total_allocated,
"total_used_ips": $total_used,
"utilization_percentage": $vpc_utilization,
"status": "$(get_utilization_status "$vpc_utilization")",
"subnet_count": $(echo "$subnets" | jq 'length'),
"subnets": $subnet_analysis_json
}
EOF
}
# Function to calculate network size from CIDR
calculate_network_size() {
local cidr="$1"
local prefix=$(echo "$cidr" | cut -d'/' -f2)
local network_size=$((2 ** (32 - prefix)))
echo "$network_size"
}
# Function to get utilization status
get_utilization_status() {
local utilization="$1"
if (( $(echo "$utilization >= $EXPANSION_THRESHOLD" | bc -l) )); then
echo "critical"
elif (( $(echo "$utilization >= $UTILIZATION_THRESHOLD" | bc -l) )); then
echo "warning"
elif (( $(echo "$utilization >= 50" | bc -l) )); then
echo "normal"
else
echo "low"
fi
}
# Function to generate expansion recommendations
generate_expansion_recommendations() {
local analysis_files=("$@")
local recommendations_file="$RESULTS_DIR/expansion_recommendations_$(date +%Y%m%d_%H%M%S).json"
log "Generating expansion recommendations"
# Initialize recommendations
cat > "$recommendations_file" << EOF
{
"generated_at": "$(date -u +%Y-%m-%dT%H:%M:%SZ)",
"analysis_files": $(printf '%s\n' "${analysis_files[@]}" | jq -R . | jq -s .),
"recommendations": [],
"summary": {
"total_recommendations": 0,
"critical_recommendations": 0,
"warning_recommendations": 0
}
}
EOF
local recommendations=()
local critical_count=0
local warning_count=0
# Analyze each file for recommendations
for analysis_file in "${analysis_files[@]}"; do
if [[ -f "$analysis_file" ]]; then
# Extract high utilization VPCs and subnets
local high_util_vpcs=$(jq -r '.vpcs[] | select(.utilization_percentage >= '$UTILIZATION_THRESHOLD')' "$analysis_file" 2>/dev/null || echo '{}')
if [[ "$high_util_vpcs" != "{}" ]]; then
echo "$high_util_vpcs" | jq -c '.' | while read -r vpc; do
local vpc_id=$(echo "$vpc" | jq -r '.vpc_id')
local vpc_utilization=$(echo "$vpc" | jq -r '.utilization_percentage')
local region=$(echo "$vpc" | jq -r '.region')
# Generate VPC-level recommendation
local priority="warning"
if (( $(echo "$vpc_utilization >= $EXPANSION_THRESHOLD" | bc -l) )); then
priority="critical"
critical_count=$((critical_count + 1))
else
warning_count=$((warning_count + 1))
fi
local recommendation=$(generate_vpc_recommendation "$vpc" "$priority")
recommendations+=("$recommendation")
# Generate subnet-level recommendations
echo "$vpc" | jq -c '.subnets[] | select(.utilization_percentage >= '$UTILIZATION_THRESHOLD')' | while read -r subnet; do
local subnet_recommendation=$(generate_subnet_recommendation "$subnet" "$vpc_id" "$region")
recommendations+=("$subnet_recommendation")
done
done
fi
fi
done
# Add recommendations to file
local recommendations_json=$(printf '%s\n' "${recommendations[@]}" | jq -s .)
local total_recommendations=$(echo "$recommendations_json" | jq 'length')
jq --argjson recs "$recommendations_json" \
--argjson total "$total_recommendations" \
--argjson critical "$critical_count" \
--argjson warning "$warning_count" \
'.recommendations = $recs |
.summary.total_recommendations = $total |
.summary.critical_recommendations = $critical |
.summary.warning_recommendations = $warning' \
"$recommendations_file" > "$recommendations_file.tmp"
mv "$recommendations_file.tmp" "$recommendations_file"
log "Expansion recommendations generated: $recommendations_file"
echo "$recommendations_file"
}
# Function to generate VPC recommendation
generate_vpc_recommendation() {
local vpc="$1"
local priority="$2"
local vpc_id=$(echo "$vpc" | jq -r '.vpc_id')
local vpc_cidr=$(echo "$vpc" | jq -r '.vpc_cidr')
local utilization=$(echo "$vpc" | jq -r '.utilization_percentage')
local region=$(echo "$vpc" | jq -r '.region')
cat << EOF
{
"type": "vpc_expansion",
"priority": "$priority",
"vpc_id": "$vpc_id",
"region": "$region",
"current_cidr": "$vpc_cidr",
"current_utilization": $utilization,
"recommendation": "Consider adding secondary CIDR block to VPC",
"suggested_actions": [
"Add secondary CIDR block to VPC",
"Create additional subnets in new CIDR range",
"Update route tables and security groups",
"Plan migration strategy for high-utilization subnets"
],
"estimated_timeline": "2-4 weeks",
"business_impact": "$(get_business_impact "$priority")"
}
EOF
}
# Function to generate subnet recommendation
generate_subnet_recommendation() {
local subnet="$1"
local vpc_id="$2"
local region="$3"
local subnet_id=$(echo "$subnet" | jq -r '.subnet_id')
local subnet_cidr=$(echo "$subnet" | jq -r '.cidr_block')
local utilization=$(echo "$subnet" | jq -r '.utilization_percentage')
local subnet_type=$(echo "$subnet" | jq -r '.subnet_type')
local az=$(echo "$subnet" | jq -r '.availability_zone')
local priority="warning"
if (( $(echo "$utilization >= $EXPANSION_THRESHOLD" | bc -l) )); then
priority="critical"
fi
cat << EOF
{
"type": "subnet_expansion",
"priority": "$priority",
"subnet_id": "$subnet_id",
"vpc_id": "$vpc_id",
"region": "$region",
"availability_zone": "$az",
"subnet_type": "$subnet_type",
"current_cidr": "$subnet_cidr",
"current_utilization": $utilization,
"recommendation": "Create additional subnet or migrate workloads",
"suggested_actions": [
"Create additional subnet in same AZ",
"Migrate some workloads to new subnet",
"Update load balancer target groups",
"Review auto-scaling group configurations"
],
"estimated_timeline": "1-2 weeks",
"business_impact": "$(get_business_impact "$priority")"
}
EOF
}
# Function to get business impact
get_business_impact() {
local priority="$1"
case "$priority" in
"critical")
echo "High - Risk of service disruption due to IP exhaustion"
;;
"warning")
echo "Medium - Potential capacity constraints for scaling"
;;
*)
echo "Low - Proactive capacity planning recommended"
;;
esac
}
# Function to generate comprehensive IPAM report
generate_ipam_report() {
local analysis_files=("$@")
local report_file="$RESULTS_DIR/ipam_comprehensive_report_$(date +%Y%m%d_%H%M%S).json"
log "Generating comprehensive IPAM report"
# Collect all analysis data
local all_analysis=()
for analysis_file in "${analysis_files[@]}"; do
if [[ -f "$analysis_file" ]]; then
all_analysis+=("$(cat "$analysis_file")")
fi
done
# Create comprehensive report
local all_analysis_json=$(printf '%s\n' "${all_analysis[@]}" | jq -s .)
# Calculate global statistics
local global_stats=$(echo "$all_analysis_json" | jq '
{
"total_regions": length,
"total_vpcs": [.[].summary.total_vpcs] | add,
"total_allocated_ips": [.[].summary.total_allocated_ips] | add,
"total_used_ips": [.[].summary.total_used_ips] | add,
"total_high_utilization_vpcs": [.[].summary.high_utilization_vpcs] | add,
"global_utilization": (([.[].summary.total_used_ips] | add) / ([.[].summary.total_allocated_ips] | add) * 100)
}
')
cat > "$report_file" << EOF
{
"report_id": "ipam_report_$(date +%s)",
"generated_at": "$(date -u +%Y-%m-%dT%H:%M:%SZ)",
"configuration": $(cat "$CONFIG_FILE"),
"global_statistics": $global_stats,
"regional_analysis": $all_analysis_json,
"utilization_thresholds": {
"warning_threshold": $UTILIZATION_THRESHOLD,
"critical_threshold": $EXPANSION_THRESHOLD
}
}
EOF
log "Comprehensive IPAM report generated: $report_file"
echo "$report_file"
}
# Main execution
main() {
log "Starting comprehensive IPAM analysis"
local analysis_files=()
# Analyze each region
for region in "${REGIONS[@]}"; do
log "Analyzing region: $region"
analysis_file=$(analyze_vpc_utilization "$region")
analysis_files+=("$analysis_file")
done
# Generate expansion recommendations
recommendations_file=$(generate_expansion_recommendations "${analysis_files[@]}")
# Generate comprehensive report
report_file=$(generate_ipam_report "${analysis_files[@]}")
# Display summary
log "IPAM analysis completed"
log "Analysis files: ${#analysis_files[@]}"
log "Recommendations file: $recommendations_file"
log "Comprehensive report: $report_file"
# Show summary statistics
if [[ -f "$report_file" ]]; then
local summary=$(jq -r '.global_statistics | "Total VPCs: \(.total_vpcs), Global Utilization: \(.global_utilization | round)%, High Utilization VPCs: \(.total_high_utilization_vpcs)"' "$report_file")
log "Global Summary: $summary"
fi
# Show critical recommendations
if [[ -f "$recommendations_file" ]]; then
local critical_recs=$(jq -r '.summary.critical_recommendations' "$recommendations_file")
if [[ "$critical_recs" -gt 0 ]]; then
log "WARNING: $critical_recs critical recommendations require immediate attention"
fi
fi
}
# Configuration file template
create_config_template() {
cat > ipam-config.json << 'EOF'
{
"regions": [
"us-east-1",
"us-west-2",
"eu-west-1"
],
"environments": [
"production",
"staging",
"development"
],
"utilization_threshold": 80,
"expansion_threshold": 90,
"analysis_settings": {
"include_reserved_subnets": true,
"calculate_growth_projections": true,
"generate_cost_estimates": false
},
"notification_settings": {
"sns_topic_arn": "arn:aws:sns:us-east-1:123456789012:ipam-alerts",
"email_recipients": ["admin@company.com"]
}
}
EOF
log "Created configuration template: ipam-config.json"
}
# Command line argument handling
case "${1:-}" in
"config")
create_config_template
;;
"analyze"|"")
main
;;
*)
echo "Usage: $0 [config|analyze]"
echo " config - Create configuration template"
echo " analyze - Run IPAM analysis (default)"
exit 1
;;
esac
# Cleanup
rm -rf "$TEMP_DIR"
log "IPAM analysis process completed"AWS Services Used
- Amazon VPC: Virtual private cloud with flexible IP address allocation and CIDR management
- Amazon EC2: Subnet creation and management across multiple Availability Zones
- AWS Transit Gateway: Centralized connectivity hub with IP address coordination
- Amazon Route 53: DNS resolution and private hosted zones for internal networking
- AWS Direct Connect: Dedicated network connections with IP address coordination
- Amazon CloudWatch: Network monitoring, metrics, and automated alerting for IP utilization
- AWS Lambda: Serverless functions for automated IP management and monitoring
- Amazon DynamoDB: Storage for IP address management data and utilization tracking
- Amazon SNS: Notification service for IP utilization alerts and expansion recommendations
- AWS Systems Manager: Configuration management and automation for IP address policies
- AWS CloudFormation: Infrastructure as code for consistent subnet deployment
- VPC Flow Logs: Network traffic analysis and IP address usage monitoring
Benefits
- Scalable Architecture: Accommodates future growth through intelligent IP address planning
- High Availability: Multi-AZ subnet deployment ensures resilience and fault tolerance
- Automated Management: Reduces manual overhead through automated monitoring and expansion
- Cost Optimization: Efficient IP address utilization and right-sized subnet allocation
- Security Segmentation: Proper network isolation through tiered subnet architecture
- Compliance: Standardized IP address allocation following best practices
- Operational Efficiency: Centralized IP address management and monitoring
- Disaster Recovery: Cross-region IP address coordination for business continuity
- Container Ready: Optimized subnet sizing for container orchestration platforms
- Future-Proof: Reserved address space and expansion capabilities for growth