Skip to content
SEC09

SEC09-BP02: Enforce encryption in transit

One-Click Remediation

Deploy CloudFormation stacks to implement this best practice with a single click.

Encryption in Transit Baseline
Enforce transport encryption with managed certificates, TLS policies, and secure listener defaults
Launch Stack

Stacks deploy to your AWS account. Review parameters before creating. Standard AWS charges apply.

SEC09-BP02: Enforce encryption in transit

Overview

Enforcing encryption in transit ensures that all data moving between systems, services, and users is protected from interception, tampering, and eavesdropping. This best practice focuses on implementing comprehensive encryption policies that prevent unencrypted communications and ensure strong cryptographic protocols are used consistently across your entire infrastructure.

Encryption in transit should be enforced at multiple layers including network protocols, application protocols, service-to-service communication, and client-server interactions. The implementation should be transparent to applications while providing strong cryptographic protection for all data flows.

Implementation Guidance

1. Implement Service-Level Encryption Enforcement

Deploy encryption enforcement across all AWS services:

  • Amazon S3: HTTPS-only bucket policies and SSL/TLS enforcement
  • Amazon RDS: Force SSL connections for database access
  • Amazon ElastiCache: TLS encryption for Redis and Memcached
  • Amazon ELB: HTTPS listeners with strong SSL policies
  • Amazon API Gateway: TLS termination and backend encryption

2. Configure Network-Level Encryption

Establish network encryption controls:

  • VPC Endpoints: Private encrypted connectivity to AWS services
  • AWS PrivateLink: Encrypted service-to-service communication
  • VPN Connections: IPSec encryption for hybrid connectivity
  • AWS Direct Connect: MACsec encryption for dedicated connections

3. Deploy Application-Level Encryption

Implement application-specific encryption:

  • TLS/SSL Configuration: Strong cipher suites and protocol versions
  • Certificate Management: Proper certificate deployment and validation
  • Client Authentication: Mutual TLS for service authentication
  • Protocol Security: Secure protocol configuration and hardening

4. Automate Encryption Compliance

Deploy automated enforcement mechanisms:

  • Service Control Policies: Prevent unencrypted resource creation
  • AWS Config Rules: Monitor encryption compliance continuously
  • Lambda Functions: Automated remediation of encryption violations
  • CloudFormation Guards: Validate encryption in infrastructure templates

5. Monitor Encryption Status

Establish comprehensive encryption monitoring:

  • CloudTrail Analysis: Monitor for unencrypted API calls
  • VPC Flow Logs: Analyze network traffic patterns
  • Application Logs: Track encryption status in applications
  • Real-Time Alerting: Immediate notification of encryption violations

6. Implement Encryption Governance

Deploy governance frameworks for encryption:

  • Policy Management: Centralized encryption policy definition
  • Compliance Reporting: Regular encryption compliance assessments
  • Exception Management: Controlled handling of encryption exceptions
  • Audit Procedures: Regular audits of encryption implementation

Implementation Examples

Example 1: Comprehensive Encryption Enforcement System

View code
# encryption_in_transit_enforcer.py
import boto3
import json
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
import logging
import ssl
import socket
import requests
from urllib.parse import urlparse

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class EncryptionPolicy:
    service: str
    resource_type: str
    encryption_required: bool
    min_tls_version: str
    allowed_cipher_suites: List[str]
    certificate_validation: bool
    compliance_frameworks: List[str]

@dataclass
class EncryptionViolation:
    resource_arn: str
    service: str
    violation_type: str
    description: str
    severity: str
    detected_at: str
    remediation_action: str

