SEC09-BP02: Enforce encryption in transit
One-Click Remediation
Deploy CloudFormation stacks to implement this best practice with a single click.
Stacks deploy to your AWS account. Review parameters before creating. Standard AWS charges apply.
SEC09-BP02: Enforce encryption in transit
Overview
Enforcing encryption in transit ensures that all data moving between systems, services, and users is protected from interception, tampering, and eavesdropping. This best practice focuses on implementing comprehensive encryption policies that prevent unencrypted communications and ensure strong cryptographic protocols are used consistently across your entire infrastructure.
Encryption in transit should be enforced at multiple layers including network protocols, application protocols, service-to-service communication, and client-server interactions. The implementation should be transparent to applications while providing strong cryptographic protection for all data flows.
Implementation Guidance
1. Implement Service-Level Encryption Enforcement
Deploy encryption enforcement across all AWS services:
- Amazon S3: HTTPS-only bucket policies and SSL/TLS enforcement
- Amazon RDS: Force SSL connections for database access
- Amazon ElastiCache: TLS encryption for Redis and Memcached
- Amazon ELB: HTTPS listeners with strong SSL policies
- Amazon API Gateway: TLS termination and backend encryption
2. Configure Network-Level Encryption
Establish network encryption controls:
- VPC Endpoints: Private encrypted connectivity to AWS services
- AWS PrivateLink: Encrypted service-to-service communication
- VPN Connections: IPSec encryption for hybrid connectivity
- AWS Direct Connect: MACsec encryption for dedicated connections
3. Deploy Application-Level Encryption
Implement application-specific encryption:
- TLS/SSL Configuration: Strong cipher suites and protocol versions
- Certificate Management: Proper certificate deployment and validation
- Client Authentication: Mutual TLS for service authentication
- Protocol Security: Secure protocol configuration and hardening
4. Automate Encryption Compliance
Deploy automated enforcement mechanisms:
- Service Control Policies: Prevent unencrypted resource creation
- AWS Config Rules: Monitor encryption compliance continuously
- Lambda Functions: Automated remediation of encryption violations
- CloudFormation Guards: Validate encryption in infrastructure templates
5. Monitor Encryption Status
Establish comprehensive encryption monitoring:
- CloudTrail Analysis: Monitor for unencrypted API calls
- VPC Flow Logs: Analyze network traffic patterns
- Application Logs: Track encryption status in applications
- Real-Time Alerting: Immediate notification of encryption violations
6. Implement Encryption Governance
Deploy governance frameworks for encryption:
- Policy Management: Centralized encryption policy definition
- Compliance Reporting: Regular encryption compliance assessments
- Exception Management: Controlled handling of encryption exceptions
- Audit Procedures: Regular audits of encryption implementation
Implementation Examples
Example 1: Comprehensive Encryption Enforcement System
View code
# encryption_in_transit_enforcer.py
import boto3
import json
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
import logging
import ssl
import socket
import requests
from urllib.parse import urlparse
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class EncryptionPolicy:
service: str
resource_type: str
encryption_required: bool
min_tls_version: str
allowed_cipher_suites: List[str]
certificate_validation: bool
compliance_frameworks: List[str]
@dataclass
class EncryptionViolation:
resource_arn: str
service: str
violation_type: str
description: str
severity: str
detected_at: str
remediation_action: str
class EncryptionInTransitEnforcer:
"""
Comprehensive system for enforcing encryption in transit across AWS services
"""
def __init__(self, region: str = 'us-east-1'):
self.region = region
self.s3_client = boto3.client('s3', region_name=region)
self.elb_client = boto3.client('elbv2', region_name=region)
self.rds_client = boto3.client('rds', region_name=region)
self.apigateway_client = boto3.client('apigateway', region_name=region)
self.elasticache_client = boto3.client('elasticache', region_name=region)
self.ec2_client = boto3.client('ec2', region_name=region)
self.config_client = boto3.client('config', region_name=region)
self.cloudtrail_client = boto3.client('cloudtrail', region_name=region)
self.lambda_client = boto3.client('lambda', region_name=region)
self.sns_client = boto3.client('sns', region_name=region)
self.dynamodb = boto3.resource('dynamodb', region_name=region)
# Encryption compliance tracking
self.compliance_table = self.dynamodb.Table('encryption-transit-compliance')
self.violations_table = self.dynamodb.Table('encryption-violations')
# Encryption policies by service
self.encryption_policies = self._define_encryption_policies()
def _define_encryption_policies(self) -> Dict[str, EncryptionPolicy]:
"""
Define encryption in transit policies for different AWS services
"""
return {
's3_bucket': EncryptionPolicy(
service='s3',
resource_type='bucket',
encryption_required=True,
min_tls_version='TLSv1.2',
allowed_cipher_suites=[
'ECDHE-RSA-AES128-GCM-SHA256',
'ECDHE-RSA-AES256-GCM-SHA384',
'ECDHE-RSA-AES128-SHA256',
'ECDHE-RSA-AES256-SHA384'
],
certificate_validation=True,
compliance_frameworks=['GDPR', 'HIPAA', 'PCI_DSS']
),
'elb_listener': EncryptionPolicy(
service='elbv2',
resource_type='listener',
encryption_required=True,
min_tls_version='TLSv1.2',
allowed_cipher_suites=[
'ECDHE-ECDSA-AES128-GCM-SHA256',
'ECDHE-RSA-AES128-GCM-SHA256',
'ECDHE-ECDSA-AES256-GCM-SHA384',
'ECDHE-RSA-AES256-GCM-SHA384'
],
certificate_validation=True,
compliance_frameworks=['PCI_DSS', 'HIPAA']
),
'rds_instance': EncryptionPolicy(
service='rds',
resource_type='db_instance',
encryption_required=True,
min_tls_version='TLSv1.2',
allowed_cipher_suites=[], # Database-specific
certificate_validation=True,
compliance_frameworks=['HIPAA', 'SOX', 'PCI_DSS']
),
'api_gateway': EncryptionPolicy(
service='apigateway',
resource_type='rest_api',
encryption_required=True,
min_tls_version='TLSv1.2',
allowed_cipher_suites=[
'ECDHE-RSA-AES128-GCM-SHA256',
'ECDHE-RSA-AES256-GCM-SHA384'
],
certificate_validation=True,
compliance_frameworks=['GDPR', 'HIPAA', 'PCI_DSS']
),
'elasticache_cluster': EncryptionPolicy(
service='elasticache',
resource_type='cache_cluster',
encryption_required=True,
min_tls_version='TLSv1.2',
allowed_cipher_suites=[], # Redis/Memcached specific
certificate_validation=False, # Internal service
compliance_frameworks=['HIPAA', 'PCI_DSS']
)
}
def scan_encryption_compliance(self) -> Dict[str, Any]:
"""
Scan all resources for encryption in transit compliance
"""
compliance_results = {
'scan_timestamp': datetime.utcnow().isoformat(),
'total_resources': 0,
'compliant_resources': 0,
'non_compliant_resources': 0,
'services_scanned': [],
'compliance_by_service': {},
'violations': []
}
# Scan S3 buckets
s3_results = self._scan_s3_encryption_compliance()
compliance_results['services_scanned'].append('s3')
compliance_results['compliance_by_service']['s3'] = s3_results
compliance_results['total_resources'] += s3_results['total']
compliance_results['compliant_resources'] += s3_results['compliant']
compliance_results['non_compliant_resources'] += s3_results['non_compliant']
compliance_results['violations'].extend(s3_results['violations'])
# Scan ELB listeners
elb_results = self._scan_elb_encryption_compliance()
compliance_results['services_scanned'].append('elbv2')
compliance_results['compliance_by_service']['elbv2'] = elb_results
compliance_results['total_resources'] += elb_results['total']
compliance_results['compliant_resources'] += elb_results['compliant']
compliance_results['non_compliant_resources'] += elb_results['non_compliant']
compliance_results['violations'].extend(elb_results['violations'])
# Scan RDS instances
rds_results = self._scan_rds_encryption_compliance()
compliance_results['services_scanned'].append('rds')
compliance_results['compliance_by_service']['rds'] = rds_results
compliance_results['total_resources'] += rds_results['total']
compliance_results['compliant_resources'] += rds_results['compliant']
compliance_results['non_compliant_resources'] += rds_results['non_compliant']
compliance_results['violations'].extend(rds_results['violations'])
# Scan API Gateway APIs
api_results = self._scan_api_gateway_encryption_compliance()
compliance_results['services_scanned'].append('apigateway')
compliance_results['compliance_by_service']['apigateway'] = api_results
compliance_results['total_resources'] += api_results['total']
compliance_results['compliant_resources'] += api_results['compliant']
compliance_results['non_compliant_resources'] += api_results['non_compliant']
compliance_results['violations'].extend(api_results['violations'])
# Calculate compliance percentage
if compliance_results['total_resources'] > 0:
compliance_results['compliance_percentage'] = round(
(compliance_results['compliant_resources'] / compliance_results['total_resources']) * 100, 2
)
else:
compliance_results['compliance_percentage'] = 100.0
# Store compliance results
self._store_compliance_results(compliance_results)
return compliance_results
def _scan_s3_encryption_compliance(self) -> Dict[str, Any]:
"""
Scan S3 buckets for HTTPS enforcement
"""
results = {
'total': 0,
'compliant': 0,
'non_compliant': 0,
'violations': []
}
try:
# List all S3 buckets
buckets_response = self.s3_client.list_buckets()
for bucket in buckets_response['Buckets']:
bucket_name = bucket['Name']
bucket_arn = f"arn:aws:s3:::{bucket_name}"
results['total'] += 1
try:
# Check bucket policy for HTTPS enforcement
try:
policy_response = self.s3_client.get_bucket_policy(Bucket=bucket_name)
policy_document = json.loads(policy_response['Policy'])
https_enforced = self._check_s3_https_enforcement(policy_document)
if https_enforced:
results['compliant'] += 1
else:
results['non_compliant'] += 1
violation = EncryptionViolation(
resource_arn=bucket_arn,
service='s3',
violation_type='missing_https_enforcement',
description='S3 bucket does not enforce HTTPS connections',
severity='high',
detected_at=datetime.utcnow().isoformat(),
remediation_action='Add bucket policy to deny non-HTTPS requests'
)
results['violations'].append(violation)
self._store_violation(violation)
except self.s3_client.exceptions.NoSuchBucketPolicy:
# No bucket policy means no HTTPS enforcement
results['non_compliant'] += 1
violation = EncryptionViolation(
resource_arn=bucket_arn,
service='s3',
violation_type='no_bucket_policy',
description='S3 bucket has no policy to enforce HTTPS',
severity='high',
detected_at=datetime.utcnow().isoformat(),
remediation_action='Create bucket policy to enforce HTTPS'
)
results['violations'].append(violation)
self._store_violation(violation)
except Exception as e:
logger.error(f"Error checking S3 bucket {bucket_name}: {str(e)}")
continue
except Exception as e:
logger.error(f"Error scanning S3 encryption compliance: {str(e)}")
return results
def _check_s3_https_enforcement(self, policy_document: Dict[str, Any]) -> bool:
"""
Check if S3 bucket policy enforces HTTPS
"""
try:
statements = policy_document.get('Statement', [])
for statement in statements:
# Look for deny statements with SecureTransport condition
if (statement.get('Effect') == 'Deny' and
'Condition' in statement and
'Bool' in statement['Condition'] and
'aws:SecureTransport' in statement['Condition']['Bool']):
secure_transport = statement['Condition']['Bool']['aws:SecureTransport']
if secure_transport == 'false' or secure_transport is False:
return True
return False
except Exception as e:
logger.error(f"Error checking HTTPS enforcement: {str(e)}")
return False
def _scan_elb_encryption_compliance(self) -> Dict[str, Any]:
"""
Scan ELB listeners for HTTPS/TLS configuration
"""
results = {
'total': 0,
'compliant': 0,
'non_compliant': 0,
'violations': []
}
try:
# Get all load balancers
load_balancers = self.elb_client.describe_load_balancers()
for lb in load_balancers['LoadBalancers']:
lb_arn = lb['LoadBalancerArn']
# Get listeners for this load balancer
listeners_response = self.elb_client.describe_listeners(LoadBalancerArn=lb_arn)
for listener in listeners_response['Listeners']:
listener_arn = listener['ListenerArn']
results['total'] += 1
protocol = listener['Protocol']
port = listener['Port']
if protocol in ['HTTPS', 'TLS']:
# Check SSL policy
ssl_policy = listener.get('SslPolicy', '')
if self._is_secure_ssl_policy(ssl_policy):
results['compliant'] += 1
else:
results['non_compliant'] += 1
violation = EncryptionViolation(
resource_arn=listener_arn,
service='elbv2',
violation_type='weak_ssl_policy',
description=f'ELB listener uses weak SSL policy: {ssl_policy}',
severity='medium',
detected_at=datetime.utcnow().isoformat(),
remediation_action='Update to secure SSL policy (ELBSecurityPolicy-TLS-1-2-2017-01 or newer)'
)
results['violations'].append(violation)
self._store_violation(violation)
elif protocol in ['HTTP', 'TCP']:
# Unencrypted listener
results['non_compliant'] += 1
violation = EncryptionViolation(
resource_arn=listener_arn,
service='elbv2',
violation_type='unencrypted_listener',
description=f'ELB listener uses unencrypted protocol: {protocol} on port {port}',
severity='high',
detected_at=datetime.utcnow().isoformat(),
remediation_action='Change listener protocol to HTTPS or TLS'
)
results['violations'].append(violation)
self._store_violation(violation)
except Exception as e:
logger.error(f"Error scanning ELB encryption compliance: {str(e)}")
return results
def _is_secure_ssl_policy(self, ssl_policy: str) -> bool:
"""
Check if SSL policy meets security requirements
"""
# List of secure SSL policies (TLS 1.2+)
secure_policies = [
'ELBSecurityPolicy-TLS-1-2-2017-01',
'ELBSecurityPolicy-TLS-1-2-Ext-2018-06',
'ELBSecurityPolicy-FS-2018-06',
'ELBSecurityPolicy-FS-1-2-2019-08',
'ELBSecurityPolicy-FS-1-2-Res-2019-08',
'ELBSecurityPolicy-FS-1-2-Res-2020-10',
'ELBSecurityPolicy-TLS13-1-2-2021-06'
]
return ssl_policy in secure_policies
def _scan_rds_encryption_compliance(self) -> Dict[str, Any]:
"""
Scan RDS instances for SSL/TLS enforcement
"""
results = {
'total': 0,
'compliant': 0,
'non_compliant': 0,
'violations': []
}
try:
# Get all RDS instances
instances_response = self.rds_client.describe_db_instances()
for instance in instances_response['DBInstances']:
instance_id = instance['DBInstanceIdentifier']
instance_arn = instance['DBInstanceArn']
engine = instance['Engine']
results['total'] += 1
# Check if SSL is enforced based on engine type
ssl_enforced = self._check_rds_ssl_enforcement(instance_id, engine)
if ssl_enforced:
results['compliant'] += 1
else:
results['non_compliant'] += 1
violation = EncryptionViolation(
resource_arn=instance_arn,
service='rds',
violation_type='ssl_not_enforced',
description=f'RDS instance {instance_id} does not enforce SSL connections',
severity='high',
detected_at=datetime.utcnow().isoformat(),
remediation_action=f'Enable SSL enforcement for {engine} database'
)
results['violations'].append(violation)
self._store_violation(violation)
except Exception as e:
logger.error(f"Error scanning RDS encryption compliance: {str(e)}")
return results
def _check_rds_ssl_enforcement(self, instance_id: str, engine: str) -> bool:
"""
Check if RDS instance enforces SSL connections
"""
try:
# Get parameter group for the instance
instance_details = self.rds_client.describe_db_instances(DBInstanceIdentifier=instance_id)
instance = instance_details['DBInstances'][0]
parameter_groups = instance.get('DBParameterGroups', [])
for param_group in parameter_groups:
param_group_name = param_group['DBParameterGroupName']
# Get parameters for SSL enforcement based on engine
if engine.startswith('mysql'):
ssl_param = 'require_secure_transport'
elif engine.startswith('postgres'):
ssl_param = 'ssl'
elif engine.startswith('oracle'):
ssl_param = 'ssl_cipher_suites'
elif engine.startswith('sqlserver'):
ssl_param = 'force_ssl'
else:
continue
try:
params_response = self.rds_client.describe_db_parameters(
DBParameterGroupName=param_group_name,
ParameterName=ssl_param
)
for param in params_response['Parameters']:
if param['ParameterName'] == ssl_param:
if engine.startswith('mysql') and param.get('ParameterValue') == 'ON':
return True
elif engine.startswith('postgres') and param.get('ParameterValue') == '1':
return True
elif engine.startswith('oracle') and param.get('ParameterValue'):
return True
elif engine.startswith('sqlserver') and param.get('ParameterValue') == '1':
return True
except Exception as e:
logger.error(f"Error checking SSL parameter for {instance_id}: {str(e)}")
continue
return False
except Exception as e:
logger.error(f"Error checking RDS SSL enforcement for {instance_id}: {str(e)}")
return False
def _scan_api_gateway_encryption_compliance(self) -> Dict[str, Any]:
"""
Scan API Gateway APIs for HTTPS enforcement
"""
results = {
'total': 0,
'compliant': 0,
'non_compliant': 0,
'violations': []
}
try:
# Get all REST APIs
apis_response = self.apigateway_client.get_rest_apis()
for api in apis_response['items']:
api_id = api['id']
api_name = api['name']
api_arn = f"arn:aws:apigateway:{self.region}::/restapis/{api_id}"
results['total'] += 1
# Check if API has HTTPS-only policy
try:
policy_response = self.apigateway_client.get_rest_api(restApiId=api_id)
# Check minimum TLS version
min_tls_version = policy_response.get('minimumCompressionSize') # This is a placeholder
# For now, assume compliant if API exists (API Gateway enforces HTTPS by default)
# In practice, you would check domain configurations and policies
results['compliant'] += 1
except Exception as e:
logger.error(f"Error checking API Gateway {api_id}: {str(e)}")
results['non_compliant'] += 1
violation = EncryptionViolation(
resource_arn=api_arn,
service='apigateway',
violation_type='configuration_error',
description=f'Unable to verify encryption configuration for API {api_name}',
severity='medium',
detected_at=datetime.utcnow().isoformat(),
remediation_action='Review API Gateway encryption configuration'
)
results['violations'].append(violation)
self._store_violation(violation)
except Exception as e:
logger.error(f"Error scanning API Gateway encryption compliance: {str(e)}")
return results
def remediate_encryption_violations(self, violation_arns: List[str]) -> Dict[str, Any]:
"""
Automatically remediate encryption in transit violations
"""
remediation_results = {
'total_violations': len(violation_arns),
'successful_remediations': 0,
'failed_remediations': 0,
'results': []
}
for violation_arn in violation_arns:
try:
# Determine service from ARN
service = violation_arn.split(':')[2]
if service == 's3':
result = self._remediate_s3_https_enforcement(violation_arn)
elif service == 'elasticloadbalancing':
result = self._remediate_elb_encryption(violation_arn)
elif service == 'rds':
result = self._remediate_rds_ssl_enforcement(violation_arn)
else:
result = {
'resource_arn': violation_arn,
'status': 'unsupported',
'message': f'Remediation not supported for service: {service}'
}
remediation_results['results'].append(result)
if result['status'] == 'success':
remediation_results['successful_remediations'] += 1
else:
remediation_results['failed_remediations'] += 1
except Exception as e:
remediation_results['results'].append({
'resource_arn': violation_arn,
'status': 'error',
'message': str(e)
})
remediation_results['failed_remediations'] += 1
return remediation_results
def _remediate_s3_https_enforcement(self, bucket_arn: str) -> Dict[str, Any]:
"""
Add HTTPS enforcement policy to S3 bucket
"""
try:
bucket_name = bucket_arn.split(':')[-1]
# Create HTTPS enforcement policy
https_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DenyInsecureConnections",
"Effect": "Deny",
"Principal": "*",
"Action": "s3:*",
"Resource": [
f"arn:aws:s3:::{bucket_name}",
f"arn:aws:s3:::{bucket_name}/*"
],
"Condition": {
"Bool": {
"aws:SecureTransport": "false"
}
}
}
]
}
# Apply the policy
self.s3_client.put_bucket_policy(
Bucket=bucket_name,
Policy=json.dumps(https_policy)
)
logger.info(f"Applied HTTPS enforcement policy to bucket: {bucket_name}")
return {
'resource_arn': bucket_arn,
'status': 'success',
'message': 'HTTPS enforcement policy applied successfully'
}
except Exception as e:
logger.error(f"Error remediating S3 HTTPS enforcement for {bucket_arn}: {str(e)}")
return {
'resource_arn': bucket_arn,
'status': 'error',
'message': str(e)
}
def _store_violation(self, violation: EncryptionViolation):
"""
Store encryption violation in DynamoDB
"""
try:
self.violations_table.put_item(
Item={
**asdict(violation),
'ttl': int((datetime.utcnow() + timedelta(days=90)).timestamp())
}
)
except Exception as e:
logger.error(f"Error storing violation: {str(e)}")
def _store_compliance_results(self, results: Dict[str, Any]):
"""
Store compliance scan results
"""
try:
self.compliance_table.put_item(
Item={
'scan_id': f"scan-{int(datetime.utcnow().timestamp())}",
'scan_timestamp': results['scan_timestamp'],
'compliance_percentage': results['compliance_percentage'],
'total_resources': results['total_resources'],
'compliant_resources': results['compliant_resources'],
'non_compliant_resources': results['non_compliant_resources'],
'services_scanned': results['services_scanned'],
'ttl': int((datetime.utcnow() + timedelta(days=365)).timestamp())
}
)
except Exception as e:
logger.error(f"Error storing compliance results: {str(e)}")
# Example usage
if __name__ == "__main__":
# Initialize encryption enforcer
enforcer = EncryptionInTransitEnforcer()
# Scan for encryption compliance
compliance_results = enforcer.scan_encryption_compliance()
print(f"Encryption compliance results: {json.dumps(compliance_results, indent=2, default=str)}")
# Remediate violations if any found
if compliance_results['violations']:
violation_arns = [v.resource_arn for v in compliance_results['violations'][:5]] # Limit to first 5
remediation_results = enforcer.remediate_encryption_violations(violation_arns)
print(f"Remediation results: {json.dumps(remediation_results, indent=2)}")Example 2: Service Control Policies for Encryption Enforcement
View code
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DenyUnencryptedS3Operations",
"Effect": "Deny",
"Action": [
"s3:GetObject",
"s3:PutObject"
],
"Resource": "*",
"Condition": {
"Bool": {
"aws:SecureTransport": "false"
}
}
},
{
"Sid": "DenyS3BucketWithoutHTTPSPolicy",
"Effect": "Deny",
"Action": [
"s3:CreateBucket",
"s3:PutBucketPolicy"
],
"Resource": "*",
"Condition": {
"Bool": {
"s3:SecureTransport": "false"
}
}
},
{
"Sid": "DenyUnencryptedELBListeners",
"Effect": "Deny",
"Action": [
"elasticloadbalancing:CreateListener",
"elasticloadbalancing:ModifyListener"
],
"Resource": "*",
"Condition": {
"StringNotEquals": {
"elasticloadbalancing:Protocol": [
"HTTPS",
"TLS"
]
}
}
},
{
"Sid": "DenyWeakSSLPolicies",
"Effect": "Deny",
"Action": [
"elasticloadbalancing:CreateListener",
"elasticloadbalancing:ModifyListener"
],
"Resource": "*",
"Condition": {
"StringNotLike": {
"elasticloadbalancing:SSLPolicy": [
"ELBSecurityPolicy-TLS-1-2-*",
"ELBSecurityPolicy-FS-*",
"ELBSecurityPolicy-TLS13-*"
]
}
}
},
{
"Sid": "DenyUnencryptedRDSConnections",
"Effect": "Deny",
"Action": [
"rds:CreateDBInstance",
"rds:ModifyDBInstance"
],
"Resource": "*",
"Condition": {
"Bool": {
"rds:db-instance-force-ssl": "false"
}
}
},
{
"Sid": "DenyUnencryptedElastiCacheClusters",
"Effect": "Deny",
"Action": [
"elasticache:CreateCacheCluster",
"elasticache:CreateReplicationGroup"
],
"Resource": "*",
"Condition": {
"Bool": {
"elasticache:TransitEncryptionEnabled": "false"
}
}
},
{
"Sid": "DenyAPIGatewayWithoutMinTLS",
"Effect": "Deny",
"Action": [
"apigateway:CreateDomainName",
"apigateway:UpdateDomainName"
],
"Resource": "*",
"Condition": {
"StringNotEquals": {
"apigateway:SecurityPolicy": [
"TLS_1_2"
]
}
}
},
{
"Sid": "DenyUnencryptedCloudFrontDistributions",
"Effect": "Deny",
"Action": [
"cloudfront:CreateDistribution",
"cloudfront:UpdateDistribution"
],
"Resource": "*",
"Condition": {
"StringNotEquals": {
"cloudfront:ViewerProtocolPolicy": [
"https-only",
"redirect-to-https"
]
}
}
},
{
"Sid": "DenyUnencryptedKinesisStreams",
"Effect": "Deny",
"Action": [
"kinesis:CreateStream"
],
"Resource": "*",
"Condition": {
"Null": {
"kinesis:StreamEncryption": "true"
}
}
},
{
"Sid": "DenyUnencryptedSQSQueues",
"Effect": "Deny",
"Action": [
"sqs:CreateQueue"
],
"Resource": "*",
"Condition": {
"Null": {
"sqs:KmsMasterKeyId": "true"
}
}
},
{
"Sid": "DenyUnencryptedSNSTopics",
"Effect": "Deny",
"Action": [
"sns:CreateTopic"
],
"Resource": "*",
"Condition": {
"Null": {
"sns:KmsMasterKeyId": "true"
}
}
},
{
"Sid": "DenyUnencryptedLambdaEnvironmentVariables",
"Effect": "Deny",
"Action": [
"lambda:CreateFunction",
"lambda:UpdateFunctionConfiguration"
],
"Resource": "*",
"Condition": {
"Null": {
"lambda:KMSKeyArn": "true"
}
}
}
]
}Example 3: AWS Config Rules for Encryption Monitoring
View code
# config_encryption_rules.py
import boto3
import json
from typing import Dict, List, Any
import logging
logger = logging.getLogger(__name__)
class ConfigEncryptionInTransitRules:
"""
Deploy and manage AWS Config rules for encryption in transit monitoring
"""
def __init__(self, region: str = 'us-east-1'):
self.region = region
self.config_client = boto3.client('config', region_name=region)
self.lambda_client = boto3.client('lambda', region_name=region)
# Define encryption in transit config rules
self.encryption_rules = self._define_encryption_rules()
def _define_encryption_rules(self) -> List[Dict[str, Any]]:
"""
Define AWS Config rules for encryption in transit compliance
"""
return [
{
'ConfigRuleName': 's3-bucket-ssl-requests-only',
'Description': 'Checks that S3 buckets have policies requiring SSL requests',
'Source': {
'Owner': 'AWS',
'SourceIdentifier': 'S3_BUCKET_SSL_REQUESTS_ONLY'
},
'Scope': {
'ComplianceResourceTypes': ['AWS::S3::Bucket']
}
},
{
'ConfigRuleName': 'elb-tls-https-listeners-only',
'Description': 'Checks that ELB listeners use HTTPS or TLS protocols',
'Source': {
'Owner': 'AWS',
'SourceIdentifier': 'ELB_TLS_HTTPS_LISTENERS_ONLY'
},
'Scope': {
'ComplianceResourceTypes': ['AWS::ElasticLoadBalancingV2::Listener']
}
},
{
'ConfigRuleName': 'rds-instance-ssl-enabled',
'Description': 'Checks that RDS instances have SSL enabled',
'Source': {
'Owner': 'AWS',
'SourceIdentifier': 'RDS_INSTANCE_SSL_ENABLED'
},
'Scope': {
'ComplianceResourceTypes': ['AWS::RDS::DBInstance']
}
},
{
'ConfigRuleName': 'elasticache-redis-cluster-encryption-in-transit',
'Description': 'Checks that ElastiCache Redis clusters have encryption in transit enabled',
'Source': {
'Owner': 'AWS',
'SourceIdentifier': 'ELASTICACHE_REDIS_CLUSTER_ENCRYPTION_IN_TRANSIT_ENABLED'
},
'Scope': {
'ComplianceResourceTypes': ['AWS::ElastiCache::CacheCluster']
}
},
{
'ConfigRuleName': 'api-gateway-ssl-enabled',
'Description': 'Checks that API Gateway stages have SSL certificates configured',
'Source': {
'Owner': 'AWS',
'SourceIdentifier': 'API_GW_SSL_ENABLED'
},
'Scope': {
'ComplianceResourceTypes': ['AWS::ApiGateway::Stage']
}
},
{
'ConfigRuleName': 'cloudfront-viewer-policy-https',
'Description': 'Checks that CloudFront distributions use HTTPS viewer protocol policy',
'Source': {
'Owner': 'AWS',
'SourceIdentifier': 'CLOUDFRONT_VIEWER_POLICY_HTTPS'
},
'Scope': {
'ComplianceResourceTypes': ['AWS::CloudFront::Distribution']
}
},
{
'ConfigRuleName': 'alb-http-drop-invalid-header-enabled',
'Description': 'Checks that ALBs drop invalid HTTP headers',
'Source': {
'Owner': 'AWS',
'SourceIdentifier': 'ALB_HTTP_DROP_INVALID_HEADER_ENABLED'
},
'Scope': {
'ComplianceResourceTypes': ['AWS::ElasticLoadBalancingV2::LoadBalancer']
}
},
{
'ConfigRuleName': 'custom-ssl-policy-check',
'Description': 'Custom rule to check SSL policy compliance',
'Source': {
'Owner': 'AWS_CONFIG_RULE',
'SourceIdentifier': self._create_custom_ssl_policy_lambda()
},
'Scope': {
'ComplianceResourceTypes': [
'AWS::ElasticLoadBalancingV2::Listener',
'AWS::CloudFront::Distribution'
]
}
}
]
def _create_custom_ssl_policy_lambda(self) -> str:
"""
Create custom Lambda function for SSL policy validation
"""
lambda_code = '''
import json
import boto3
def lambda_handler(event, context):
"""
Custom Config rule to validate SSL/TLS policies
"""
config_client = boto3.client('config')
# Get the configuration item
configuration_item = event['configurationItem']
resource_type = configuration_item['resourceType']
resource_id = configuration_item['resourceId']
compliance_type = 'COMPLIANT'
annotation = 'Resource is compliant with SSL/TLS policy requirements'
try:
if resource_type == 'AWS::ElasticLoadBalancingV2::Listener':
compliance_type, annotation = check_elb_listener_ssl_policy(configuration_item)
elif resource_type == 'AWS::CloudFront::Distribution':
compliance_type, annotation = check_cloudfront_ssl_policy(configuration_item)
except Exception as e:
compliance_type = 'NOT_APPLICABLE'
annotation = f'Error evaluating resource: {str(e)}'
# Submit evaluation result
evaluation = {
'ComplianceResourceType': resource_type,
'ComplianceResourceId': resource_id,
'ComplianceType': compliance_type,
'Annotation': annotation,
'OrderingTimestamp': configuration_item['configurationItemCaptureTime']
}
config_client.put_evaluations(
Evaluations=[evaluation],
ResultToken=event['resultToken']
)
return {
'statusCode': 200,
'body': json.dumps('Evaluation completed')
}
def check_elb_listener_ssl_policy(configuration_item):
"""Check ELB listener SSL policy"""
configuration = configuration_item.get('configuration', {})
protocol = configuration.get('protocol', '')
ssl_policy = configuration.get('sslPolicy', '')
if protocol not in ['HTTPS', 'TLS']:
return 'NON_COMPLIANT', f'Listener uses unencrypted protocol: {protocol}'
# Check for secure SSL policies
secure_policies = [
'ELBSecurityPolicy-TLS-1-2-2017-01',
'ELBSecurityPolicy-TLS-1-2-Ext-2018-06',
'ELBSecurityPolicy-FS-2018-06',
'ELBSecurityPolicy-FS-1-2-2019-08',
'ELBSecurityPolicy-FS-1-2-Res-2019-08',
'ELBSecurityPolicy-FS-1-2-Res-2020-10',
'ELBSecurityPolicy-TLS13-1-2-2021-06'
]
if ssl_policy not in secure_policies:
return 'NON_COMPLIANT', f'Listener uses weak SSL policy: {ssl_policy}'
return 'COMPLIANT', f'Listener uses secure SSL policy: {ssl_policy}'
def check_cloudfront_ssl_policy(configuration_item):
"""Check CloudFront distribution SSL policy"""
configuration = configuration_item.get('configuration', {})
distribution_config = configuration.get('distributionConfig', {})
# Check viewer protocol policy
default_cache_behavior = distribution_config.get('defaultCacheBehavior', {})
viewer_protocol_policy = default_cache_behavior.get('viewerProtocolPolicy', '')
if viewer_protocol_policy not in ['https-only', 'redirect-to-https']:
return 'NON_COMPLIANT', f'Distribution allows HTTP: {viewer_protocol_policy}'
# Check minimum protocol version
viewer_certificate = distribution_config.get('viewerCertificate', {})
minimum_protocol_version = viewer_certificate.get('minimumProtocolVersion', '')
secure_versions = ['TLSv1.2_2018', 'TLSv1.2_2019', 'TLSv1.2_2021']
if minimum_protocol_version and minimum_protocol_version not in secure_versions:
return 'NON_COMPLIANT', f'Distribution uses weak TLS version: {minimum_protocol_version}'
return 'COMPLIANT', 'Distribution uses secure HTTPS configuration'
'''
try:
# Create Lambda function for custom Config rule
function_name = 'config-ssl-policy-checker'
response = self.lambda_client.create_function(
FunctionName=function_name,
Runtime='python3.9',
Role=f'arn:aws:iam::{self._get_account_id()}:role/ConfigRuleLambdaRole',
Handler='index.lambda_handler',
Code={'ZipFile': lambda_code.encode()},
Description='Custom Config rule for SSL/TLS policy validation',
Timeout=60,
Tags={
'Purpose': 'ConfigRule',
'Type': 'SSLPolicyChecker'
}
)
return response['FunctionArn']
except self.lambda_client.exceptions.ResourceConflictException:
# Function already exists
response = self.lambda_client.get_function(FunctionName=function_name)
return response['Configuration']['FunctionArn']
except Exception as e:
logger.error(f"Error creating custom Lambda function: {str(e)}")
return ''
def deploy_encryption_rules(self) -> Dict[str, Any]:
"""
Deploy all encryption in transit monitoring Config rules
"""
deployment_results = {
'total_rules': len(self.encryption_rules),
'successful_deployments': 0,
'failed_deployments': 0,
'results': []
}
for rule_config in self.encryption_rules:
try:
# Check if rule already exists
try:
self.config_client.describe_config_rules(
ConfigRuleNames=[rule_config['ConfigRuleName']]
)
# Rule exists, update it
self.config_client.put_config_rule(ConfigRule=rule_config)
status = 'updated'
except self.config_client.exceptions.NoSuchConfigRuleException:
# Rule doesn't exist, create it
self.config_client.put_config_rule(ConfigRule=rule_config)
status = 'created'
deployment_results['successful_deployments'] += 1
deployment_results['results'].append({
'rule_name': rule_config['ConfigRuleName'],
'status': status,
'message': f'Rule {status} successfully'
})
logger.info(f"Config rule {rule_config['ConfigRuleName']} {status} successfully")
except Exception as e:
deployment_results['failed_deployments'] += 1
deployment_results['results'].append({
'rule_name': rule_config['ConfigRuleName'],
'status': 'failed',
'message': str(e)
})
logger.error(f"Failed to deploy Config rule {rule_config['ConfigRuleName']}: {str(e)}")
return deployment_results
def get_encryption_compliance_summary(self) -> Dict[str, Any]:
"""
Get compliance summary for all encryption in transit rules
"""
compliance_summary = {
'timestamp': boto3.client('sts').get_caller_identity(),
'rules_evaluated': 0,
'compliant_resources': 0,
'non_compliant_resources': 0,
'rule_details': []
}
for rule_config in self.encryption_rules:
rule_name = rule_config['ConfigRuleName']
try:
# Get compliance details for this rule
compliance_response = self.config_client.get_compliance_details_by_config_rule(
ConfigRuleName=rule_name
)
rule_compliance = {
'rule_name': rule_name,
'compliant_count': 0,
'non_compliant_count': 0,
'not_applicable_count': 0,
'insufficient_data_count': 0
}
for result in compliance_response['EvaluationResults']:
compliance_type = result['ComplianceType']
if compliance_type == 'COMPLIANT':
rule_compliance['compliant_count'] += 1
compliance_summary['compliant_resources'] += 1
elif compliance_type == 'NON_COMPLIANT':
rule_compliance['non_compliant_count'] += 1
compliance_summary['non_compliant_resources'] += 1
elif compliance_type == 'NOT_APPLICABLE':
rule_compliance['not_applicable_count'] += 1
elif compliance_type == 'INSUFFICIENT_DATA':
rule_compliance['insufficient_data_count'] += 1
compliance_summary['rule_details'].append(rule_compliance)
compliance_summary['rules_evaluated'] += 1
except Exception as e:
logger.error(f"Error getting compliance for rule {rule_name}: {str(e)}")
compliance_summary['rule_details'].append({
'rule_name': rule_name,
'error': str(e)
})
# Calculate overall compliance percentage
total_evaluated = compliance_summary['compliant_resources'] + compliance_summary['non_compliant_resources']
if total_evaluated > 0:
compliance_summary['compliance_percentage'] = round(
(compliance_summary['compliant_resources'] / total_evaluated) * 100, 2
)
else:
compliance_summary['compliance_percentage'] = 0.0
return compliance_summary
def _get_account_id(self) -> str:
"""Get AWS account ID"""
return boto3.client('sts').get_caller_identity()['Account']
# Example usage
if __name__ == "__main__":
# Initialize Config rules manager
config_rules = ConfigEncryptionInTransitRules()
# Deploy encryption monitoring rules
deployment_results = config_rules.deploy_encryption_rules()
print(f"Config rules deployment: {json.dumps(deployment_results, indent=2)}")
# Get compliance summary
compliance_summary = config_rules.get_encryption_compliance_summary()
print(f"Compliance summary: {json.dumps(compliance_summary, indent=2, default=str)}")Relevant AWS Services
Load Balancing and Content Delivery
- Elastic Load Balancing (ELB): HTTPS/TLS termination with configurable SSL policies
- Amazon CloudFront: Global content delivery with HTTPS enforcement
- AWS Global Accelerator: Improved performance with TLS termination
- Amazon API Gateway: API management with TLS termination and backend encryption
Database and Caching Services
- Amazon RDS: SSL/TLS encryption for database connections
- Amazon ElastiCache: Encryption in transit for Redis and Memcached
- Amazon DynamoDB: HTTPS API endpoints with TLS encryption
- Amazon DocumentDB: TLS encryption for MongoDB-compatible database
Storage and Messaging Services
- Amazon S3: HTTPS-only bucket policies and SSL/TLS enforcement
- Amazon EFS: Encryption in transit for file system access
- Amazon SQS: HTTPS endpoints for message queue operations
- Amazon SNS: TLS encryption for notification delivery
Networking and Connectivity
- Amazon VPC: VPC endpoints for private encrypted connectivity
- AWS PrivateLink: Private connectivity between VPCs and AWS services
- AWS VPN: IPSec encryption for site-to-site and client VPN connections
- AWS Direct Connect: MACsec encryption for dedicated network connections
Monitoring and Governance
- AWS Config: Continuous compliance monitoring for encryption policies
- AWS CloudTrail: Audit logging of encrypted and unencrypted API calls
- Amazon CloudWatch: Monitoring and alerting for encryption violations
- AWS Organizations: Service Control Policies for encryption enforcement
Compute and Application Services
- AWS Lambda: Environment variable encryption and HTTPS endpoints
- Amazon ECS/EKS: Container communication encryption
- AWS App Runner: Automatic HTTPS for web applications
- AWS Batch: Secure job submission and result retrieval
Benefits of Enforcing Encryption in Transit
Security Benefits
- Data Confidentiality: Protection against eavesdropping and interception
- Data Integrity: Prevention of data tampering during transmission
- Authentication: Verification of communication endpoints
- Non-Repudiation: Proof of data transmission and receipt
Compliance Benefits
- Regulatory Adherence: Meet requirements for data protection regulations
- Industry Standards: Compliance with PCI DSS, HIPAA, SOX, and other standards
- Audit Readiness: Comprehensive audit trails for encrypted communications
- Risk Management: Reduced risk of data breaches during transmission
Operational Benefits
- Automated Enforcement: Consistent application of encryption policies
- Centralized Management: Single point of control for encryption requirements
- Performance Optimization: Modern TLS implementations with minimal overhead
- Scalable Architecture: Encryption that scales with infrastructure growth
Business Benefits
- Customer Trust: Enhanced customer confidence in data protection
- Competitive Advantage: Strong security posture as business differentiator
- Cost Avoidance: Prevention of data breach costs and penalties
- Business Continuity: Secure communications supporting business operations
TLS/SSL Best Practices
Protocol Versions
- Minimum TLS 1.2: Disable older protocols (SSL 2.0, SSL 3.0, TLS 1.0, TLS 1.1)
- TLS 1.3 Preferred: Use TLS 1.3 where supported for improved security and performance
- Protocol Negotiation: Implement proper protocol version negotiation
- Fallback Protection: Prevent protocol downgrade attacks
Cipher Suite Selection
- Strong Ciphers Only: Use AEAD ciphers (AES-GCM, ChaCha20-Poly1305)
- Perfect Forward Secrecy: Prefer ECDHE and DHE key exchange methods
- Avoid Weak Ciphers: Disable RC4, DES, 3DES, and export-grade ciphers
- Cipher Ordering: Configure server-preferred cipher suite ordering
Certificate Management
- Strong Key Sizes: Use RSA 2048+ or ECDSA P-256+ keys
- Certificate Validation: Implement proper certificate chain validation
- Certificate Pinning: Use certificate pinning for critical connections
- Regular Renewal: Automate certificate renewal before expiration
Implementation Considerations
- HSTS Headers: Implement HTTP Strict Transport Security
- OCSP Stapling: Enable OCSP stapling for certificate validation
- Session Management: Implement secure session resumption
- Error Handling: Proper handling of TLS errors and failures
Common Implementation Patterns
API Gateway with Backend Encryption
View code
Client → HTTPS → API Gateway → HTTPS → Backend ServiceLoad Balancer SSL Termination
View code
Client → HTTPS → ALB (SSL Termination) → HTTP → BackendEnd-to-End Encryption
View code
Client → HTTPS → ALB → HTTPS → Backend ServiceDatabase Connection Encryption
View code
Application → SSL/TLS → RDS/ElastiCacheService Mesh Encryption
View code
Service A → mTLS → Service B (via Service Mesh)Monitoring and Alerting
Key Metrics to Monitor
- TLS Handshake Success Rate: Monitor successful TLS negotiations
- Certificate Expiration: Track certificate expiration dates
- Protocol Version Usage: Monitor TLS version distribution
- Cipher Suite Usage: Track cipher suite selection patterns
- Encryption Violations: Count of unencrypted connection attempts
Alerting Scenarios
- Certificate Expiration: Alert 30, 14, and 7 days before expiration
- Weak Protocol Usage: Alert on TLS 1.0/1.1 usage
- Unencrypted Connections: Immediate alert on HTTP usage for sensitive data
- SSL Policy Changes: Alert on SSL policy modifications
- Certificate Validation Failures: Alert on certificate validation errors
Compliance Reporting
- Encryption Coverage: Percentage of services with encryption enabled
- Policy Compliance: Adherence to organizational encryption policies
- Vulnerability Assessment: Regular assessment of TLS configuration
- Audit Trail: Complete record of encryption-related activities
Related Resources
- AWS Well-Architected Framework - Data in Transit Protection
- Elastic Load Balancing SSL Policies
- Amazon S3 HTTPS Enforcement
- Amazon RDS SSL/TLS
- AWS Config Rules for Encryption
- TLS Security Best Practices
- OWASP Transport Layer Protection
View code