REL06
REL06-BP07 - Monitor end-to-end tracing of requests through your system
REL06-BP07: Monitor end-to-end tracing of requests through your system
Overview
Implement comprehensive distributed tracing to monitor requests as they flow through your entire system architecture. End-to-end tracing provides visibility into request paths, performance bottlenecks, error propagation, and service dependencies, enabling rapid troubleshooting and optimization of complex distributed systems.
Implementation Steps
1. Design Distributed Tracing Architecture
- Implement trace context propagation across all services
- Design trace sampling strategies for performance and cost optimization
- Establish trace correlation and span relationship modeling
- Configure trace data retention and storage policies
2. Instrument Applications and Services
- Add tracing instrumentation to application code
- Configure automatic instrumentation for frameworks and libraries
- Implement custom spans for business logic and critical operations
- Establish trace metadata and tagging strategies
3. Configure Service Mesh and Infrastructure Tracing
- Implement service mesh tracing for network-level visibility
- Configure load balancer and API gateway tracing
- Enable database and cache operation tracing
- Establish infrastructure component trace integration
4. Set Up Trace Collection and Processing
- Configure trace collectors and aggregation pipelines
- Implement trace data enrichment and correlation
- Design trace data processing and analysis workflows
- Establish real-time trace streaming and batch processing
5. Create Trace Analysis and Visualization
- Implement trace search and filtering capabilities
- Configure service dependency mapping and topology visualization
- Design performance analysis and bottleneck identification
- Establish error tracking and root cause analysis
6. Monitor and Optimize Tracing Performance
- Track tracing overhead and system performance impact
- Optimize sampling rates and trace data volume
- Monitor trace collection completeness and accuracy
- Implement continuous improvement based on trace insights
Implementation Examples
Example 1: Comprehensive Distributed Tracing System
View code
import boto3
import json
import logging
import asyncio
import time
import uuid
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Callable
from dataclasses import dataclass, asdict
from enum import Enum
import threading
from contextlib import contextmanager
class SpanKind(Enum):
SERVER = "server"
CLIENT = "client"
PRODUCER = "producer"
CONSUMER = "consumer"
INTERNAL = "internal"
class SpanStatus(Enum):
OK = "ok"
ERROR = "error"
TIMEOUT = "timeout"
CANCELLED = "cancelled"
@dataclass
class TraceContext:
trace_id: str
span_id: str
parent_span_id: Optional[str]
trace_flags: int
trace_state: str
@dataclass
class Span:
trace_id: str
span_id: str
parent_span_id: Optional[str]
operation_name: str
service_name: str
span_kind: SpanKind
start_time: datetime
end_time: Optional[datetime]
duration_ms: Optional[float]
status: SpanStatus
tags: Dict[str, Any]
logs: List[Dict[str, Any]]
baggage: Dict[str, str]
@dataclass
class Trace:
trace_id: str
spans: List[Span]
root_span: Optional[Span]
start_time: datetime
end_time: Optional[datetime]
duration_ms: Optional[float]
service_count: int
span_count: int
error_count: int
class DistributedTracer:
"""Comprehensive distributed tracing system"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.service_name = config.get('service_name', 'unknown-service')
# AWS clients
self.xray = boto3.client('xray')
self.cloudwatch = boto3.client('cloudwatch')
self.dynamodb = boto3.resource('dynamodb')
self.kinesis = boto3.client('kinesis')
# Storage
self.traces_table = self.dynamodb.Table(config.get('traces_table', 'distributed-traces'))
self.spans_table = self.dynamodb.Table(config.get('spans_table', 'trace-spans'))
# Tracing configuration
self.sampling_rate = config.get('sampling_rate', 0.1) # 10% sampling
self.max_span_duration = config.get('max_span_duration', 300000) # 5 minutes
# Thread-local storage for trace context
self.local = threading.local()
# Active spans and traces
self.active_spans = {}
self.completed_traces = {}
def start_trace(self, operation_name: str, **kwargs) -> str:
"""Start a new distributed trace"""
try:
trace_id = self._generate_trace_id()
span_id = self._generate_span_id()
# Create root span
root_span = Span(
trace_id=trace_id,
span_id=span_id,
parent_span_id=None,
operation_name=operation_name,
service_name=self.service_name,
span_kind=SpanKind.SERVER,
start_time=datetime.utcnow(),
end_time=None,
duration_ms=None,
status=SpanStatus.OK,
tags=kwargs,
logs=[],
baggage={}
)
# Set trace context
trace_context = TraceContext(
trace_id=trace_id,
span_id=span_id,
parent_span_id=None,
trace_flags=1 if self._should_sample() else 0,
trace_state=""
)
self._set_trace_context(trace_context)
self.active_spans[span_id] = root_span
logging.info(f"Started trace {trace_id} with root span {span_id}")
return trace_id
except Exception as e:
logging.error(f"Failed to start trace: {str(e)}")
return ""
@contextmanager
def start_span(self, operation_name: str, span_kind: SpanKind = SpanKind.INTERNAL, **kwargs):
"""Context manager for creating spans"""
span = None
try:
span = self._create_span(operation_name, span_kind, **kwargs)
yield span
except Exception as e:
if span:
span.status = SpanStatus.ERROR
span.tags['error'] = str(e)
self._add_span_log(span, 'error', {'message': str(e)})
raise
finally:
if span:
self._finish_span(span)
def _create_span(self, operation_name: str, span_kind: SpanKind, **kwargs) -> Span:
"""Create a new span"""
try:
current_context = self._get_trace_context()
if not current_context:
# Start new trace if no context exists
trace_id = self.start_trace(operation_name, **kwargs)
current_context = self._get_trace_context()
span_id = self._generate_span_id()
span = Span(
trace_id=current_context.trace_id,
span_id=span_id,
parent_span_id=current_context.span_id,
operation_name=operation_name,
service_name=self.service_name,
span_kind=span_kind,
start_time=datetime.utcnow(),
end_time=None,
duration_ms=None,
status=SpanStatus.OK,
tags=kwargs,
logs=[],
baggage={}
)
# Update trace context
new_context = TraceContext(
trace_id=current_context.trace_id,
span_id=span_id,
parent_span_id=current_context.span_id,
trace_flags=current_context.trace_flags,
trace_state=current_context.trace_state
)
self._set_trace_context(new_context)
self.active_spans[span_id] = span
return span
except Exception as e:
logging.error(f"Failed to create span: {str(e)}")
raise
def _finish_span(self, span: Span):
"""Finish and record a span"""
try:
span.end_time = datetime.utcnow()
span.duration_ms = (span.end_time - span.start_time).total_seconds() * 1000
# Remove from active spans
if span.span_id in self.active_spans:
del self.active_spans[span.span_id]
# Store span
asyncio.create_task(self._store_span(span))
# Check if trace is complete
asyncio.create_task(self._check_trace_completion(span.trace_id))
logging.debug(f"Finished span {span.span_id} in {span.duration_ms:.2f}ms")
except Exception as e:
logging.error(f"Failed to finish span: {str(e)}")
def add_span_tag(self, key: str, value: Any):
"""Add tag to current span"""
try:
context = self._get_trace_context()
if context and context.span_id in self.active_spans:
span = self.active_spans[context.span_id]
span.tags[key] = value
except Exception as e:
logging.error(f"Failed to add span tag: {str(e)}")
def add_span_log(self, level: str, message: str, **kwargs):
"""Add log entry to current span"""
try:
context = self._get_trace_context()
if context and context.span_id in self.active_spans:
span = self.active_spans[context.span_id]
self._add_span_log(span, level, {'message': message, **kwargs})
except Exception as e:
logging.error(f"Failed to add span log: {str(e)}")
def _add_span_log(self, span: Span, level: str, fields: Dict[str, Any]):
"""Add log entry to span"""
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'level': level,
'fields': fields
}
span.logs.append(log_entry)
def inject_trace_context(self, headers: Dict[str, str]) -> Dict[str, str]:
"""Inject trace context into HTTP headers"""
try:
context = self._get_trace_context()
if context:
headers['X-Trace-Id'] = context.trace_id
headers['X-Span-Id'] = context.span_id
if context.parent_span_id:
headers['X-Parent-Span-Id'] = context.parent_span_id
headers['X-Trace-Flags'] = str(context.trace_flags)
if context.trace_state:
headers['X-Trace-State'] = context.trace_state
return headers
except Exception as e:
logging.error(f"Failed to inject trace context: {str(e)}")
return headers
def extract_trace_context(self, headers: Dict[str, str]) -> Optional[TraceContext]:
"""Extract trace context from HTTP headers"""
try:
trace_id = headers.get('X-Trace-Id')
span_id = headers.get('X-Span-Id')
if trace_id and span_id:
return TraceContext(
trace_id=trace_id,
span_id=span_id,
parent_span_id=headers.get('X-Parent-Span-Id'),
trace_flags=int(headers.get('X-Trace-Flags', '0')),
trace_state=headers.get('X-Trace-State', '')
)
return None
except Exception as e:
logging.error(f"Failed to extract trace context: {str(e)}")
return None
def set_extracted_context(self, context: TraceContext):
"""Set extracted trace context as current context"""
self._set_trace_context(context)
def _generate_trace_id(self) -> str:
"""Generate unique trace ID"""
return f"{int(time.time())}-{uuid.uuid4().hex[:16]}"
def _generate_span_id(self) -> str:
"""Generate unique span ID"""
return uuid.uuid4().hex[:16]
def _should_sample(self) -> bool:
"""Determine if trace should be sampled"""
import random
return random.random() < self.sampling_rate
def _get_trace_context(self) -> Optional[TraceContext]:
"""Get current trace context from thread-local storage"""
return getattr(self.local, 'trace_context', None)
def _set_trace_context(self, context: TraceContext):
"""Set trace context in thread-local storage"""
self.local.trace_context = context
async def _store_span(self, span: Span):
"""Store span in DynamoDB"""
try:
span_dict = asdict(span)
span_dict['start_time'] = span.start_time.isoformat()
if span.end_time:
span_dict['end_time'] = span.end_time.isoformat()
self.spans_table.put_item(Item=span_dict)
# Also send to Kinesis for real-time processing
await self._send_span_to_kinesis(span)
except Exception as e:
logging.error(f"Failed to store span: {str(e)}")
async def _send_span_to_kinesis(self, span: Span):
"""Send span to Kinesis for real-time processing"""
try:
span_data = {
'trace_id': span.trace_id,
'span_id': span.span_id,
'parent_span_id': span.parent_span_id,
'operation_name': span.operation_name,
'service_name': span.service_name,
'duration_ms': span.duration_ms,
'status': span.status.value,
'tags': span.tags,
'timestamp': span.start_time.isoformat()
}
self.kinesis.put_record(
StreamName=self.config.get('kinesis_stream', 'trace-spans'),
Data=json.dumps(span_data),
PartitionKey=span.trace_id
)
except Exception as e:
logging.error(f"Failed to send span to Kinesis: {str(e)}")
async def _check_trace_completion(self, trace_id: str):
"""Check if trace is complete and process it"""
try:
# Get all spans for this trace
response = self.spans_table.query(
IndexName='trace-id-index',
KeyConditionExpression='trace_id = :trace_id',
ExpressionAttributeValues={':trace_id': trace_id}
)
spans = []
for item in response['Items']:
span = Span(**item)
spans.append(span)
# Check if all spans are complete
incomplete_spans = [s for s in spans if s.end_time is None]
if incomplete_spans:
return # Trace not yet complete
# Create trace summary
trace = self._create_trace_summary(trace_id, spans)
# Store trace summary
await self._store_trace(trace)
# Analyze trace for insights
await self._analyze_trace(trace)
except Exception as e:
logging.error(f"Failed to check trace completion: {str(e)}")
def _create_trace_summary(self, trace_id: str, spans: List[Span]) -> Trace:
"""Create trace summary from spans"""
try:
# Find root span
root_span = next((s for s in spans if s.parent_span_id is None), None)
# Calculate trace metrics
start_time = min(s.start_time for s in spans)
end_time = max(s.end_time for s in spans if s.end_time)
duration_ms = (end_time - start_time).total_seconds() * 1000 if end_time else None
# Count services and errors
services = set(s.service_name for s in spans)
error_spans = [s for s in spans if s.status == SpanStatus.ERROR]
return Trace(
trace_id=trace_id,
spans=spans,
root_span=root_span,
start_time=start_time,
end_time=end_time,
duration_ms=duration_ms,
service_count=len(services),
span_count=len(spans),
error_count=len(error_spans)
)
except Exception as e:
logging.error(f"Failed to create trace summary: {str(e)}")
raise
async def _store_trace(self, trace: Trace):
"""Store trace summary in DynamoDB"""
try:
trace_dict = {
'trace_id': trace.trace_id,
'start_time': trace.start_time.isoformat(),
'end_time': trace.end_time.isoformat() if trace.end_time else None,
'duration_ms': trace.duration_ms,
'service_count': trace.service_count,
'span_count': trace.span_count,
'error_count': trace.error_count,
'root_operation': trace.root_span.operation_name if trace.root_span else None,
'services': list(set(s.service_name for s in trace.spans))
}
self.traces_table.put_item(Item=trace_dict)
except Exception as e:
logging.error(f"Failed to store trace: {str(e)}")
async def _analyze_trace(self, trace: Trace):
"""Analyze trace for performance insights"""
try:
insights = []
# Check for slow operations
if trace.duration_ms and trace.duration_ms > 5000: # 5 seconds
insights.append({
'type': 'slow_trace',
'message': f'Trace duration {trace.duration_ms:.0f}ms exceeds threshold',
'severity': 'warning'
})
# Check for errors
if trace.error_count > 0:
insights.append({
'type': 'trace_errors',
'message': f'Trace contains {trace.error_count} error(s)',
'severity': 'error'
})
# Check for service dependencies
if trace.service_count > 5:
insights.append({
'type': 'high_service_count',
'message': f'Trace spans {trace.service_count} services',
'severity': 'info'
})
# Send insights to CloudWatch
for insight in insights:
await self._send_trace_insight(trace.trace_id, insight)
except Exception as e:
logging.error(f"Failed to analyze trace: {str(e)}")
async def _send_trace_insight(self, trace_id: str, insight: Dict[str, Any]):
"""Send trace insight to CloudWatch"""
try:
self.cloudwatch.put_metric_data(
Namespace='DistributedTracing/Insights',
MetricData=[
{
'MetricName': insight['type'],
'Value': 1,
'Unit': 'Count',
'Dimensions': [
{
'Name': 'Severity',
'Value': insight['severity']
},
{
'Name': 'ServiceName',
'Value': self.service_name
}
]
}
]
)
except Exception as e:
logging.error(f"Failed to send trace insight: {str(e)}")
# Usage example
async def main():
config = {
'service_name': 'user-service',
'traces_table': 'distributed-traces',
'spans_table': 'trace-spans',
'kinesis_stream': 'trace-spans',
'sampling_rate': 0.1
}
# Initialize tracer
tracer = DistributedTracer(config)
# Start a trace
trace_id = tracer.start_trace('process_user_request', user_id='12345')
# Create spans for different operations
with tracer.start_span('validate_user', SpanKind.INTERNAL) as span:
tracer.add_span_tag('user_id', '12345')
tracer.add_span_log('info', 'Validating user credentials')
# Simulate work
await asyncio.sleep(0.1)
with tracer.start_span('fetch_user_data', SpanKind.CLIENT) as span:
tracer.add_span_tag('database', 'users_db')
tracer.add_span_log('info', 'Fetching user data from database')
# Simulate database call
await asyncio.sleep(0.2)
with tracer.start_span('process_business_logic', SpanKind.INTERNAL) as span:
tracer.add_span_tag('operation', 'calculate_recommendations')
# Simulate processing
await asyncio.sleep(0.15)
print(f"Completed trace: {trace_id}")
if __name__ == "__main__":
import asyncio
asyncio.run(main())AWS Services Used
- AWS X-Ray: Distributed tracing service for end-to-end request tracking
- Amazon CloudWatch: Metrics and logs integration for trace analysis
- Amazon Kinesis: Real-time trace data streaming and processing
- Amazon DynamoDB: Storage for trace data, spans, and metadata
- AWS Lambda: Serverless functions for trace processing and analysis
- Amazon API Gateway: API-level tracing and request correlation
- Elastic Load Balancing: Load balancer tracing and request routing visibility
- Amazon ECS/EKS: Container-based service tracing and orchestration
- Amazon RDS: Database query tracing and performance monitoring
- Amazon ElastiCache: Cache operation tracing and hit/miss analysis
- AWS Step Functions: Workflow tracing and state machine visibility
- Amazon SQS/SNS: Message queue and notification tracing
- AWS AppSync: GraphQL API tracing and resolver performance
- Amazon Timestream: Time-series storage for trace metrics and analytics
- Amazon OpenSearch: Trace search, analysis, and visualization
Benefits
- End-to-End Visibility: Complete request flow visibility across distributed systems
- Performance Optimization: Identify bottlenecks and optimize critical paths
- Error Tracking: Trace error propagation and identify root causes
- Service Dependencies: Understand service interactions and dependencies
- Latency Analysis: Measure and optimize request latency across services
- Capacity Planning: Understand resource utilization patterns
- Troubleshooting: Rapid issue identification and resolution
- Business Intelligence: Correlate technical metrics with business outcomes
- Compliance: Audit trails for regulatory and security requirements
- Continuous Improvement: Data-driven optimization and enhancement
Related Resources
- AWS Well-Architected Reliability Pillar
- Monitor End-to-End Tracing
- AWS X-Ray Developer Guide
- Amazon CloudWatch User Guide
- Amazon Kinesis Developer Guide
- Amazon DynamoDB Developer Guide
- AWS Lambda Developer Guide
- Amazon API Gateway Developer Guide
- Distributed Tracing Best Practices
- OpenTelemetry on AWS
- AWS Distro for OpenTelemetry
- Microservices Observability