class EncryptionInTransitEnforcer:
    """
    Comprehensive system for enforcing encryption in transit across AWS services
    """
    
    def __init__(self, region: str = 'us-east-1'):
        self.region = region
        self.s3_client = boto3.client('s3', region_name=region)
        self.elb_client = boto3.client('elbv2', region_name=region)
        self.rds_client = boto3.client('rds', region_name=region)
        self.apigateway_client = boto3.client('apigateway', region_name=region)
        self.elasticache_client = boto3.client('elasticache', region_name=region)
        self.ec2_client = boto3.client('ec2', region_name=region)
        self.config_client = boto3.client('config', region_name=region)
        self.cloudtrail_client = boto3.client('cloudtrail', region_name=region)
        self.lambda_client = boto3.client('lambda', region_name=region)
        self.sns_client = boto3.client('sns', region_name=region)
        self.dynamodb = boto3.resource('dynamodb', region_name=region)
        
        # Encryption compliance tracking
        self.compliance_table = self.dynamodb.Table('encryption-transit-compliance')
        self.violations_table = self.dynamodb.Table('encryption-violations')
        
        # Encryption policies by service
        self.encryption_policies = self._define_encryption_policies()
    
    def _define_encryption_policies(self) -> Dict[str, EncryptionPolicy]:
        """
        Define encryption in transit policies for different AWS services
        """
        return {
            's3_bucket': EncryptionPolicy(
                service='s3',
                resource_type='bucket',
                encryption_required=True,
                min_tls_version='TLSv1.2',
                allowed_cipher_suites=[
                    'ECDHE-RSA-AES128-GCM-SHA256',
                    'ECDHE-RSA-AES256-GCM-SHA384',
                    'ECDHE-RSA-AES128-SHA256',
                    'ECDHE-RSA-AES256-SHA384'
                ],
                certificate_validation=True,
                compliance_frameworks=['GDPR', 'HIPAA', 'PCI_DSS']
            ),
            'elb_listener': EncryptionPolicy(
                service='elbv2',
                resource_type='listener',
                encryption_required=True,
                min_tls_version='TLSv1.2',
                allowed_cipher_suites=[
                    'ECDHE-ECDSA-AES128-GCM-SHA256',
                    'ECDHE-RSA-AES128-GCM-SHA256',
                    'ECDHE-ECDSA-AES256-GCM-SHA384',
                    'ECDHE-RSA-AES256-GCM-SHA384'
                ],
                certificate_validation=True,
                compliance_frameworks=['PCI_DSS', 'HIPAA']
            ),
            'rds_instance': EncryptionPolicy(
                service='rds',
                resource_type='db_instance',
                encryption_required=True,
                min_tls_version='TLSv1.2',
                allowed_cipher_suites=[],  # Database-specific
                certificate_validation=True,
                compliance_frameworks=['HIPAA', 'SOX', 'PCI_DSS']
            ),
            'api_gateway': EncryptionPolicy(
                service='apigateway',
                resource_type='rest_api',
                encryption_required=True,
                min_tls_version='TLSv1.2',
                allowed_cipher_suites=[
                    'ECDHE-RSA-AES128-GCM-SHA256',
                    'ECDHE-RSA-AES256-GCM-SHA384'
                ],
                certificate_validation=True,
                compliance_frameworks=['GDPR', 'HIPAA', 'PCI_DSS']
            ),
            'elasticache_cluster': EncryptionPolicy(
                service='elasticache',
                resource_type='cache_cluster',
                encryption_required=True,
                min_tls_version='TLSv1.2',
                allowed_cipher_suites=[],  # Redis/Memcached specific
                certificate_validation=False,  # Internal service
                compliance_frameworks=['HIPAA', 'PCI_DSS']
            )
        }
    
    def scan_encryption_compliance(self) -> Dict[str, Any]:
        """
        Scan all resources for encryption in transit compliance
        """
        compliance_results = {
            'scan_timestamp': datetime.utcnow().isoformat(),
            'total_resources': 0,
            'compliant_resources': 0,
            'non_compliant_resources': 0,
            'services_scanned': [],
            'compliance_by_service': {},
            'violations': []
        }
        
        # Scan S3 buckets
        s3_results = self._scan_s3_encryption_compliance()
        compliance_results['services_scanned'].append('s3')
        compliance_results['compliance_by_service']['s3'] = s3_results
        compliance_results['total_resources'] += s3_results['total']
        compliance_results['compliant_resources'] += s3_results['compliant']
        compliance_results['non_compliant_resources'] += s3_results['non_compliant']
        compliance_results['violations'].extend(s3_results['violations'])
        
        # Scan ELB listeners
        elb_results = self._scan_elb_encryption_compliance()
        compliance_results['services_scanned'].append('elbv2')
        compliance_results['compliance_by_service']['elbv2'] = elb_results
        compliance_results['total_resources'] += elb_results['total']
        compliance_results['compliant_resources'] += elb_results['compliant']
        compliance_results['non_compliant_resources'] += elb_results['non_compliant']
        compliance_results['violations'].extend(elb_results['violations'])
        
        # Scan RDS instances
        rds_results = self._scan_rds_encryption_compliance()
        compliance_results['services_scanned'].append('rds')
        compliance_results['compliance_by_service']['rds'] = rds_results
        compliance_results['total_resources'] += rds_results['total']
        compliance_results['compliant_resources'] += rds_results['compliant']
        compliance_results['non_compliant_resources'] += rds_results['non_compliant']
        compliance_results['violations'].extend(rds_results['violations'])
        
        # Scan API Gateway APIs
        api_results = self._scan_api_gateway_encryption_compliance()
        compliance_results['services_scanned'].append('apigateway')
        compliance_results['compliance_by_service']['apigateway'] = api_results
        compliance_results['total_resources'] += api_results['total']
        compliance_results['compliant_resources'] += api_results['compliant']
        compliance_results['non_compliant_resources'] += api_results['non_compliant']
        compliance_results['violations'].extend(api_results['violations'])
        
        # Calculate compliance percentage
        if compliance_results['total_resources'] > 0:
            compliance_results['compliance_percentage'] = round(
                (compliance_results['compliant_resources'] / compliance_results['total_resources']) * 100, 2
            )
        else:
            compliance_results['compliance_percentage'] = 100.0
        
        # Store compliance results
        self._store_compliance_results(compliance_results)
        
        return compliance_results
    
    def _scan_s3_encryption_compliance(self) -> Dict[str, Any]:
        """
        Scan S3 buckets for HTTPS enforcement
        """
        results = {
            'total': 0,
            'compliant': 0,
            'non_compliant': 0,
            'violations': []
        }
        
        try:
            # List all S3 buckets
            buckets_response = self.s3_client.list_buckets()
            
            for bucket in buckets_response['Buckets']:
                bucket_name = bucket['Name']
                bucket_arn = f"arn:aws:s3:::{bucket_name}"
                results['total'] += 1
                
                try:
                    # Check bucket policy for HTTPS enforcement
                    try:
                        policy_response = self.s3_client.get_bucket_policy(Bucket=bucket_name)
                        policy_document = json.loads(policy_response['Policy'])
                        
                        https_enforced = self._check_s3_https_enforcement(policy_document)
                        
                        if https_enforced:
                            results['compliant'] += 1
                        else:
                            results['non_compliant'] += 1
                            violation = EncryptionViolation(
                                resource_arn=bucket_arn,
                                service='s3',
                                violation_type='missing_https_enforcement',
                                description='S3 bucket does not enforce HTTPS connections',
                                severity='high',
                                detected_at=datetime.utcnow().isoformat(),
                                remediation_action='Add bucket policy to deny non-HTTPS requests'
                            )
                            results['violations'].append(violation)
                            self._store_violation(violation)
                    
                    except self.s3_client.exceptions.NoSuchBucketPolicy:
                        # No bucket policy means no HTTPS enforcement
                        results['non_compliant'] += 1
                        violation = EncryptionViolation(
                            resource_arn=bucket_arn,
                            service='s3',
                            violation_type='no_bucket_policy',
                            description='S3 bucket has no policy to enforce HTTPS',
                            severity='high',
                            detected_at=datetime.utcnow().isoformat(),
                            remediation_action='Create bucket policy to enforce HTTPS'
                        )
                        results['violations'].append(violation)
                        self._store_violation(violation)
                
                except Exception as e:
                    logger.error(f"Error checking S3 bucket {bucket_name}: {str(e)}")
                    continue
        
        except Exception as e:
            logger.error(f"Error scanning S3 encryption compliance: {str(e)}")
        
        return results
    
    def _check_s3_https_enforcement(self, policy_document: Dict[str, Any]) -> bool:
        """
        Check if S3 bucket policy enforces HTTPS
        """
        try:
            statements = policy_document.get('Statement', [])
            
            for statement in statements:
                # Look for deny statements with SecureTransport condition
                if (statement.get('Effect') == 'Deny' and 
                    'Condition' in statement and
                    'Bool' in statement['Condition'] and
                    'aws:SecureTransport' in statement['Condition']['Bool']):
                    
                    secure_transport = statement['Condition']['Bool']['aws:SecureTransport']
                    if secure_transport == 'false' or secure_transport is False:
                        return True
            
            return False
            
        except Exception as e:
            logger.error(f"Error checking HTTPS enforcement: {str(e)}")
            return False
    
    def _scan_elb_encryption_compliance(self) -> Dict[str, Any]:
        """
        Scan ELB listeners for HTTPS/TLS configuration
        """
        results = {
            'total': 0,
            'compliant': 0,
            'non_compliant': 0,
            'violations': []
        }
        
        try:
            # Get all load balancers
            load_balancers = self.elb_client.describe_load_balancers()
            
            for lb in load_balancers['LoadBalancers']:
                lb_arn = lb['LoadBalancerArn']
                
                # Get listeners for this load balancer
                listeners_response = self.elb_client.describe_listeners(LoadBalancerArn=lb_arn)
                
                for listener in listeners_response['Listeners']:
                    listener_arn = listener['ListenerArn']
                    results['total'] += 1
                    
                    protocol = listener['Protocol']
                    port = listener['Port']
                    
                    if protocol in ['HTTPS', 'TLS']:
                        # Check SSL policy
                        ssl_policy = listener.get('SslPolicy', '')
                        
                        if self._is_secure_ssl_policy(ssl_policy):
                            results['compliant'] += 1
                        else:
                            results['non_compliant'] += 1
                            violation = EncryptionViolation(
                                resource_arn=listener_arn,
                                service='elbv2',
                                violation_type='weak_ssl_policy',
                                description=f'ELB listener uses weak SSL policy: {ssl_policy}',
                                severity='medium',
                                detected_at=datetime.utcnow().isoformat(),
                                remediation_action='Update to secure SSL policy (ELBSecurityPolicy-TLS-1-2-2017-01 or newer)'
                            )
                            results['violations'].append(violation)
                            self._store_violation(violation)
                    
                    elif protocol in ['HTTP', 'TCP']:
                        # Unencrypted listener
                        results['non_compliant'] += 1
                        violation = EncryptionViolation(
                            resource_arn=listener_arn,
                            service='elbv2',
                            violation_type='unencrypted_listener',
                            description=f'ELB listener uses unencrypted protocol: {protocol} on port {port}',
                            severity='high',
                            detected_at=datetime.utcnow().isoformat(),
                            remediation_action='Change listener protocol to HTTPS or TLS'
                        )
                        results['violations'].append(violation)
                        self._store_violation(violation)
        
        except Exception as e:
            logger.error(f"Error scanning ELB encryption compliance: {str(e)}")
        
        return results
    
    def _is_secure_ssl_policy(self, ssl_policy: str) -> bool:
        """
        Check if SSL policy meets security requirements
        """
        # List of secure SSL policies (TLS 1.2+)
        secure_policies = [
            'ELBSecurityPolicy-TLS-1-2-2017-01',
            'ELBSecurityPolicy-TLS-1-2-Ext-2018-06',
            'ELBSecurityPolicy-FS-2018-06',
            'ELBSecurityPolicy-FS-1-2-2019-08',
            'ELBSecurityPolicy-FS-1-2-Res-2019-08',
            'ELBSecurityPolicy-FS-1-2-Res-2020-10',
            'ELBSecurityPolicy-TLS13-1-2-2021-06'
        ]
        
        return ssl_policy in secure_policies
    
    def _scan_rds_encryption_compliance(self) -> Dict[str, Any]:
        """
        Scan RDS instances for SSL/TLS enforcement
        """
        results = {
            'total': 0,
            'compliant': 0,
            'non_compliant': 0,
            'violations': []
        }
        
        try:
            # Get all RDS instances
            instances_response = self.rds_client.describe_db_instances()
            
            for instance in instances_response['DBInstances']:
                instance_id = instance['DBInstanceIdentifier']
                instance_arn = instance['DBInstanceArn']
                engine = instance['Engine']
                results['total'] += 1
                
                # Check if SSL is enforced based on engine type
                ssl_enforced = self._check_rds_ssl_enforcement(instance_id, engine)
                
                if ssl_enforced:
                    results['compliant'] += 1
                else:
                    results['non_compliant'] += 1
                    violation = EncryptionViolation(
                        resource_arn=instance_arn,
                        service='rds',
                        violation_type='ssl_not_enforced',
                        description=f'RDS instance {instance_id} does not enforce SSL connections',
                        severity='high',
                        detected_at=datetime.utcnow().isoformat(),
                        remediation_action=f'Enable SSL enforcement for {engine} database'
                    )
                    results['violations'].append(violation)
                    self._store_violation(violation)
        
        except Exception as e:
            logger.error(f"Error scanning RDS encryption compliance: {str(e)}")
        
        return results
    
    def _check_rds_ssl_enforcement(self, instance_id: str, engine: str) -> bool:
        """
        Check if RDS instance enforces SSL connections
        """
        try:
            # Get parameter group for the instance
            instance_details = self.rds_client.describe_db_instances(DBInstanceIdentifier=instance_id)
            instance = instance_details['DBInstances'][0]
            
            parameter_groups = instance.get('DBParameterGroups', [])
            
            for param_group in parameter_groups:
                param_group_name = param_group['DBParameterGroupName']
                
                # Get parameters for SSL enforcement based on engine
                if engine.startswith('mysql'):
                    ssl_param = 'require_secure_transport'
                elif engine.startswith('postgres'):
                    ssl_param = 'ssl'
                elif engine.startswith('oracle'):
                    ssl_param = 'ssl_cipher_suites'
                elif engine.startswith('sqlserver'):
                    ssl_param = 'force_ssl'
                else:
                    continue
                
                try:
                    params_response = self.rds_client.describe_db_parameters(
                        DBParameterGroupName=param_group_name,
                        ParameterName=ssl_param
                    )
                    
                    for param in params_response['Parameters']:
                        if param['ParameterName'] == ssl_param:
                            if engine.startswith('mysql') and param.get('ParameterValue') == 'ON':
                                return True
                            elif engine.startswith('postgres') and param.get('ParameterValue') == '1':
                                return True
                            elif engine.startswith('oracle') and param.get('ParameterValue'):
                                return True
                            elif engine.startswith('sqlserver') and param.get('ParameterValue') == '1':
                                return True
                
                except Exception as e:
                    logger.error(f"Error checking SSL parameter for {instance_id}: {str(e)}")
                    continue
            
            return False
            
        except Exception as e:
            logger.error(f"Error checking RDS SSL enforcement for {instance_id}: {str(e)}")
            return False
    
    def _scan_api_gateway_encryption_compliance(self) -> Dict[str, Any]:
        """
        Scan API Gateway APIs for HTTPS enforcement
        """
        results = {
            'total': 0,
            'compliant': 0,
            'non_compliant': 0,
            'violations': []
        }
        
        try:
            # Get all REST APIs
            apis_response = self.apigateway_client.get_rest_apis()
            
            for api in apis_response['items']:
                api_id = api['id']
                api_name = api['name']
                api_arn = f"arn:aws:apigateway:{self.region}::/restapis/{api_id}"
                results['total'] += 1
                
                # Check if API has HTTPS-only policy
                try:
                    policy_response = self.apigateway_client.get_rest_api(restApiId=api_id)
                    
                    # Check minimum TLS version
                    min_tls_version = policy_response.get('minimumCompressionSize')  # This is a placeholder
                    
                    # For now, assume compliant if API exists (API Gateway enforces HTTPS by default)
                    # In practice, you would check domain configurations and policies
                    results['compliant'] += 1
                
                except Exception as e:
                    logger.error(f"Error checking API Gateway {api_id}: {str(e)}")
                    results['non_compliant'] += 1
                    violation = EncryptionViolation(
                        resource_arn=api_arn,
                        service='apigateway',
                        violation_type='configuration_error',
                        description=f'Unable to verify encryption configuration for API {api_name}',
                        severity='medium',
                        detected_at=datetime.utcnow().isoformat(),
                        remediation_action='Review API Gateway encryption configuration'
                    )
                    results['violations'].append(violation)
                    self._store_violation(violation)
        
        except Exception as e:
            logger.error(f"Error scanning API Gateway encryption compliance: {str(e)}")
        
        return results
    
    def remediate_encryption_violations(self, violation_arns: List[str]) -> Dict[str, Any]:
        """
        Automatically remediate encryption in transit violations
        """
        remediation_results = {
            'total_violations': len(violation_arns),
            'successful_remediations': 0,
            'failed_remediations': 0,
            'results': []
        }
        
        for violation_arn in violation_arns:
            try:
                # Determine service from ARN
                service = violation_arn.split(':')[2]
                
                if service == 's3':
                    result = self._remediate_s3_https_enforcement(violation_arn)
                elif service == 'elasticloadbalancing':
                    result = self._remediate_elb_encryption(violation_arn)
                elif service == 'rds':
                    result = self._remediate_rds_ssl_enforcement(violation_arn)
                else:
                    result = {
                        'resource_arn': violation_arn,
                        'status': 'unsupported',
                        'message': f'Remediation not supported for service: {service}'
                    }
                
                remediation_results['results'].append(result)
                
                if result['status'] == 'success':
                    remediation_results['successful_remediations'] += 1
                else:
                    remediation_results['failed_remediations'] += 1
                    
            except Exception as e:
                remediation_results['results'].append({
                    'resource_arn': violation_arn,
                    'status': 'error',
                    'message': str(e)
                })
                remediation_results['failed_remediations'] += 1
        
        return remediation_results
    
    def _remediate_s3_https_enforcement(self, bucket_arn: str) -> Dict[str, Any]:
        """
        Add HTTPS enforcement policy to S3 bucket
        """
        try:
            bucket_name = bucket_arn.split(':')[-1]
            
            # Create HTTPS enforcement policy
            https_policy = {
                "Version": "2012-10-17",
                "Statement": [
                    {
                        "Sid": "DenyInsecureConnections",
                        "Effect": "Deny",
                        "Principal": "*",
                        "Action": "s3:*",
                        "Resource": [
                            f"arn:aws:s3:::{bucket_name}",
                            f"arn:aws:s3:::{bucket_name}/*"
                        ],
                        "Condition": {
                            "Bool": {
                                "aws:SecureTransport": "false"
                            }
                        }
                    }
                ]
            }
            
            # Apply the policy
            self.s3_client.put_bucket_policy(
                Bucket=bucket_name,
                Policy=json.dumps(https_policy)
            )
            
            logger.info(f"Applied HTTPS enforcement policy to bucket: {bucket_name}")
            
            return {
                'resource_arn': bucket_arn,
                'status': 'success',
                'message': 'HTTPS enforcement policy applied successfully'
            }
            
        except Exception as e:
            logger.error(f"Error remediating S3 HTTPS enforcement for {bucket_arn}: {str(e)}")
            return {
                'resource_arn': bucket_arn,
                'status': 'error',
                'message': str(e)
            }
    
    def _store_violation(self, violation: EncryptionViolation):
        """
        Store encryption violation in DynamoDB
        """
        try:
            self.violations_table.put_item(
                Item={
                    **asdict(violation),
                    'ttl': int((datetime.utcnow() + timedelta(days=90)).timestamp())
                }
            )
        except Exception as e:
            logger.error(f"Error storing violation: {str(e)}")
    
    def _store_compliance_results(self, results: Dict[str, Any]):
        """
        Store compliance scan results
        """
        try:
            self.compliance_table.put_item(
                Item={
                    'scan_id': f"scan-{int(datetime.utcnow().timestamp())}",
                    'scan_timestamp': results['scan_timestamp'],
                    'compliance_percentage': results['compliance_percentage'],
                    'total_resources': results['total_resources'],
                    'compliant_resources': results['compliant_resources'],
                    'non_compliant_resources': results['non_compliant_resources'],
                    'services_scanned': results['services_scanned'],
                    'ttl': int((datetime.utcnow() + timedelta(days=365)).timestamp())
                }
            )
        except Exception as e:
            logger.error(f"Error storing compliance results: {str(e)}")

