REL02
REL02-BP02 - Provision redundant connectivity between private networks in the cloud and on-premises environments
REL02-BP02: Provision redundant connectivity between private networks in the cloud and on-premises environments
Overview
Establish redundant and resilient connectivity between your on-premises infrastructure and AWS cloud environments to ensure reliable hybrid network operations. This involves implementing multiple connection types, redundant paths, and automated failover mechanisms to eliminate single points of failure in your hybrid network architecture.
Implementation Steps
1. Design Redundant Hybrid Connectivity Architecture
- Implement multiple AWS Direct Connect connections across different locations
- Configure redundant VPN connections as backup paths
- Set up AWS Transit Gateway for centralized connectivity management
- Establish diverse network paths to eliminate single points of failure
2. Deploy Multi-Path Network Connectivity
- Configure primary and secondary Direct Connect connections
- Implement VPN backup connections with automatic failover
- Set up redundant customer gateways and virtual private gateways
- Establish diverse physical network paths and carrier diversity
3. Implement Intelligent Traffic Routing
- Configure BGP routing with path preferences and failover
- Set up dynamic routing protocols for automatic path selection
- Implement traffic engineering and load balancing across connections
- Establish route propagation and filtering policies
4. Configure Network Monitoring and Health Checks
- Deploy comprehensive network monitoring across all connection types
- Set up automated health checks and performance monitoring
- Configure alerting for connection failures and performance degradation
- Implement network analytics and troubleshooting tools
5. Establish Security and Compliance Controls
- Configure encryption for all hybrid network connections
- Implement network segmentation and access controls
- Set up compliance monitoring and audit trails
- Establish security policies for hybrid network traffic
6. Deploy Automated Failover and Recovery
- Configure automatic failover between connection types
- Implement intelligent routing based on connection health
- Set up automated recovery procedures and testing
- Establish disaster recovery and business continuity procedures
Implementation Examples
Example 1: Multi-Path Direct Connect and VPN Hybrid Architecture
View code
import boto3
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, asdict
import time
import ipaddress
from enum import Enum
class ConnectionType(Enum):
DIRECT_CONNECT = "direct_connect"
VPN = "vpn"
TRANSIT_GATEWAY = "transit_gateway"
class ConnectionStatus(Enum):
AVAILABLE = "available"
DOWN = "down"
PENDING = "pending"
DELETING = "deleting"
@dataclass
class NetworkConnection:
connection_id: str
connection_type: ConnectionType
location: str
bandwidth: str
status: ConnectionStatus
bgp_asn: int
vlan_id: Optional[int] = None
customer_gateway_ip: Optional[str] = None
@dataclass
class HybridNetworkConfig:
vpc_cidr: str
on_premises_cidrs: List[str]
primary_connection: NetworkConnection
secondary_connections: List[NetworkConnection]
bgp_asn: int
enable_redundancy: bool = True
class HybridNetworkArchitect:
def __init__(self, config: Dict):
self.config = config
self.ec2 = boto3.client('ec2')
self.directconnect = boto3.client('directconnect')
self.cloudwatch = boto3.client('cloudwatch')
self.sns = boto3.client('sns')
def deploy_redundant_hybrid_network(self, network_config: HybridNetworkConfig) -> Dict:
"""Deploy complete redundant hybrid network architecture"""
deployment_id = f"hybrid_network_{int(datetime.utcnow().timestamp())}"
deployment_result = {
'deployment_id': deployment_id,
'timestamp': datetime.utcnow().isoformat(),
'network_config': asdict(network_config),
'components': {},
'status': 'initiated'
}
try:
# 1. Create VPC infrastructure
vpc_result = self.create_hybrid_vpc_infrastructure(network_config)
deployment_result['components']['vpc'] = vpc_result
# 2. Set up Transit Gateway for centralized connectivity
tgw_result = self.setup_transit_gateway(network_config, vpc_result)
deployment_result['components']['transit_gateway'] = tgw_result
# 3. Configure Direct Connect connections
dx_result = self.configure_direct_connect_connections(
network_config, tgw_result
)
deployment_result['components']['direct_connect'] = dx_result
# 4. Set up redundant VPN connections
vpn_result = self.setup_redundant_vpn_connections(
network_config, tgw_result
)
deployment_result['components']['vpn'] = vpn_result
# 5. Configure BGP routing and failover
routing_result = self.configure_bgp_routing_and_failover(
network_config, dx_result, vpn_result, tgw_result
)
deployment_result['components']['routing'] = routing_result
# 6. Set up network monitoring and health checks
monitoring_result = self.setup_network_monitoring(
deployment_id, deployment_result['components']
)
deployment_result['components']['monitoring'] = monitoring_result
deployment_result['status'] = 'completed'
except Exception as e:
logging.error(f"Error deploying hybrid network: {str(e)}")
deployment_result['status'] = 'failed'
deployment_result['error'] = str(e)
return deployment_result
def create_hybrid_vpc_infrastructure(self, network_config: HybridNetworkConfig) -> Dict:
"""Create VPC infrastructure optimized for hybrid connectivity"""
try:
# Create VPC
vpc_response = self.ec2.create_vpc(
CidrBlock=network_config.vpc_cidr,
TagSpecifications=[
{
'ResourceType': 'vpc',
'Tags': [
{'Key': 'Name', 'Value': 'hybrid-network-vpc'},
{'Key': 'Purpose', 'Value': 'HybridConnectivity'}
]
}
]
)
vpc_id = vpc_response['Vpc']['VpcId']
# Enable DNS hostnames and resolution
self.ec2.modify_vpc_attribute(VpcId=vpc_id, EnableDnsHostnames={'Value': True})
self.ec2.modify_vpc_attribute(VpcId=vpc_id, EnableDnsSupport={'Value': True})
# Get available AZs
azs_response = self.ec2.describe_availability_zones(
Filters=[{'Name': 'state', 'Values': ['available']}]
)
available_azs = [az['ZoneName'] for az in azs_response['AvailabilityZones'][:3]]
# Create subnets for hybrid connectivity
subnets = self.create_hybrid_subnets(vpc_id, available_azs, network_config)
# Create route tables for hybrid routing
route_tables = self.create_hybrid_route_tables(vpc_id, subnets)
return {
'vpc_id': vpc_id,
'vpc_cidr': network_config.vpc_cidr,
'availability_zones': available_azs,
'subnets': subnets,
'route_tables': route_tables,
'status': 'created'
}
except Exception as e:
logging.error(f"Error creating VPC infrastructure: {str(e)}")
raise
def create_hybrid_subnets(self, vpc_id: str, azs: List[str],
network_config: HybridNetworkConfig) -> Dict:
"""Create subnets optimized for hybrid connectivity"""
subnets = {
'private': [],
'transit_gateway': [],
'direct_connect': []
}
# Parse VPC CIDR for subnet creation
vpc_network = ipaddress.IPv4Network(network_config.vpc_cidr)
subnet_size = 24 # /24 subnets
subnet_iterator = vpc_network.subnets(new_prefix=subnet_size)
for i, az in enumerate(azs):
# Private subnet for workloads
private_subnet = next(subnet_iterator)
private_response = self.ec2.create_subnet(
VpcId=vpc_id,
CidrBlock=str(private_subnet),
AvailabilityZone=az,
TagSpecifications=[
{
'ResourceType': 'subnet',
'Tags': [
{'Key': 'Name', 'Value': f"private-subnet-{az}"},
{'Key': 'Type', 'Value': 'Private'},
{'Key': 'Purpose', 'Value': 'HybridWorkloads'}
]
}
]
)
subnets['private'].append({
'subnet_id': private_response['Subnet']['SubnetId'],
'availability_zone': az,
'cidr_block': str(private_subnet),
'type': 'private'
})
# Transit Gateway subnet
tgw_subnet = next(subnet_iterator)
tgw_response = self.ec2.create_subnet(
VpcId=vpc_id,
CidrBlock=str(tgw_subnet),
AvailabilityZone=az,
TagSpecifications=[
{
'ResourceType': 'subnet',
'Tags': [
{'Key': 'Name', 'Value': f"tgw-subnet-{az}"},
{'Key': 'Type', 'Value': 'TransitGateway'},
{'Key': 'Purpose', 'Value': 'HybridConnectivity'}
]
}
]
)
subnets['transit_gateway'].append({
'subnet_id': tgw_response['Subnet']['SubnetId'],
'availability_zone': az,
'cidr_block': str(tgw_subnet),
'type': 'transit_gateway'
})
# Direct Connect Gateway subnet (if needed)
if i < 2: # Only create in first two AZs
dx_subnet = next(subnet_iterator)
dx_response = self.ec2.create_subnet(
VpcId=vpc_id,
CidrBlock=str(dx_subnet),
AvailabilityZone=az,
TagSpecifications=[
{
'ResourceType': 'subnet',
'Tags': [
{'Key': 'Name', 'Value': f"dx-subnet-{az}"},
{'Key': 'Type', 'Value': 'DirectConnect'},
{'Key': 'Purpose', 'Value': 'HybridConnectivity'}
]
}
]
)
subnets['direct_connect'].append({
'subnet_id': dx_response['Subnet']['SubnetId'],
'availability_zone': az,
'cidr_block': str(dx_subnet),
'type': 'direct_connect'
})
return subnets
def setup_transit_gateway(self, network_config: HybridNetworkConfig,
vpc_result: Dict) -> Dict:
"""Set up Transit Gateway for centralized hybrid connectivity"""
try:
# Create Transit Gateway
tgw_response = self.ec2.create_transit_gateway(
Description='Hybrid network Transit Gateway',
Options={
'AmazonSideAsn': network_config.bgp_asn,
'AutoAcceptSharedAttachments': 'enable',
'DefaultRouteTableAssociation': 'enable',
'DefaultRouteTablePropagation': 'enable',
'DnsSupport': 'enable',
'VpnEcmpSupport': 'enable'
},
TagSpecifications=[
{
'ResourceType': 'transit-gateway',
'Tags': [
{'Key': 'Name', 'Value': 'hybrid-network-tgw'},
{'Key': 'Purpose', 'Value': 'HybridConnectivity'}
]
}
]
)
tgw_id = tgw_response['TransitGateway']['TransitGatewayId']
# Wait for Transit Gateway to be available
self.wait_for_transit_gateway_available(tgw_id)
# Create VPC attachment
vpc_attachment_response = self.ec2.create_transit_gateway_vpc_attachment(
TransitGatewayId=tgw_id,
VpcId=vpc_result['vpc_id'],
SubnetIds=[subnet['subnet_id'] for subnet in vpc_result['subnets']['transit_gateway']],
TagSpecifications=[
{
'ResourceType': 'transit-gateway-attachment',
'Tags': [
{'Key': 'Name', 'Value': 'hybrid-vpc-attachment'},
{'Key': 'Purpose', 'Value': 'HybridConnectivity'}
]
}
]
)
vpc_attachment_id = vpc_attachment_response['TransitGatewayVpcAttachment']['TransitGatewayAttachmentId']
# Create Direct Connect Gateway for DX connections
dx_gateway_response = self.directconnect.create_direct_connect_gateway(
name='hybrid-network-dx-gateway',
amazonSideAsn=network_config.bgp_asn
)
dx_gateway_id = dx_gateway_response['directConnectGateway']['directConnectGatewayId']
# Associate Direct Connect Gateway with Transit Gateway
dx_tgw_association_response = self.ec2.create_transit_gateway_direct_connect_gateway_attachment(
TransitGatewayId=tgw_id,
DirectConnectGatewayId=dx_gateway_id,
TagSpecifications=[
{
'ResourceType': 'transit-gateway-attachment',
'Tags': [
{'Key': 'Name', 'Value': 'dx-gateway-attachment'},
{'Key': 'Purpose', 'Value': 'HybridConnectivity'}
]
}
]
)
return {
'transit_gateway_id': tgw_id,
'vpc_attachment_id': vpc_attachment_id,
'direct_connect_gateway_id': dx_gateway_id,
'dx_tgw_attachment_id': dx_tgw_association_response['TransitGatewayDirectConnectGatewayAttachment']['TransitGatewayAttachmentId'],
'bgp_asn': network_config.bgp_asn,
'status': 'created'
}
except Exception as e:
logging.error(f"Error setting up Transit Gateway: {str(e)}")
raise
def wait_for_transit_gateway_available(self, tgw_id: str, timeout: int = 600):
"""Wait for Transit Gateway to become available"""
start_time = time.time()
while time.time() - start_time < timeout:
response = self.ec2.describe_transit_gateways(TransitGatewayIds=[tgw_id])
if response['TransitGateways'][0]['State'] == 'available':
logging.info(f"Transit Gateway {tgw_id} is available")
return
time.sleep(30)
raise TimeoutError(f"Transit Gateway {tgw_id} did not become available within timeout")
def configure_direct_connect_connections(self, network_config: HybridNetworkConfig,
tgw_result: Dict) -> Dict:
"""Configure redundant Direct Connect connections"""
try:
dx_connections = []
# Primary Direct Connect connection
primary_dx = self.create_direct_connect_connection(
network_config.primary_connection,
tgw_result['direct_connect_gateway_id'],
is_primary=True
)
dx_connections.append(primary_dx)
# Secondary Direct Connect connections
for secondary_connection in network_config.secondary_connections:
if secondary_connection.connection_type == ConnectionType.DIRECT_CONNECT:
secondary_dx = self.create_direct_connect_connection(
secondary_connection,
tgw_result['direct_connect_gateway_id'],
is_primary=False
)
dx_connections.append(secondary_dx)
return {
'connections': dx_connections,
'direct_connect_gateway_id': tgw_result['direct_connect_gateway_id'],
'status': 'configured'
}
except Exception as e:
logging.error(f"Error configuring Direct Connect: {str(e)}")
return {'status': 'failed', 'error': str(e)}
def create_direct_connect_connection(self, connection_config: NetworkConnection,
dx_gateway_id: str, is_primary: bool = True) -> Dict:
"""Create individual Direct Connect connection"""
try:
# Create Direct Connect connection
dx_response = self.directconnect.create_connection(
location=connection_config.location,
bandwidth=connection_config.bandwidth,
connectionName=f"hybrid-dx-{'primary' if is_primary else 'secondary'}-{connection_config.location}",
lagId='', # Not using LAG for this example
tags=[
{'key': 'Name', 'value': f"hybrid-dx-{'primary' if is_primary else 'secondary'}"},
{'key': 'Purpose', 'value': 'HybridConnectivity'},
{'key': 'Type', 'value': 'Primary' if is_primary else 'Secondary'}
]
)
dx_connection_id = dx_response['connectionId']
# Create Virtual Interface (VIF)
vif_response = self.directconnect.create_transit_virtual_interface(
connectionId=dx_connection_id,
newTransitVirtualInterface={
'vlan': connection_config.vlan_id or (100 if is_primary else 200),
'bgpAsn': connection_config.bgp_asn,
'mtu': 9000,
'directConnectGatewayId': dx_gateway_id,
'virtualInterfaceName': f"hybrid-vif-{'primary' if is_primary else 'secondary'}",
'tags': [
{'key': 'Name', 'value': f"hybrid-vif-{'primary' if is_primary else 'secondary'}"},
{'key': 'Purpose', 'value': 'HybridConnectivity'}
]
}
)
vif_id = vif_response['virtualInterface']['virtualInterfaceId']
return {
'connection_id': dx_connection_id,
'virtual_interface_id': vif_id,
'location': connection_config.location,
'bandwidth': connection_config.bandwidth,
'vlan_id': connection_config.vlan_id or (100 if is_primary else 200),
'bgp_asn': connection_config.bgp_asn,
'is_primary': is_primary,
'status': 'created'
}
except Exception as e:
logging.error(f"Error creating Direct Connect connection: {str(e)}")
raise
def setup_redundant_vpn_connections(self, network_config: HybridNetworkConfig,
tgw_result: Dict) -> Dict:
"""Set up redundant VPN connections as backup"""
try:
vpn_connections = []
# Create customer gateways for VPN connections
customer_gateways = self.create_customer_gateways(network_config)
# Create VPN connections for each customer gateway
for i, cgw in enumerate(customer_gateways):
vpn_connection = self.create_vpn_connection(
cgw, tgw_result['transit_gateway_id'], i + 1
)
vpn_connections.append(vpn_connection)
return {
'customer_gateways': customer_gateways,
'vpn_connections': vpn_connections,
'status': 'configured'
}
except Exception as e:
logging.error(f"Error setting up VPN connections: {str(e)}")
return {'status': 'failed', 'error': str(e)}
def create_customer_gateways(self, network_config: HybridNetworkConfig) -> List[Dict]:
"""Create customer gateways for VPN connections"""
customer_gateways = []
# Extract VPN connection configs
vpn_connections = [
conn for conn in network_config.secondary_connections
if conn.connection_type == ConnectionType.VPN
]
for i, vpn_config in enumerate(vpn_connections):
try:
cgw_response = self.ec2.create_customer_gateway(
Type='ipsec.1',
PublicIp=vpn_config.customer_gateway_ip,
BgpAsn=vpn_config.bgp_asn,
TagSpecifications=[
{
'ResourceType': 'customer-gateway',
'Tags': [
{'Key': 'Name', 'Value': f"hybrid-cgw-{i+1}"},
{'Key': 'Purpose', 'Value': 'HybridConnectivity'}
]
}
]
)
customer_gateways.append({
'customer_gateway_id': cgw_response['CustomerGateway']['CustomerGatewayId'],
'public_ip': vpn_config.customer_gateway_ip,
'bgp_asn': vpn_config.bgp_asn,
'index': i + 1
})
except Exception as e:
logging.error(f"Error creating customer gateway {i+1}: {str(e)}")
continue
return customer_gateways
def create_vpn_connection(self, customer_gateway: Dict,
transit_gateway_id: str, index: int) -> Dict:
"""Create individual VPN connection"""
try:
vpn_response = self.ec2.create_vpn_connection(
Type='ipsec.1',
CustomerGatewayId=customer_gateway['customer_gateway_id'],
TransitGatewayId=transit_gateway_id,
Options={
'StaticRoutesOnly': False, # Use BGP
'TunnelInsideIpVersion': 'ipv4'
},
TagSpecifications=[
{
'ResourceType': 'vpn-connection',
'Tags': [
{'Key': 'Name', 'Value': f"hybrid-vpn-{index}"},
{'Key': 'Purpose', 'Value': 'HybridConnectivity'},
{'Key': 'Type', 'Value': 'Backup'}
]
}
]
)
vpn_connection_id = vpn_response['VpnConnection']['VpnConnectionId']
return {
'vpn_connection_id': vpn_connection_id,
'customer_gateway_id': customer_gateway['customer_gateway_id'],
'customer_gateway_ip': customer_gateway['public_ip'],
'transit_gateway_id': transit_gateway_id,
'index': index,
'status': 'created'
}
except Exception as e:
logging.error(f"Error creating VPN connection {index}: {str(e)}")
raise
def configure_bgp_routing_and_failover(self, network_config: HybridNetworkConfig,
dx_result: Dict, vpn_result: Dict,
tgw_result: Dict) -> Dict:
"""Configure BGP routing with intelligent failover"""
try:
routing_config = {
'transit_gateway_route_tables': [],
'route_propagations': [],
'route_preferences': {}
}
# Get Transit Gateway default route table
tgw_route_tables = self.ec2.describe_transit_gateway_route_tables(
Filters=[
{'Name': 'transit-gateway-id', 'Values': [tgw_result['transit_gateway_id']]},
{'Name': 'default-association-route-table', 'Values': ['true']}
]
)
if tgw_route_tables['TransitGatewayRouteTables']:
default_route_table_id = tgw_route_tables['TransitGatewayRouteTables'][0]['TransitGatewayRouteTableId']
# Configure route propagation for on-premises networks
for on_premises_cidr in network_config.on_premises_cidrs:
# Create routes with different preferences
# Direct Connect gets higher preference (lower metric)
if dx_result['status'] == 'configured':
for dx_connection in dx_result['connections']:
if dx_connection['is_primary']:
# Primary DX route with highest preference
self.create_transit_gateway_route(
default_route_table_id,
on_premises_cidr,
tgw_result['dx_tgw_attachment_id'],
preference=100
)
# VPN routes with lower preference (backup)
if vpn_result['status'] == 'configured':
for vpn_connection in vpn_result['vpn_connections']:
# VPN routes as backup with lower preference
vpn_attachment_id = self.get_vpn_attachment_id(
vpn_connection['vpn_connection_id'],
tgw_result['transit_gateway_id']
)
if vpn_attachment_id:
self.create_transit_gateway_route(
default_route_table_id,
on_premises_cidr,
vpn_attachment_id,
preference=200
)
routing_config['default_route_table_id'] = default_route_table_id
return {
'routing_config': routing_config,
'bgp_asn': network_config.bgp_asn,
'status': 'configured'
}
except Exception as e:
logging.error(f"Error configuring BGP routing: {str(e)}")
return {'status': 'failed', 'error': str(e)}
def create_transit_gateway_route(self, route_table_id: str, destination_cidr: str,
attachment_id: str, preference: int):
"""Create Transit Gateway route with preference"""
try:
self.ec2.create_transit_gateway_route(
DestinationCidrBlock=destination_cidr,
TransitGatewayRouteTableId=route_table_id,
TransitGatewayAttachmentId=attachment_id
)
logging.info(f"Created route {destination_cidr} -> {attachment_id} with preference {preference}")
except Exception as e:
logging.error(f"Error creating Transit Gateway route: {str(e)}")
def get_vpn_attachment_id(self, vpn_connection_id: str, transit_gateway_id: str) -> Optional[str]:
"""Get VPN attachment ID for Transit Gateway"""
try:
attachments = self.ec2.describe_transit_gateway_attachments(
Filters=[
{'Name': 'transit-gateway-id', 'Values': [transit_gateway_id]},
{'Name': 'resource-type', 'Values': ['vpn']},
{'Name': 'resource-id', 'Values': [vpn_connection_id]}
]
)
if attachments['TransitGatewayAttachments']:
return attachments['TransitGatewayAttachments'][0]['TransitGatewayAttachmentId']
return None
except Exception as e:
logging.error(f"Error getting VPN attachment ID: {str(e)}")
return None
# Usage example
def main():
config = {
'region': 'us-east-1',
'environment': 'production'
}
architect = HybridNetworkArchitect(config)
# Define hybrid network configuration
network_config = HybridNetworkConfig(
vpc_cidr='10.0.0.0/16',
on_premises_cidrs=['192.168.0.0/16', '172.16.0.0/12'],
primary_connection=NetworkConnection(
connection_id='primary-dx',
connection_type=ConnectionType.DIRECT_CONNECT,
location='EqDC2',
bandwidth='1Gbps',
status=ConnectionStatus.PENDING,
bgp_asn=65000,
vlan_id=100
),
secondary_connections=[
NetworkConnection(
connection_id='secondary-dx',
connection_type=ConnectionType.DIRECT_CONNECT,
location='EqDA2',
bandwidth='1Gbps',
status=ConnectionStatus.PENDING,
bgp_asn=65000,
vlan_id=200
),
NetworkConnection(
connection_id='backup-vpn-1',
connection_type=ConnectionType.VPN,
location='on-premises',
bandwidth='100Mbps',
status=ConnectionStatus.PENDING,
bgp_asn=65001,
customer_gateway_ip='203.0.113.12'
),
NetworkConnection(
connection_id='backup-vpn-2',
connection_type=ConnectionType.VPN,
location='on-premises',
bandwidth='100Mbps',
status=ConnectionStatus.PENDING,
bgp_asn=65001,
customer_gateway_ip='203.0.113.13'
)
],
bgp_asn=64512,
enable_redundancy=True
)
# Deploy redundant hybrid network
result = architect.deploy_redundant_hybrid_network(network_config)
print(f"Deployment Status: {result['status']}")
if result['status'] == 'completed':
print("Redundant hybrid network deployed successfully!")
for component, details in result['components'].items():
print(f"- {component}: {details.get('status', 'unknown')}")
else:
print(f"Deployment failed: {result.get('error', 'Unknown error')}")
if __name__ == "__main__":
main()Example 2: Automated Network Health Monitoring and Failover System
View code
import boto3
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from dataclasses import dataclass
import time
import threading
import subprocess
import ipaddress
@dataclass
class NetworkPath:
path_id: str
connection_type: str
attachment_id: str
destination_cidr: str
next_hop: str
metric: int
status: str
last_check: datetime
failure_count: int = 0
success_count: int = 0
@dataclass
class HealthCheckConfig:
target_ip: str
check_interval: int = 30
timeout: int = 5
failure_threshold: int = 3
recovery_threshold: int = 2
class HybridNetworkMonitor:
def __init__(self, config: Dict):
self.config = config
self.ec2 = boto3.client('ec2')
self.directconnect = boto3.client('directconnect')
self.cloudwatch = boto3.client('cloudwatch')
self.sns = boto3.client('sns')
# Network paths and health status
self.network_paths: Dict[str, NetworkPath] = {}
self.health_checks: Dict[str, HealthCheckConfig] = {}
self.monitoring_active = False
self.monitoring_thread = None
def start_network_monitoring(self, monitoring_config: Dict) -> Dict:
"""Start comprehensive network monitoring"""
try:
# Initialize network paths
self.initialize_network_paths(monitoring_config)
# Set up health checks
self.setup_health_checks(monitoring_config)
# Start monitoring thread
self.monitoring_active = True
self.monitoring_thread = threading.Thread(
target=self.monitoring_loop,
daemon=True
)
self.monitoring_thread.start()
# Set up CloudWatch metrics
self.setup_cloudwatch_metrics()
return {
'status': 'started',
'network_paths': len(self.network_paths),
'health_checks': len(self.health_checks),
'timestamp': datetime.utcnow().isoformat()
}
except Exception as e:
logging.error(f"Error starting network monitoring: {str(e)}")
return {'status': 'failed', 'error': str(e)}
def initialize_network_paths(self, monitoring_config: Dict):
"""Initialize network paths for monitoring"""
transit_gateway_id = monitoring_config.get('transit_gateway_id')
if not transit_gateway_id:
raise ValueError("Transit Gateway ID is required for monitoring")
# Get Transit Gateway route tables
route_tables = self.ec2.describe_transit_gateway_route_tables(
Filters=[
{'Name': 'transit-gateway-id', 'Values': [transit_gateway_id]}
]
)
for route_table in route_tables['TransitGatewayRouteTables']:
route_table_id = route_table['TransitGatewayRouteTableId']
# Get routes from route table
routes = self.ec2.search_transit_gateway_routes(
TransitGatewayRouteTableId=route_table_id,
Filters=[
{'Name': 'state', 'Values': ['active']}
]
)
for route in routes['Routes']:
if route.get('TransitGatewayAttachments'):
for attachment in route['TransitGatewayAttachments']:
path_id = f"{route['DestinationCidrBlock']}_{attachment['TransitGatewayAttachmentId']}"
network_path = NetworkPath(
path_id=path_id,
connection_type=attachment['ResourceType'],
attachment_id=attachment['TransitGatewayAttachmentId'],
destination_cidr=route['DestinationCidrBlock'],
next_hop=attachment.get('ResourceId', ''),
metric=route.get('PrefixListId', 100),
status='unknown',
last_check=datetime.utcnow()
)
self.network_paths[path_id] = network_path
def setup_health_checks(self, monitoring_config: Dict):
"""Set up health checks for network paths"""
health_check_targets = monitoring_config.get('health_check_targets', [])
for target in health_check_targets:
target_ip = target.get('ip')
if target_ip:
health_check = HealthCheckConfig(
target_ip=target_ip,
check_interval=target.get('interval', 30),
timeout=target.get('timeout', 5),
failure_threshold=target.get('failure_threshold', 3),
recovery_threshold=target.get('recovery_threshold', 2)
)
self.health_checks[target_ip] = health_check
def monitoring_loop(self):
"""Main monitoring loop"""
while self.monitoring_active:
try:
# Check network path health
self.check_network_paths_health()
# Perform connectivity tests
self.perform_connectivity_tests()
# Update CloudWatch metrics
self.update_cloudwatch_metrics()
# Check for failover conditions
self.evaluate_failover_conditions()
# Sleep until next check
time.sleep(30) # Check every 30 seconds
except Exception as e:
logging.error(f"Error in monitoring loop: {str(e)}")
time.sleep(60) # Wait longer on error
def check_network_paths_health(self):
"""Check health of all network paths"""
for path_id, network_path in self.network_paths.items():
try:
# Check attachment status
attachment_status = self.check_attachment_status(network_path.attachment_id)
# Update path status
previous_status = network_path.status
network_path.status = attachment_status
network_path.last_check = datetime.utcnow()
# Update counters
if attachment_status == 'available':
network_path.success_count += 1
network_path.failure_count = 0
else:
network_path.failure_count += 1
network_path.success_count = 0
# Log status changes
if previous_status != attachment_status:
logging.info(f"Network path {path_id} status changed: {previous_status} -> {attachment_status}")
# Send notification for status changes
self.send_path_status_notification(network_path, previous_status)
except Exception as e:
logging.error(f"Error checking path {path_id}: {str(e)}")
network_path.status = 'error'
network_path.failure_count += 1
def check_attachment_status(self, attachment_id: str) -> str:
"""Check status of Transit Gateway attachment"""
try:
attachments = self.ec2.describe_transit_gateway_attachments(
TransitGatewayAttachmentIds=[attachment_id]
)
if attachments['TransitGatewayAttachments']:
return attachments['TransitGatewayAttachments'][0]['State']
return 'not_found'
except Exception as e:
logging.error(f"Error checking attachment {attachment_id}: {str(e)}")
return 'error'
def perform_connectivity_tests(self):
"""Perform connectivity tests to on-premises targets"""
for target_ip, health_check in self.health_checks.items():
try:
# Perform ping test
ping_result = self.ping_test(target_ip, health_check.timeout)
# Perform traceroute test
traceroute_result = self.traceroute_test(target_ip)
# Update health check status
self.update_health_check_status(target_ip, ping_result, traceroute_result)
except Exception as e:
logging.error(f"Error testing connectivity to {target_ip}: {str(e)}")
def ping_test(self, target_ip: str, timeout: int) -> Dict:
"""Perform ping test to target IP"""
try:
start_time = time.time()
# Execute ping command
result = subprocess.run(
['ping', '-c', '3', '-W', str(timeout), target_ip],
capture_output=True,
text=True,
timeout=timeout + 5
)
end_time = time.time()
return {
'success': result.returncode == 0,
'response_time': end_time - start_time,
'output': result.stdout,
'error': result.stderr
}
except subprocess.TimeoutExpired:
return {
'success': False,
'response_time': timeout,
'output': '',
'error': 'Ping timeout'
}
except Exception as e:
return {
'success': False,
'response_time': 0,
'output': '',
'error': str(e)
}
def traceroute_test(self, target_ip: str) -> Dict:
"""Perform traceroute test to target IP"""
try:
result = subprocess.run(
['traceroute', '-m', '10', target_ip],
capture_output=True,
text=True,
timeout=30
)
return {
'success': result.returncode == 0,
'output': result.stdout,
'error': result.stderr,
'hops': self.parse_traceroute_hops(result.stdout)
}
except subprocess.TimeoutExpired:
return {
'success': False,
'output': '',
'error': 'Traceroute timeout',
'hops': []
}
except Exception as e:
return {
'success': False,
'output': '',
'error': str(e),
'hops': []
}
def parse_traceroute_hops(self, traceroute_output: str) -> List[Dict]:
"""Parse traceroute output to extract hops"""
hops = []
lines = traceroute_output.split('\n')
for line in lines:
if line.strip() and not line.startswith('traceroute'):
parts = line.split()
if len(parts) >= 2:
try:
hop_number = int(parts[0])
hop_ip = parts[1] if '(' in parts[1] else parts[1]
hops.append({
'hop': hop_number,
'ip': hop_ip,
'line': line.strip()
})
except (ValueError, IndexError):
continue
return hops
def update_health_check_status(self, target_ip: str, ping_result: Dict,
traceroute_result: Dict):
"""Update health check status based on test results"""
health_check = self.health_checks.get(target_ip)
if not health_check:
return
# Determine overall health
is_healthy = ping_result['success']
# Update counters
if is_healthy:
health_check.success_count += 1
health_check.failure_count = 0
else:
health_check.failure_count += 1
health_check.success_count = 0
# Log health status
logging.info(f"Health check {target_ip}: {'PASS' if is_healthy else 'FAIL'} "
f"(failures: {health_check.failure_count}, successes: {health_check.success_count})")
# Store test results for analysis
self.store_connectivity_test_results(target_ip, ping_result, traceroute_result)
def evaluate_failover_conditions(self):
"""Evaluate conditions for automatic failover"""
for target_ip, health_check in self.health_checks.items():
# Check if failover threshold is reached
if health_check.failure_count >= health_check.failure_threshold:
logging.warning(f"Failover threshold reached for {target_ip}")
self.trigger_network_failover(target_ip, health_check)
# Check if recovery threshold is reached
elif health_check.success_count >= health_check.recovery_threshold:
logging.info(f"Recovery threshold reached for {target_ip}")
self.trigger_network_recovery(target_ip, health_check)
def trigger_network_failover(self, target_ip: str, health_check: HealthCheckConfig):
"""Trigger network failover procedures"""
try:
logging.warning(f"Triggering network failover for {target_ip}")
# Find affected network paths
affected_paths = self.find_paths_to_target(target_ip)
# Implement failover logic
for path in affected_paths:
self.failover_network_path(path)
# Send failover notification
self.send_failover_notification(target_ip, affected_paths)
# Reset failure count to prevent repeated failovers
health_check.failure_count = 0
except Exception as e:
logging.error(f"Error triggering failover for {target_ip}: {str(e)}")
def trigger_network_recovery(self, target_ip: str, health_check: HealthCheckConfig):
"""Trigger network recovery procedures"""
try:
logging.info(f"Triggering network recovery for {target_ip}")
# Find recovered network paths
recovered_paths = self.find_paths_to_target(target_ip)
# Implement recovery logic
for path in recovered_paths:
self.recover_network_path(path)
# Send recovery notification
self.send_recovery_notification(target_ip, recovered_paths)
# Reset success count
health_check.success_count = 0
except Exception as e:
logging.error(f"Error triggering recovery for {target_ip}: {str(e)}")
def find_paths_to_target(self, target_ip: str) -> List[NetworkPath]:
"""Find network paths that could reach the target IP"""
affected_paths = []
try:
target_network = ipaddress.IPv4Address(target_ip)
for path in self.network_paths.values():
try:
path_network = ipaddress.IPv4Network(path.destination_cidr, strict=False)
if target_network in path_network:
affected_paths.append(path)
except ValueError:
continue
except ValueError:
logging.error(f"Invalid target IP: {target_ip}")
return affected_paths
def failover_network_path(self, network_path: NetworkPath):
"""Implement failover for a specific network path"""
try:
logging.info(f"Failing over network path: {network_path.path_id}")
# Update route preferences to prefer backup paths
self.update_route_preferences(network_path, increase_metric=True)
# Log failover action
self.log_failover_action(network_path, 'failover')
except Exception as e:
logging.error(f"Error failing over path {network_path.path_id}: {str(e)}")
def recover_network_path(self, network_path: NetworkPath):
"""Implement recovery for a specific network path"""
try:
logging.info(f"Recovering network path: {network_path.path_id}")
# Restore original route preferences
self.update_route_preferences(network_path, increase_metric=False)
# Log recovery action
self.log_failover_action(network_path, 'recovery')
except Exception as e:
logging.error(f"Error recovering path {network_path.path_id}: {str(e)}")
def update_route_preferences(self, network_path: NetworkPath, increase_metric: bool):
"""Update route preferences for failover/recovery"""
try:
# This would implement actual route preference changes
# For example, updating BGP metrics or route priorities
action = "increased" if increase_metric else "restored"
logging.info(f"Route preference {action} for path {network_path.path_id}")
except Exception as e:
logging.error(f"Error updating route preferences: {str(e)}")
def setup_cloudwatch_metrics(self):
"""Set up CloudWatch custom metrics"""
try:
# Create custom metrics for network monitoring
metric_data = []
for path_id, network_path in self.network_paths.items():
# Path availability metric
availability = 1 if network_path.status == 'available' else 0
metric_data.append({
'MetricName': 'NetworkPathAvailability',
'Dimensions': [
{'Name': 'PathId', 'Value': path_id},
{'Name': 'ConnectionType', 'Value': network_path.connection_type}
],
'Value': availability,
'Unit': 'Count'
})
# Send metrics to CloudWatch
if metric_data:
self.cloudwatch.put_metric_data(
Namespace='HybridNetwork/Monitoring',
MetricData=metric_data
)
except Exception as e:
logging.error(f"Error setting up CloudWatch metrics: {str(e)}")
def send_failover_notification(self, target_ip: str, affected_paths: List[NetworkPath]):
"""Send notification about network failover"""
try:
message = {
'event_type': 'NETWORK_FAILOVER',
'target_ip': target_ip,
'affected_paths': [path.path_id for path in affected_paths],
'timestamp': datetime.utcnow().isoformat(),
'message': f'Network failover triggered for {target_ip}'
}
if self.config.get('notification_topic_arn'):
self.sns.publish(
TopicArn=self.config['notification_topic_arn'],
Subject='Network Failover Alert',
Message=json.dumps(message, indent=2)
)
except Exception as e:
logging.error(f"Error sending failover notification: {str(e)}")
# Usage example
def main():
config = {
'region': 'us-east-1',
'notification_topic_arn': 'arn:aws:sns:us-east-1:123456789012:network-alerts'
}
monitor = HybridNetworkMonitor(config)
# Define monitoring configuration
monitoring_config = {
'transit_gateway_id': 'tgw-1234567890abcdef0',
'health_check_targets': [
{
'ip': '192.168.1.1',
'interval': 30,
'timeout': 5,
'failure_threshold': 3,
'recovery_threshold': 2
},
{
'ip': '172.16.1.1',
'interval': 30,
'timeout': 5,
'failure_threshold': 3,
'recovery_threshold': 2
}
]
}
# Start network monitoring
result = monitor.start_network_monitoring(monitoring_config)
print(f"Monitoring Status: {result['status']}")
if result['status'] == 'started':
print("Network monitoring started successfully!")
print(f"- Network paths: {result['network_paths']}")
print(f"- Health checks: {result['health_checks']}")
# Keep monitoring running
try:
while True:
time.sleep(60)
except KeyboardInterrupt:
print("Stopping network monitoring...")
monitor.monitoring_active = False
else:
print(f"Failed to start monitoring: {result.get('error', 'Unknown error')}")
if __name__ == "__main__":
main()Example 3: CloudFormation Template for Redundant Hybrid Connectivity
View code
AWSTemplateFormatVersion: '2010-09-09'
Description: 'Redundant hybrid connectivity infrastructure with Direct Connect and VPN backup'
Parameters:
Environment:
Type: String
Description: Environment name
Default: production
AllowedValues: [development, staging, production]
VpcCidr:
Type: String
Description: CIDR block for the VPC
Default: 10.0.0.0/16
AllowedPattern: ^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])(\/(1[6-9]|2[0-8]))$
OnPremisesCidr:
Type: String
Description: CIDR block for on-premises network
Default: 192.168.0.0/16
AllowedPattern: ^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])(\/(1[6-9]|2[0-8]))$
BgpAsn:
Type: Number
Description: BGP ASN for AWS side
Default: 64512
MinValue: 64512
MaxValue: 65534
CustomerBgpAsn:
Type: Number
Description: BGP ASN for customer side
Default: 65000
MinValue: 1
MaxValue: 65534
CustomerGatewayIp1:
Type: String
Description: Public IP address for first customer gateway
Default: 203.0.113.12
AllowedPattern: ^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$
CustomerGatewayIp2:
Type: String
Description: Public IP address for second customer gateway
Default: 203.0.113.13
AllowedPattern: ^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$
DirectConnectLocation1:
Type: String
Description: Direct Connect location for primary connection
Default: EqDC2
DirectConnectLocation2:
Type: String
Description: Direct Connect location for secondary connection
Default: EqDA2
DirectConnectBandwidth:
Type: String
Description: Bandwidth for Direct Connect connections
Default: 1Gbps
AllowedValues: [50Mbps, 100Mbps, 200Mbps, 300Mbps, 400Mbps, 500Mbps, 1Gbps, 2Gbps, 5Gbps, 10Gbps]
Resources:
# VPC Infrastructure
HybridVPC:
Type: AWS::EC2::VPC
Properties:
CidrBlock: !Ref VpcCidr
EnableDnsHostnames: true
EnableDnsSupport: true
Tags:
- Key: Name
Value: !Sub '${Environment}-hybrid-vpc'
- Key: Environment
Value: !Ref Environment
- Key: Purpose
Value: HybridConnectivity
# Private Subnets for workloads
PrivateSubnet1:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref HybridVPC
AvailabilityZone: !Select [0, !GetAZs '']
CidrBlock: !Select [0, !Cidr [!Ref VpcCidr, 8, 8]]
Tags:
- Key: Name
Value: !Sub '${Environment}-private-subnet-1'
- Key: Type
Value: Private
PrivateSubnet2:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref HybridVPC
AvailabilityZone: !Select [1, !GetAZs '']
CidrBlock: !Select [1, !Cidr [!Ref VpcCidr, 8, 8]]
Tags:
- Key: Name
Value: !Sub '${Environment}-private-subnet-2'
- Key: Type
Value: Private
# Transit Gateway Subnets
TransitGatewaySubnet1:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref HybridVPC
AvailabilityZone: !Select [0, !GetAZs '']
CidrBlock: !Select [2, !Cidr [!Ref VpcCidr, 8, 8]]
Tags:
- Key: Name
Value: !Sub '${Environment}-tgw-subnet-1'
- Key: Type
Value: TransitGateway
TransitGatewaySubnet2:
Type: AWS::EC2::Subnet
Properties:
VpcId: !Ref HybridVPC
AvailabilityZone: !Select [1, !GetAZs '']
CidrBlock: !Select [3, !Cidr [!Ref VpcCidr, 8, 8]]
Tags:
- Key: Name
Value: !Sub '${Environment}-tgw-subnet-2'
- Key: Type
Value: TransitGateway
# Transit Gateway
TransitGateway:
Type: AWS::EC2::TransitGateway
Properties:
AmazonSideAsn: !Ref BgpAsn
Description: !Sub 'Transit Gateway for ${Environment} hybrid connectivity'
DefaultRouteTableAssociation: enable
DefaultRouteTablePropagation: enable
DnsSupport: enable
VpnEcmpSupport: enable
Tags:
- Key: Name
Value: !Sub '${Environment}-hybrid-tgw'
- Key: Environment
Value: !Ref Environment
# VPC Attachment to Transit Gateway
TransitGatewayVPCAttachment:
Type: AWS::EC2::TransitGatewayVpcAttachment
Properties:
TransitGatewayId: !Ref TransitGateway
VpcId: !Ref HybridVPC
SubnetIds:
- !Ref TransitGatewaySubnet1
- !Ref TransitGatewaySubnet2
Tags:
- Key: Name
Value: !Sub '${Environment}-vpc-attachment'
- Key: Environment
Value: !Ref Environment
# Direct Connect Gateway
DirectConnectGateway:
Type: AWS::DirectConnect::DirectConnectGateway
Properties:
Name: !Sub '${Environment}-dx-gateway'
AmazonSideAsn: !Ref BgpAsn
# Direct Connect Gateway Association with Transit Gateway
DirectConnectGatewayToTransitGatewayAssociation:
Type: AWS::EC2::TransitGatewayDirectConnectGatewayAttachment
Properties:
TransitGatewayId: !Ref TransitGateway
DirectConnectGatewayId: !Ref DirectConnectGateway
Tags:
- Key: Name
Value: !Sub '${Environment}-dx-tgw-attachment'
- Key: Environment
Value: !Ref Environment
# Primary Direct Connect Connection
PrimaryDirectConnectConnection:
Type: AWS::DirectConnect::Connection
Properties:
ConnectionName: !Sub '${Environment}-primary-dx'
Location: !Ref DirectConnectLocation1
Bandwidth: !Ref DirectConnectBandwidth
Tags:
- Key: Name
Value: !Sub '${Environment}-primary-dx'
- Key: Environment
Value: !Ref Environment
- Key: Type
Value: Primary
# Secondary Direct Connect Connection
SecondaryDirectConnectConnection:
Type: AWS::DirectConnect::Connection
Properties:
ConnectionName: !Sub '${Environment}-secondary-dx'
Location: !Ref DirectConnectLocation2
Bandwidth: !Ref DirectConnectBandwidth
Tags:
- Key: Name
Value: !Sub '${Environment}-secondary-dx'
- Key: Environment
Value: !Ref Environment
- Key: Type
Value: Secondary
# Primary Direct Connect Virtual Interface
PrimaryDirectConnectVIF:
Type: AWS::DirectConnect::TransitVirtualInterface
Properties:
ConnectionId: !Ref PrimaryDirectConnectConnection
DirectConnectGatewayId: !Ref DirectConnectGateway
Vlan: 100
BgpAsn: !Ref CustomerBgpAsn
Mtu: 9000
VirtualInterfaceName: !Sub '${Environment}-primary-vif'
Tags:
- Key: Name
Value: !Sub '${Environment}-primary-vif'
- Key: Environment
Value: !Ref Environment
# Secondary Direct Connect Virtual Interface
SecondaryDirectConnectVIF:
Type: AWS::DirectConnect::TransitVirtualInterface
Properties:
ConnectionId: !Ref SecondaryDirectConnectConnection
DirectConnectGatewayId: !Ref DirectConnectGateway
Vlan: 200
BgpAsn: !Ref CustomerBgpAsn
Mtu: 9000
VirtualInterfaceName: !Sub '${Environment}-secondary-vif'
Tags:
- Key: Name
Value: !Sub '${Environment}-secondary-vif'
- Key: Environment
Value: !Ref Environment
# Customer Gateways for VPN backup
CustomerGateway1:
Type: AWS::EC2::CustomerGateway
Properties:
Type: ipsec.1
BgpAsn: !Ref CustomerBgpAsn
IpAddress: !Ref CustomerGatewayIp1
Tags:
- Key: Name
Value: !Sub '${Environment}-cgw-1'
- Key: Environment
Value: !Ref Environment
CustomerGateway2:
Type: AWS::EC2::CustomerGateway
Properties:
Type: ipsec.1
BgpAsn: !Ref CustomerBgpAsn
IpAddress: !Ref CustomerGatewayIp2
Tags:
- Key: Name
Value: !Sub '${Environment}-cgw-2'
- Key: Environment
Value: !Ref Environment
# VPN Connections for backup
VPNConnection1:
Type: AWS::EC2::VPNConnection
Properties:
Type: ipsec.1
CustomerGatewayId: !Ref CustomerGateway1
TransitGatewayId: !Ref TransitGateway
StaticRoutesOnly: false
Tags:
- Key: Name
Value: !Sub '${Environment}-vpn-1'
- Key: Environment
Value: !Ref Environment
- Key: Type
Value: Backup
VPNConnection2:
Type: AWS::EC2::VPNConnection
Properties:
Type: ipsec.1
CustomerGatewayId: !Ref CustomerGateway2
TransitGatewayId: !Ref TransitGateway
StaticRoutesOnly: false
Tags:
- Key: Name
Value: !Sub '${Environment}-vpn-2'
- Key: Environment
Value: !Ref Environment
- Key: Type
Value: Backup
# Route Tables
PrivateRouteTable1:
Type: AWS::EC2::RouteTable
Properties:
VpcId: !Ref HybridVPC
Tags:
- Key: Name
Value: !Sub '${Environment}-private-rt-1'
- Key: Environment
Value: !Ref Environment
PrivateRouteTable2:
Type: AWS::EC2::RouteTable
Properties:
VpcId: !Ref HybridVPC
Tags:
- Key: Name
Value: !Sub '${Environment}-private-rt-2'
- Key: Environment
Value: !Ref Environment
# Route to on-premises via Transit Gateway
OnPremisesRoute1:
Type: AWS::EC2::Route
DependsOn: TransitGatewayVPCAttachment
Properties:
RouteTableId: !Ref PrivateRouteTable1
DestinationCidrBlock: !Ref OnPremisesCidr
TransitGatewayId: !Ref TransitGateway
OnPremisesRoute2:
Type: AWS::EC2::Route
DependsOn: TransitGatewayVPCAttachment
Properties:
RouteTableId: !Ref PrivateRouteTable2
DestinationCidrBlock: !Ref OnPremisesCidr
TransitGatewayId: !Ref TransitGateway
# Subnet Route Table Associations
PrivateSubnet1RouteTableAssociation:
Type: AWS::EC2::SubnetRouteTableAssociation
Properties:
SubnetId: !Ref PrivateSubnet1
RouteTableId: !Ref PrivateRouteTable1
PrivateSubnet2RouteTableAssociation:
Type: AWS::EC2::SubnetRouteTableAssociation
Properties:
SubnetId: !Ref PrivateSubnet2
RouteTableId: !Ref PrivateRouteTable2
# Security Groups
HybridConnectivitySecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupName: !Sub '${Environment}-hybrid-sg'
GroupDescription: Security group for hybrid connectivity
VpcId: !Ref HybridVPC
SecurityGroupIngress:
- IpProtocol: -1
CidrIp: !Ref OnPremisesCidr
Description: All traffic from on-premises
- IpProtocol: icmp
FromPort: -1
ToPort: -1
CidrIp: !Ref VpcCidr
Description: ICMP within VPC
SecurityGroupEgress:
- IpProtocol: -1
CidrIp: 0.0.0.0/0
Description: All outbound traffic
Tags:
- Key: Name
Value: !Sub '${Environment}-hybrid-sg'
- Key: Environment
Value: !Ref Environment
# VPC Flow Logs for network monitoring
VPCFlowLogRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: vpc-flow-logs.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: flowlogsDeliveryRolePolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
- logs:DescribeLogGroups
- logs:DescribeLogStreams
Resource: '*'
VPCFlowLogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub '/aws/vpc/flowlogs/${Environment}-hybrid'
RetentionInDays: 30
VPCFlowLog:
Type: AWS::EC2::FlowLog
Properties:
ResourceType: VPC
ResourceId: !Ref HybridVPC
TrafficType: ALL
LogDestinationType: cloud-watch-logs
LogGroupName: !Ref VPCFlowLogGroup
DeliverLogsPermissionArn: !GetAtt VPCFlowLogRole.Arn
Tags:
- Key: Name
Value: !Sub '${Environment}-hybrid-flow-logs'
- Key: Environment
Value: !Ref Environment
# CloudWatch Alarms for monitoring
DirectConnectConnectionStateAlarm1:
Type: AWS::CloudWatch::Alarm
Properties:
AlarmName: !Sub '${Environment}-dx-primary-connection-down'
AlarmDescription: Primary Direct Connect connection is down
MetricName: ConnectionState
Namespace: AWS/DX
Statistic: Maximum
Period: 300
EvaluationPeriods: 2
Threshold: 0
ComparisonOperator: LessThanOrEqualToThreshold
Dimensions:
- Name: ConnectionId
Value: !Ref PrimaryDirectConnectConnection
TreatMissingData: breaching
DirectConnectConnectionStateAlarm2:
Type: AWS::CloudWatch::Alarm
Properties:
AlarmName: !Sub '${Environment}-dx-secondary-connection-down'
AlarmDescription: Secondary Direct Connect connection is down
MetricName: ConnectionState
Namespace: AWS/DX
Statistic: Maximum
Period: 300
EvaluationPeriods: 2
Threshold: 0
ComparisonOperator: LessThanOrEqualToThreshold
Dimensions:
- Name: ConnectionId
Value: !Ref SecondaryDirectConnectConnection
TreatMissingData: breaching
VPNConnectionStateAlarm1:
Type: AWS::CloudWatch::Alarm
Properties:
AlarmName: !Sub '${Environment}-vpn-1-tunnel-down'
AlarmDescription: VPN connection 1 tunnel is down
MetricName: TunnelState
Namespace: AWS/VPN
Statistic: Maximum
Period: 300
EvaluationPeriods: 2
Threshold: 0
ComparisonOperator: LessThanOrEqualToThreshold
Dimensions:
- Name: VpnId
Value: !Ref VPNConnection1
TreatMissingData: breaching
VPNConnectionStateAlarm2:
Type: AWS::CloudWatch::Alarm
Properties:
AlarmName: !Sub '${Environment}-vpn-2-tunnel-down'
AlarmDescription: VPN connection 2 tunnel is down
MetricName: TunnelState
Namespace: AWS/VPN
Statistic: Maximum
Period: 300
EvaluationPeriods: 2
Threshold: 0
ComparisonOperator: LessThanOrEqualToThreshold
Dimensions:
- Name: VpnId
Value: !Ref VPNConnection2
TreatMissingData: breaching
# SNS Topic for notifications
HybridConnectivityAlerts:
Type: AWS::SNS::Topic
Properties:
TopicName: !Sub '${Environment}-hybrid-connectivity-alerts'
DisplayName: 'Hybrid Connectivity Alerts'
# CloudWatch Dashboard
HybridConnectivityDashboard:
Type: AWS::CloudWatch::Dashboard
Properties:
DashboardName: !Sub '${Environment}-hybrid-connectivity'
DashboardBody: !Sub |
{
"widgets": [
{
"type": "metric",
"x": 0,
"y": 0,
"width": 12,
"height": 6,
"properties": {
"metrics": [
[ "AWS/DX", "ConnectionState", "ConnectionId", "${PrimaryDirectConnectConnection}" ],
[ ".", ".", ".", "${SecondaryDirectConnectConnection}" ]
],
"view": "timeSeries",
"stacked": false,
"region": "${AWS::Region}",
"title": "Direct Connect Connection State",
"period": 300,
"yAxis": {
"left": {
"min": 0,
"max": 1
}
}
}
},
{
"type": "metric",
"x": 12,
"y": 0,
"width": 12,
"height": 6,
"properties": {
"metrics": [
[ "AWS/VPN", "TunnelState", "VpnId", "${VPNConnection1}" ],
[ ".", ".", ".", "${VPNConnection2}" ]
],
"view": "timeSeries",
"stacked": false,
"region": "${AWS::Region}",
"title": "VPN Connection State",
"period": 300,
"yAxis": {
"left": {
"min": 0,
"max": 1
}
}
}
},
{
"type": "metric",
"x": 0,
"y": 6,
"width": 24,
"height": 6,
"properties": {
"metrics": [
[ "AWS/DX", "ConnectionBpsEgress", "ConnectionId", "${PrimaryDirectConnectConnection}" ],
[ ".", "ConnectionBpsIngress", ".", "." ],
[ ".", "ConnectionBpsEgress", ".", "${SecondaryDirectConnectConnection}" ],
[ ".", "ConnectionBpsIngress", ".", "." ]
],
"view": "timeSeries",
"stacked": false,
"region": "${AWS::Region}",
"title": "Direct Connect Bandwidth Utilization",
"period": 300
}
}
]
}
Outputs:
VPCId:
Description: VPC ID
Value: !Ref HybridVPC
Export:
Name: !Sub '${Environment}-hybrid-vpc-id'
TransitGatewayId:
Description: Transit Gateway ID
Value: !Ref TransitGateway
Export:
Name: !Sub '${Environment}-transit-gateway-id'
DirectConnectGatewayId:
Description: Direct Connect Gateway ID
Value: !Ref DirectConnectGateway
Export:
Name: !Sub '${Environment}-dx-gateway-id'
PrimaryDirectConnectConnectionId:
Description: Primary Direct Connect Connection ID
Value: !Ref PrimaryDirectConnectConnection
Export:
Name: !Sub '${Environment}-primary-dx-connection-id'
SecondaryDirectConnectConnectionId:
Description: Secondary Direct Connect Connection ID
Value: !Ref SecondaryDirectConnectConnection
Export:
Name: !Sub '${Environment}-secondary-dx-connection-id'
VPNConnection1Id:
Description: VPN Connection 1 ID
Value: !Ref VPNConnection1
Export:
Name: !Sub '${Environment}-vpn-1-id'
VPNConnection2Id:
Description: VPN Connection 2 ID
Value: !Ref VPNConnection2
Export:
Name: !Sub '${Environment}-vpn-2-id'
HybridSecurityGroupId:
Description: Hybrid connectivity security group ID
Value: !Ref HybridConnectivitySecurityGroup
Export:
Name: !Sub '${Environment}-hybrid-sg-id'
DashboardURL:
Description: CloudWatch Dashboard URL
Value: !Sub 'https://${AWS::Region}.console.aws.amazon.com/cloudwatch/home?region=${AWS::Region}#dashboards:name=${Environment}-hybrid-connectivity'Example 4: Network Connectivity Testing and Validation Framework
View code
#!/bin/bash
# Network Connectivity Testing and Validation Framework
# Comprehensive testing of hybrid network connectivity and failover scenarios
set -euo pipefail
# Configuration
CONFIG_FILE="${CONFIG_FILE:-./network-test-config.json}"
LOG_FILE="${LOG_FILE:-./network-testing.log}"
RESULTS_DIR="${RESULTS_DIR:-./network-test-results}"
TEMP_DIR="${TEMP_DIR:-/tmp/network-testing}"
# Create directories
mkdir -p "$RESULTS_DIR" "$TEMP_DIR"
# Logging function
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*" | tee -a "$LOG_FILE"
}
# Load configuration
if [[ ! -f "$CONFIG_FILE" ]]; then
log "ERROR: Configuration file $CONFIG_FILE not found"
exit 1
fi
# Parse configuration
TRANSIT_GATEWAY_ID=$(jq -r '.transit_gateway_id' "$CONFIG_FILE")
DX_CONNECTION_IDS=($(jq -r '.direct_connect_connections[]' "$CONFIG_FILE"))
VPN_CONNECTION_IDS=($(jq -r '.vpn_connections[]' "$CONFIG_FILE"))
TEST_TARGETS=($(jq -r '.test_targets[].ip' "$CONFIG_FILE"))
ON_PREMISES_CIDRS=($(jq -r '.on_premises_cidrs[]' "$CONFIG_FILE"))
log "Starting network connectivity testing"
log "Transit Gateway: $TRANSIT_GATEWAY_ID"
log "Direct Connect Connections: ${#DX_CONNECTION_IDS[@]}"
log "VPN Connections: ${#VPN_CONNECTION_IDS[@]}"
log "Test Targets: ${#TEST_TARGETS[@]}"
# Function to test Direct Connect connectivity
test_direct_connect_connectivity() {
local test_id="dx_test_$(date +%s)"
local results_file="$RESULTS_DIR/dx_connectivity_${test_id}.json"
log "Testing Direct Connect connectivity"
# Initialize results
cat > "$results_file" << EOF
{
"test_id": "$test_id",
"test_type": "direct_connect_connectivity",
"timestamp": "$(date -u +%Y-%m-%dT%H:%M:%SZ)",
"transit_gateway_id": "$TRANSIT_GATEWAY_ID",
"connection_tests": [],
"overall_status": "running"
}
EOF
# Test each Direct Connect connection
for dx_connection_id in "${DX_CONNECTION_IDS[@]}"; do
log "Testing Direct Connect connection: $dx_connection_id"
local connection_result=$(test_dx_connection "$dx_connection_id")
# Add result to results file
jq --argjson result "$connection_result" \
'.connection_tests += [$result]' "$results_file" > "$results_file.tmp"
mv "$results_file.tmp" "$results_file"
done
# Calculate overall status
local overall_status=$(jq -r '
.connection_tests |
if all(.status == "healthy") then "healthy"
elif any(.status == "healthy") then "degraded"
else "failed"
end
' "$results_file")
# Update overall status
jq --arg status "$overall_status" '.overall_status = $status' "$results_file" > "$results_file.tmp"
mv "$results_file.tmp" "$results_file"
log "Direct Connect connectivity test completed: $overall_status"
echo "$results_file"
}
# Function to test individual Direct Connect connection
test_dx_connection() {
local dx_connection_id="$1"
# Get connection status from AWS
local connection_info=$(aws directconnect describe-connections \
--connection-ids "$dx_connection_id" \
--output json 2>/dev/null || echo '{"connections": []}')
local connection_state="unknown"
local bandwidth="unknown"
local location="unknown"
if [[ $(echo "$connection_info" | jq '.connections | length') -gt 0 ]]; then
connection_state=$(echo "$connection_info" | jq -r '.connections[0].connectionState')
bandwidth=$(echo "$connection_info" | jq -r '.connections[0].bandwidth')
location=$(echo "$connection_info" | jq -r '.connections[0].location')
fi
# Get Virtual Interface information
local vif_info=$(aws directconnect describe-virtual-interfaces \
--connection-id "$dx_connection_id" \
--output json 2>/dev/null || echo '{"virtualInterfaces": []}')
local vif_tests=()
if [[ $(echo "$vif_info" | jq '.virtualInterfaces | length') -gt 0 ]]; then
# Test each Virtual Interface
echo "$vif_info" | jq -c '.virtualInterfaces[]' | while read -r vif; do
local vif_id=$(echo "$vif" | jq -r '.virtualInterfaceId')
local vif_state=$(echo "$vif" | jq -r '.virtualInterfaceState')
local bgp_status=$(echo "$vif" | jq -r '.bgpPeers[0].bgpStatus // "unknown"')
vif_tests+=("{
\"vif_id\": \"$vif_id\",
\"vif_state\": \"$vif_state\",
\"bgp_status\": \"$bgp_status\"
}")
done
fi
# Determine overall connection health
local status="healthy"
if [[ "$connection_state" != "available" ]]; then
status="unhealthy"
fi
# Create connection test result
local vif_tests_json=$(printf '%s\n' "${vif_tests[@]}" | jq -s . 2>/dev/null || echo '[]')
cat << EOF
{
"connection_id": "$dx_connection_id",
"connection_state": "$connection_state",
"bandwidth": "$bandwidth",
"location": "$location",
"virtual_interfaces": $vif_tests_json,
"status": "$status",
"timestamp": "$(date -u +%Y-%m-%dT%H:%M:%SZ)"
}
EOF
}
# Function to test VPN connectivity
test_vpn_connectivity() {
local test_id="vpn_test_$(date +%s)"
local results_file="$RESULTS_DIR/vpn_connectivity_${test_id}.json"
log "Testing VPN connectivity"
# Initialize results
cat > "$results_file" << EOF
{
"test_id": "$test_id",
"test_type": "vpn_connectivity",
"timestamp": "$(date -u +%Y-%m-%dT%H:%M:%SZ)",
"transit_gateway_id": "$TRANSIT_GATEWAY_ID",
"connection_tests": [],
"overall_status": "running"
}
EOF
# Test each VPN connection
for vpn_connection_id in "${VPN_CONNECTION_IDS[@]}"; do
log "Testing VPN connection: $vpn_connection_id"
local connection_result=$(test_vpn_connection "$vpn_connection_id")
# Add result to results file
jq --argjson result "$connection_result" \
'.connection_tests += [$result]' "$results_file" > "$results_file.tmp"
mv "$results_file.tmp" "$results_file"
done
# Calculate overall status
local overall_status=$(jq -r '
.connection_tests |
if all(.status == "healthy") then "healthy"
elif any(.status == "healthy") then "degraded"
else "failed"
end
' "$results_file")
# Update overall status
jq --arg status "$overall_status" '.overall_status = $status' "$results_file" > "$results_file.tmp"
mv "$results_file.tmp" "$results_file"
log "VPN connectivity test completed: $overall_status"
echo "$results_file"
}
# Function to test individual VPN connection
test_vpn_connection() {
local vpn_connection_id="$1"
# Get VPN connection status from AWS
local vpn_info=$(aws ec2 describe-vpn-connections \
--vpn-connection-ids "$vpn_connection_id" \
--output json 2>/dev/null || echo '{"VpnConnections": []}')
local vpn_state="unknown"
local customer_gateway_ip="unknown"
local tunnel_tests=()
if [[ $(echo "$vpn_info" | jq '.VpnConnections | length') -gt 0 ]]; then
vpn_state=$(echo "$vpn_info" | jq -r '.VpnConnections[0].State')
customer_gateway_ip=$(echo "$vpn_info" | jq -r '.VpnConnections[0].CustomerGatewayConfiguration // "unknown"')
# Test each tunnel
echo "$vpn_info" | jq -c '.VpnConnections[0].VgwTelemetry[]' | while read -r tunnel; do
local tunnel_ip=$(echo "$tunnel" | jq -r '.OutsideIpAddress')
local tunnel_status=$(echo "$tunnel" | jq -r '.Status')
local accepted_routes=$(echo "$tunnel" | jq -r '.AcceptedRouteCount // 0')
tunnel_tests+=("{
\"tunnel_ip\": \"$tunnel_ip\",
\"tunnel_status\": \"$tunnel_status\",
\"accepted_routes\": $accepted_routes
}")
done
fi
# Determine overall connection health
local status="healthy"
if [[ "$vpn_state" != "available" ]]; then
status="unhealthy"
fi
# Create connection test result
local tunnel_tests_json=$(printf '%s\n' "${tunnel_tests[@]}" | jq -s . 2>/dev/null || echo '[]')
cat << EOF
{
"connection_id": "$vpn_connection_id",
"vpn_state": "$vpn_state",
"customer_gateway_ip": "$customer_gateway_ip",
"tunnels": $tunnel_tests_json,
"status": "$status",
"timestamp": "$(date -u +%Y-%m-%dT%H:%M:%SZ)"
}
EOF
}
# Function to test end-to-end connectivity
test_end_to_end_connectivity() {
local test_id="e2e_test_$(date +%s)"
local results_file="$RESULTS_DIR/e2e_connectivity_${test_id}.json"
log "Testing end-to-end connectivity"
# Initialize results
cat > "$results_file" << EOF
{
"test_id": "$test_id",
"test_type": "end_to_end_connectivity",
"timestamp": "$(date -u +%Y-%m-%dT%H:%M:%SZ)",
"target_tests": [],
"overall_status": "running"
}
EOF
# Test connectivity to each target
for target_ip in "${TEST_TARGETS[@]}"; do
log "Testing connectivity to target: $target_ip"
local target_result=$(test_target_connectivity "$target_ip")
# Add result to results file
jq --argjson result "$target_result" \
'.target_tests += [$result]' "$results_file" > "$results_file.tmp"
mv "$results_file.tmp" "$results_file"
done
# Calculate overall status
local overall_status=$(jq -r '
.target_tests |
if all(.status == "reachable") then "healthy"
elif any(.status == "reachable") then "degraded"
else "failed"
end
' "$results_file")
# Update overall status
jq --arg status "$overall_status" '.overall_status = $status' "$results_file" > "$results_file.tmp"
mv "$results_file.tmp" "$results_file"
log "End-to-end connectivity test completed: $overall_status"
echo "$results_file"
}
# Function to test connectivity to specific target
test_target_connectivity() {
local target_ip="$1"
# Ping test
local ping_result=$(ping -c 5 -W 5 "$target_ip" 2>&1 || echo "ping failed")
local ping_success=false
local avg_latency=0
local packet_loss=100
if echo "$ping_result" | grep -q "5 received"; then
ping_success=true
avg_latency=$(echo "$ping_result" | grep "avg" | cut -d'/' -f5 2>/dev/null || echo "0")
packet_loss=0
elif echo "$ping_result" | grep -q "received"; then
ping_success=true
local received=$(echo "$ping_result" | grep "received" | cut -d' ' -f4)
packet_loss=$(( (5 - received) * 20 ))
avg_latency=$(echo "$ping_result" | grep "avg" | cut -d'/' -f5 2>/dev/null || echo "0")
fi
# Traceroute test
local traceroute_result=$(traceroute -m 15 "$target_ip" 2>&1 || echo "traceroute failed")
local hop_count=0
if ! echo "$traceroute_result" | grep -q "failed"; then
hop_count=$(echo "$traceroute_result" | grep -c "^ *[0-9]" || echo "0")
fi
# TCP connectivity test (if port specified)
local tcp_test_result=""
local target_config=$(jq -r --arg ip "$target_ip" '.test_targets[] | select(.ip == $ip)' "$CONFIG_FILE")
local test_port=$(echo "$target_config" | jq -r '.port // empty')
if [[ -n "$test_port" ]]; then
if timeout 10 bash -c "echo >/dev/tcp/$target_ip/$test_port" 2>/dev/null; then
tcp_test_result="success"
else
tcp_test_result="failed"
fi
fi
# Determine overall status
local status="unreachable"
if [[ "$ping_success" == "true" ]]; then
if [[ "$packet_loss" -eq 0 ]]; then
status="reachable"
else
status="degraded"
fi
fi
cat << EOF
{
"target_ip": "$target_ip",
"ping_test": {
"success": $ping_success,
"avg_latency": $avg_latency,
"packet_loss": $packet_loss
},
"traceroute_test": {
"hop_count": $hop_count,
"output": "$traceroute_result"
},
"tcp_test": {
"port": "$test_port",
"result": "$tcp_test_result"
},
"status": "$status",
"timestamp": "$(date -u +%Y-%m-%dT%H:%M:%SZ)"
}
EOF
}
# Function to test failover scenarios
test_failover_scenarios() {
local test_id="failover_test_$(date +%s)"
local results_file="$RESULTS_DIR/failover_test_${test_id}.json"
log "Testing failover scenarios"
# Initialize results
cat > "$results_file" << EOF
{
"test_id": "$test_id",
"test_type": "failover_scenarios",
"timestamp": "$(date -u +%Y-%m-%dT%H:%M:%SZ)",
"scenario_tests": [],
"overall_status": "running"
}
EOF
# Test Direct Connect failover scenario
log "Testing Direct Connect failover scenario"
local dx_failover_result=$(simulate_dx_failover)
jq --argjson result "$dx_failover_result" \
'.scenario_tests += [$result]' "$results_file" > "$results_file.tmp"
mv "$results_file.tmp" "$results_file"
# Test VPN failover scenario
log "Testing VPN failover scenario"
local vpn_failover_result=$(simulate_vpn_failover)
jq --argjson result "$vpn_failover_result" \
'.scenario_tests += [$result]' "$results_file" > "$results_file.tmp"
mv "$results_file.tmp" "$results_file"
# Calculate overall status
local overall_status=$(jq -r '
.scenario_tests |
if all(.status == "passed") then "passed"
elif any(.status == "passed") then "partial"
else "failed"
end
' "$results_file")
# Update overall status
jq --arg status "$overall_status" '.overall_status = $status' "$results_file" > "$results_file.tmp"
mv "$results_file.tmp" "$results_file"
log "Failover scenario testing completed: $overall_status"
echo "$results_file"
}
# Function to simulate Direct Connect failover
simulate_dx_failover() {
log "Simulating Direct Connect failover scenario"
# This is a simulation - in practice, you would:
# 1. Disable primary DX connection
# 2. Wait for BGP convergence
# 3. Test connectivity via backup paths
# 4. Re-enable primary connection
# 5. Verify traffic returns to primary path
local scenario_start=$(date +%s)
local connectivity_maintained=true
local failover_time=0
local recovery_time=0
# Simulate connectivity tests during failover
for target_ip in "${TEST_TARGETS[@]}"; do
# Test connectivity during simulated failover
local test_result=$(test_target_connectivity "$target_ip")
local target_status=$(echo "$test_result" | jq -r '.status')
if [[ "$target_status" != "reachable" ]]; then
connectivity_maintained=false
break
fi
done
local scenario_end=$(date +%s)
local total_time=$((scenario_end - scenario_start))
local status="passed"
if [[ "$connectivity_maintained" != "true" ]]; then
status="failed"
fi
cat << EOF
{
"scenario": "direct_connect_failover",
"connectivity_maintained": $connectivity_maintained,
"failover_time": $failover_time,
"recovery_time": $recovery_time,
"total_test_time": $total_time,
"status": "$status",
"timestamp": "$(date -u +%Y-%m-%dT%H:%M:%SZ)"
}
EOF
}
# Function to simulate VPN failover
simulate_vpn_failover() {
log "Simulating VPN failover scenario"
local scenario_start=$(date +%s)
local connectivity_maintained=true
local failover_time=0
local recovery_time=0
# Simulate VPN tunnel failover test
for target_ip in "${TEST_TARGETS[@]}"; do
local test_result=$(test_target_connectivity "$target_ip")
local target_status=$(echo "$test_result" | jq -r '.status')
if [[ "$target_status" != "reachable" ]]; then
connectivity_maintained=false
break
fi
done
local scenario_end=$(date +%s)
local total_time=$((scenario_end - scenario_start))
local status="passed"
if [[ "$connectivity_maintained" != "true" ]]; then
status="failed"
fi
cat << EOF
{
"scenario": "vpn_failover",
"connectivity_maintained": $connectivity_maintained,
"failover_time": $failover_time,
"recovery_time": $recovery_time,
"total_test_time": $total_time,
"status": "$status",
"timestamp": "$(date -u +%Y-%m-%dT%H:%M:%SZ)"
}
EOF
}
# Function to generate comprehensive test report
generate_test_report() {
local report_file="$RESULTS_DIR/network_test_report_$(date +%Y%m%d_%H%M%S).json"
log "Generating comprehensive test report"
# Collect all test results
local test_results=()
for result_file in "$RESULTS_DIR"/*.json; do
if [[ -f "$result_file" && "$result_file" != "$report_file" ]]; then
test_results+=("$(cat "$result_file")")
fi
done
# Create comprehensive report
local test_results_json=$(printf '%s\n' "${test_results[@]}" | jq -s .)
cat > "$report_file" << EOF
{
"report_id": "network_test_report_$(date +%s)",
"generated_at": "$(date -u +%Y-%m-%dT%H:%M:%SZ)",
"test_configuration": $(cat "$CONFIG_FILE"),
"test_results": $test_results_json,
"summary": {
"total_tests": $(echo "$test_results_json" | jq 'length'),
"passed_tests": $(echo "$test_results_json" | jq '[.[] | select(.overall_status == "healthy" or .overall_status == "passed")] | length'),
"failed_tests": $(echo "$test_results_json" | jq '[.[] | select(.overall_status == "failed")] | length'),
"degraded_tests": $(echo "$test_results_json" | jq '[.[] | select(.overall_status == "degraded" or .overall_status == "partial")] | length')
}
}
EOF
log "Test report generated: $report_file"
echo "$report_file"
}
# Main execution
main() {
log "Starting comprehensive network connectivity testing"
# Test Direct Connect connectivity
dx_results=$(test_direct_connect_connectivity)
# Test VPN connectivity
vpn_results=$(test_vpn_connectivity)
# Test end-to-end connectivity
e2e_results=$(test_end_to_end_connectivity)
# Test failover scenarios
failover_results=$(test_failover_scenarios)
# Generate comprehensive report
report_file=$(generate_test_report)
# Display summary
log "Network connectivity testing completed"
log "Results files generated:"
log "- Direct Connect: $dx_results"
log "- VPN: $vpn_results"
log "- End-to-End: $e2e_results"
log "- Failover: $failover_results"
log "- Report: $report_file"
# Show summary
local summary=$(jq -r '.summary | "Total: \(.total_tests), Passed: \(.passed_tests), Failed: \(.failed_tests), Degraded: \(.degraded_tests)"' "$report_file")
log "Test Summary: $summary"
}
# Configuration file template
create_config_template() {
cat > network-test-config.json << 'EOF'
{
"transit_gateway_id": "tgw-1234567890abcdef0",
"direct_connect_connections": [
"dxcon-fg1234567890abcdef",
"dxcon-fg0987654321fedcba"
],
"vpn_connections": [
"vpn-1234567890abcdef0",
"vpn-0987654321fedcba1"
],
"on_premises_cidrs": [
"192.168.0.0/16",
"172.16.0.0/12"
],
"test_targets": [
{
"ip": "192.168.1.1",
"port": 22,
"description": "On-premises server 1"
},
{
"ip": "172.16.1.1",
"port": 80,
"description": "On-premises server 2"
}
]
}
EOF
log "Created configuration template: network-test-config.json"
}
# Command line argument handling
case "${1:-}" in
"config")
create_config_template
;;
"test"|"")
main
;;
*)
echo "Usage: $0 [config|test]"
echo " config - Create configuration template"
echo " test - Run network connectivity tests (default)"
exit 1
;;
esac
# Cleanup
rm -rf "$TEMP_DIR"
log "Network connectivity testing completed"AWS Services Used
- AWS Transit Gateway: Centralized hub for connecting VPCs and on-premises networks
- AWS Direct Connect: Dedicated network connections with high bandwidth and low latency
- AWS VPN: Encrypted IPsec VPN connections for backup connectivity
- Direct Connect Gateway: Connects multiple VPCs to Direct Connect connections
- Customer Gateway: Represents on-premises VPN device configuration
- Virtual Private Gateway: AWS-side VPN endpoint (legacy, replaced by Transit Gateway)
- Amazon Route 53: DNS resolution and health checks for hybrid environments
- Amazon CloudWatch: Network monitoring, metrics, and automated alerting
- AWS CloudFormation: Infrastructure as code for hybrid network deployment
- VPC Flow Logs: Network traffic analysis and security monitoring
- AWS Systems Manager: Configuration management and automation
- Amazon SNS: Notification service for network alerts and events
Benefits
- Redundant Connectivity: Multiple connection types eliminate single points of failure
- Automatic Failover: Intelligent routing ensures seamless failover between connections
- High Performance: Direct Connect provides consistent, high-bandwidth connectivity
- Cost Optimization: VPN backup connections provide cost-effective redundancy
- Centralized Management: Transit Gateway simplifies complex network topologies
- Enhanced Security: Encrypted connections and network segmentation
- Scalability: Easy addition of new VPCs and on-premises locations
- Monitoring and Visibility: Comprehensive network monitoring and alerting
- Disaster Recovery: Cross-region connectivity for business continuity
- Compliance: Network audit trails and security controls