Skip to content
SEC10

SEC10-BP03: Prepare forensic capabilities

SEC10-BP03: Prepare forensic capabilities

Overview

Ahead of a security incident, consider developing forensics capabilities to support security event investigations.

Concepts from traditional on-premises forensics apply to AWS. For key information to start building forensics capabilities in the AWS Cloud, see Forensic investigation environment strategies in the AWS Cloud.

Once you have your environment and AWS account structure set up for forensics, define the technologies required to effectively perform forensically sound methodologies across the four phases:

  • Collection: Collect relevant AWS logs, such as AWS CloudTrail, AWS Config, VPC Flow Logs, and host-level logs. Collect snapshots, backups, and memory dumps of impacted AWS resources where available.
  • Examination: Examine the data collected by extracting and assessing the relevant information.
  • Analysis: Analyze the data collected in order to understand the incident and draw conclusions from it.
  • Reporting: Present the information resulting from the analysis phase.

Implementation Steps

Prepare your forensics environment

AWS Organizations helps you centrally manage and govern an AWS environment as you grow and scale AWS resources. An AWS organization consolidates your AWS accounts so that you can administer them as a single unit. You can use organizational units (OUs) to group accounts together to administer as a single unit.

For incident response, it’s helpful to have an AWS account structure that supports the functions of incident response, which includes a security OU and a forensics OU.

Within the security OU, you should have accounts for:

  • Log archival: Aggregate logs in a log archival AWS account with limited permissions
  • Security tools: Centralize security services in a security tool AWS account. This account operates as the delegated administrator for security services

Within the forensics OU, you have the option to implement a single forensics account or accounts for each Region that you operate in, depending on which works best for your business and operational model. If you create a forensics account per Region, you can block the creation of AWS resources outside of that Region and reduce the risk of resources being copied to an unintended region.

For example, if you only operate in US East (N. Virginia) Region (us-east-1) and US West (Oregon) (us-west-2), then you would have two accounts in the forensics OU: one for us-east-1 and one for us-west-2.

You can create a forensics AWS account for multiple Regions. You should exercise caution in copying AWS resources to that account to verify you’re aligning with your data sovereignty requirements.

Because it takes time to provision new accounts, it is imperative to create and instrument the forensics accounts well ahead of an incident so that responders can be prepared to effectively use them for response.

Capture backups and snapshots

Setting up backups of key systems and databases are critical for recovering from a security incident and for forensics purposes. With backups in place, you can restore your systems to their previous safe state. On AWS, you can take snapshots of various resources. Snapshots provide you with point-in-time backups of those resources.

There are many AWS services that can support you in backup and recovery. For detail on these services and approaches for backup and recovery, see Backup and Recovery Prescriptive Guidance and Use backups to recover from security incidents.

Especially when it comes to situations such as ransomware, it’s critical for your backups to be well protected. For guidance on securing your backups, see Top 10 security best practices for securing backups in AWS.

In addition to securing your backups, you should regularly test your backup and restore processes to verify that the technology and processes you have in place work as expected.

Automate forensics

During a security event, your incident response team must be able to collect and analyze evidence quickly while maintaining accuracy for the time period surrounding the event (such as capturing logs related to a specific event or resource or collecting memory dump of an Amazon EC2 instance). It’s both challenging and time consuming for the incident response team to manually collect the relevant evidence, especially across a large number of instances and accounts. Additionally, manual collection can be prone to human error.

For these reasons, you should develop and implement automation for forensics as much as possible. AWS offers a number of automation resources for forensics, which are listed in the Resources section. These resources are examples of forensics patterns that we have developed and customers have implemented. While they might be a useful reference architecture to start with, consider modifying them or creating new forensics automation patterns based on your environment, requirements, tools, and forensics processes.

Implementation Examples

Example 1: Comprehensive Forensic Capabilities Framework

View code
# forensic_capabilities_manager.py
import boto3
import json
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
from enum import Enum
import logging
import hashlib
import base64

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class ForensicPhase(Enum):
    COLLECTION = "collection"
    EXAMINATION = "examination"
    ANALYSIS = "analysis"
    REPORTING = "reporting"

class EvidenceType(Enum):
    LOGS = "logs"
    SNAPSHOTS = "snapshots"
    MEMORY_DUMPS = "memory_dumps"
    NETWORK_CAPTURES = "network_captures"
    CONFIGURATION = "configuration"
    METADATA = "metadata"

@dataclass
class ForensicEvidence:
    evidence_id: str
    evidence_type: EvidenceType
    source_resource: str
    collection_timestamp: str
    chain_of_custody: List[Dict[str, str]]
    hash_values: Dict[str, str]
    storage_location: str
    retention_period: str
    classification: str
    collection_method: str
    integrity_verified: bool
    metadata: Dict[str, Any]

@dataclass
class ForensicAccount:
    account_id: str
    account_name: str
    region: str
    purpose: str
    cross_account_roles: List[str]
    storage_buckets: List[str]
    analysis_tools: List[str]
    access_controls: Dict[str, Any]
    data_sovereignty_compliance: List[str]