# Example usage
if __name__ == "__main__":
    # Initialize encryption enforcer
    enforcer = EncryptionInTransitEnforcer()
    
    # Scan for encryption compliance
    compliance_results = enforcer.scan_encryption_compliance()
    print(f"Encryption compliance results: {json.dumps(compliance_results, indent=2, default=str)}")
    
    # Remediate violations if any found
    if compliance_results['violations']:
        violation_arns = [v.resource_arn for v in compliance_results['violations'][:5]]  # Limit to first 5
        remediation_results = enforcer.remediate_encryption_violations(violation_arns)
        print(f"Remediation results: {json.dumps(remediation_results, indent=2)}")

Example 2: Service Control Policies for Encryption Enforcement

View code
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DenyUnencryptedS3Operations",
      "Effect": "Deny",
      "Action": [
        "s3:GetObject",
        "s3:PutObject"
      ],
      "Resource": "*",
      "Condition": {
        "Bool": {
          "aws:SecureTransport": "false"
        }
      }
    },
    {
      "Sid": "DenyS3BucketWithoutHTTPSPolicy",
      "Effect": "Deny",
      "Action": [
        "s3:CreateBucket",
        "s3:PutBucketPolicy"
      ],
      "Resource": "*",
      "Condition": {
        "Bool": {
          "s3:SecureTransport": "false"
        }
      }
    },
    {
      "Sid": "DenyUnencryptedELBListeners",
      "Effect": "Deny",
      "Action": [
        "elasticloadbalancing:CreateListener",
        "elasticloadbalancing:ModifyListener"
      ],
      "Resource": "*",
      "Condition": {
        "StringNotEquals": {
          "elasticloadbalancing:Protocol": [
            "HTTPS",
            "TLS"
          ]
        }
      }
    },
    {
      "Sid": "DenyWeakSSLPolicies",
      "Effect": "Deny",
      "Action": [
        "elasticloadbalancing:CreateListener",
        "elasticloadbalancing:ModifyListener"
      ],
      "Resource": "*",
      "Condition": {
        "StringNotLike": {
          "elasticloadbalancing:SSLPolicy": [
            "ELBSecurityPolicy-TLS-1-2-*",
            "ELBSecurityPolicy-FS-*",
            "ELBSecurityPolicy-TLS13-*"
          ]
        }
      }
    },
    {
      "Sid": "DenyUnencryptedRDSConnections",
      "Effect": "Deny",
      "Action": [
        "rds:CreateDBInstance",
        "rds:ModifyDBInstance"
      ],
      "Resource": "*",
      "Condition": {
        "Bool": {
          "rds:db-instance-force-ssl": "false"
        }
      }
    },
    {
      "Sid": "DenyUnencryptedElastiCacheClusters",
      "Effect": "Deny",
      "Action": [
        "elasticache:CreateCacheCluster",
        "elasticache:CreateReplicationGroup"
      ],
      "Resource": "*",
      "Condition": {
        "Bool": {
          "elasticache:TransitEncryptionEnabled": "false"
        }
      }
    },
    {
      "Sid": "DenyAPIGatewayWithoutMinTLS",
      "Effect": "Deny",
      "Action": [
        "apigateway:CreateDomainName",
        "apigateway:UpdateDomainName"
      ],
      "Resource": "*",
      "Condition": {
        "StringNotEquals": {
          "apigateway:SecurityPolicy": [
            "TLS_1_2"
          ]
        }
      }
    },
    {
      "Sid": "DenyUnencryptedCloudFrontDistributions",
      "Effect": "Deny",
      "Action": [
        "cloudfront:CreateDistribution",
        "cloudfront:UpdateDistribution"
      ],
      "Resource": "*",
      "Condition": {
        "StringNotEquals": {
          "cloudfront:ViewerProtocolPolicy": [
            "https-only",
            "redirect-to-https"
          ]
        }
      }
    },
    {
      "Sid": "DenyUnencryptedKinesisStreams",
      "Effect": "Deny",
      "Action": [
        "kinesis:CreateStream"
      ],
      "Resource": "*",
      "Condition": {
        "Null": {
          "kinesis:StreamEncryption": "true"
        }
      }
    },
    {
      "Sid": "DenyUnencryptedSQSQueues",
      "Effect": "Deny",
      "Action": [
        "sqs:CreateQueue"
      ],
      "Resource": "*",
      "Condition": {
        "Null": {
          "sqs:KmsMasterKeyId": "true"
        }
      }
    },
    {
      "Sid": "DenyUnencryptedSNSTopics",
      "Effect": "Deny",
      "Action": [
        "sns:CreateTopic"
      ],
      "Resource": "*",
      "Condition": {
        "Null": {
          "sns:KmsMasterKeyId": "true"
        }
      }
    },
    {
      "Sid": "DenyUnencryptedLambdaEnvironmentVariables",
      "Effect": "Deny",
      "Action": [
        "lambda:CreateFunction",
        "lambda:UpdateFunctionConfiguration"
      ],
      "Resource": "*",
      "Condition": {
        "Null": {
          "lambda:KMSKeyArn": "true"
        }
      }
    }
  ]
}

