REL11
REL11-BP06 - Send notifications when events impact availability
REL11-BP06: Send notifications when events impact availability
Implement comprehensive notification systems that alert appropriate stakeholders when events impact or could impact system availability. This includes both technical teams for immediate response and business stakeholders for impact assessment and communication planning.
Implementation Steps
1. Define Notification Categories
Classify events by severity and impact to determine appropriate notification channels.
2. Configure Multi-Channel Delivery
Set up multiple notification channels to ensure message delivery during outages.
3. Implement Escalation Procedures
Design escalation workflows that increase notification scope based on event duration and impact.
4. Create Status Pages
Provide public and internal status pages for transparent communication.
5. Automate Incident Communication
Implement automated systems for consistent and timely incident communication.
Detailed Implementation
{% raw %}
View code
import boto3
import json
import time
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Set
from dataclasses import dataclass, asdict
from enum import Enum
import threading
import requests
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import slack_sdk
from twilio.rest import Client as TwilioClient
class NotificationSeverity(Enum):
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
INFO = "info"
class NotificationChannel(Enum):
EMAIL = "email"
SMS = "sms"
SLACK = "slack"
PAGERDUTY = "pagerduty"
WEBHOOK = "webhook"
STATUS_PAGE = "status_page"
SNS = "sns"
class IncidentStatus(Enum):
INVESTIGATING = "investigating"
IDENTIFIED = "identified"
MONITORING = "monitoring"
RESOLVED = "resolved"
class AudienceType(Enum):
TECHNICAL = "technical"
BUSINESS = "business"
CUSTOMER = "customer"
EXECUTIVE = "executive"
@dataclass
class NotificationRule:
name: str
severity: NotificationSeverity
channels: List[NotificationChannel]
audiences: List[AudienceType]
conditions: Dict[str, Any]
escalation_delay: int
max_escalations: int
enabled: bool
@dataclass
class NotificationRecipient:
name: str
audience_type: AudienceType
email: Optional[str]
phone: Optional[str]
slack_user_id: Optional[str]
escalation_level: int
@dataclass
class AvailabilityEvent:
event_id: str
title: str
description: str
severity: NotificationSeverity
affected_services: List[str]
start_time: datetime
end_time: Optional[datetime]
status: IncidentStatus
impact_description: str
root_cause: Optional[str]
resolution_steps: List[str]
class AvailabilityNotificationSystem:
def __init__(self, region: str = 'us-east-1'):
self.region = region
# AWS clients
self.sns = boto3.client('sns', region_name=region)
self.ses = boto3.client('ses', region_name=region)
self.cloudwatch = boto3.client('cloudwatch', region_name=region)
self.lambda_client = boto3.client('lambda', region_name=region)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
# Notification state
self.notification_rules: Dict[str, NotificationRule] = {}
self.recipients: Dict[str, NotificationRecipient] = {}
self.active_incidents: Dict[str, AvailabilityEvent] = {}
self.notification_history: List[Dict[str, Any]] = []
# External service clients
self.slack_client = None
self.twilio_client = None
self.pagerduty_api_key = None
# Status page configuration
self.status_page_config = {}
# Thread safety
self.notification_lock = threading.Lock()
def configure_external_services(self, config: Dict[str, Any]) -> None:
"""Configure external notification services"""
try:
# Slack configuration
if 'slack_token' in config:
self.slack_client = slack_sdk.WebClient(token=config['slack_token'])
self.logger.info("Slack client configured")
# Twilio configuration
if 'twilio_account_sid' in config and 'twilio_auth_token' in config:
self.twilio_client = TwilioClient(
config['twilio_account_sid'],
config['twilio_auth_token']
)
self.logger.info("Twilio client configured")
# PagerDuty configuration
if 'pagerduty_api_key' in config:
self.pagerduty_api_key = config['pagerduty_api_key']
self.logger.info("PagerDuty API key configured")
# Status page configuration
if 'status_page' in config:
self.status_page_config = config['status_page']
self.logger.info("Status page configured")
except Exception as e:
self.logger.error(f"External service configuration failed: {str(e)}")
def register_notification_rule(self, rule: NotificationRule) -> bool:
"""Register a notification rule"""
try:
self.notification_rules[rule.name] = rule
self.logger.info(f"Registered notification rule: {rule.name}")
return True
except Exception as e:
self.logger.error(f"Failed to register notification rule {rule.name}: {str(e)}")
return False
def register_recipient(self, recipient: NotificationRecipient) -> bool:
"""Register a notification recipient"""
try:
self.recipients[recipient.name] = recipient
self.logger.info(f"Registered notification recipient: {recipient.name}")
return True
except Exception as e:
self.logger.error(f"Failed to register recipient {recipient.name}: {str(e)}")
return False
def create_availability_event(self, event: AvailabilityEvent) -> str:
"""Create and process an availability event"""
try:
with self.notification_lock:
self.active_incidents[event.event_id] = event
# Determine applicable notification rules
applicable_rules = self._get_applicable_rules(event)
# Send initial notifications
for rule in applicable_rules:
self._send_notifications(event, rule, escalation_level=0)
# Update status page
self._update_status_page(event)
# Schedule escalations if needed
self._schedule_escalations(event, applicable_rules)
self.logger.info(f"Created availability event: {event.event_id}")
return event.event_id
except Exception as e:
self.logger.error(f"Failed to create availability event: {str(e)}")
return ""
def update_availability_event(self, event_id: str, updates: Dict[str, Any]) -> bool:
"""Update an existing availability event"""
try:
with self.notification_lock:
if event_id not in self.active_incidents:
raise ValueError(f"Event {event_id} not found")
event = self.active_incidents[event_id]
# Update event fields
for field, value in updates.items():
if hasattr(event, field):
setattr(event, field, value)
# Send update notifications
if updates.get('send_notification', True):
applicable_rules = self._get_applicable_rules(event)
for rule in applicable_rules:
self._send_update_notifications(event, rule, updates)
# Update status page
self._update_status_page(event)
# Move to history if resolved
if event.status == IncidentStatus.RESOLVED:
del self.active_incidents[event_id]
self._archive_incident(event)
self.logger.info(f"Updated availability event: {event_id}")
return True
except Exception as e:
self.logger.error(f"Failed to update availability event {event_id}: {str(e)}")
return False
def _send_notifications(self, event: AvailabilityEvent, rule: NotificationRule, escalation_level: int = 0) -> None:
"""Send notifications for an event"""
try:
# Get recipients for this rule and escalation level
recipients = self._get_recipients_for_rule(rule, escalation_level)
# Send notifications through each configured channel
for channel in rule.channels:
if channel == NotificationChannel.EMAIL:
self._send_email_notifications(event, recipients)
elif channel == NotificationChannel.SMS:
self._send_sms_notifications(event, recipients)
elif channel == NotificationChannel.SLACK:
self._send_slack_notifications(event, recipients)
elif channel == NotificationChannel.PAGERDUTY:
self._send_pagerduty_notifications(event, recipients)
elif channel == NotificationChannel.SNS:
self._send_sns_notifications(event, recipients)
elif channel == NotificationChannel.WEBHOOK:
self._send_webhook_notifications(event, recipients)
# Record notification in history
self._record_notification(event, rule, recipients, escalation_level)
except Exception as e:
self.logger.error(f"Failed to send notifications: {str(e)}")
def _send_email_notifications(self, event: AvailabilityEvent, recipients: List[NotificationRecipient]) -> None:
"""Send email notifications"""
try:
email_recipients = [r for r in recipients if r.email]
if not email_recipients:
return
# Create email content
subject = f"[{event.severity.value.upper()}] {event.title}"
html_body = self._generate_email_template(event)
text_body = self._generate_text_template(event)
# Send via SES
for recipient in email_recipients:
try:
response = self.ses.send_email(
Source='noreply@example.com',
Destination={'ToAddresses': [recipient.email]},
Message={
'Subject': {'Data': subject},
'Body': {
'Html': {'Data': html_body},
'Text': {'Data': text_body}
}
}
)
self.logger.info(f"Email sent to {recipient.email}: {response['MessageId']}")
except Exception as e:
self.logger.error(f"Failed to send email to {recipient.email}: {str(e)}")
except Exception as e:
self.logger.error(f"Email notification failed: {str(e)}")
def _send_sms_notifications(self, event: AvailabilityEvent, recipients: List[NotificationRecipient]) -> None:
"""Send SMS notifications"""
try:
if not self.twilio_client:
self.logger.warning("Twilio client not configured, skipping SMS notifications")
return
sms_recipients = [r for r in recipients if r.phone]
if not sms_recipients:
return
# Create SMS content
message = f"[{event.severity.value.upper()}] {event.title}\n\n{event.description}\n\nAffected: {', '.join(event.affected_services)}\n\nStatus: {event.status.value}"
# Truncate if too long
if len(message) > 1600:
message = message[:1597] + "..."
# Send SMS
for recipient in sms_recipients:
try:
message_obj = self.twilio_client.messages.create(
body=message,
from_='+1234567890', # Your Twilio number
to=recipient.phone
)
self.logger.info(f"SMS sent to {recipient.phone}: {message_obj.sid}")
except Exception as e:
self.logger.error(f"Failed to send SMS to {recipient.phone}: {str(e)}")
except Exception as e:
self.logger.error(f"SMS notification failed: {str(e)}")
def _send_slack_notifications(self, event: AvailabilityEvent, recipients: List[NotificationRecipient]) -> None:
"""Send Slack notifications"""
try:
if not self.slack_client:
self.logger.warning("Slack client not configured, skipping Slack notifications")
return
# Create Slack message
blocks = self._generate_slack_blocks(event)
# Send to channels and direct messages
slack_recipients = [r for r in recipients if r.slack_user_id]
# Send to incident channel
try:
response = self.slack_client.chat_postMessage(
channel='#incidents',
blocks=blocks,
text=f"[{event.severity.value.upper()}] {event.title}"
)
self.logger.info(f"Slack message sent to #incidents: {response['ts']}")
except Exception as e:
self.logger.error(f"Failed to send Slack message to #incidents: {str(e)}")
# Send direct messages to recipients
for recipient in slack_recipients:
try:
response = self.slack_client.chat_postMessage(
channel=recipient.slack_user_id,
blocks=blocks,
text=f"[{event.severity.value.upper()}] {event.title}"
)
self.logger.info(f"Slack DM sent to {recipient.name}: {response['ts']}")
except Exception as e:
self.logger.error(f"Failed to send Slack DM to {recipient.name}: {str(e)}")
except Exception as e:
self.logger.error(f"Slack notification failed: {str(e)}")
def _send_pagerduty_notifications(self, event: AvailabilityEvent, recipients: List[NotificationRecipient]) -> None:
"""Send PagerDuty notifications"""
try:
if not self.pagerduty_api_key:
self.logger.warning("PagerDuty API key not configured, skipping PagerDuty notifications")
return
# Create PagerDuty event
payload = {
"routing_key": "<YOUR_PAGERDUTY_ROUTING_KEY>",
"event_action": "trigger",
"dedup_key": event.event_id,
"payload": {
"summary": event.title,
"source": "availability-monitoring",
"severity": event.severity.value,
"component": "system",
"group": "infrastructure",
"class": "availability",
"custom_details": {
"description": event.description,
"affected_services": event.affected_services,
"impact": event.impact_description
}
}
}
# Send to PagerDuty
response = requests.post(
'https://events.pagerduty.com/v2/enqueue',
json=payload,
headers={
'Authorization': f'Token token={self.pagerduty_api_key}',
'Content-Type': 'application/json'
}
)
if response.status_code == 202:
self.logger.info(f"PagerDuty alert created for event {event.event_id}")
else:
self.logger.error(f"PagerDuty alert failed: {response.status_code} - {response.text}")
except Exception as e:
self.logger.error(f"PagerDuty notification failed: {str(e)}")
def _send_sns_notifications(self, event: AvailabilityEvent, recipients: List[NotificationRecipient]) -> None:
"""Send SNS notifications"""
try:
# Create SNS message
message = {
'default': f"[{event.severity.value.upper()}] {event.title}",
'email': self._generate_text_template(event),
'sms': f"[{event.severity.value.upper()}] {event.title}\n{event.description}"
}
# Publish to SNS topic
response = self.sns.publish(
TopicArn=f'arn:aws:sns:{self.region}:123456789012:availability-alerts',
Message=json.dumps(message),
MessageStructure='json',
Subject=f"[{event.severity.value.upper()}] {event.title}"
)
self.logger.info(f"SNS notification sent: {response['MessageId']}")
except Exception as e:
self.logger.error(f"SNS notification failed: {str(e)}")
def _update_status_page(self, event: AvailabilityEvent) -> None:
"""Update status page with incident information"""
try:
if not self.status_page_config:
return
# Create status page update
status_update = {
'incident_id': event.event_id,
'title': event.title,
'description': event.description,
'status': event.status.value,
'affected_services': event.affected_services,
'created_at': event.start_time.isoformat(),
'updated_at': datetime.utcnow().isoformat(),
'severity': event.severity.value
}
# Update status page via API
if self.status_page_config.get('api_endpoint'):
response = requests.post(
f"{self.status_page_config['api_endpoint']}/incidents",
json=status_update,
headers={
'Authorization': f"Bearer {self.status_page_config.get('api_key')}",
'Content-Type': 'application/json'
}
)
if response.status_code in [200, 201]:
self.logger.info(f"Status page updated for incident {event.event_id}")
else:
self.logger.error(f"Status page update failed: {response.status_code}")
except Exception as e:
self.logger.error(f"Status page update failed: {str(e)}")
def _generate_email_template(self, event: AvailabilityEvent) -> str:
"""Generate HTML email template"""
severity_colors = {
NotificationSeverity.CRITICAL: '#dc3545',
NotificationSeverity.HIGH: '#fd7e14',
NotificationSeverity.MEDIUM: '#ffc107',
NotificationSeverity.LOW: '#28a745',
NotificationSeverity.INFO: '#17a2b8'
}
color = severity_colors.get(event.severity, '#6c757d')
html = f"""
<html>
<body style="font-family: Arial, sans-serif; margin: 0; padding: 20px;">
<div style="max-width: 600px; margin: 0 auto;">
<div style="background-color: {color}; color: white; padding: 20px; border-radius: 5px 5px 0 0;">
<h1 style="margin: 0; font-size: 24px;">[{event.severity.value.upper()}] {event.title}</h1>
</div>
<div style="background-color: #f8f9fa; padding: 20px; border: 1px solid #dee2e6; border-top: none; border-radius: 0 0 5px 5px;">
<p><strong>Status:</strong> {event.status.value.title()}</p>
<p><strong>Started:</strong> {event.start_time.strftime('%Y-%m-%d %H:%M:%S UTC')}</p>
<p><strong>Affected Services:</strong> {', '.join(event.affected_services)}</p>
<h3>Description</h3>
<p>{event.description}</p>
<h3>Impact</h3>
<p>{event.impact_description}</p>
{f'<h3>Root Cause</h3><p>{event.root_cause}</p>' if event.root_cause else ''}
{f'<h3>Resolution Steps</h3><ul>{"".join([f"<li>{step}</li>" for step in event.resolution_steps])}</ul>' if event.resolution_steps else ''}
</div>
</div>
</body>
</html>
"""
return html
def _generate_slack_blocks(self, event: AvailabilityEvent) -> List[Dict[str, Any]]:
"""Generate Slack message blocks"""
severity_colors = {
NotificationSeverity.CRITICAL: 'danger',
NotificationSeverity.HIGH: 'warning',
NotificationSeverity.MEDIUM: 'warning',
NotificationSeverity.LOW: 'good',
NotificationSeverity.INFO: '#17a2b8'
}
color = severity_colors.get(event.severity, 'good')
blocks = [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*[{event.severity.value.upper()}] {event.title}*"
}
},
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": f"*Status:*\n{event.status.value.title()}"
},
{
"type": "mrkdwn",
"text": f"*Started:*\n{event.start_time.strftime('%Y-%m-%d %H:%M:%S UTC')}"
},
{
"type": "mrkdwn",
"text": f"*Affected Services:*\n{', '.join(event.affected_services)}"
}
]
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*Description:*\n{event.description}"
}
}
]
if event.impact_description:
blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*Impact:*\n{event.impact_description}"
}
})
return blocks
def get_notification_statistics(self) -> Dict[str, Any]:
"""Get notification system statistics"""
try:
stats = {
'active_incidents': len(self.active_incidents),
'notification_rules': len(self.notification_rules),
'registered_recipients': len(self.recipients),
'notifications_sent_24h': 0,
'incidents_by_severity': {},
'notifications_by_channel': {}
}
# Count incidents by severity
for incident in self.active_incidents.values():
severity = incident.severity.value
stats['incidents_by_severity'][severity] = stats['incidents_by_severity'].get(severity, 0) + 1
# Count recent notifications
cutoff_time = datetime.utcnow() - timedelta(hours=24)
recent_notifications = [
n for n in self.notification_history
if datetime.fromisoformat(n['timestamp']) > cutoff_time
]
stats['notifications_sent_24h'] = len(recent_notifications)
# Count by channel
for notification in recent_notifications:
for channel in notification.get('channels', []):
stats['notifications_by_channel'][channel] = stats['notifications_by_channel'].get(channel, 0) + 1
return stats
except Exception as e:
self.logger.error(f"Statistics calculation failed: {str(e)}")
return {}
# Example usage
def main():
# Initialize notification system
notification_system = AvailabilityNotificationSystem(region='us-east-1')
# Configure external services
external_config = {
'slack_token': '<YOUR_SLACK_BOT_TOKEN>',
'twilio_account_sid': '<YOUR_TWILIO_ACCOUNT_SID>',
'twilio_auth_token': '<YOUR_TWILIO_AUTH_TOKEN>',
'pagerduty_api_key': '<YOUR_PAGERDUTY_API_KEY>',
'status_page': {
'api_endpoint': 'https://api.statuspage.io/v1/pages/<YOUR_PAGE_ID>',
'api_key': '<YOUR_STATUSPAGE_API_KEY>'
}
}
notification_system.configure_external_services(external_config)
# Register notification rules
critical_rule = NotificationRule(
name='critical_incidents',
severity=NotificationSeverity.CRITICAL,
channels=[NotificationChannel.EMAIL, NotificationChannel.SMS, NotificationChannel.SLACK, NotificationChannel.PAGERDUTY],
audiences=[AudienceType.TECHNICAL, AudienceType.BUSINESS],
conditions={'immediate': True},
escalation_delay=300, # 5 minutes
max_escalations=3,
enabled=True
)
notification_system.register_notification_rule(critical_rule)
# Register recipients
tech_lead = NotificationRecipient(
name='tech_lead',
audience_type=AudienceType.TECHNICAL,
email='tech.lead@example.com',
phone='+1234567890',
slack_user_id='U1234567890',
escalation_level=0
)
notification_system.register_recipient(tech_lead)
# Create availability event
event = AvailabilityEvent(
event_id='incident-2024-001',
title='Database Connection Failures',
description='Primary database is experiencing connection timeouts affecting user authentication',
severity=NotificationSeverity.CRITICAL,
affected_services=['user-service', 'auth-service'],
start_time=datetime.utcnow(),
end_time=None,
status=IncidentStatus.INVESTIGATING,
impact_description='Users unable to log in, existing sessions may be affected',
root_cause=None,
resolution_steps=['Investigating database connection pool', 'Checking network connectivity']
)
# Process the event
event_id = notification_system.create_availability_event(event)
print(f"Created availability event: {event_id}")
# Get statistics
stats = notification_system.get_notification_statistics()
print(f"Notification statistics: {json.dumps(stats, indent=2)}")
if __name__ == "__main__":
main(){% endraw %}
AWS Services
Primary Services
- Amazon SNS: Multi-channel notification delivery
- Amazon SES: Email notification service
- AWS Lambda: Event-driven notification processing
- Amazon CloudWatch: Monitoring and alerting integration
Supporting Services
- Amazon EventBridge: Event routing for notifications
- AWS Systems Manager: Parameter management for notification configuration
- Amazon S3: Storage for notification templates and history
- AWS Step Functions: Orchestration of complex notification workflows
Benefits
- Rapid Response: Immediate notification enables faster incident response
- Stakeholder Awareness: Keep all relevant parties informed of availability impacts
- Escalation Management: Automatic escalation ensures critical issues get attention
- Communication Transparency: Status pages provide public visibility
- Audit Trail: Complete history of notifications for compliance and analysis