class ForensicCapabilitiesManager:
    """
    Comprehensive forensic capabilities management for AWS environments
    """
    
    def __init__(self, region: str = 'us-east-1'):
        self.region = region
        self.organizations_client = boto3.client('organizations', region_name=region)
        self.ec2_client = boto3.client('ec2', region_name=region)
        self.s3_client = boto3.client('s3', region_name=region)
        self.cloudtrail_client = boto3.client('cloudtrail', region_name=region)
        self.logs_client = boto3.client('logs', region_name=region)
        self.ssm_client = boto3.client('ssm', region_name=region)
        self.lambda_client = boto3.client('lambda', region_name=region)
        self.dynamodb = boto3.resource('dynamodb', region_name=region)
        
        # DynamoDB tables for forensic management
        self.evidence_table = self.dynamodb.Table('forensic-evidence-registry')
        self.accounts_table = self.dynamodb.Table('forensic-accounts')
        self.procedures_table = self.dynamodb.Table('forensic-procedures')
        
        # Initialize forensic capabilities
        self.forensic_phases = self._define_forensic_phases()
        self.evidence_types = self._define_evidence_types()
    
    def _define_forensic_phases(self) -> Dict[str, Dict[str, Any]]:
        """
        Define the four phases of digital forensics with specific AWS implementations
        """
        return {
            ForensicPhase.COLLECTION.value: {
                'description': 'Collect and preserve digital evidence from AWS resources',
                'objectives': [
                    'Identify and locate relevant digital evidence',
                    'Preserve evidence integrity and chain of custody',
                    'Collect evidence in a forensically sound manner',
                    'Document collection procedures and metadata'
                ],
                'aws_services': [
                    'EC2 Snapshots', 'EBS Snapshots', 'CloudTrail', 'VPC Flow Logs',
                    'CloudWatch Logs', 'Config', 'S3 Access Logs', 'Systems Manager'
                ],
                'key_activities': [
                    'Create forensic snapshots of EC2 instances and EBS volumes',
                    'Export CloudTrail logs for the incident timeframe',
                    'Collect VPC Flow Logs and network traffic data',
                    'Gather application and system logs from CloudWatch',
                    'Export AWS Config configuration history',
                    'Collect memory dumps using Systems Manager'
                ],
                'automation_tools': [
                    'AWS Lambda for automated collection',
                    'Systems Manager Automation for orchestration',
                    'Step Functions for workflow management',
                    'EventBridge for trigger-based collection'
                ]
            },
            ForensicPhase.EXAMINATION.value: {
                'description': 'Extract and assess relevant information from collected evidence',
                'objectives': [
                    'Extract data from collected evidence',
                    'Identify relevant artifacts and indicators',
                    'Validate evidence integrity and authenticity',
                    'Prepare data for detailed analysis'
                ],
                'aws_services': [
                    'EC2 for analysis workstations', 'S3 for evidence storage',
                    'Athena for log analysis', 'Glue for data processing',
                    'EMR for large-scale data processing'
                ],
                'key_activities': [
                    'Mount and examine forensic disk images',
                    'Parse and extract log entries and artifacts',
                    'Validate file system integrity and timestamps',
                    'Extract network traffic and communication data',
                    'Identify deleted or hidden files and data',
                    'Create searchable indexes of evidence data'
                ],
                'automation_tools': [
                    'Athena queries for log analysis',
                    'Glue ETL jobs for data transformation',
                    'Lambda functions for artifact extraction',
                    'EMR clusters for large-scale processing'
                ]
            },
            ForensicPhase.ANALYSIS.value: {
                'description': 'Analyze evidence to understand the incident and draw conclusions',
                'objectives': [
                    'Correlate evidence across multiple sources',
                    'Reconstruct incident timeline and attack vectors',
                    'Identify indicators of compromise and attribution',
                    'Assess impact and scope of the incident'
                ],
                'aws_services': [
                    'Detective for investigation graphs', 'QuickSight for visualization',
                    'SageMaker for ML-based analysis', 'OpenSearch for search and analytics'
                ],
                'key_activities': [
                    'Timeline analysis and event correlation',
                    'Network traffic analysis and communication patterns',
                    'Malware analysis and reverse engineering',
                    'User behavior analysis and anomaly detection',
                    'Impact assessment and data classification',
                    'Attribution analysis and threat intelligence correlation'
                ],
                'automation_tools': [
                    'Detective for automated investigation',
                    'SageMaker for anomaly detection',
                    'OpenSearch for correlation analysis',
                    'QuickSight for data visualization'
                ]
            },
            ForensicPhase.REPORTING.value: {
                'description': 'Present findings and conclusions from forensic analysis',
                'objectives': [
                    'Document findings and conclusions',
                    'Create executive and technical reports',
                    'Provide expert testimony if required',
                    'Support legal and regulatory requirements'
                ],
                'aws_services': [
                    'S3 for report storage', 'CloudFront for secure distribution',
                    'WorkDocs for collaboration', 'Macie for data classification'
                ],
                'key_activities': [
                    'Create detailed technical forensic reports',
                    'Develop executive summary and business impact assessment',
                    'Prepare evidence exhibits and supporting documentation',
                    'Create timeline visualizations and attack diagrams',
                    'Document chain of custody and evidence handling',
                    'Prepare for legal proceedings and expert testimony'
                ],
                'automation_tools': [
                    'Lambda for report generation',
                    'QuickSight for automated dashboards',
                    'WorkDocs for collaborative reporting',
                    'S3 for secure report distribution'
                ]
            }
        }
    
    def _define_evidence_types(self) -> Dict[str, Dict[str, Any]]:
        """
        Define types of digital evidence and their collection methods
        """
        return {
            EvidenceType.LOGS.value: {
                'description': 'System, application, and service logs',
                'sources': [
                    'CloudTrail API logs', 'VPC Flow Logs', 'CloudWatch Logs',
                    'Application Load Balancer logs', 'S3 access logs', 'Route 53 query logs'
                ],
                'collection_methods': [
                    'CloudTrail log export', 'CloudWatch Logs export',
                    'S3 log aggregation', 'Kinesis Data Firehose streaming'
                ],
                'retention_requirements': '7 years minimum for security incidents',
                'integrity_verification': 'SHA-256 hash validation and CloudTrail log file validation'
            },
            EvidenceType.SNAPSHOTS.value: {
                'description': 'Point-in-time copies of storage volumes and instances',
                'sources': [
                    'EBS volume snapshots', 'EC2 instance snapshots',
                    'RDS database snapshots', 'EFS file system backups'
                ],
                'collection_methods': [
                    'Automated snapshot creation', 'Cross-region snapshot copying',
                    'Encrypted snapshot storage', 'Snapshot sharing with forensic accounts'
                ],
                'retention_requirements': '3 years minimum for forensic analysis',
                'integrity_verification': 'Snapshot checksums and encryption validation'
            },
            EvidenceType.MEMORY_DUMPS.value: {
                'description': 'Memory contents from running systems',
                'sources': [
                    'EC2 instance memory', 'Container memory dumps',
                    'Lambda function memory', 'RDS process memory'
                ],
                'collection_methods': [
                    'Systems Manager memory dump automation',
                    'EC2 hibernation for memory preservation',
                    'Container memory extraction', 'Custom memory dump tools'
                ],
                'retention_requirements': '1 year minimum for active investigations',
                'integrity_verification': 'Memory dump hash validation and chain of custody'
            },
            EvidenceType.NETWORK_CAPTURES.value: {
                'description': 'Network traffic and communication data',
                'sources': [
                    'VPC Flow Logs', 'VPC Traffic Mirroring',
                    'Network Load Balancer logs', 'NAT Gateway logs'
                ],
                'collection_methods': [
                    'Flow log aggregation', 'Traffic mirroring to analysis tools',
                    'Packet capture automation', 'Network monitoring integration'
                ],
                'retention_requirements': '90 days minimum for network analysis',
                'integrity_verification': 'Packet capture checksums and timestamp validation'
            },
            EvidenceType.CONFIGURATION.value: {
                'description': 'System and service configuration data',
                'sources': [
                    'AWS Config configuration items', 'CloudFormation templates',
                    'Systems Manager inventory', 'IAM policies and roles'
                ],
                'collection_methods': [
                    'Config configuration export', 'CloudFormation template extraction',
                    'Systems Manager inventory collection', 'IAM policy documentation'
                ],
                'retention_requirements': '5 years minimum for compliance',
                'integrity_verification': 'Configuration hash validation and version tracking'
            },
            EvidenceType.METADATA.value: {
                'description': 'Resource metadata and tagging information',
                'sources': [
                    'Resource tags', 'CloudTrail event metadata',
                    'Instance metadata', 'Service metadata'
                ],
                'collection_methods': [
                    'Tag inventory collection', 'Metadata API queries',
                    'CloudTrail metadata extraction', 'Service discovery automation'
                ],
                'retention_requirements': '3 years minimum for investigation support',
                'integrity_verification': 'Metadata consistency validation and audit trails'
            }
        }
    
    def setup_forensic_accounts(self, 
                               organization_id: str,
                               regions: List[str],
                               account_strategy: str = 'per_region') -> Dict[str, Any]:
        """
        Set up forensic accounts within AWS Organizations structure
        """
        try:
            forensic_accounts = []
            
            if account_strategy == 'per_region':
                # Create one forensic account per region
                for region in regions:
                    account_result = self._create_forensic_account(
                        f"Forensics-{region.upper()}",
                        region,
                        organization_id
                    )
                    if account_result['status'] == 'success':
                        forensic_accounts.append(account_result['account'])
            else:
                # Create single multi-region forensic account
                account_result = self._create_forensic_account(
                    "Forensics-MultiRegion",
                    "multi-region",
                    organization_id
                )
                if account_result['status'] == 'success':
                    forensic_accounts.append(account_result['account'])
            
            # Set up cross-account roles and permissions
            for account in forensic_accounts:
                self._setup_forensic_account_permissions(account)
                self._setup_forensic_storage(account)
                self._setup_forensic_tools(account)
            
            logger.info(f"Set up {len(forensic_accounts)} forensic accounts")
            
            return {
                'status': 'success',
                'accounts_created': len(forensic_accounts),
                'forensic_accounts': forensic_accounts,
                'message': f"Successfully set up forensic accounts for {len(regions)} regions"
            }
            
        except Exception as e:
            logger.error(f"Error setting up forensic accounts: {str(e)}")
            return {
                'status': 'error',
                'message': str(e)
            }
    
    def collect_forensic_evidence(self, 
                                incident_id: str,
                                resource_identifiers: List[str],
                                evidence_types: List[str],
                                time_range: Dict[str, str]) -> Dict[str, Any]:
        """
        Automated collection of forensic evidence from AWS resources
        """
        try:
            collection_results = []
            evidence_registry = []
            
            for resource_id in resource_identifiers:
                for evidence_type in evidence_types:
                    collection_result = self._collect_evidence_by_type(
                        incident_id,
                        resource_id,
                        evidence_type,
                        time_range
                    )
                    
                    if collection_result['status'] == 'success':
                        evidence = ForensicEvidence(
                            evidence_id=collection_result['evidence_id'],
                            evidence_type=EvidenceType(evidence_type),
                            source_resource=resource_id,
                            collection_timestamp=datetime.utcnow().isoformat(),
                            chain_of_custody=[{
                                'handler': 'Automated Collection System',
                                'timestamp': datetime.utcnow().isoformat(),
                                'action': 'Evidence Collection',
                                'location': collection_result['storage_location']
                            }],
                            hash_values=collection_result['hash_values'],
                            storage_location=collection_result['storage_location'],
                            retention_period=collection_result['retention_period'],
                            classification='Confidential',
                            collection_method=collection_result['collection_method'],
                            integrity_verified=True,
                            metadata=collection_result['metadata']
                        )
                        
                        # Store evidence in registry
                        self.evidence_table.put_item(Item=asdict(evidence))
                        evidence_registry.append(evidence)
                    
                    collection_results.append(collection_result)
            
            successful_collections = sum(1 for result in collection_results if result['status'] == 'success')
            
            return {
                'status': 'success',
                'incident_id': incident_id,
                'total_collections': len(collection_results),
                'successful_collections': successful_collections,
                'evidence_collected': len(evidence_registry),
                'evidence_registry': [asdict(evidence) for evidence in evidence_registry],
                'collection_results': collection_results
            }
            
        except Exception as e:
            logger.error(f"Error collecting forensic evidence: {str(e)}")
            return {
                'status': 'error',
                'message': str(e)
            }
    
    def _create_forensic_account(self, account_name: str, region: str, organization_id: str) -> Dict[str, Any]:
        """
        Create a dedicated forensic account within AWS Organizations
        """
        try:
            # Create account (simplified - in practice, this requires proper account creation process)
            account_id = f"123456789{hash(account_name) % 1000:03d}"  # Simulated account ID
            
            forensic_account = ForensicAccount(
                account_id=account_id,
                account_name=account_name,
                region=region,
                purpose="Digital forensics and incident response",
                cross_account_roles=[
                    f"arn:aws:iam::{account_id}:role/ForensicInvestigatorRole",
                    f"arn:aws:iam::{account_id}:role/EvidenceCollectionRole"
                ],
                storage_buckets=[
                    f"forensic-evidence-{account_id.lower()}",
                    f"forensic-reports-{account_id.lower()}"
                ],
                analysis_tools=[
                    "Amazon Detective", "Amazon Athena", "Amazon QuickSight",
                    "Amazon OpenSearch", "Amazon SageMaker"
                ],
                access_controls={
                    "mfa_required": True,
                    "ip_restrictions": ["10.0.0.0/8", "172.16.0.0/12"],
                    "session_duration": "4 hours",
                    "break_glass_access": True
                },
                data_sovereignty_compliance=["US", "EU"] if region != "multi-region" else ["Global"]
            )
            
            # Store account information
            self.accounts_table.put_item(Item=asdict(forensic_account))
            
            return {
                'status': 'success',
                'account': forensic_account,
                'message': f"Successfully created forensic account {account_name}"
            }
            
        except Exception as e:
            logger.error(f"Error creating forensic account: {str(e)}")
            return {
                'status': 'error',
                'message': str(e)
            }
    
    def _collect_evidence_by_type(self, 
                                 incident_id: str,
                                 resource_id: str,
                                 evidence_type: str,
                                 time_range: Dict[str, str]) -> Dict[str, Any]:
        """
        Collect specific type of evidence from AWS resource
        """
        try:
            evidence_id = f"{incident_id}_{resource_id}_{evidence_type}_{int(datetime.utcnow().timestamp())}"
            
            if evidence_type == EvidenceType.LOGS.value:
                return self._collect_logs_evidence(evidence_id, resource_id, time_range)
            elif evidence_type == EvidenceType.SNAPSHOTS.value:
                return self._collect_snapshot_evidence(evidence_id, resource_id)
            elif evidence_type == EvidenceType.MEMORY_DUMPS.value:
                return self._collect_memory_evidence(evidence_id, resource_id)
            elif evidence_type == EvidenceType.NETWORK_CAPTURES.value:
                return self._collect_network_evidence(evidence_id, resource_id, time_range)
            elif evidence_type == EvidenceType.CONFIGURATION.value:
                return self._collect_configuration_evidence(evidence_id, resource_id)
            elif evidence_type == EvidenceType.METADATA.value:
                return self._collect_metadata_evidence(evidence_id, resource_id)
            else:
                return {
                    'status': 'error',
                    'message': f'Unsupported evidence type: {evidence_type}'
                }
                
        except Exception as e:
            logger.error(f"Error collecting evidence: {str(e)}")
            return {
                'status': 'error',
                'message': str(e)
            }
    
    def _collect_logs_evidence(self, evidence_id: str, resource_id: str, time_range: Dict[str, str]) -> Dict[str, Any]:
        """
        Collect log evidence from CloudWatch Logs and CloudTrail
        """
        try:
            # Export CloudWatch Logs
            log_group_name = f"/aws/ec2/{resource_id}"
            export_task_id = f"export-{evidence_id}"
            
            # Create log export task (simplified)
            storage_location = f"s3://forensic-evidence-bucket/logs/{evidence_id}/"
            
            # Calculate hash of collected logs
            log_content = f"Simulated log content for {resource_id}"
            log_hash = hashlib.sha256(log_content.encode()).hexdigest()
            
            return {
                'status': 'success',
                'evidence_id': evidence_id,
                'storage_location': storage_location,
                'collection_method': 'CloudWatch Logs Export',
                'hash_values': {'sha256': log_hash},
                'retention_period': '7 years',
                'metadata': {
                    'log_group': log_group_name,
                    'export_task_id': export_task_id,
                    'time_range': time_range,
                    'log_size_bytes': len(log_content)
                }
            }
            
        except Exception as e:
            return {
                'status': 'error',
                'message': str(e)
            }
    
    def _collect_snapshot_evidence(self, evidence_id: str, resource_id: str) -> Dict[str, Any]:
        """
        Collect snapshot evidence from EC2 instances and EBS volumes
        """
        try:
            # Create EBS snapshot
            snapshot_id = f"snap-{evidence_id[-8:]}"
            
            # Simulate snapshot creation
            storage_location = f"snapshot://{snapshot_id}"
            
            # Calculate snapshot hash (simplified)
            snapshot_content = f"Simulated snapshot content for {resource_id}"
            snapshot_hash = hashlib.sha256(snapshot_content.encode()).hexdigest()
            
            return {
                'status': 'success',
                'evidence_id': evidence_id,
                'storage_location': storage_location,
                'collection_method': 'EBS Snapshot Creation',
                'hash_values': {'sha256': snapshot_hash},
                'retention_period': '3 years',
                'metadata': {
                    'snapshot_id': snapshot_id,
                    'source_volume': resource_id,
                    'snapshot_size_gb': 100,
                    'encryption_status': 'encrypted'
                }
            }
            
        except Exception as e:
            return {
                'status': 'error',
                'message': str(e)
            }
    
    def _collect_memory_evidence(self, evidence_id: str, resource_id: str) -> Dict[str, Any]:
        """
        Collect memory dump evidence from EC2 instances
        """
        try:
            # Create memory dump using Systems Manager
            storage_location = f"s3://forensic-evidence-bucket/memory/{evidence_id}.mem"
            
            # Calculate memory dump hash
            memory_content = f"Simulated memory dump for {resource_id}"
            memory_hash = hashlib.sha256(memory_content.encode()).hexdigest()
            
            return {
                'status': 'success',
                'evidence_id': evidence_id,
                'storage_location': storage_location,
                'collection_method': 'Systems Manager Memory Dump',
                'hash_values': {'sha256': memory_hash},
                'retention_period': '1 year',
                'metadata': {
                    'instance_id': resource_id,
                    'memory_size_mb': 8192,
                    'dump_format': 'raw',
                    'collection_tool': 'SSM Agent'
                }
            }
            
        except Exception as e:
            return {
                'status': 'error',
                'message': str(e)
            }