Example 3: AWS Config Rules for Encryption Monitoring

View code
# config_encryption_rules.py
import boto3
import json
from typing import Dict, List, Any
import logging

logger = logging.getLogger(__name__)

class ConfigEncryptionInTransitRules:
    """
    Deploy and manage AWS Config rules for encryption in transit monitoring
    """
    
    def __init__(self, region: str = 'us-east-1'):
        self.region = region
        self.config_client = boto3.client('config', region_name=region)
        self.lambda_client = boto3.client('lambda', region_name=region)
        
        # Define encryption in transit config rules
        self.encryption_rules = self._define_encryption_rules()
    
    def _define_encryption_rules(self) -> List[Dict[str, Any]]:
        """
        Define AWS Config rules for encryption in transit compliance
        """
        return [
            {
                'ConfigRuleName': 's3-bucket-ssl-requests-only',
                'Description': 'Checks that S3 buckets have policies requiring SSL requests',
                'Source': {
                    'Owner': 'AWS',
                    'SourceIdentifier': 'S3_BUCKET_SSL_REQUESTS_ONLY'
                },
                'Scope': {
                    'ComplianceResourceTypes': ['AWS::S3::Bucket']
                }
            },
            {
                'ConfigRuleName': 'elb-tls-https-listeners-only',
                'Description': 'Checks that ELB listeners use HTTPS or TLS protocols',
                'Source': {
                    'Owner': 'AWS',
                    'SourceIdentifier': 'ELB_TLS_HTTPS_LISTENERS_ONLY'
                },
                'Scope': {
                    'ComplianceResourceTypes': ['AWS::ElasticLoadBalancingV2::Listener']
                }
            },
            {
                'ConfigRuleName': 'rds-instance-ssl-enabled',
                'Description': 'Checks that RDS instances have SSL enabled',
                'Source': {
                    'Owner': 'AWS',
                    'SourceIdentifier': 'RDS_INSTANCE_SSL_ENABLED'
                },
                'Scope': {
                    'ComplianceResourceTypes': ['AWS::RDS::DBInstance']
                }
            },
            {
                'ConfigRuleName': 'elasticache-redis-cluster-encryption-in-transit',
                'Description': 'Checks that ElastiCache Redis clusters have encryption in transit enabled',
                'Source': {
                    'Owner': 'AWS',
                    'SourceIdentifier': 'ELASTICACHE_REDIS_CLUSTER_ENCRYPTION_IN_TRANSIT_ENABLED'
                },
                'Scope': {
                    'ComplianceResourceTypes': ['AWS::ElastiCache::CacheCluster']
                }
            },
            {
                'ConfigRuleName': 'api-gateway-ssl-enabled',
                'Description': 'Checks that API Gateway stages have SSL certificates configured',
                'Source': {
                    'Owner': 'AWS',
                    'SourceIdentifier': 'API_GW_SSL_ENABLED'
                },
                'Scope': {
                    'ComplianceResourceTypes': ['AWS::ApiGateway::Stage']
                }
            },
            {
                'ConfigRuleName': 'cloudfront-viewer-policy-https',
                'Description': 'Checks that CloudFront distributions use HTTPS viewer protocol policy',
                'Source': {
                    'Owner': 'AWS',
                    'SourceIdentifier': 'CLOUDFRONT_VIEWER_POLICY_HTTPS'
                },
                'Scope': {
                    'ComplianceResourceTypes': ['AWS::CloudFront::Distribution']
                }
            },
            {
                'ConfigRuleName': 'alb-http-drop-invalid-header-enabled',
                'Description': 'Checks that ALBs drop invalid HTTP headers',
                'Source': {
                    'Owner': 'AWS',
                    'SourceIdentifier': 'ALB_HTTP_DROP_INVALID_HEADER_ENABLED'
                },
                'Scope': {
                    'ComplianceResourceTypes': ['AWS::ElasticLoadBalancingV2::LoadBalancer']
                }
            },
            {
                'ConfigRuleName': 'custom-ssl-policy-check',
                'Description': 'Custom rule to check SSL policy compliance',
                'Source': {
                    'Owner': 'AWS_CONFIG_RULE',
                    'SourceIdentifier': self._create_custom_ssl_policy_lambda()
                },
                'Scope': {
                    'ComplianceResourceTypes': [
                        'AWS::ElasticLoadBalancingV2::Listener',
                        'AWS::CloudFront::Distribution'
                    ]
                }
            }
        ]
    
    def _create_custom_ssl_policy_lambda(self) -> str:
        """
        Create custom Lambda function for SSL policy validation
        """
        lambda_code = '''
import json
import boto3

def lambda_handler(event, context):
    """
    Custom Config rule to validate SSL/TLS policies
    """
    
    config_client = boto3.client('config')
    
    # Get the configuration item
    configuration_item = event['configurationItem']
    resource_type = configuration_item['resourceType']
    resource_id = configuration_item['resourceId']
    
    compliance_type = 'COMPLIANT'
    annotation = 'Resource is compliant with SSL/TLS policy requirements'
    
    try:
        if resource_type == 'AWS::ElasticLoadBalancingV2::Listener':
            compliance_type, annotation = check_elb_listener_ssl_policy(configuration_item)
        elif resource_type == 'AWS::CloudFront::Distribution':
            compliance_type, annotation = check_cloudfront_ssl_policy(configuration_item)
        
    except Exception as e:
        compliance_type = 'NOT_APPLICABLE'
        annotation = f'Error evaluating resource: {str(e)}'
    
    # Submit evaluation result
    evaluation = {
        'ComplianceResourceType': resource_type,
        'ComplianceResourceId': resource_id,
        'ComplianceType': compliance_type,
        'Annotation': annotation,
        'OrderingTimestamp': configuration_item['configurationItemCaptureTime']
    }
    
    config_client.put_evaluations(
        Evaluations=[evaluation],
        ResultToken=event['resultToken']
    )
    
    return {
        'statusCode': 200,
        'body': json.dumps('Evaluation completed')
    }

def check_elb_listener_ssl_policy(configuration_item):
    """Check ELB listener SSL policy"""
    configuration = configuration_item.get('configuration', {})
    protocol = configuration.get('protocol', '')
    ssl_policy = configuration.get('sslPolicy', '')
    
    if protocol not in ['HTTPS', 'TLS']:
        return 'NON_COMPLIANT', f'Listener uses unencrypted protocol: {protocol}'
    
    # Check for secure SSL policies
    secure_policies = [
        'ELBSecurityPolicy-TLS-1-2-2017-01',
        'ELBSecurityPolicy-TLS-1-2-Ext-2018-06',
        'ELBSecurityPolicy-FS-2018-06',
        'ELBSecurityPolicy-FS-1-2-2019-08',
        'ELBSecurityPolicy-FS-1-2-Res-2019-08',
        'ELBSecurityPolicy-FS-1-2-Res-2020-10',
        'ELBSecurityPolicy-TLS13-1-2-2021-06'
    ]
    
    if ssl_policy not in secure_policies:
        return 'NON_COMPLIANT', f'Listener uses weak SSL policy: {ssl_policy}'
    
    return 'COMPLIANT', f'Listener uses secure SSL policy: {ssl_policy}'

def check_cloudfront_ssl_policy(configuration_item):
    """Check CloudFront distribution SSL policy"""
    configuration = configuration_item.get('configuration', {})
    distribution_config = configuration.get('distributionConfig', {})
    
    # Check viewer protocol policy
    default_cache_behavior = distribution_config.get('defaultCacheBehavior', {})
    viewer_protocol_policy = default_cache_behavior.get('viewerProtocolPolicy', '')
    
    if viewer_protocol_policy not in ['https-only', 'redirect-to-https']:
        return 'NON_COMPLIANT', f'Distribution allows HTTP: {viewer_protocol_policy}'
    
    # Check minimum protocol version
    viewer_certificate = distribution_config.get('viewerCertificate', {})
    minimum_protocol_version = viewer_certificate.get('minimumProtocolVersion', '')
    
    secure_versions = ['TLSv1.2_2018', 'TLSv1.2_2019', 'TLSv1.2_2021']
    if minimum_protocol_version and minimum_protocol_version not in secure_versions:
        return 'NON_COMPLIANT', f'Distribution uses weak TLS version: {minimum_protocol_version}'
    
    return 'COMPLIANT', 'Distribution uses secure HTTPS configuration'
'''
        
        try:
            # Create Lambda function for custom Config rule
            function_name = 'config-ssl-policy-checker'
            
            response = self.lambda_client.create_function(
                FunctionName=function_name,
                Runtime='python3.9',
                Role=f'arn:aws:iam::{self._get_account_id()}:role/ConfigRuleLambdaRole',
                Handler='index.lambda_handler',
                Code={'ZipFile': lambda_code.encode()},
                Description='Custom Config rule for SSL/TLS policy validation',
                Timeout=60,
                Tags={
                    'Purpose': 'ConfigRule',
                    'Type': 'SSLPolicyChecker'
                }
            )
            
            return response['FunctionArn']
            
        except self.lambda_client.exceptions.ResourceConflictException:
            # Function already exists
            response = self.lambda_client.get_function(FunctionName=function_name)
            return response['Configuration']['FunctionArn']
        except Exception as e:
            logger.error(f"Error creating custom Lambda function: {str(e)}")
            return ''
    
    def deploy_encryption_rules(self) -> Dict[str, Any]:
        """
        Deploy all encryption in transit monitoring Config rules
        """
        deployment_results = {
            'total_rules': len(self.encryption_rules),
            'successful_deployments': 0,
            'failed_deployments': 0,
            'results': []
        }
        
        for rule_config in self.encryption_rules:
            try:
                # Check if rule already exists
                try:
                    self.config_client.describe_config_rules(
                        ConfigRuleNames=[rule_config['ConfigRuleName']]
                    )
                    # Rule exists, update it
                    self.config_client.put_config_rule(ConfigRule=rule_config)
                    status = 'updated'
                except self.config_client.exceptions.NoSuchConfigRuleException:
                    # Rule doesn't exist, create it
                    self.config_client.put_config_rule(ConfigRule=rule_config)
                    status = 'created'
                
                deployment_results['successful_deployments'] += 1
                deployment_results['results'].append({
                    'rule_name': rule_config['ConfigRuleName'],
                    'status': status,
                    'message': f'Rule {status} successfully'
                })
                
                logger.info(f"Config rule {rule_config['ConfigRuleName']} {status} successfully")
                
            except Exception as e:
                deployment_results['failed_deployments'] += 1
                deployment_results['results'].append({
                    'rule_name': rule_config['ConfigRuleName'],
                    'status': 'failed',
                    'message': str(e)
                })
                logger.error(f"Failed to deploy Config rule {rule_config['ConfigRuleName']}: {str(e)}")
        
        return deployment_results
    
    def get_encryption_compliance_summary(self) -> Dict[str, Any]:
        """
        Get compliance summary for all encryption in transit rules
        """
        compliance_summary = {
            'timestamp': boto3.client('sts').get_caller_identity(),
            'rules_evaluated': 0,
            'compliant_resources': 0,
            'non_compliant_resources': 0,
            'rule_details': []
        }
        
        for rule_config in self.encryption_rules:
            rule_name = rule_config['ConfigRuleName']
            
            try:
                # Get compliance details for this rule
                compliance_response = self.config_client.get_compliance_details_by_config_rule(
                    ConfigRuleName=rule_name
                )
                
                rule_compliance = {
                    'rule_name': rule_name,
                    'compliant_count': 0,
                    'non_compliant_count': 0,
                    'not_applicable_count': 0,
                    'insufficient_data_count': 0
                }
                
                for result in compliance_response['EvaluationResults']:
                    compliance_type = result['ComplianceType']
                    
                    if compliance_type == 'COMPLIANT':
                        rule_compliance['compliant_count'] += 1
                        compliance_summary['compliant_resources'] += 1
                    elif compliance_type == 'NON_COMPLIANT':
                        rule_compliance['non_compliant_count'] += 1
                        compliance_summary['non_compliant_resources'] += 1
                    elif compliance_type == 'NOT_APPLICABLE':
                        rule_compliance['not_applicable_count'] += 1
                    elif compliance_type == 'INSUFFICIENT_DATA':
                        rule_compliance['insufficient_data_count'] += 1
                
                compliance_summary['rule_details'].append(rule_compliance)
                compliance_summary['rules_evaluated'] += 1
                
            except Exception as e:
                logger.error(f"Error getting compliance for rule {rule_name}: {str(e)}")
                compliance_summary['rule_details'].append({
                    'rule_name': rule_name,
                    'error': str(e)
                })
        
        # Calculate overall compliance percentage
        total_evaluated = compliance_summary['compliant_resources'] + compliance_summary['non_compliant_resources']
        if total_evaluated > 0:
            compliance_summary['compliance_percentage'] = round(
                (compliance_summary['compliant_resources'] / total_evaluated) * 100, 2
            )
        else:
            compliance_summary['compliance_percentage'] = 0.0
        
        return compliance_summary
    
    def _get_account_id(self) -> str:
        """Get AWS account ID"""
        return boto3.client('sts').get_caller_identity()['Account']

