REL06
REL06-BP01 - Monitor all components for the workload (Generation)
One-Click Remediation
Deploy CloudFormation stacks to implement this best practice with a single click.
Monitoring Dashboard and Alarms
Deploy a baseline CloudWatch dashboard and alarms for core workload health and availability signals
Stacks deploy to your AWS account. Review parameters before creating. Standard AWS charges apply.
REL06-BP01: Monitor all components for the workload (Generation)
Overview
Implement comprehensive monitoring across all workload components to generate metrics, logs, and traces that provide visibility into system health, performance, and behavior. Effective monitoring generation ensures that all critical components are instrumented to collect the data needed for observability, troubleshooting, and optimization.
Implementation Steps
1. Identify All Workload Components
- Map all infrastructure components including compute, storage, and network resources
- Catalog application components, services, and dependencies
- Document third-party integrations and external dependencies
- Identify critical paths and high-risk components requiring enhanced monitoring
2. Implement Infrastructure Monitoring
- Deploy CloudWatch agents on all EC2 instances and containers
- Configure VPC Flow Logs for network monitoring
- Enable AWS service-specific monitoring and metrics
- Implement custom metrics for business-specific infrastructure components
3. Configure Application Performance Monitoring
- Instrument applications with metrics, logs, and traces
- Implement health checks and readiness probes
- Configure performance counters and business metrics
- Deploy application-specific monitoring agents and libraries
4. Establish Database and Storage Monitoring
- Enable database performance insights and query monitoring
- Configure storage metrics for IOPS, throughput, and capacity
- Implement backup and replication monitoring
- Monitor data consistency and integrity checks
5. Deploy Network and Security Monitoring
- Configure network performance and connectivity monitoring
- Implement security event logging and monitoring
- Deploy intrusion detection and anomaly monitoring
- Monitor SSL/TLS certificate expiration and security configurations
6. Implement Synthetic and User Experience Monitoring
- Deploy synthetic monitoring for critical user journeys
- Implement real user monitoring (RUM) for actual user experience
- Configure uptime monitoring for external endpoints
- Monitor API response times and availability from multiple locations
Implementation Examples
Example 1: Comprehensive Workload Monitoring System
View code
import boto3
import json
import logging
import time
import psutil
import requests
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, asdict
from enum import Enum
import threading
import asyncio
from concurrent.futures import ThreadPoolExecutor
class ComponentType(Enum):
COMPUTE = "compute"
DATABASE = "database"
STORAGE = "storage"
NETWORK = "network"
APPLICATION = "application"
EXTERNAL_SERVICE = "external_service"
class MetricType(Enum):
COUNTER = "counter"
GAUGE = "gauge"
HISTOGRAM = "histogram"
TIMER = "timer"
@dataclass
class MonitoringComponent:
component_id: str
name: str
component_type: ComponentType
monitoring_enabled: bool
metrics_config: Dict[str, Any]
health_check_config: Dict[str, Any]
alert_thresholds: Dict[str, float]
tags: Dict[str, str]
@dataclass
class MetricData:
metric_name: str
metric_type: MetricType
value: float
unit: str
timestamp: datetime
dimensions: Dict[str, str]
component_id: str
class WorkloadMonitoringManager:
"""Comprehensive workload monitoring system"""
def __init__(self, config: Dict[str, Any]):
self.config = config
# AWS clients
self.cloudwatch = boto3.client('cloudwatch')
self.ec2 = boto3.client('ec2')
self.rds = boto3.client('rds')
self.elbv2 = boto3.client('elbv2')
self.logs = boto3.client('logs')
self.xray = boto3.client('xray')
# Monitoring components registry
self.components = {}
self.metric_collectors = {}
self.health_checkers = {}
# Monitoring state
self.monitoring_active = False
self.collection_interval = config.get('collection_interval_seconds', 60)
self.thread_pool = ThreadPoolExecutor(max_workers=10)
# Metrics buffer
self.metrics_buffer = []
self.buffer_lock = threading.Lock()
def register_component(self, component: MonitoringComponent):
"""Register a component for monitoring"""
self.components[component.component_id] = component
# Initialize metric collectors based on component type
if component.component_type == ComponentType.COMPUTE:
self.metric_collectors[component.component_id] = ComputeMetricCollector(component)
elif component.component_type == ComponentType.DATABASE:
self.metric_collectors[component.component_id] = DatabaseMetricCollector(component)
elif component.component_type == ComponentType.APPLICATION:
self.metric_collectors[component.component_id] = ApplicationMetricCollector(component)
elif component.component_type == ComponentType.EXTERNAL_SERVICE:
self.metric_collectors[component.component_id] = ExternalServiceMetricCollector(component)
# Initialize health checkers
if component.health_check_config.get('enabled', False):
self.health_checkers[component.component_id] = HealthChecker(component)
logging.info(f"Registered monitoring component: {component.name}")
async def start_monitoring(self):
"""Start comprehensive monitoring for all components"""
self.monitoring_active = True
logging.info("Starting workload monitoring system")
# Start monitoring tasks
tasks = [
asyncio.create_task(self._metric_collection_loop()),
asyncio.create_task(self._health_check_loop()),
asyncio.create_task(self._metric_publishing_loop()),
asyncio.create_task(self._synthetic_monitoring_loop())
]
try:
await asyncio.gather(*tasks)
except Exception as e:
logging.error(f"Monitoring system error: {str(e)}")
self.monitoring_active = False
async def _metric_collection_loop(self):
"""Main metric collection loop"""
while self.monitoring_active:
try:
collection_tasks = []
for component_id, collector in self.metric_collectors.items():
if self.components[component_id].monitoring_enabled:
task = asyncio.create_task(self._collect_component_metrics(component_id, collector))
collection_tasks.append(task)
# Collect metrics from all components
await asyncio.gather(*collection_tasks, return_exceptions=True)
# Wait for next collection interval
await asyncio.sleep(self.collection_interval)
except Exception as e:
logging.error(f"Metric collection loop error: {str(e)}")
await asyncio.sleep(self.collection_interval)
async def _collect_component_metrics(self, component_id: str, collector):
"""Collect metrics from a specific component"""
try:
metrics = await collector.collect_metrics()
with self.buffer_lock:
self.metrics_buffer.extend(metrics)
logging.debug(f"Collected {len(metrics)} metrics from {component_id}")
except Exception as e:
logging.error(f"Failed to collect metrics from {component_id}: {str(e)}")
async def _health_check_loop(self):
"""Health check monitoring loop"""
while self.monitoring_active:
try:
health_tasks = []
for component_id, health_checker in self.health_checkers.items():
task = asyncio.create_task(self._check_component_health(component_id, health_checker))
health_tasks.append(task)
# Perform health checks
await asyncio.gather(*health_tasks, return_exceptions=True)
# Health checks run more frequently
await asyncio.sleep(30)
except Exception as e:
logging.error(f"Health check loop error: {str(e)}")
await asyncio.sleep(30)
async def _check_component_health(self, component_id: str, health_checker):
"""Check health of a specific component"""
try:
health_status = await health_checker.check_health()
# Create health metric
health_metric = MetricData(
metric_name="ComponentHealth",
metric_type=MetricType.GAUGE,
value=1.0 if health_status['healthy'] else 0.0,
unit="Count",
timestamp=datetime.utcnow(),
dimensions={
"ComponentId": component_id,
"ComponentType": self.components[component_id].component_type.value
},
component_id=component_id
)
with self.buffer_lock:
self.metrics_buffer.append(health_metric)
if not health_status['healthy']:
logging.warning(f"Component {component_id} health check failed: {health_status.get('error')}")
except Exception as e:
logging.error(f"Health check failed for {component_id}: {str(e)}")
async def _metric_publishing_loop(self):
"""Publish metrics to CloudWatch"""
while self.monitoring_active:
try:
# Get metrics from buffer
metrics_to_publish = []
with self.buffer_lock:
if self.metrics_buffer:
metrics_to_publish = self.metrics_buffer.copy()
self.metrics_buffer.clear()
if metrics_to_publish:
await self._publish_metrics_to_cloudwatch(metrics_to_publish)
# Publish every 60 seconds
await asyncio.sleep(60)
except Exception as e:
logging.error(f"Metric publishing loop error: {str(e)}")
await asyncio.sleep(60)
async def _publish_metrics_to_cloudwatch(self, metrics: List[MetricData]):
"""Publish metrics to CloudWatch"""
try:
# Group metrics by namespace
namespace_metrics = {}
for metric in metrics:
component = self.components[metric.component_id]
namespace = f"Workload/{component.component_type.value.title()}"
if namespace not in namespace_metrics:
namespace_metrics[namespace] = []
metric_data = {
'MetricName': metric.metric_name,
'Value': metric.value,
'Unit': metric.unit,
'Timestamp': metric.timestamp,
'Dimensions': [
{'Name': key, 'Value': value}
for key, value in metric.dimensions.items()
]
}
namespace_metrics[namespace].append(metric_data)
# Publish metrics in batches (CloudWatch limit is 20 metrics per call)
for namespace, metric_list in namespace_metrics.items():
for i in range(0, len(metric_list), 20):
batch = metric_list[i:i+20]
self.cloudwatch.put_metric_data(
Namespace=namespace,
MetricData=batch
)
logging.info(f"Published {len(metrics)} metrics to CloudWatch")
except Exception as e:
logging.error(f"Failed to publish metrics to CloudWatch: {str(e)}")
async def _synthetic_monitoring_loop(self):
"""Synthetic monitoring for critical endpoints"""
synthetic_config = self.config.get('synthetic_monitoring', {})
if not synthetic_config.get('enabled', False):
return
endpoints = synthetic_config.get('endpoints', [])
check_interval = synthetic_config.get('check_interval_seconds', 300)
while self.monitoring_active:
try:
for endpoint_config in endpoints:
await self._perform_synthetic_check(endpoint_config)
await asyncio.sleep(check_interval)
except Exception as e:
logging.error(f"Synthetic monitoring loop error: {str(e)}")
await asyncio.sleep(check_interval)
async def _perform_synthetic_check(self, endpoint_config: Dict[str, Any]):
"""Perform synthetic monitoring check"""
try:
url = endpoint_config['url']
timeout = endpoint_config.get('timeout_seconds', 30)
expected_status = endpoint_config.get('expected_status_code', 200)
start_time = time.time()
async with asyncio.timeout(timeout):
# Use requests in thread pool to avoid blocking
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(
self.thread_pool,
lambda: requests.get(url, timeout=timeout)
)
response_time = (time.time() - start_time) * 1000 # Convert to milliseconds
# Create synthetic monitoring metrics
availability_metric = MetricData(
metric_name="SyntheticAvailability",
metric_type=MetricType.GAUGE,
value=1.0 if response.status_code == expected_status else 0.0,
unit="Count",
timestamp=datetime.utcnow(),
dimensions={
"Endpoint": url,
"StatusCode": str(response.status_code)
},
component_id="synthetic_monitoring"
)
response_time_metric = MetricData(
metric_name="SyntheticResponseTime",
metric_type=MetricType.TIMER,
value=response_time,
unit="Milliseconds",
timestamp=datetime.utcnow(),
dimensions={
"Endpoint": url
},
component_id="synthetic_monitoring"
)
with self.buffer_lock:
self.metrics_buffer.extend([availability_metric, response_time_metric])
logging.debug(f"Synthetic check for {url}: {response.status_code} ({response_time:.2f}ms)")
except Exception as e:
logging.error(f"Synthetic check failed for {endpoint_config.get('url')}: {str(e)}")
# Create failure metric
failure_metric = MetricData(
metric_name="SyntheticAvailability",
metric_type=MetricType.GAUGE,
value=0.0,
unit="Count",
timestamp=datetime.utcnow(),
dimensions={
"Endpoint": endpoint_config.get('url', 'unknown'),
"Error": str(e)[:50] # Truncate error message
},
component_id="synthetic_monitoring"
)
with self.buffer_lock:
self.metrics_buffer.append(failure_metric)
class ComputeMetricCollector:
"""Metric collector for compute resources"""
def __init__(self, component: MonitoringComponent):
self.component = component
self.instance_id = component.metrics_config.get('instance_id')
async def collect_metrics(self) -> List[MetricData]:
"""Collect compute metrics"""
metrics = []
timestamp = datetime.utcnow()
try:
# CPU metrics
cpu_percent = psutil.cpu_percent(interval=1)
metrics.append(MetricData(
metric_name="CPUUtilization",
metric_type=MetricType.GAUGE,
value=cpu_percent,
unit="Percent",
timestamp=timestamp,
dimensions={"InstanceId": self.instance_id or "unknown"},
component_id=self.component.component_id
))
# Memory metrics
memory = psutil.virtual_memory()
metrics.append(MetricData(
metric_name="MemoryUtilization",
metric_type=MetricType.GAUGE,
value=memory.percent,
unit="Percent",
timestamp=timestamp,
dimensions={"InstanceId": self.instance_id or "unknown"},
component_id=self.component.component_id
))
# Disk metrics
disk = psutil.disk_usage('/')
disk_percent = (disk.used / disk.total) * 100
metrics.append(MetricData(
metric_name="DiskUtilization",
metric_type=MetricType.GAUGE,
value=disk_percent,
unit="Percent",
timestamp=timestamp,
dimensions={"InstanceId": self.instance_id or "unknown"},
component_id=self.component.component_id
))
# Network metrics
network = psutil.net_io_counters()
metrics.extend([
MetricData(
metric_name="NetworkBytesIn",
metric_type=MetricType.COUNTER,
value=network.bytes_recv,
unit="Bytes",
timestamp=timestamp,
dimensions={"InstanceId": self.instance_id or "unknown"},
component_id=self.component.component_id
),
MetricData(
metric_name="NetworkBytesOut",
metric_type=MetricType.COUNTER,
value=network.bytes_sent,
unit="Bytes",
timestamp=timestamp,
dimensions={"InstanceId": self.instance_id or "unknown"},
component_id=self.component.component_id
)
])
except Exception as e:
logging.error(f"Failed to collect compute metrics: {str(e)}")
return metrics
class ApplicationMetricCollector:
"""Metric collector for application components"""
def __init__(self, component: MonitoringComponent):
self.component = component
self.app_name = component.metrics_config.get('app_name', 'unknown')
async def collect_metrics(self) -> List[MetricData]:
"""Collect application metrics"""
metrics = []
timestamp = datetime.utcnow()
try:
# Simulate application metrics collection
# In real implementation, this would integrate with application monitoring libraries
# Request rate metric
metrics.append(MetricData(
metric_name="RequestRate",
metric_type=MetricType.GAUGE,
value=100.0, # Simulated value
unit="Count/Second",
timestamp=timestamp,
dimensions={"Application": self.app_name},
component_id=self.component.component_id
))
# Error rate metric
metrics.append(MetricData(
metric_name="ErrorRate",
metric_type=MetricType.GAUGE,
value=0.5, # Simulated value
unit="Percent",
timestamp=timestamp,
dimensions={"Application": self.app_name},
component_id=self.component.component_id
))
# Response time metric
metrics.append(MetricData(
metric_name="ResponseTime",
metric_type=MetricType.TIMER,
value=250.0, # Simulated value
unit="Milliseconds",
timestamp=timestamp,
dimensions={"Application": self.app_name},
component_id=self.component.component_id
))
except Exception as e:
logging.error(f"Failed to collect application metrics: {str(e)}")
return metrics
class HealthChecker:
"""Health checker for components"""
def __init__(self, component: MonitoringComponent):
self.component = component
self.health_config = component.health_check_config
async def check_health(self) -> Dict[str, Any]:
"""Perform health check"""
try:
check_type = self.health_config.get('type', 'http')
if check_type == 'http':
return await self._http_health_check()
elif check_type == 'tcp':
return await self._tcp_health_check()
elif check_type == 'custom':
return await self._custom_health_check()
else:
return {'healthy': False, 'error': f'Unknown health check type: {check_type}'}
except Exception as e:
return {'healthy': False, 'error': str(e)}
async def _http_health_check(self) -> Dict[str, Any]:
"""HTTP health check"""
url = self.health_config.get('url')
timeout = self.health_config.get('timeout_seconds', 10)
expected_status = self.health_config.get('expected_status_code', 200)
try:
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(
None,
lambda: requests.get(url, timeout=timeout)
)
healthy = response.status_code == expected_status
return {
'healthy': healthy,
'status_code': response.status_code,
'response_time_ms': response.elapsed.total_seconds() * 1000
}
except Exception as e:
return {'healthy': False, 'error': str(e)}
async def _tcp_health_check(self) -> Dict[str, Any]:
"""TCP health check"""
host = self.health_config.get('host')
port = self.health_config.get('port')
timeout = self.health_config.get('timeout_seconds', 10)
try:
reader, writer = await asyncio.wait_for(
asyncio.open_connection(host, port),
timeout=timeout
)
writer.close()
await writer.wait_closed()
return {'healthy': True}
except Exception as e:
return {'healthy': False, 'error': str(e)}
async def _custom_health_check(self) -> Dict[str, Any]:
"""Custom health check"""
# Implement custom health check logic based on component type
return {'healthy': True, 'message': 'Custom health check passed'}
# Usage example
async def main():
config = {
'collection_interval_seconds': 60,
'synthetic_monitoring': {
'enabled': True,
'check_interval_seconds': 300,
'endpoints': [
{
'url': 'https://example.com/health',
'timeout_seconds': 30,
'expected_status_code': 200
}
]
}
}
# Initialize monitoring manager
monitoring_manager = WorkloadMonitoringManager(config)
# Register components for monitoring
web_server_component = MonitoringComponent(
component_id="web_server_01",
name="Web Server Instance",
component_type=ComponentType.COMPUTE,
monitoring_enabled=True,
metrics_config={
'instance_id': 'i-1234567890abcdef0'
},
health_check_config={
'enabled': True,
'type': 'http',
'url': 'http://localhost:8080/health',
'timeout_seconds': 10,
'expected_status_code': 200
},
alert_thresholds={
'cpu_utilization': 80.0,
'memory_utilization': 85.0,
'disk_utilization': 90.0
},
tags={
'Environment': 'production',
'Application': 'web-app'
}
)
app_component = MonitoringComponent(
component_id="web_application",
name="Web Application",
component_type=ComponentType.APPLICATION,
monitoring_enabled=True,
metrics_config={
'app_name': 'web-app'
},
health_check_config={
'enabled': True,
'type': 'http',
'url': 'http://localhost:8080/api/health',
'timeout_seconds': 5
},
alert_thresholds={
'error_rate': 5.0,
'response_time': 1000.0
},
tags={
'Environment': 'production',
'Component': 'application'
}
)
# Register components
monitoring_manager.register_component(web_server_component)
monitoring_manager.register_component(app_component)
# Start monitoring
print("Starting comprehensive workload monitoring...")
await monitoring_manager.start_monitoring()
if __name__ == "__main__":
asyncio.run(main())AWS Services Used
- Amazon CloudWatch: Central metrics collection, storage, and visualization platform
- AWS X-Ray: Distributed tracing for application performance monitoring
- Amazon CloudWatch Logs: Centralized log collection and analysis
- AWS Systems Manager: Infrastructure monitoring and patch management
- Amazon EventBridge: Event-driven monitoring and automated responses
- AWS Config: Configuration monitoring and compliance tracking
- Amazon GuardDuty: Security monitoring and threat detection
- AWS CloudTrail: API call monitoring and audit logging
- Amazon VPC Flow Logs: Network traffic monitoring and analysis
- AWS Health Dashboard: AWS service health monitoring
- Amazon Route 53 Health Checks: DNS and endpoint monitoring
- Elastic Load Balancing: Load balancer health and performance monitoring
- Amazon RDS Performance Insights: Database performance monitoring
- Amazon ElastiCache: Cache performance and health monitoring
- AWS Lambda: Serverless function monitoring and error tracking
- Amazon ECS/EKS: Container orchestration monitoring and logging
Benefits
- Complete Visibility: Comprehensive monitoring across all workload components
- Proactive Issue Detection: Early identification of performance and reliability issues
- Improved Troubleshooting: Rich data for faster problem diagnosis and resolution
- Performance Optimization: Data-driven insights for system optimization
- Capacity Planning: Historical data for informed scaling decisions
- Compliance Monitoring: Automated tracking of configuration and security compliance
- Cost Optimization: Resource utilization monitoring for cost management
- Business Intelligence: Application and business metrics for decision making
- Automated Response: Foundation for automated incident response and remediation
- Enhanced Reliability: Continuous monitoring improves overall system reliability
Related Resources
- AWS Well-Architected Reliability Pillar
- Monitor All Components
- Amazon CloudWatch User Guide
- AWS X-Ray Developer Guide
- Amazon CloudWatch Logs User Guide
- AWS Systems Manager User Guide
- Monitoring Best Practices
- Application Performance Monitoring
- Infrastructure Monitoring
- AWS Config User Guide
- Amazon GuardDuty User Guide
- Building Observability