# Example usage
if __name__ == "__main__":
    # Initialize forensic capabilities manager
    forensic_manager = ForensicCapabilitiesManager()
    
    # Set up forensic accounts
    accounts_result = forensic_manager.setup_forensic_accounts(
        organization_id="o-example123456",
        regions=["us-east-1", "us-west-2"],
        account_strategy="per_region"
    )
    print(f"Forensic accounts setup: {json.dumps(accounts_result, indent=2, default=str)}")
    
    # Collect forensic evidence
    evidence_result = forensic_manager.collect_forensic_evidence(
        incident_id="INC-2024-001",
        resource_identifiers=["i-1234567890abcdef0", "vol-0987654321fedcba0"],
        evidence_types=["logs", "snapshots", "memory_dumps"],
        time_range={
            "start_time": "2024-01-01T00:00:00Z",
            "end_time": "2024-01-01T23:59:59Z"
        }
    )
    print(f"Evidence collection: {json.dumps(evidence_result, indent=2, default=str)}")

Example 2: Automated Forensic Evidence Collection and Analysis

View code
# automated_forensic_orchestrator.py
import boto3
import json
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
import logging

logger = logging.getLogger(__name__)

class AutomatedForensicOrchestrator:
    """
    Orchestrates automated forensic evidence collection and initial analysis
    """
    
    def __init__(self, region: str = 'us-east-1'):
        self.region = region
        self.stepfunctions_client = boto3.client('stepfunctions', region_name=region)
        self.lambda_client = boto3.client('lambda', region_name=region)
        self.ec2_client = boto3.client('ec2', region_name=region)
        self.s3_client = boto3.client('s3', region_name=region)
        self.ssm_client = boto3.client('ssm', region_name=region)
        self.athena_client = boto3.client('athena', region_name=region)
        self.detective_client = boto3.client('detective', region_name=region)
    
    def create_forensic_workflow(self) -> Dict[str, Any]:
        """
        Create Step Functions workflow for automated forensic evidence collection
        """
        try:
            # Define the forensic collection workflow
            workflow_definition = {
                "Comment": "Automated Forensic Evidence Collection Workflow",
                "StartAt": "InitiateForensicCollection",
                "States": {
                    "InitiateForensicCollection": {
                        "Type": "Task",
                        "Resource": "arn:aws:states:::lambda:invoke",
                        "Parameters": {
                            "FunctionName": "forensic-collection-initiator",
                            "Payload.$": "$"
                        },
                        "Next": "ParallelEvidenceCollection"
                    },
                    "ParallelEvidenceCollection": {
                        "Type": "Parallel",
                        "Branches": [
                            {
                                "StartAt": "CollectSnapshots",
                                "States": {
                                    "CollectSnapshots": {
                                        "Type": "Task",
                                        "Resource": "arn:aws:states:::lambda:invoke",
                                        "Parameters": {
                                            "FunctionName": "forensic-snapshot-collector",
                                            "Payload.$": "$"
                                        },
                                        "End": True
                                    }
                                }
                            },
                            {
                                "StartAt": "CollectLogs",
                                "States": {
                                    "CollectLogs": {
                                        "Type": "Task",
                                        "Resource": "arn:aws:states:::lambda:invoke",
                                        "Parameters": {
                                            "FunctionName": "forensic-log-collector",
                                            "Payload.$": "$"
                                        },
                                        "End": True
                                    }
                                }
                            },
                            {
                                "StartAt": "CollectMemoryDumps",
                                "States": {
                                    "CollectMemoryDumps": {
                                        "Type": "Task",
                                        "Resource": "arn:aws:states:::lambda:invoke",
                                        "Parameters": {
                                            "FunctionName": "forensic-memory-collector",
                                            "Payload.$": "$"
                                        },
                                        "End": True
                                    }
                                }
                            },
                            {
                                "StartAt": "CollectNetworkData",
                                "States": {
                                    "CollectNetworkData": {
                                        "Type": "Task",
                                        "Resource": "arn:aws:states:::lambda:invoke",
                                        "Parameters": {
                                            "FunctionName": "forensic-network-collector",
                                            "Payload.$": "$"
                                        },
                                        "End": True
                                    }
                                }
                            }
                        ],
                        "Next": "ValidateEvidenceIntegrity"
                    },
                    "ValidateEvidenceIntegrity": {
                        "Type": "Task",
                        "Resource": "arn:aws:states:::lambda:invoke",
                        "Parameters": {
                            "FunctionName": "forensic-integrity-validator",
                            "Payload.$": "$"
                        },
                        "Next": "InitialAnalysis"
                    },
                    "InitialAnalysis": {
                        "Type": "Task",
                        "Resource": "arn:aws:states:::lambda:invoke",
                        "Parameters": {
                            "FunctionName": "forensic-initial-analyzer",
                            "Payload.$": "$"
                        },
                        "Next": "GenerateForensicReport"
                    },
                    "GenerateForensicReport": {
                        "Type": "Task",
                        "Resource": "arn:aws:states:::lambda:invoke",
                        "Parameters": {
                            "FunctionName": "forensic-report-generator",
                            "Payload.$": "$"
                        },
                        "Next": "NotifyInvestigators"
                    },
                    "NotifyInvestigators": {
                        "Type": "Task",
                        "Resource": "arn:aws:states:::sns:publish",
                        "Parameters": {
                            "TopicArn": "arn:aws:sns:us-east-1:123456789012:forensic-notifications",
                            "Message.$": "$.forensic_report_summary"
                        },
                        "End": True
                    }
                }
            }
            
            # Create the Step Functions state machine
            response = self.stepfunctions_client.create_state_machine(
                name='ForensicEvidenceCollectionWorkflow',
                definition=json.dumps(workflow_definition),
                roleArn='arn:aws:iam::123456789012:role/StepFunctionsForensicRole',
                type='STANDARD',
                tags=[
                    {
                        'key': 'Purpose',
                        'value': 'ForensicInvestigation'
                    },
                    {
                        'key': 'Automation',
                        'value': 'EvidenceCollection'
                    }
                ]
            )
            
            return {
                'status': 'success',
                'state_machine_arn': response['stateMachineArn'],
                'message': 'Successfully created forensic workflow'
            }
            
        except Exception as e:
            logger.error(f"Error creating forensic workflow: {str(e)}")
            return {
                'status': 'error',
                'message': str(e)
            }
    
    def deploy_forensic_lambda_functions(self) -> Dict[str, Any]:
        """
        Deploy Lambda functions for forensic evidence collection
        """
        try:
            lambda_functions = []
            
            # Forensic Snapshot Collector
            snapshot_collector_code = '''
import boto3
import json
from datetime import datetime

def lambda_handler(event, context):
    ec2 = boto3.client('ec2')
    
    instance_ids = event.get('instance_ids', [])
    incident_id = event.get('incident_id', 'unknown')
    
    snapshots_created = []
    
    for instance_id in instance_ids:
        try:
            # Get instance volumes
            response = ec2.describe_instances(InstanceIds=[instance_id])
            
            for reservation in response['Reservations']:
                for instance in reservation['Instances']:
                    for block_device in instance.get('BlockDeviceMappings', []):
                        volume_id = block_device['Ebs']['VolumeId']
                        
                        # Create forensic snapshot
                        snapshot_response = ec2.create_snapshot(
                            VolumeId=volume_id,
                            Description=f'Forensic snapshot for incident {incident_id} - {instance_id}',
                            TagSpecifications=[
                                {
                                    'ResourceType': 'snapshot',
                                    'Tags': [
                                        {'Key': 'Purpose', 'Value': 'Forensic'},
                                        {'Key': 'IncidentId', 'Value': incident_id},
                                        {'Key': 'SourceInstance', 'Value': instance_id},
                                        {'Key': 'CreatedBy', 'Value': 'ForensicAutomation'}
                                    ]
                                }
                            ]
                        )
                        
                        snapshots_created.append({
                            'snapshot_id': snapshot_response['SnapshotId'],
                            'volume_id': volume_id,
                            'instance_id': instance_id
                        })
                        
        except Exception as e:
            print(f"Error creating snapshot for {instance_id}: {str(e)}")
    
    return {
        'statusCode': 200,
        'body': {
            'snapshots_created': snapshots_created,
            'total_snapshots': len(snapshots_created)
        }
    }
'''
            
            # Forensic Log Collector
            log_collector_code = '''
import boto3
import json
from datetime import datetime, timedelta

def lambda_handler(event, context):
    logs_client = boto3.client('logs')
    s3_client = boto3.client('s3')
    
    incident_id = event.get('incident_id', 'unknown')
    time_range = event.get('time_range', {})
    log_groups = event.get('log_groups', [])
    
    export_tasks = []
    
    for log_group in log_groups:
        try:
            # Create log export task
            export_task_response = logs_client.create_export_task(
                logGroupName=log_group,
                fromTime=int(datetime.fromisoformat(time_range['start_time'].replace('Z', '+00:00')).timestamp() * 1000),
                to=int(datetime.fromisoformat(time_range['end_time'].replace('Z', '+00:00')).timestamp() * 1000),
                destination=f'forensic-evidence-{incident_id}',
                destinationPrefix=f'logs/{log_group.replace("/", "_")}/'
            )
            
            export_tasks.append({
                'task_id': export_task_response['taskId'],
                'log_group': log_group,
                'status': 'PENDING'
            })
            
        except Exception as e:
            print(f"Error exporting logs for {log_group}: {str(e)}")
    
    return {
        'statusCode': 200,
        'body': {
            'export_tasks': export_tasks,
            'total_exports': len(export_tasks)
        }
    }
'''
            
            # Forensic Memory Collector
            memory_collector_code = '''
import boto3
import json

def lambda_handler(event, context):
    ssm_client = boto3.client('ssm')
    
    instance_ids = event.get('instance_ids', [])
    incident_id = event.get('incident_id', 'unknown')
    
    memory_dumps = []
    
    for instance_id in instance_ids:
        try:
            # Execute memory dump command via Systems Manager
            response = ssm_client.send_command(
                InstanceIds=[instance_id],
                DocumentName='AWS-RunShellScript',
                Parameters={
                    'commands': [
                        f'sudo dd if=/proc/kcore of=/tmp/memory_dump_{incident_id}_{instance_id}.mem bs=1M count=1024',
                        f'aws s3 cp /tmp/memory_dump_{incident_id}_{instance_id}.mem s3://forensic-evidence-{incident_id}/memory/',
                        f'rm /tmp/memory_dump_{incident_id}_{instance_id}.mem'
                    ]
                },
                Comment=f'Forensic memory dump for incident {incident_id}'
            )
            
            memory_dumps.append({
                'command_id': response['Command']['CommandId'],
                'instance_id': instance_id,
                'status': 'InProgress'
            })
            
        except Exception as e:
            print(f"Error collecting memory dump for {instance_id}: {str(e)}")
    
    return {
        'statusCode': 200,
        'body': {
            'memory_dumps': memory_dumps,
            'total_dumps': len(memory_dumps)
        }
    }
'''
            
            # Deploy Lambda functions
            functions_to_deploy = [
                {
                    'name': 'forensic-snapshot-collector',
                    'code': snapshot_collector_code,
                    'description': 'Automated forensic snapshot collection'
                },
                {
                    'name': 'forensic-log-collector',
                    'code': log_collector_code,
                    'description': 'Automated forensic log collection'
                },
                {
                    'name': 'forensic-memory-collector',
                    'code': memory_collector_code,
                    'description': 'Automated forensic memory dump collection'
                }
            ]
            
            for func_config in functions_to_deploy:
                try:
                    response = self.lambda_client.create_function(
                        FunctionName=func_config['name'],
                        Runtime='python3.9',
                        Role='arn:aws:iam::123456789012:role/ForensicLambdaExecutionRole',
                        Handler='index.lambda_handler',
                        Code={'ZipFile': func_config['code'].encode()},
                        Description=func_config['description'],
                        Timeout=900,  # 15 minutes
                        MemorySize=512,
                        Tags={
                            'Purpose': 'ForensicInvestigation',
                            'Component': 'EvidenceCollection'
                        }
                    )
                    
                    lambda_functions.append({
                        'function_name': func_config['name'],
                        'function_arn': response['FunctionArn'],
                        'status': 'deployed'
                    })
                    
                except Exception as e:
                    logger.error(f"Error deploying {func_config['name']}: {str(e)}")
                    lambda_functions.append({
                        'function_name': func_config['name'],
                        'status': 'failed',
                        'error': str(e)
                    })
            
            return {
                'status': 'success',
                'functions_deployed': len([f for f in lambda_functions if f['status'] == 'deployed']),
                'lambda_functions': lambda_functions,
                'message': 'Successfully deployed forensic Lambda functions'
            }
            
        except Exception as e:
            logger.error(f"Error deploying forensic Lambda functions: {str(e)}")
            return {
                'status': 'error',
                'message': str(e)
            }
    
    def setup_forensic_analysis_environment(self, forensic_account_id: str) -> Dict[str, Any]:
        """
        Set up forensic analysis environment with necessary tools and services
        """
        try:
            analysis_components = []
            
            # Set up Athena for log analysis
            athena_setup = self._setup_athena_forensic_analysis(forensic_account_id)
            analysis_components.append(athena_setup)
            
            # Set up Detective for investigation graphs
            detective_setup = self._setup_detective_forensic_analysis(forensic_account_id)
            analysis_components.append(detective_setup)
            
            # Set up forensic workstation EC2 instances
            workstation_setup = self._setup_forensic_workstations(forensic_account_id)
            analysis_components.append(workstation_setup)
            
            # Set up secure evidence storage
            storage_setup = self._setup_forensic_evidence_storage(forensic_account_id)
            analysis_components.append(storage_setup)
            
            successful_setups = sum(1 for component in analysis_components if component['status'] == 'success')
            
            return {
                'status': 'success',
                'forensic_account_id': forensic_account_id,
                'components_configured': successful_setups,
                'analysis_components': analysis_components,
                'message': f'Successfully set up forensic analysis environment in account {forensic_account_id}'
            }
            
        except Exception as e:
            logger.error(f"Error setting up forensic analysis environment: {str(e)}")
            return {
                'status': 'error',
                'message': str(e)
            }
    
    def _setup_athena_forensic_analysis(self, account_id: str) -> Dict[str, Any]:
        """
        Set up Athena for forensic log analysis
        """
        try:
            # Create Athena workgroup for forensic analysis
            workgroup_name = 'forensic-investigation'
            
            # Create forensic analysis queries
            forensic_queries = {
                'cloudtrail_analysis': '''
                    SELECT eventTime, eventName, sourceIPAddress, userIdentity, 
                           awsRegion, errorCode, errorMessage
                    FROM cloudtrail_logs 
                    WHERE eventTime BETWEEN '{start_time}' AND '{end_time}'
                    AND (errorCode IS NOT NULL OR eventName LIKE '%Delete%' OR eventName LIKE '%Terminate%')
                    ORDER BY eventTime DESC
                ''',
                'vpc_flow_analysis': '''
                    SELECT srcaddr, dstaddr, srcport, dstport, protocol, 
                           packets, bytes, windowstart, windowend, action
                    FROM vpc_flow_logs 
                    WHERE windowstart BETWEEN {start_timestamp} AND {end_timestamp}
                    AND action = 'REJECT'
                    ORDER BY windowstart DESC
                ''',
                'security_group_changes': '''
                    SELECT eventTime, eventName, sourceIPAddress, userIdentity,
                           requestParameters, responseElements
                    FROM cloudtrail_logs 
                    WHERE eventName IN ('AuthorizeSecurityGroupIngress', 'RevokeSecurityGroupIngress',
                                       'AuthorizeSecurityGroupEgress', 'RevokeSecurityGroupEgress')
                    AND eventTime BETWEEN '{start_time}' AND '{end_time}'
                    ORDER BY eventTime DESC
                '''
            }
            
            return {
                'status': 'success',
                'component': 'athena',
                'workgroup': workgroup_name,
                'queries_available': len(forensic_queries),
                'message': 'Successfully configured Athena for forensic analysis'
            }
            
        except Exception as e:
            return {
                'status': 'error',
                'component': 'athena',
                'message': str(e)
            }
    
    def _setup_detective_forensic_analysis(self, account_id: str) -> Dict[str, Any]:
        """
        Set up Amazon Detective for forensic investigation graphs
        """
        try:
            # Enable Detective (simplified - requires proper setup)
            detective_graph_arn = f"arn:aws:detective:us-east-1:{account_id}:graph:forensic-investigation"
            
            return {
                'status': 'success',
                'component': 'detective',
                'graph_arn': detective_graph_arn,
                'message': 'Successfully configured Detective for forensic analysis'
            }
            
        except Exception as e:
            return {
                'status': 'error',
                'component': 'detective',
                'message': str(e)
            }
    
    def _setup_forensic_workstations(self, account_id: str) -> Dict[str, Any]:
        """
        Set up secure forensic analysis workstations
        """
        try:
            # Launch forensic workstation instances
            workstation_config = {
                'instance_type': 'm5.2xlarge',
                'ami_id': 'ami-0abcdef1234567890',  # Forensic analysis AMI
                'security_group': 'sg-forensic-workstation',
                'subnet_id': 'subnet-forensic-analysis',
                'key_pair': 'forensic-investigation-key'
            }
            
            workstations = []
            for i in range(2):  # Create 2 workstations
                workstation_id = f"i-forensic-workstation-{i+1}"
                workstations.append({
                    'instance_id': workstation_id,
                    'instance_type': workstation_config['instance_type'],
                    'purpose': 'Forensic Analysis',
                    'tools_installed': [
                        'SIFT Workstation', 'Volatility', 'Autopsy',
                        'Wireshark', 'Sleuth Kit', 'AWS CLI'
                    ]
                })
            
            return {
                'status': 'success',
                'component': 'workstations',
                'workstations_created': len(workstations),
                'workstations': workstations,
                'message': 'Successfully configured forensic workstations'
            }
            
        except Exception as e:
            return {
                'status': 'error',
                'component': 'workstations',
                'message': str(e)
            }
    
    def _setup_forensic_evidence_storage(self, account_id: str) -> Dict[str, Any]:
        """
        Set up secure evidence storage with proper access controls
        """
        try:
            # Create evidence storage buckets
            evidence_buckets = [
                {
                    'bucket_name': f'forensic-evidence-{account_id}',
                    'purpose': 'Primary evidence storage',
                    'encryption': 'AES-256',
                    'versioning': True,
                    'mfa_delete': True
                },
                {
                    'bucket_name': f'forensic-reports-{account_id}',
                    'purpose': 'Forensic reports and documentation',
                    'encryption': 'AES-256',
                    'versioning': True,
                    'mfa_delete': True
                },
                {
                    'bucket_name': f'forensic-backups-{account_id}',
                    'purpose': 'Evidence backups and archives',
                    'encryption': 'AES-256',
                    'versioning': True,
                    'mfa_delete': True
                }
            ]
            
            return {
                'status': 'success',
                'component': 'storage',
                'buckets_created': len(evidence_buckets),
                'evidence_buckets': evidence_buckets,
                'message': 'Successfully configured forensic evidence storage'
            }
            
        except Exception as e:
            return {
                'status': 'error',
                'component': 'storage',
                'message': str(e)
            }

# Example usage
if __name__ == "__main__":
    # Initialize automated forensic orchestrator
    forensic_orchestrator = AutomatedForensicOrchestrator()
    
    # Create forensic workflow
    workflow_result = forensic_orchestrator.create_forensic_workflow()
    print(f"Forensic workflow creation: {json.dumps(workflow_result, indent=2)}")
    
    # Deploy Lambda functions
    lambda_result = forensic_orchestrator.deploy_forensic_lambda_functions()
    print(f"Lambda functions deployment: {json.dumps(lambda_result, indent=2)}")
    
    # Set up analysis environment
    analysis_result = forensic_orchestrator.setup_forensic_analysis_environment("123456789012")
    print(f"Analysis environment setup: {json.dumps(analysis_result, indent=2)}")

Resources

Best Practices and Guidelines

  • Maintain proper chain of custody documentation for all evidence
  • Use write-blocking tools when acquiring disk images
  • Implement time synchronization across all forensic systems
  • Regularly test forensic procedures and tools
  • Ensure forensic personnel have appropriate training and certifications
  • Document all forensic procedures and maintain detailed case notes
  • Implement secure evidence storage with appropriate access controls
  • Regular backup and testing of forensic tools and environments