# Example usage
if __name__ == "__main__":
    # Initialize Config rules manager
    config_rules = ConfigEncryptionInTransitRules()
    
    # Deploy encryption monitoring rules
    deployment_results = config_rules.deploy_encryption_rules()
    print(f"Config rules deployment: {json.dumps(deployment_results, indent=2)}")
    
    # Get compliance summary
    compliance_summary = config_rules.get_encryption_compliance_summary()
    print(f"Compliance summary: {json.dumps(compliance_summary, indent=2, default=str)}")

Relevant AWS Services

Load Balancing and Content Delivery

  • Elastic Load Balancing (ELB): HTTPS/TLS termination with configurable SSL policies
  • Amazon CloudFront: Global content delivery with HTTPS enforcement
  • AWS Global Accelerator: Improved performance with TLS termination
  • Amazon API Gateway: API management with TLS termination and backend encryption

Database and Caching Services

  • Amazon RDS: SSL/TLS encryption for database connections
  • Amazon ElastiCache: Encryption in transit for Redis and Memcached
  • Amazon DynamoDB: HTTPS API endpoints with TLS encryption
  • Amazon DocumentDB: TLS encryption for MongoDB-compatible database

Storage and Messaging Services

  • Amazon S3: HTTPS-only bucket policies and SSL/TLS enforcement
  • Amazon EFS: Encryption in transit for file system access
  • Amazon SQS: HTTPS endpoints for message queue operations
  • Amazon SNS: TLS encryption for notification delivery

Networking and Connectivity

  • Amazon VPC: VPC endpoints for private encrypted connectivity
  • AWS PrivateLink: Private connectivity between VPCs and AWS services
  • AWS VPN: IPSec encryption for site-to-site and client VPN connections
  • AWS Direct Connect: MACsec encryption for dedicated network connections

Monitoring and Governance

  • AWS Config: Continuous compliance monitoring for encryption policies
  • AWS CloudTrail: Audit logging of encrypted and unencrypted API calls
  • Amazon CloudWatch: Monitoring and alerting for encryption violations
  • AWS Organizations: Service Control Policies for encryption enforcement

Compute and Application Services

  • AWS Lambda: Environment variable encryption and HTTPS endpoints
  • Amazon ECS/EKS: Container communication encryption
  • AWS App Runner: Automatic HTTPS for web applications
  • AWS Batch: Secure job submission and result retrieval

Benefits of Enforcing Encryption in Transit

Security Benefits

  • Data Confidentiality: Protection against eavesdropping and interception
  • Data Integrity: Prevention of data tampering during transmission
  • Authentication: Verification of communication endpoints
  • Non-Repudiation: Proof of data transmission and receipt

Compliance Benefits

  • Regulatory Adherence: Meet requirements for data protection regulations
  • Industry Standards: Compliance with PCI DSS, HIPAA, SOX, and other standards
  • Audit Readiness: Comprehensive audit trails for encrypted communications
  • Risk Management: Reduced risk of data breaches during transmission

Operational Benefits

  • Automated Enforcement: Consistent application of encryption policies
  • Centralized Management: Single point of control for encryption requirements
  • Performance Optimization: Modern TLS implementations with minimal overhead
  • Scalable Architecture: Encryption that scales with infrastructure growth

Business Benefits

  • Customer Trust: Enhanced customer confidence in data protection
  • Competitive Advantage: Strong security posture as business differentiator
  • Cost Avoidance: Prevention of data breach costs and penalties
  • Business Continuity: Secure communications supporting business operations

TLS/SSL Best Practices

Protocol Versions

  • Minimum TLS 1.2: Disable older protocols (SSL 2.0, SSL 3.0, TLS 1.0, TLS 1.1)
  • TLS 1.3 Preferred: Use TLS 1.3 where supported for improved security and performance
  • Protocol Negotiation: Implement proper protocol version negotiation
  • Fallback Protection: Prevent protocol downgrade attacks

Cipher Suite Selection

  • Strong Ciphers Only: Use AEAD ciphers (AES-GCM, ChaCha20-Poly1305)
  • Perfect Forward Secrecy: Prefer ECDHE and DHE key exchange methods
  • Avoid Weak Ciphers: Disable RC4, DES, 3DES, and export-grade ciphers
  • Cipher Ordering: Configure server-preferred cipher suite ordering

Certificate Management

  • Strong Key Sizes: Use RSA 2048+ or ECDSA P-256+ keys
  • Certificate Validation: Implement proper certificate chain validation
  • Certificate Pinning: Use certificate pinning for critical connections
  • Regular Renewal: Automate certificate renewal before expiration

Implementation Considerations

  • HSTS Headers: Implement HTTP Strict Transport Security
  • OCSP Stapling: Enable OCSP stapling for certificate validation
  • Session Management: Implement secure session resumption
  • Error Handling: Proper handling of TLS errors and failures

Common Implementation Patterns

API Gateway with Backend Encryption

View code
Client → HTTPS → API Gateway → HTTPS → Backend Service

Load Balancer SSL Termination

View code
Client → HTTPS → ALB (SSL Termination) → HTTP → Backend

End-to-End Encryption

View code
Client → HTTPS → ALB → HTTPS → Backend Service

Database Connection Encryption

View code
Application → SSL/TLS → RDS/ElastiCache

Service Mesh Encryption

View code
Service A → mTLS → Service B (via Service Mesh)

Monitoring and Alerting

Key Metrics to Monitor

  • TLS Handshake Success Rate: Monitor successful TLS negotiations
  • Certificate Expiration: Track certificate expiration dates
  • Protocol Version Usage: Monitor TLS version distribution
  • Cipher Suite Usage: Track cipher suite selection patterns
  • Encryption Violations: Count of unencrypted connection attempts

Alerting Scenarios

  • Certificate Expiration: Alert 30, 14, and 7 days before expiration
  • Weak Protocol Usage: Alert on TLS 1.0/1.1 usage
  • Unencrypted Connections: Immediate alert on HTTP usage for sensitive data
  • SSL Policy Changes: Alert on SSL policy modifications
  • Certificate Validation Failures: Alert on certificate validation errors

Compliance Reporting

  • Encryption Coverage: Percentage of services with encryption enabled
  • Policy Compliance: Adherence to organizational encryption policies
  • Vulnerability Assessment: Regular assessment of TLS configuration
  • Audit Trail: Complete record of encryption-related activities
View code