Skip to content
SEC11

SEC11-BP02: Automate testing throughout the development and release lifecycle

One-Click Remediation

Deploy CloudFormation stacks to implement this best practice with a single click.

IaC Security Scanning Pipeline
Provision a CI/CD pipeline that validates infrastructure changes with security and policy checks
Launch Stack

Stacks deploy to your AWS account. Review parameters before creating. Standard AWS charges apply.

SEC11-BP02: Automate testing throughout the development and release lifecycle

Overview

Implement automated security testing throughout the software development lifecycle (SDLC) to identify and remediate security vulnerabilities early and continuously. This includes static application security testing (SAST), dynamic application security testing (DAST), interactive application security testing (IAST), dependency scanning, and infrastructure as code (IaC) security testing.

Implementation Guidance

Automated security testing is essential for maintaining security at the speed of modern software development. Manual security testing alone cannot keep pace with continuous integration and deployment practices. By integrating automated security testing throughout the development lifecycle, organizations can identify vulnerabilities early when they are less expensive to fix, ensure consistent security validation, and maintain security standards across all releases.

Key Principles of Automated Security Testing

Shift-Left Security: Move security testing earlier in the development process to catch issues when they are easier and cheaper to fix. This includes integrating security testing into developer IDEs, pre-commit hooks, and early CI/CD pipeline stages.

Comprehensive Coverage: Implement multiple types of automated security testing to cover different aspects of application security, including source code, dependencies, runtime behavior, and infrastructure configuration.

Continuous Integration: Integrate security testing into CI/CD pipelines to ensure every code change is automatically tested for security issues before deployment.

Fast Feedback: Provide rapid feedback to developers about security issues so they can be addressed quickly without disrupting development velocity.

Risk-Based Approach: Prioritize security testing based on risk assessment, focusing more intensive testing on high-risk components and critical security controls.

Implementation Steps

Step 1: Implement Static Application Security Testing (SAST)

Deploy SAST tools to analyze source code for security vulnerabilities:

View code
# SAST Integration Framework
import boto3
import json
import subprocess
import os
from datetime import datetime
from typing import Dict, List, Optional

class SASTIntegration:
    def __init__(self):
        self.codebuild = boto3.client('codebuild')
        self.s3 = boto3.client('s3')
        self.sns = boto3.client('sns')
        self.dynamodb = boto3.resource('dynamodb')
        self.results_table = self.dynamodb.Table('sast-scan-results')
        
    def configure_sast_tools(self, project_config: Dict) -> Dict:
        """
        Configure multiple SAST tools for comprehensive coverage
        """
        sast_tools_config = {
            'sonarqube': {
                'enabled': True,
                'server_url': project_config.get('sonarqube_url', 'https://sonar.company.com'),
                'project_key': project_config['project_name'],
                'quality_gate': 'security_focused',
                'coverage_threshold': 80,
                'security_hotspot_threshold': 0,
                'vulnerability_threshold': 0,
                'languages': project_config.get('languages', ['java', 'python', 'javascript']),
                'exclusions': [
                    '**/test/**',
                    '**/tests/**',
                    '**/node_modules/**',
                    '**/vendor/**'
                ]
            },
            'semgrep': {
                'enabled': True,
                'ruleset': 'security',
                'custom_rules_path': 'security/semgrep-rules',
                'severity_threshold': 'WARNING',
                'languages': project_config.get('languages', ['python', 'javascript', 'go']),
                'exclude_patterns': [
                    'test_*.py',
                    '*.test.js',
                    'mock_*.py'
                ]
            },
            'bandit': {
                'enabled': project_config.get('languages', []).count('python') > 0,
                'config_file': '.bandit',
                'severity_level': 'medium',
                'confidence_level': 'medium',
                'exclude_dirs': ['tests', 'test'],
                'skip_tests': ['B101']  # Skip assert_used test
            },
            'eslint_security': {
                'enabled': 'javascript' in project_config.get('languages', []),
                'plugins': ['security', 'security-node'],
                'rules': {
                    'security/detect-object-injection': 'error',
                    'security/detect-non-literal-regexp': 'error',
                    'security/detect-unsafe-regex': 'error',
                    'security/detect-buffer-noassert': 'error',
                    'security/detect-child-process': 'error',
                    'security/detect-disable-mustache-escape': 'error',
                    'security/detect-eval-with-expression': 'error',
                    'security/detect-no-csrf-before-method-override': 'error',
                    'security/detect-non-literal-fs-filename': 'error',
                    'security/detect-non-literal-require': 'error',
                    'security/detect-possible-timing-attacks': 'error',
                    'security/detect-pseudoRandomBytes': 'error'
                }
            },
            'gosec': {
                'enabled': 'go' in project_config.get('languages', []),
                'severity': 'medium',
                'confidence': 'medium',
                'exclude_rules': [],
                'include_rules': ['G101', 'G102', 'G103', 'G104', 'G105']
            }
        }
        
        return sast_tools_config
    
    def create_sast_pipeline_stage(self, tool_config: Dict) -> str:
        """
        Create CodeBuild project for SAST scanning
        """
        buildspec = {
            'version': '0.2',
            'phases': {
                'install': {
                    'runtime-versions': {
                        'python': '3.9',
                        'nodejs': '16'
                    },
                    'commands': [
                        'echo "Installing SAST tools..."',
                        'pip install bandit semgrep',
                        'npm install -g eslint eslint-plugin-security',
                        'wget -O sonar-scanner.zip https://binaries.sonarsource.com/Distribution/sonar-scanner-cli/sonar-scanner-cli-4.7.0.2747-linux.zip',
                        'unzip sonar-scanner.zip',
                        'export PATH=$PATH:$PWD/sonar-scanner-4.7.0.2747-linux/bin'
                    ]
                },
                'pre_build': {
                    'commands': [
                        'echo "Preparing SAST scan..."',
                        'mkdir -p sast-results'
                    ]
                },
                'build': {
                    'commands': [
                        'echo "Running SAST scans..."',
                        self.generate_sast_commands(tool_config)
                    ]
                },
                'post_build': {
                    'commands': [
                        'echo "Processing SAST results..."',
                        'python scripts/process_sast_results.py',
                        'aws s3 cp sast-results/ s3://$SAST_RESULTS_BUCKET/$(date +%Y%m%d-%H%M%S)/ --recursive'
                    ]
                }
            },
            'artifacts': {
                'files': [
                    'sast-results/**/*'
                ]
            }
        }
        
        project_config = {
            'name': f"sast-scan-{tool_config['project_name']}",
            'source': {
                'type': 'CODEPIPELINE',
                'buildspec': json.dumps(buildspec, indent=2)
            },
            'artifacts': {
                'type': 'CODEPIPELINE'
            },
            'environment': {
                'type': 'LINUX_CONTAINER',
                'image': 'aws/codebuild/amazonlinux2-x86_64-standard:3.0',
                'computeType': 'BUILD_GENERAL1_MEDIUM',
                'environmentVariables': [
                    {
                        'name': 'SAST_RESULTS_BUCKET',
                        'value': tool_config.get('results_bucket', 'security-scan-results')
                    },
                    {
                        'name': 'SONAR_TOKEN',
                        'value': 'sonar-token',
                        'type': 'SECRETS_MANAGER'
                    }
                ]
            },
            'serviceRole': tool_config.get('service_role_arn')
        }
        
        response = self.codebuild.create_project(**project_config)
        return response['project']['arn']
    
    def generate_sast_commands(self, tool_config: Dict) -> str:
        """
        Generate SAST scanning commands based on enabled tools
        """
        commands = []
        
        # SonarQube scan
        if tool_config.get('sonarqube', {}).get('enabled'):
            sonar_config = tool_config['sonarqube']
            commands.extend([
                f'sonar-scanner \\',
                f'  -Dsonar.projectKey={sonar_config["project_key"]} \\',
                f'  -Dsonar.sources=. \\',
                f'  -Dsonar.host.url={sonar_config["server_url"]} \\',
                f'  -Dsonar.login=$SONAR_TOKEN \\',
                f'  -Dsonar.exclusions="{",".join(sonar_config["exclusions"])}" \\',
                f'  -Dsonar.qualitygate.wait=true',
                'sonar_exit_code=$?'
            ])
        
        # Semgrep scan
        if tool_config.get('semgrep', {}).get('enabled'):
            semgrep_config = tool_config['semgrep']
            commands.extend([
                f'semgrep --config=auto \\',
                f'  --severity={semgrep_config["severity_threshold"]} \\',
                f'  --json \\',
                f'  --output=sast-results/semgrep-results.json \\',
                f'  --exclude="{" --exclude=".join(semgrep_config["exclude_patterns"])}" \\',
                f'  .',
                'semgrep_exit_code=$?'
            ])
        
        # Bandit scan for Python
        if tool_config.get('bandit', {}).get('enabled'):
            bandit_config = tool_config['bandit']
            commands.extend([
                f'bandit -r . \\',
                f'  -f json \\',
                f'  -o sast-results/bandit-results.json \\',
                f'  -ll \\',
                f'  -i \\',
                f'  --exclude={",".join(bandit_config["exclude_dirs"])} \\',
                f'  --skip={",".join(bandit_config["skip_tests"])} || true',
                'bandit_exit_code=$?'
            ])
        
        # ESLint security scan for JavaScript
        if tool_config.get('eslint_security', {}).get('enabled'):
            commands.extend([
                'eslint . \\',
                '  --ext .js,.jsx,.ts,.tsx \\',
                '  --format json \\',
                '  --output-file sast-results/eslint-security-results.json \\',
                '  --no-error-on-unmatched-pattern || true',
                'eslint_exit_code=$?'
            ])
        
        # Gosec scan for Go
        if tool_config.get('gosec', {}).get('enabled'):
            gosec_config = tool_config['gosec']
            commands.extend([
                f'gosec -fmt json -out sast-results/gosec-results.json ./...',
                'gosec_exit_code=$?'
            ])
        
        # Combine all commands
        return ' && '.join(commands)
    
    def process_sast_results(self, scan_results_path: str, project_name: str) -> Dict:
        """
        Process and normalize SAST results from multiple tools
        """
        consolidated_results = {
            'scan_id': f"SAST-{datetime.now().strftime('%Y%m%d-%H%M%S')}",
            'project_name': project_name,
            'scan_timestamp': datetime.now().isoformat(),
            'tools_used': [],
            'summary': {
                'total_issues': 0,
                'critical_issues': 0,
                'high_issues': 0,
                'medium_issues': 0,
                'low_issues': 0,
                'info_issues': 0
            },
            'issues': [],
            'metrics': {
                'lines_of_code': 0,
                'files_scanned': 0,
                'scan_duration_seconds': 0
            }
        }
        
        # Process SonarQube results
        sonar_results_file = os.path.join(scan_results_path, 'sonar-results.json')
        if os.path.exists(sonar_results_file):
            sonar_issues = self.parse_sonarqube_results(sonar_results_file)
            consolidated_results['tools_used'].append('SonarQube')
            consolidated_results['issues'].extend(sonar_issues)
        
        # Process Semgrep results
        semgrep_results_file = os.path.join(scan_results_path, 'semgrep-results.json')
        if os.path.exists(semgrep_results_file):
            semgrep_issues = self.parse_semgrep_results(semgrep_results_file)
            consolidated_results['tools_used'].append('Semgrep')
            consolidated_results['issues'].extend(semgrep_issues)
        
        # Process Bandit results
        bandit_results_file = os.path.join(scan_results_path, 'bandit-results.json')
        if os.path.exists(bandit_results_file):
            bandit_issues = self.parse_bandit_results(bandit_results_file)
            consolidated_results['tools_used'].append('Bandit')
            consolidated_results['issues'].extend(bandit_issues)
        
        # Process ESLint results
        eslint_results_file = os.path.join(scan_results_path, 'eslint-security-results.json')
        if os.path.exists(eslint_results_file):
            eslint_issues = self.parse_eslint_results(eslint_results_file)
            consolidated_results['tools_used'].append('ESLint Security')
            consolidated_results['issues'].extend(eslint_issues)
        
        # Process Gosec results
        gosec_results_file = os.path.join(scan_results_path, 'gosec-results.json')
        if os.path.exists(gosec_results_file):
            gosec_issues = self.parse_gosec_results(gosec_results_file)
            consolidated_results['tools_used'].append('Gosec')
            consolidated_results['issues'].extend(gosec_issues)
        
        # Calculate summary statistics
        consolidated_results = self.calculate_summary_stats(consolidated_results)
        
        # Store results in DynamoDB
        self.store_scan_results(consolidated_results)
        
        return consolidated_results
    
    def parse_sonarqube_results(self, results_file: str) -> List[Dict]:
        """
        Parse SonarQube results and normalize format
        """
        issues = []
        try:
            with open(results_file, 'r') as f:
                sonar_data = json.load(f)
            
            for issue in sonar_data.get('issues', []):
                normalized_issue = {
                    'tool': 'SonarQube',
                    'rule_id': issue.get('rule'),
                    'severity': self.normalize_severity(issue.get('severity')),
                    'message': issue.get('message'),
                    'file_path': issue.get('component', '').replace(f"{sonar_data.get('projectKey', '')}:", ''),
                    'line_number': issue.get('line', 0),
                    'category': issue.get('type', 'VULNERABILITY'),
                    'cwe_id': self.extract_cwe_from_sonar(issue),
                    'confidence': 'HIGH',
                    'effort_minutes': issue.get('effort', 0)
                }
                issues.append(normalized_issue)
                
        except Exception as e:
            print(f"Error parsing SonarQube results: {e}")
        
        return issues
    
    def parse_semgrep_results(self, results_file: str) -> List[Dict]:
        """
        Parse Semgrep results and normalize format
        """
        issues = []
        try:
            with open(results_file, 'r') as f:
                semgrep_data = json.load(f)
            
            for result in semgrep_data.get('results', []):
                normalized_issue = {
                    'tool': 'Semgrep',
                    'rule_id': result.get('check_id'),
                    'severity': self.normalize_severity(result.get('extra', {}).get('severity')),
                    'message': result.get('extra', {}).get('message'),
                    'file_path': result.get('path'),
                    'line_number': result.get('start', {}).get('line', 0),
                    'category': 'VULNERABILITY',
                    'cwe_id': self.extract_cwe_from_semgrep(result),
                    'confidence': result.get('extra', {}).get('metadata', {}).get('confidence', 'MEDIUM'),
                    'owasp_category': result.get('extra', {}).get('metadata', {}).get('owasp')
                }
                issues.append(normalized_issue)
                
        except Exception as e:
            print(f"Error parsing Semgrep results: {e}")
        
        return issues
    
    def parse_bandit_results(self, results_file: str) -> List[Dict]:
        """
        Parse Bandit results and normalize format
        """
        issues = []
        try:
            with open(results_file, 'r') as f:
                bandit_data = json.load(f)
            
            for result in bandit_data.get('results', []):
                normalized_issue = {
                    'tool': 'Bandit',
                    'rule_id': result.get('test_id'),
                    'severity': self.normalize_severity(result.get('issue_severity')),
                    'message': result.get('issue_text'),
                    'file_path': result.get('filename'),
                    'line_number': result.get('line_number', 0),
                    'category': 'VULNERABILITY',
                    'cwe_id': result.get('issue_cwe', {}).get('id'),
                    'confidence': result.get('issue_confidence'),
                    'more_info': result.get('more_info')
                }
                issues.append(normalized_issue)
                
        except Exception as e:
            print(f"Error parsing Bandit results: {e}")
        
        return issues
    
    def normalize_severity(self, severity: str) -> str:
        """
        Normalize severity levels across different tools
        """
        severity_mapping = {
            # SonarQube
            'BLOCKER': 'CRITICAL',
            'CRITICAL': 'CRITICAL',
            'MAJOR': 'HIGH',
            'MINOR': 'MEDIUM',
            'INFO': 'LOW',
            
            # Semgrep
            'ERROR': 'CRITICAL',
            'WARNING': 'HIGH',
            'INFO': 'LOW',
            
            # Bandit
            'HIGH': 'HIGH',
            'MEDIUM': 'MEDIUM',
            'LOW': 'LOW',
            
            # ESLint
            '2': 'HIGH',
            '1': 'MEDIUM',
            '0': 'LOW'
        }
        
        return severity_mapping.get(str(severity).upper(), 'MEDIUM')
    
    def calculate_summary_stats(self, results: Dict) -> Dict:
        """
        Calculate summary statistics for scan results
        """
        severity_counts = {
            'CRITICAL': 0,
            'HIGH': 0,
            'MEDIUM': 0,
            'LOW': 0,
            'INFO': 0
        }
        
        for issue in results['issues']:
            severity = issue.get('severity', 'MEDIUM')
            if severity in severity_counts:
                severity_counts[severity] += 1
        
        results['summary'] = {
            'total_issues': len(results['issues']),
            'critical_issues': severity_counts['CRITICAL'],
            'high_issues': severity_counts['HIGH'],
            'medium_issues': severity_counts['MEDIUM'],
            'low_issues': severity_counts['LOW'],
            'info_issues': severity_counts['INFO']
        }
        
        return results
    
    def store_scan_results(self, results: Dict):
        """
        Store scan results in DynamoDB for tracking and reporting
        """
        try:
            self.results_table.put_item(Item=results)
        except Exception as e:
            print(f"Error storing scan results: {e}")
    
    def create_security_gates(self, gate_config: Dict) -> Dict:
        """
        Create security gates based on SAST results
        """
        security_gates = {
            'gate_id': f"GATE-{datetime.now().strftime('%Y%m%d-%H%M%S')}",
            'project_name': gate_config['project_name'],
            'gate_rules': {
                'critical_issues_threshold': gate_config.get('critical_threshold', 0),
                'high_issues_threshold': gate_config.get('high_threshold', 5),
                'medium_issues_threshold': gate_config.get('medium_threshold', 20),
                'total_issues_threshold': gate_config.get('total_threshold', 50),
                'new_issues_threshold': gate_config.get('new_issues_threshold', 0)
            },
            'actions': {
                'block_deployment': gate_config.get('block_deployment', True),
                'notify_security_team': gate_config.get('notify_security', True),
                'create_jira_tickets': gate_config.get('create_tickets', True),
                'fail_build': gate_config.get('fail_build', True)
            }
        }
        
        return security_gates

# Example usage
sast_integration = SASTIntegration()

# Configure SAST tools for a project
project_config = {
    'project_name': 'secure-web-app',
    'languages': ['python', 'javascript'],
    'sonarqube_url': 'https://sonar.company.com',
    'results_bucket': 'security-scan-results-bucket'
}

sast_config = sast_integration.configure_sast_tools(project_config)
print("SAST Configuration:")
print(json.dumps(sast_config, indent=2))

# Create SAST pipeline stage
pipeline_arn = sast_integration.create_sast_pipeline_stage(sast_config)
print(f"\\nSAST Pipeline ARN: {pipeline_arn}")

Step 2: Implement Dynamic Application Security Testing (DAST)

Deploy DAST tools to test running applications for security vulnerabilities:

View code
# DAST Integration Framework
import boto3
import json
import requests
import time
from datetime import datetime
from typing import Dict, List, Optional

class DASTIntegration:
    def __init__(self):
        self.ecs = boto3.client('ecs')
        self.ec2 = boto3.client('ec2')
        self.ssm = boto3.client('ssm')
        self.s3 = boto3.client('s3')
        self.dynamodb = boto3.resource('dynamodb')
        self.results_table = self.dynamodb.Table('dast-scan-results')
        
    def configure_dast_tools(self, application_config: Dict) -> Dict:
        """
        Configure DAST tools for comprehensive runtime security testing
        """
        dast_tools_config = {
            'owasp_zap': {
                'enabled': True,
                'docker_image': 'owasp/zap2docker-stable',
                'scan_types': ['baseline', 'full', 'api'],
                'authentication': {
                    'enabled': application_config.get('requires_auth', False),
                    'auth_type': application_config.get('auth_type', 'form'),
                    'login_url': application_config.get('login_url'),
                    'username_field': 'username',
                    'password_field': 'password',
                    'credentials_secret': application_config.get('test_credentials_secret')
                },
                'scan_policies': {
                    'baseline': {
                        'passive_scan_only': True,
                        'max_duration_minutes': 30,
                        'alert_threshold': 'MEDIUM'
                    },
                    'full': {
                        'active_scan': True,
                        'max_duration_minutes': 120,
                        'alert_threshold': 'LOW',
                        'attack_strength': 'MEDIUM'
                    },
                    'api': {
                        'api_definition': application_config.get('openapi_spec_url'),
                        'max_duration_minutes': 60,
                        'alert_threshold': 'MEDIUM'
                    }
                },
                'exclusions': [
                    '/logout',
                    '/admin/delete',
                    '/api/v1/users/delete'
                ]
            },
            'nuclei': {
                'enabled': True,
                'docker_image': 'projectdiscovery/nuclei',
                'templates': [
                    'cves',
                    'vulnerabilities',
                    'security-misconfiguration',
                    'exposed-panels',
                    'technologies'
                ],
                'severity_threshold': 'medium',
                'rate_limit': 150,  # requests per second
                'timeout': 10,
                'retries': 1
            },
            'nikto': {
                'enabled': True,
                'docker_image': 'sullo/nikto',
                'scan_options': [
                    '-Tuning', '1,2,3,4,5,6,7,8,9,0',
                    '-Format', 'json',
                    '-maxtime', '3600'
                ],
                'plugins': 'ALL'
            },
            'custom_security_tests': {
                'enabled': True,
                'test_suite_image': 'company/security-test-suite:latest',
                'test_categories': [
                    'authentication_bypass',
                    'authorization_flaws',
                    'input_validation',
                    'session_management',
                    'business_logic'
                ]
            }
        }
        
        return dast_tools_config
    
    def create_dast_environment(self, app_config: Dict) -> Dict:
        """
        Create isolated environment for DAST testing
        """
        # Create VPC for DAST testing
        vpc_response = self.ec2.create_vpc(
            CidrBlock='10.0.0.0/16',
            TagSpecifications=[
                {
                    'ResourceType': 'vpc',
                    'Tags': [
                        {'Key': 'Name', 'Value': f"dast-vpc-{app_config['app_name']}"},
                        {'Key': 'Purpose', 'Value': 'DAST-Testing'},
                        {'Key': 'Environment', 'Value': 'testing'}
                    ]
                }
            ]
        )
        vpc_id = vpc_response['Vpc']['VpcId']
        
        # Create subnet
        subnet_response = self.ec2.create_subnet(
            VpcId=vpc_id,
            CidrBlock='10.0.1.0/24',
            TagSpecifications=[
                {
                    'ResourceType': 'subnet',
                    'Tags': [
                        {'Key': 'Name', 'Value': f"dast-subnet-{app_config['app_name']}"}
                    ]
                }
            ]
        )
        subnet_id = subnet_response['Subnet']['SubnetId']
        
        # Create security group for DAST testing
        sg_response = self.ec2.create_security_group(
            GroupName=f"dast-sg-{app_config['app_name']}",
            Description='Security group for DAST testing environment',
            VpcId=vpc_id,
            TagSpecifications=[
                {
                    'ResourceType': 'security-group',
                    'Tags': [
                        {'Key': 'Name', 'Value': f"dast-sg-{app_config['app_name']}"}
                    ]
                }
            ]
        )
        sg_id = sg_response['GroupId']
        
        # Configure security group rules
        self.ec2.authorize_security_group_ingress(
            GroupId=sg_id,
            IpPermissions=[
                {
                    'IpProtocol': 'tcp',
                    'FromPort': 80,
                    'ToPort': 80,
                    'IpRanges': [{'CidrIp': '10.0.0.0/16'}]
                },
                {
                    'IpProtocol': 'tcp',
                    'FromPort': 443,
                    'ToPort': 443,
                    'IpRanges': [{'CidrIp': '10.0.0.0/16'}]
                },
                {
                    'IpProtocol': 'tcp',
                    'FromPort': 8080,
                    'ToPort': 8090,
                    'IpRanges': [{'CidrIp': '10.0.0.0/16'}]
                }
            ]
        )
        
        # Create ECS cluster for DAST tools
        cluster_response = self.ecs.create_cluster(
            clusterName=f"dast-cluster-{app_config['app_name']}",
            tags=[
                {'key': 'Purpose', 'value': 'DAST-Testing'},
                {'key': 'Application', 'value': app_config['app_name']}
            ]
        )
        
        environment_config = {
            'vpc_id': vpc_id,
            'subnet_id': subnet_id,
            'security_group_id': sg_id,
            'cluster_arn': cluster_response['cluster']['clusterArn'],
            'cluster_name': cluster_response['cluster']['clusterName']
        }
        
        return environment_config
    
    def deploy_application_under_test(self, app_config: Dict, environment: Dict) -> Dict:
        """
        Deploy application in isolated environment for testing
        """
        task_definition = {
            'family': f"dast-app-{app_config['app_name']}",
            'networkMode': 'awsvpc',
            'requiresCompatibilities': ['FARGATE'],
            'cpu': '512',
            'memory': '1024',
            'executionRoleArn': app_config.get('execution_role_arn'),
            'taskRoleArn': app_config.get('task_role_arn'),
            'containerDefinitions': [
                {
                    'name': 'application',
                    'image': app_config['docker_image'],
                    'portMappings': [
                        {
                            'containerPort': app_config.get('port', 8080),
                            'protocol': 'tcp'
                        }
                    ],
                    'environment': [
                        {'name': 'ENV', 'value': 'dast-testing'},
                        {'name': 'DEBUG', 'value': 'false'},
                        {'name': 'LOG_LEVEL', 'value': 'INFO'}
                    ],
                    'secrets': [
                        {
                            'name': 'DB_PASSWORD',
                            'valueFrom': app_config.get('db_secret_arn')
                        }
                    ] if app_config.get('db_secret_arn') else [],
                    'logConfiguration': {
                        'logDriver': 'awslogs',
                        'options': {
                            'awslogs-group': f"/ecs/dast-{app_config['app_name']}",
                            'awslogs-region': 'us-east-1',
                            'awslogs-stream-prefix': 'ecs'
                        }
                    },
                    'healthCheck': {
                        'command': [
                            'CMD-SHELL',
                            f"curl -f http://localhost:{app_config.get('port', 8080)}/health || exit 1"
                        ],
                        'interval': 30,
                        'timeout': 5,
                        'retries': 3,
                        'startPeriod': 60
                    }
                }
            ]
        }
        
        # Register task definition
        task_def_response = self.ecs.register_task_definition(**task_definition)
        
        # Create service
        service_config = {
            'cluster': environment['cluster_name'],
            'serviceName': f"dast-service-{app_config['app_name']}",
            'taskDefinition': task_def_response['taskDefinition']['taskDefinitionArn'],
            'desiredCount': 1,
            'launchType': 'FARGATE',
            'networkConfiguration': {
                'awsvpcConfiguration': {
                    'subnets': [environment['subnet_id']],
                    'securityGroups': [environment['security_group_id']],
                    'assignPublicIp': 'ENABLED'
                }
            },
            'tags': [
                {'key': 'Purpose', 'value': 'DAST-Testing'},
                {'key': 'Application', 'value': app_config['app_name']}
            ]
        }
        
        service_response = self.ecs.create_service(**service_config)
        
        # Wait for service to be stable
        waiter = self.ecs.get_waiter('services_stable')
        waiter.wait(
            cluster=environment['cluster_name'],
            services=[service_config['serviceName']],
            WaiterConfig={'delay': 15, 'maxAttempts': 40}
        )
        
        # Get service endpoint
        service_endpoint = self.get_service_endpoint(
            environment['cluster_name'],
            service_config['serviceName']
        )
        
        deployment_info = {
            'task_definition_arn': task_def_response['taskDefinition']['taskDefinitionArn'],
            'service_arn': service_response['service']['serviceArn'],
            'service_name': service_config['serviceName'],
            'endpoint': service_endpoint,
            'health_check_url': f"http://{service_endpoint}/health"
        }
        
        return deployment_info
    
    def run_owasp_zap_scan(self, target_url: str, scan_config: Dict) -> Dict:
        """
        Run OWASP ZAP security scan
        """
        scan_id = f"ZAP-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
        
        # Create ZAP task definition
        zap_task_definition = {
            'family': f'zap-scanner-{scan_id}',
            'networkMode': 'awsvpc',
            'requiresCompatibilities': ['FARGATE'],
            'cpu': '1024',
            'memory': '2048',
            'executionRoleArn': scan_config.get('execution_role_arn'),
            'containerDefinitions': [
                {
                    'name': 'zap-scanner',
                    'image': scan_config['owasp_zap']['docker_image'],
                    'command': self.build_zap_command(target_url, scan_config),
                    'environment': [
                        {'name': 'ZAP_PORT', 'value': '8080'},
                        {'name': 'TARGET_URL', 'value': target_url}
                    ],
                    'logConfiguration': {
                        'logDriver': 'awslogs',
                        'options': {
                            'awslogs-group': f'/ecs/zap-scanner',
                            'awslogs-region': 'us-east-1',
                            'awslogs-stream-prefix': scan_id
                        }
                    }
                }
            ]
        }
        
        # Register and run ZAP task
        task_def_response = self.ecs.register_task_definition(**zap_task_definition)
        
        run_task_response = self.ecs.run_task(
            cluster=scan_config['cluster_name'],
            taskDefinition=task_def_response['taskDefinition']['taskDefinitionArn'],
            launchType='FARGATE',
            networkConfiguration={
                'awsvpcConfiguration': {
                    'subnets': [scan_config['subnet_id']],
                    'securityGroups': [scan_config['security_group_id']],
                    'assignPublicIp': 'ENABLED'
                }
            },
            tags=[
                {'key': 'ScanType', 'value': 'DAST'},
                {'key': 'Tool', 'value': 'OWASP-ZAP'},
                {'key': 'ScanId', 'value': scan_id}
            ]
        )
        
        task_arn = run_task_response['tasks'][0]['taskArn']
        
        # Wait for task completion
        waiter = self.ecs.get_waiter('tasks_stopped')
        waiter.wait(
            cluster=scan_config['cluster_name'],
            tasks=[task_arn],
            WaiterConfig={'delay': 30, 'maxAttempts': 120}
        )
        
        # Retrieve scan results
        scan_results = self.retrieve_zap_results(task_arn, scan_id)
        
        return {
            'scan_id': scan_id,
            'task_arn': task_arn,
            'target_url': target_url,
            'scan_type': 'OWASP_ZAP',
            'results': scan_results,
            'timestamp': datetime.now().isoformat()
        }
    
    def build_zap_command(self, target_url: str, scan_config: Dict) -> List[str]:
        """
        Build OWASP ZAP command based on scan configuration
        """
        zap_config = scan_config['owasp_zap']
        scan_policy = zap_config['scan_policies']['full']  # Default to full scan
        
        command = [
            'zap-full-scan.py',
            '-t', target_url,
            '-J', f'/zap/wrk/zap-report-{datetime.now().strftime("%Y%m%d-%H%M%S")}.json',
            '-r', f'/zap/wrk/zap-report-{datetime.now().strftime("%Y%m%d-%H%M%S")}.html'
        ]
        
        # Add authentication if configured
        if zap_config['authentication']['enabled']:
            command.extend([
                '-z', f"auth.loginurl={zap_config['authentication']['login_url']}",
                '-z', f"auth.username={zap_config['authentication']['username_field']}",
                '-z', f"auth.password={zap_config['authentication']['password_field']}"
            ])
        
        # Add exclusions
        for exclusion in zap_config['exclusions']:
            command.extend(['-z', f"spider.excludeurl={exclusion}"])
        
        # Add scan duration limit
        command.extend(['-m', str(scan_policy['max_duration_minutes'])])
        
        # Add alert threshold
        command.extend(['-l', scan_policy['alert_threshold']])
        
        return command
    
    def run_nuclei_scan(self, target_url: str, scan_config: Dict) -> Dict:
        """
        Run Nuclei vulnerability scanner
        """
        scan_id = f"NUCLEI-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
        
        nuclei_config = scan_config['nuclei']
        
        # Build Nuclei command
        nuclei_command = [
            'nuclei',
            '-u', target_url,
            '-json',
            '-o', f'/results/nuclei-{scan_id}.json',
            '-severity', nuclei_config['severity_threshold'],
            '-rate-limit', str(nuclei_config['rate_limit']),
            '-timeout', str(nuclei_config['timeout']),
            '-retries', str(nuclei_config['retries'])
        ]
        
        # Add templates
        for template in nuclei_config['templates']:
            nuclei_command.extend(['-t', template])
        
        # Create Nuclei task definition
        nuclei_task_definition = {
            'family': f'nuclei-scanner-{scan_id}',
            'networkMode': 'awsvpc',
            'requiresCompatibilities': ['FARGATE'],
            'cpu': '512',
            'memory': '1024',
            'executionRoleArn': scan_config.get('execution_role_arn'),
            'containerDefinitions': [
                {
                    'name': 'nuclei-scanner',
                    'image': nuclei_config['docker_image'],
                    'command': nuclei_command,
                    'logConfiguration': {
                        'logDriver': 'awslogs',
                        'options': {
                            'awslogs-group': '/ecs/nuclei-scanner',
                            'awslogs-region': 'us-east-1',
                            'awslogs-stream-prefix': scan_id
                        }
                    }
                }
            ]
        }
        
        # Register and run Nuclei task
        task_def_response = self.ecs.register_task_definition(**nuclei_task_definition)
        
        run_task_response = self.ecs.run_task(
            cluster=scan_config['cluster_name'],
            taskDefinition=task_def_response['taskDefinition']['taskDefinitionArn'],
            launchType='FARGATE',
            networkConfiguration={
                'awsvpcConfiguration': {
                    'subnets': [scan_config['subnet_id']],
                    'securityGroups': [scan_config['security_group_id']],
                    'assignPublicIp': 'ENABLED'
                }
            }
        )
        
        task_arn = run_task_response['tasks'][0]['taskArn']
        
        # Wait for completion and retrieve results
        waiter = self.ecs.get_waiter('tasks_stopped')
        waiter.wait(
            cluster=scan_config['cluster_name'],
            tasks=[task_arn],
            WaiterConfig={'delay': 15, 'maxAttempts': 60}
        )
        
        scan_results = self.retrieve_nuclei_results(task_arn, scan_id)
        
        return {
            'scan_id': scan_id,
            'task_arn': task_arn,
            'target_url': target_url,
            'scan_type': 'NUCLEI',
            'results': scan_results,
            'timestamp': datetime.now().isoformat()
        }
    
    def orchestrate_dast_pipeline(self, app_config: Dict, dast_config: Dict) -> Dict:
        """
        Orchestrate complete DAST pipeline
        """
        pipeline_id = f"DAST-PIPELINE-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
        
        pipeline_results = {
            'pipeline_id': pipeline_id,
            'application': app_config['app_name'],
            'start_time': datetime.now().isoformat(),
            'stages': [],
            'overall_status': 'RUNNING',
            'security_gate_status': 'PENDING'
        }
        
        try:
            # Stage 1: Create DAST environment
            print("Creating DAST environment...")
            environment = self.create_dast_environment(app_config)
            pipeline_results['stages'].append({
                'stage': 'environment_setup',
                'status': 'COMPLETED',
                'duration_seconds': 120,
                'details': environment
            })
            
            # Stage 2: Deploy application under test
            print("Deploying application under test...")
            deployment = self.deploy_application_under_test(app_config, environment)
            pipeline_results['stages'].append({
                'stage': 'application_deployment',
                'status': 'COMPLETED',
                'duration_seconds': 300,
                'details': deployment
            })
            
            # Stage 3: Wait for application readiness
            print("Waiting for application readiness...")
            self.wait_for_application_ready(deployment['health_check_url'])
            
            # Stage 4: Run DAST scans
            scan_results = []
            
            # OWASP ZAP scan
            if dast_config['owasp_zap']['enabled']:
                print("Running OWASP ZAP scan...")
                zap_results = self.run_owasp_zap_scan(
                    deployment['endpoint'], 
                    {**dast_config, **environment}
                )
                scan_results.append(zap_results)
            
            # Nuclei scan
            if dast_config['nuclei']['enabled']:
                print("Running Nuclei scan...")
                nuclei_results = self.run_nuclei_scan(
                    deployment['endpoint'],
                    {**dast_config, **environment}
                )
                scan_results.append(nuclei_results)
            
            pipeline_results['stages'].append({
                'stage': 'security_scanning',
                'status': 'COMPLETED',
                'duration_seconds': 1800,
                'scan_results': scan_results
            })
            
            # Stage 5: Process and consolidate results
            consolidated_results = self.consolidate_dast_results(scan_results)
            pipeline_results['consolidated_results'] = consolidated_results
            
            # Stage 6: Apply security gates
            gate_result = self.apply_security_gates(consolidated_results, dast_config)
            pipeline_results['security_gate_status'] = gate_result['status']
            pipeline_results['security_gate_details'] = gate_result
            
            # Stage 7: Cleanup
            print("Cleaning up DAST environment...")
            self.cleanup_dast_environment(environment, deployment)
            
            pipeline_results['overall_status'] = 'COMPLETED'
            pipeline_results['end_time'] = datetime.now().isoformat()
            
        except Exception as e:
            pipeline_results['overall_status'] = 'FAILED'
            pipeline_results['error'] = str(e)
            pipeline_results['end_time'] = datetime.now().isoformat()
        
        # Store pipeline results
        self.store_dast_results(pipeline_results)
        
        return pipeline_results
    
    def consolidate_dast_results(self, scan_results: List[Dict]) -> Dict:
        """
        Consolidate results from multiple DAST tools
        """
        consolidated = {
            'total_vulnerabilities': 0,
            'critical_count': 0,
            'high_count': 0,
            'medium_count': 0,
            'low_count': 0,
            'info_count': 0,
            'vulnerabilities_by_category': {},
            'tools_used': [],
            'scan_coverage': {},
            'detailed_findings': []
        }
        
        for scan_result in scan_results:
            tool_name = scan_result['scan_type']
            consolidated['tools_used'].append(tool_name)
            
            # Process tool-specific results
            if tool_name == 'OWASP_ZAP':
                zap_findings = self.process_zap_findings(scan_result['results'])
                consolidated['detailed_findings'].extend(zap_findings)
            elif tool_name == 'NUCLEI':
                nuclei_findings = self.process_nuclei_findings(scan_result['results'])
                consolidated['detailed_findings'].extend(nuclei_findings)
        
        # Calculate summary statistics
        for finding in consolidated['detailed_findings']:
            severity = finding.get('severity', 'INFO').upper()
            if severity == 'CRITICAL':
                consolidated['critical_count'] += 1
            elif severity == 'HIGH':
                consolidated['high_count'] += 1
            elif severity == 'MEDIUM':
                consolidated['medium_count'] += 1
            elif severity == 'LOW':
                consolidated['low_count'] += 1
            else:
                consolidated['info_count'] += 1
            
            # Count by category
            category = finding.get('category', 'Other')
            consolidated['vulnerabilities_by_category'][category] = \
                consolidated['vulnerabilities_by_category'].get(category, 0) + 1
        
        consolidated['total_vulnerabilities'] = len(consolidated['detailed_findings'])
        
        return consolidated

# Example usage
dast_integration = DASTIntegration()

# Configure application for DAST testing
app_config = {
    'app_name': 'secure-web-app',
    'docker_image': 'company/secure-web-app:latest',
    'port': 8080,
    'requires_auth': True,
    'auth_type': 'form',
    'login_url': '/login',
    'test_credentials_secret': 'arn:aws:secretsmanager:us-east-1:123456789012:secret:dast-test-creds',
    'execution_role_arn': 'arn:aws:iam::123456789012:role/ecsTaskExecutionRole'
}

# Configure DAST tools
dast_config = dast_integration.configure_dast_tools(app_config)
print("DAST Configuration:")
print(json.dumps(dast_config, indent=2))

# Run complete DAST pipeline
pipeline_results = dast_integration.orchestrate_dast_pipeline(app_config, dast_config)
print("\\nDAST Pipeline Results:")
print(json.dumps(pipeline_results, indent=2))

Step 3: Implement Dependency Scanning

Deploy dependency scanning tools to identify vulnerabilities in third-party libraries and components:

View code
# Dependency Scanning Framework
import boto3
import json
import subprocess
import os
from datetime import datetime
from typing import Dict, List, Optional

class DependencyScanning:
    def __init__(self):
        self.codebuild = boto3.client('codebuild')
        self.s3 = boto3.client('s3')
        self.dynamodb = boto3.resource('dynamodb')
        self.results_table = self.dynamodb.Table('dependency-scan-results')
        
    def configure_dependency_scanners(self, project_config: Dict) -> Dict:
        """
        Configure dependency scanning tools for different package managers
        """
        scanner_config = {
            'npm_audit': {
                'enabled': 'package.json' in project_config.get('manifest_files', []),
                'audit_level': 'moderate',
                'production_only': True,
                'registry': 'https://registry.npmjs.org/',
                'output_format': 'json'
            },
            'pip_audit': {
                'enabled': any(f in project_config.get('manifest_files', []) 
                             for f in ['requirements.txt', 'Pipfile', 'pyproject.toml']),
                'vulnerability_database': 'pypa',
                'output_format': 'json',
                'ignore_vulns': project_config.get('ignored_vulnerabilities', [])
            },
            'snyk': {
                'enabled': True,
                'severity_threshold': 'medium',
                'monitor': True,
                'test_all_projects': True,
                'fail_on': 'upgradable',
                'package_managers': ['npm', 'pip', 'maven', 'gradle', 'go', 'nuget'],
                'exclude_dev_dependencies': True
            },
            'safety': {
                'enabled': 'python' in project_config.get('languages', []),
                'database': 'safety-db',
                'output_format': 'json',
                'ignore_ids': project_config.get('safety_ignore_ids', [])
            },
            'retire_js': {
                'enabled': 'javascript' in project_config.get('languages', []),
                'severity': ['high', 'medium'],
                'output_format': 'json',
                'ignore_file': '.retireignore'
            },
            'owasp_dependency_check': {
                'enabled': True,
                'formats': ['JSON', 'HTML'],
                'suppression_file': 'dependency-check-suppressions.xml',
                'fail_build_on_cvss': 7.0,
                'enable_experimental': False,
                'enable_retired': True
            }
        }
        
        return scanner_config
    
    def create_dependency_scan_pipeline(self, scanner_config: Dict) -> str:
        """
        Create CodeBuild project for dependency scanning
        """
        buildspec = {
            'version': '0.2',
            'phases': {
                'install': {
                    'runtime-versions': {
                        'python': '3.9',
                        'nodejs': '16',
                        'java': 'corretto11'
                    },
                    'commands': [
                        'echo "Installing dependency scanners..."',
                        'npm install -g npm-audit-resolver retire @snyk/cli',
                        'pip install pip-audit safety',
                        'wget -O dependency-check.zip https://github.com/jeremylong/DependencyCheck/releases/download/v7.4.4/dependency-check-7.4.4-release.zip',
                        'unzip dependency-check.zip',
                        'export PATH=$PATH:$PWD/dependency-check/bin'
                    ]
                },
                'pre_build': {
                    'commands': [
                        'echo "Preparing dependency scan..."',
                        'mkdir -p dependency-scan-results',
                        'echo "Scanning for manifest files..."',
                        'find . -name "package.json" -o -name "requirements.txt" -o -name "pom.xml" -o -name "build.gradle" -o -name "go.mod" -o -name "Cargo.toml" | tee manifest-files.txt'
                    ]
                },
                'build': {
                    'commands': [
                        'echo "Running dependency scans..."',
                        self.generate_dependency_scan_commands(scanner_config)
                    ]
                },
                'post_build': {
                    'commands': [
                        'echo "Processing dependency scan results..."',
                        'python scripts/consolidate_dependency_results.py',
                        'aws s3 cp dependency-scan-results/ s3://$DEPENDENCY_SCAN_BUCKET/$(date +%Y%m%d-%H%M%S)/ --recursive'
                    ]
                }
            },
            'artifacts': {
                'files': [
                    'dependency-scan-results/**/*'
                ]
            }
        }
        
        project_config = {
            'name': f"dependency-scan-{scanner_config['project_name']}",
            'source': {
                'type': 'CODEPIPELINE',
                'buildspec': json.dumps(buildspec, indent=2)
            },
            'artifacts': {
                'type': 'CODEPIPELINE'
            },
            'environment': {
                'type': 'LINUX_CONTAINER',
                'image': 'aws/codebuild/amazonlinux2-x86_64-standard:3.0',
                'computeType': 'BUILD_GENERAL1_MEDIUM',
                'environmentVariables': [
                    {
                        'name': 'DEPENDENCY_SCAN_BUCKET',
                        'value': scanner_config.get('results_bucket', 'dependency-scan-results')
                    },
                    {
                        'name': 'SNYK_TOKEN',
                        'value': 'snyk-api-token',
                        'type': 'SECRETS_MANAGER'
                    }
                ]
            },
            'serviceRole': scanner_config.get('service_role_arn')
        }
        
        response = self.codebuild.create_project(**project_config)
        return response['project']['arn']
    
    def generate_dependency_scan_commands(self, scanner_config: Dict) -> str:
        """
        Generate dependency scanning commands based on enabled tools
        """
        commands = []
        
        # NPM Audit
        if scanner_config.get('npm_audit', {}).get('enabled'):
            npm_config = scanner_config['npm_audit']
            commands.extend([
                'if [ -f "package.json" ]; then',
                f'  npm audit --audit-level={npm_config["audit_level"]} --json > dependency-scan-results/npm-audit.json || true',
                'fi'
            ])
        
        # Pip Audit
        if scanner_config.get('pip_audit', {}).get('enabled'):
            pip_config = scanner_config['pip_audit']
            commands.extend([
                'if [ -f "requirements.txt" ]; then',
                f'  pip-audit --format={pip_config["output_format"]} --output=dependency-scan-results/pip-audit.json || true',
                'fi'
            ])
        
        # Snyk
        if scanner_config.get('snyk', {}).get('enabled'):
            snyk_config = scanner_config['snyk']
            commands.extend([
                f'snyk test --json --severity-threshold={snyk_config["severity_threshold"]} > dependency-scan-results/snyk-test.json || true',
                'snyk monitor || true'
            ])
        
        # Safety (Python)
        if scanner_config.get('safety', {}).get('enabled'):
            safety_config = scanner_config['safety']
            commands.extend([
                'if [ -f "requirements.txt" ]; then',
                f'  safety check --json --output dependency-scan-results/safety.json || true',
                'fi'
            ])
        
        # Retire.js
        if scanner_config.get('retire_js', {}).get('enabled'):
            retire_config = scanner_config['retire_js']
            commands.extend([
                'if [ -f "package.json" ]; then',
                f'  retire --outputformat=json --outputpath=dependency-scan-results/retire-js.json || true',
                'fi'
            ])
        
        # OWASP Dependency Check
        if scanner_config.get('owasp_dependency_check', {}).get('enabled'):
            owasp_config = scanner_config['owasp_dependency_check']
            commands.extend([
                f'dependency-check.sh --project "Security Scan" --scan . --format JSON --format HTML --out dependency-scan-results/ --failOnCVSS {owasp_config["fail_build_on_cvss"]} || true'
            ])
        
        return ' && '.join(commands)
    
    def process_dependency_scan_results(self, results_path: str) -> Dict:
        """
        Process and consolidate dependency scan results
        """
        consolidated_results = {
            'scan_id': f"DEP-{datetime.now().strftime('%Y%m%d-%H%M%S')}",
            'scan_timestamp': datetime.now().isoformat(),
            'tools_used': [],
            'summary': {
                'total_vulnerabilities': 0,
                'critical_vulnerabilities': 0,
                'high_vulnerabilities': 0,
                'medium_vulnerabilities': 0,
                'low_vulnerabilities': 0,
                'packages_scanned': 0,
                'vulnerable_packages': 0
            },
            'vulnerabilities': [],
            'package_managers': [],
            'recommendations': []
        }
        
        # Process NPM Audit results
        npm_results_file = os.path.join(results_path, 'npm-audit.json')
        if os.path.exists(npm_results_file):
            npm_vulns = self.parse_npm_audit_results(npm_results_file)
            consolidated_results['tools_used'].append('npm-audit')
            consolidated_results['package_managers'].append('npm')
            consolidated_results['vulnerabilities'].extend(npm_vulns)
        
        # Process Snyk results
        snyk_results_file = os.path.join(results_path, 'snyk-test.json')
        if os.path.exists(snyk_results_file):
            snyk_vulns = self.parse_snyk_results(snyk_results_file)
            consolidated_results['tools_used'].append('snyk')
            consolidated_results['vulnerabilities'].extend(snyk_vulns)
        
        # Process Safety results
        safety_results_file = os.path.join(results_path, 'safety.json')
        if os.path.exists(safety_results_file):
            safety_vulns = self.parse_safety_results(safety_results_file)
            consolidated_results['tools_used'].append('safety')
            consolidated_results['package_managers'].append('pip')
            consolidated_results['vulnerabilities'].extend(safety_vulns)
        
        # Process OWASP Dependency Check results
        owasp_results_file = os.path.join(results_path, 'dependency-check-report.json')
        if os.path.exists(owasp_results_file):
            owasp_vulns = self.parse_owasp_dependency_check_results(owasp_results_file)
            consolidated_results['tools_used'].append('owasp-dependency-check')
            consolidated_results['vulnerabilities'].extend(owasp_vulns)
        
        # Calculate summary statistics
        consolidated_results = self.calculate_dependency_summary(consolidated_results)
        
        # Generate recommendations
        consolidated_results['recommendations'] = self.generate_dependency_recommendations(
            consolidated_results['vulnerabilities']
        )
        
        # Store results
        self.store_dependency_scan_results(consolidated_results)
        
        return consolidated_results
    
    def parse_npm_audit_results(self, results_file: str) -> List[Dict]:
        """
        Parse NPM audit results
        """
        vulnerabilities = []
        try:
            with open(results_file, 'r') as f:
                npm_data = json.load(f)
            
            for vuln_id, vuln_data in npm_data.get('vulnerabilities', {}).items():
                vulnerability = {
                    'tool': 'npm-audit',
                    'vulnerability_id': vuln_id,
                    'package_name': vuln_data.get('name'),
                    'installed_version': vuln_data.get('version'),
                    'severity': self.normalize_severity(vuln_data.get('severity')),
                    'title': vuln_data.get('title'),
                    'description': vuln_data.get('overview'),
                    'cwe_ids': vuln_data.get('cwe', []),
                    'cvss_score': vuln_data.get('cvss', {}).get('score'),
                    'references': vuln_data.get('references', []),
                    'patched_versions': vuln_data.get('patched_versions'),
                    'vulnerable_versions': vuln_data.get('vulnerable_versions'),
                    'recommendation': vuln_data.get('recommendation'),
                    'package_manager': 'npm'
                }
                vulnerabilities.append(vulnerability)
                
        except Exception as e:
            print(f"Error parsing NPM audit results: {e}")
        
        return vulnerabilities
    
    def parse_snyk_results(self, results_file: str) -> List[Dict]:
        """
        Parse Snyk scan results
        """
        vulnerabilities = []
        try:
            with open(results_file, 'r') as f:
                snyk_data = json.load(f)
            
            for vuln in snyk_data.get('vulnerabilities', []):
                vulnerability = {
                    'tool': 'snyk',
                    'vulnerability_id': vuln.get('id'),
                    'package_name': vuln.get('packageName'),
                    'installed_version': vuln.get('version'),
                    'severity': self.normalize_severity(vuln.get('severity')),
                    'title': vuln.get('title'),
                    'description': vuln.get('description'),
                    'cve_ids': [vuln.get('identifiers', {}).get('CVE', [])],
                    'cwe_ids': [vuln.get('identifiers', {}).get('CWE', [])],
                    'cvss_score': vuln.get('cvssScore'),
                    'references': vuln.get('references', []),
                    'upgrade_path': vuln.get('upgradePath', []),
                    'is_patchable': vuln.get('isPatchable', False),
                    'is_upgradable': vuln.get('isUpgradable', False),
                    'package_manager': vuln.get('packageManager', 'unknown')
                }
                vulnerabilities.append(vulnerability)
                
        except Exception as e:
            print(f"Error parsing Snyk results: {e}")
        
        return vulnerabilities
    
    def create_dependency_security_gates(self, gate_config: Dict) -> Dict:
        """
        Create security gates for dependency scanning
        """
        security_gates = {
            'gate_id': f"DEP-GATE-{datetime.now().strftime('%Y%m%d-%H%M%S')}",
            'gate_rules': {
                'critical_vulnerabilities_threshold': gate_config.get('critical_threshold', 0),
                'high_vulnerabilities_threshold': gate_config.get('high_threshold', 0),
                'medium_vulnerabilities_threshold': gate_config.get('medium_threshold', 10),
                'total_vulnerabilities_threshold': gate_config.get('total_threshold', 50),
                'cvss_score_threshold': gate_config.get('cvss_threshold', 7.0),
                'age_threshold_days': gate_config.get('age_threshold', 30),
                'allow_dev_dependencies': gate_config.get('allow_dev_deps', True)
            },
            'actions': {
                'block_deployment': gate_config.get('block_deployment', True),
                'create_security_tickets': gate_config.get('create_tickets', True),
                'notify_security_team': gate_config.get('notify_security', True),
                'auto_create_pr_for_updates': gate_config.get('auto_pr', False)
            },
            'exceptions': {
                'allowed_vulnerabilities': gate_config.get('allowed_vulns', []),
                'temporary_exceptions': gate_config.get('temp_exceptions', []),
                'business_justifications': gate_config.get('business_justifications', [])
            }
        }
        
        return security_gates
    
    def generate_dependency_recommendations(self, vulnerabilities: List[Dict]) -> List[Dict]:
        """
        Generate actionable recommendations for dependency vulnerabilities
        """
        recommendations = []
        
        # Group vulnerabilities by package
        packages_with_vulns = {}
        for vuln in vulnerabilities:
            package_name = vuln.get('package_name')
            if package_name not in packages_with_vulns:
                packages_with_vulns[package_name] = []
            packages_with_vulns[package_name].append(vuln)
        
        # Generate recommendations for each package
        for package_name, package_vulns in packages_with_vulns.items():
            highest_severity = max(
                [self.severity_to_numeric(v.get('severity', 'LOW')) for v in package_vulns]
            )
            
            recommendation = {
                'package_name': package_name,
                'vulnerability_count': len(package_vulns),
                'highest_severity': self.numeric_to_severity(highest_severity),
                'recommendation_type': 'UPDATE',
                'priority': 'HIGH' if highest_severity >= 3 else 'MEDIUM',
                'actions': []
            }
            
            # Check if updates are available
            upgradable_vulns = [v for v in package_vulns if v.get('is_upgradable')]
            if upgradable_vulns:
                recommendation['actions'].append({
                    'action': 'UPDATE_PACKAGE',
                    'description': f'Update {package_name} to latest secure version',
                    'automated': True,
                    'effort_estimate': 'LOW'
                })
            
            # Check if patches are available
            patchable_vulns = [v for v in package_vulns if v.get('is_patchable')]
            if patchable_vulns:
                recommendation['actions'].append({
                    'action': 'APPLY_PATCH',
                    'description': f'Apply security patches for {package_name}',
                    'automated': True,
                    'effort_estimate': 'LOW'
                })
            
            # If no automatic fixes available
            if not upgradable_vulns and not patchable_vulns:
                recommendation['actions'].append({
                    'action': 'MANUAL_REVIEW',
                    'description': f'Manual review required for {package_name} vulnerabilities',
                    'automated': False,
                    'effort_estimate': 'HIGH'
                })
            
            recommendations.append(recommendation)
        
        return recommendations

# Example usage
dependency_scanner = DependencyScanning()

# Configure dependency scanners
project_config = {
    'project_name': 'secure-web-app',
    'languages': ['python', 'javascript'],
    'manifest_files': ['package.json', 'requirements.txt'],
    'ignored_vulnerabilities': ['GHSA-example-1234'],
    'safety_ignore_ids': ['12345']
}

scanner_config = dependency_scanner.configure_dependency_scanners(project_config)
print("Dependency Scanner Configuration:")
print(json.dumps(scanner_config, indent=2))

# Create dependency scan pipeline
pipeline_arn = dependency_scanner.create_dependency_scan_pipeline({
    **scanner_config,
    'project_name': project_config['project_name'],
    'results_bucket': 'dependency-scan-results-bucket'
})
print(f"\\nDependency Scan Pipeline ARN: {pipeline_arn}")

Step 4: Implement Infrastructure as Code (IaC) Security Testing

Deploy IaC security scanning tools to identify misconfigurations and security issues in infrastructure code:

View code
# Infrastructure as Code Security Testing Framework
import boto3
import json
import yaml
import subprocess
import os
from datetime import datetime
from typing import Dict, List, Optional

class IaCSecurityTesting:
    def __init__(self):
        self.codebuild = boto3.client('codebuild')
        self.s3 = boto3.client('s3')
        self.dynamodb = boto3.resource('dynamodb')
        self.results_table = self.dynamodb.Table('iac-scan-results')
        
    def configure_iac_scanners(self, project_config: Dict) -> Dict:
        """
        Configure IaC security scanning tools
        """
        scanner_config = {
            'checkov': {
                'enabled': True,
                'frameworks': ['cloudformation', 'terraform', 'kubernetes', 'dockerfile'],
                'severity_threshold': 'MEDIUM',
                'output_format': 'json',
                'skip_checks': project_config.get('checkov_skip_checks', []),
                'custom_policies_dir': 'security/checkov-policies',
                'enable_secrets_scan': True
            },
            'tfsec': {
                'enabled': 'terraform' in project_config.get('iac_types', []),
                'severity_threshold': 'MEDIUM',
                'output_format': 'json',
                'exclude_checks': project_config.get('tfsec_exclude_checks', []),
                'custom_checks_dir': 'security/tfsec-checks',
                'include_ignored': False
            },
            'cfn_nag': {
                'enabled': 'cloudformation' in project_config.get('iac_types', []),
                'output_format': 'json',
                'rule_directory': 'security/cfn-nag-rules',
                'profile_path': 'security/cfn-nag-profile.json',
                'fail_on_warnings': True
            },
            'kube_score': {
                'enabled': 'kubernetes' in project_config.get('iac_types', []),
                'output_format': 'json',
                'ignore_tests': project_config.get('kube_score_ignore', []),
                'enable_optional_tests': True
            },
            'terrascan': {
                'enabled': True,
                'policy_type': ['aws', 'azure', 'gcp', 'kubernetes'],
                'severity': 'medium',
                'output_format': 'json',
                'config_file': 'security/terrascan-config.toml',
                'skip_rules': project_config.get('terrascan_skip_rules', [])
            },
            'aws_config_rules': {
                'enabled': 'cloudformation' in project_config.get('iac_types', []),
                'rule_sets': ['security-best-practices', 'operational-best-practices'],
                'custom_rules': project_config.get('custom_config_rules', [])
            }
        }
        
        return scanner_config
    
    def create_iac_scan_pipeline(self, scanner_config: Dict) -> str:
        """
        Create CodeBuild project for IaC security scanning
        """
        buildspec = {
            'version': '0.2',
            'phases': {
                'install': {
                    'runtime-versions': {
                        'python': '3.9',
                        'nodejs': '16'
                    },
                    'commands': [
                        'echo "Installing IaC security scanners..."',
                        'pip install checkov',
                        'curl -s https://raw.githubusercontent.com/aquasecurity/tfsec/master/scripts/install_linux.sh | bash',
                        'gem install cfn-nag',
                        'wget -O kube-score.tar.gz https://github.com/zegl/kube-score/releases/download/v1.16.1/kube-score_1.16.1_linux_amd64.tar.gz',
                        'tar -xzf kube-score.tar.gz',
                        'chmod +x kube-score',
                        'mv kube-score /usr/local/bin/',
                        'curl -L "$(curl -s https://api.github.com/repos/tenable/terrascan/releases/latest | grep -o -E "https://.+?_Linux_x86_64.tar.gz")" > terrascan.tar.gz',
                        'tar -xf terrascan.tar.gz terrascan && rm terrascan.tar.gz',
                        'install terrascan /usr/local/bin && rm terrascan'
                    ]
                },
                'pre_build': {
                    'commands': [
                        'echo "Preparing IaC security scan..."',
                        'mkdir -p iac-scan-results',
                        'echo "Discovering IaC files..."',
                        'find . -name "*.tf" -o -name "*.yaml" -o -name "*.yml" -o -name "*.json" -o -name "Dockerfile" | grep -E "\\.(tf|yaml|yml|json|Dockerfile)$" | tee iac-files.txt'
                    ]
                },
                'build': {
                    'commands': [
                        'echo "Running IaC security scans..."',
                        self.generate_iac_scan_commands(scanner_config)
                    ]
                },
                'post_build': {
                    'commands': [
                        'echo "Processing IaC scan results..."',
                        'python scripts/consolidate_iac_results.py',
                        'aws s3 cp iac-scan-results/ s3://$IAC_SCAN_BUCKET/$(date +%Y%m%d-%H%M%S)/ --recursive'
                    ]
                }
            },
            'artifacts': {
                'files': [
                    'iac-scan-results/**/*'
                ]
            }
        }
        
        project_config = {
            'name': f"iac-scan-{scanner_config['project_name']}",
            'source': {
                'type': 'CODEPIPELINE',
                'buildspec': json.dumps(buildspec, indent=2)
            },
            'artifacts': {
                'type': 'CODEPIPELINE'
            },
            'environment': {
                'type': 'LINUX_CONTAINER',
                'image': 'aws/codebuild/amazonlinux2-x86_64-standard:3.0',
                'computeType': 'BUILD_GENERAL1_MEDIUM',
                'environmentVariables': [
                    {
                        'name': 'IAC_SCAN_BUCKET',
                        'value': scanner_config.get('results_bucket', 'iac-scan-results')
                    }
                ]
            },
            'serviceRole': scanner_config.get('service_role_arn')
        }
        
        response = self.codebuild.create_project(**project_config)
        return response['project']['arn']
    
    def generate_iac_scan_commands(self, scanner_config: Dict) -> str:
        """
        Generate IaC scanning commands based on enabled tools
        """
        commands = []
        
        # Checkov scan
        if scanner_config.get('checkov', {}).get('enabled'):
            checkov_config = scanner_config['checkov']
            frameworks = ','.join(checkov_config['frameworks'])
            skip_checks = ','.join(checkov_config.get('skip_checks', []))
            
            checkov_cmd = [
                'checkov',
                '--directory .',
                f'--framework {frameworks}',
                f'--output {checkov_config["output_format"]}',
                '--output-file iac-scan-results/checkov-results.json'
            ]
            
            if skip_checks:
                checkov_cmd.append(f'--skip-check {skip_checks}')
            
            if checkov_config.get('enable_secrets_scan'):
                checkov_cmd.append('--enable-secret-scan-all-files')
            
            commands.append(' '.join(checkov_cmd) + ' || true')
        
        # TFSec scan
        if scanner_config.get('tfsec', {}).get('enabled'):
            tfsec_config = scanner_config['tfsec']
            exclude_checks = ','.join(tfsec_config.get('exclude_checks', []))
            
            tfsec_cmd = [
                'tfsec .',
                f'--format {tfsec_config["output_format"]}',
                '--out iac-scan-results/tfsec-results.json'
            ]
            
            if exclude_checks:
                tfsec_cmd.append(f'--exclude-checks {exclude_checks}')
            
            commands.append(' '.join(tfsec_cmd) + ' || true')
        
        # CFN-Nag scan
        if scanner_config.get('cfn_nag', {}).get('enabled'):
            cfn_nag_config = scanner_config['cfn_nag']
            commands.extend([
                'find . -name "*.yaml" -o -name "*.yml" -o -name "*.json" | grep -E "template|cloudformation" > cf-templates.txt || true',
                'if [ -s cf-templates.txt ]; then',
                f'  cfn_nag_scan --input-path . --output-format {cfn_nag_config["output_format"]} --output-file iac-scan-results/cfn-nag-results.json || true',
                'fi'
            ])
        
        # Kube-score scan
        if scanner_config.get('kube_score', {}).get('enabled'):
            kube_score_config = scanner_config['kube_score']
            commands.extend([
                'find . -name "*.yaml" -o -name "*.yml" | grep -E "k8s|kubernetes|deployment|service" > k8s-files.txt || true',
                'if [ -s k8s-files.txt ]; then',
                f'  kube-score score --output-format {kube_score_config["output_format"]} $(cat k8s-files.txt) > iac-scan-results/kube-score-results.json || true',
                'fi'
            ])
        
        # Terrascan
        if scanner_config.get('terrascan', {}).get('enabled'):
            terrascan_config = scanner_config['terrascan']
            policy_types = ','.join(terrascan_config['policy_type'])
            
            terrascan_cmd = [
                'terrascan scan',
                '--iac-type all',
                f'--policy-type {policy_types}',
                f'--severity {terrascan_config["severity"]}',
                f'--output {terrascan_config["output_format"]}',
                '--output-file iac-scan-results/terrascan-results.json'
            ]
            
            commands.append(' '.join(terrascan_cmd) + ' || true')
        
        return ' && '.join(commands)
    
    def process_iac_scan_results(self, results_path: str) -> Dict:
        """
        Process and consolidate IaC scan results
        """
        consolidated_results = {
            'scan_id': f"IAC-{datetime.now().strftime('%Y%m%d-%H%M%S')}",
            'scan_timestamp': datetime.now().isoformat(),
            'tools_used': [],
            'summary': {
                'total_issues': 0,
                'critical_issues': 0,
                'high_issues': 0,
                'medium_issues': 0,
                'low_issues': 0,
                'info_issues': 0,
                'files_scanned': 0,
                'passed_checks': 0,
                'failed_checks': 0
            },
            'issues_by_category': {},
            'issues_by_resource_type': {},
            'detailed_findings': [],
            'compliance_status': {},
            'remediation_suggestions': []
        }
        
        # Process Checkov results
        checkov_results_file = os.path.join(results_path, 'checkov-results.json')
        if os.path.exists(checkov_results_file):
            checkov_issues = self.parse_checkov_results(checkov_results_file)
            consolidated_results['tools_used'].append('checkov')
            consolidated_results['detailed_findings'].extend(checkov_issues)
        
        # Process TFSec results
        tfsec_results_file = os.path.join(results_path, 'tfsec-results.json')
        if os.path.exists(tfsec_results_file):
            tfsec_issues = self.parse_tfsec_results(tfsec_results_file)
            consolidated_results['tools_used'].append('tfsec')
            consolidated_results['detailed_findings'].extend(tfsec_issues)
        
        # Process CFN-Nag results
        cfn_nag_results_file = os.path.join(results_path, 'cfn-nag-results.json')
        if os.path.exists(cfn_nag_results_file):
            cfn_nag_issues = self.parse_cfn_nag_results(cfn_nag_results_file)
            consolidated_results['tools_used'].append('cfn-nag')
            consolidated_results['detailed_findings'].extend(cfn_nag_issues)
        
        # Process Terrascan results
        terrascan_results_file = os.path.join(results_path, 'terrascan-results.json')
        if os.path.exists(terrascan_results_file):
            terrascan_issues = self.parse_terrascan_results(terrascan_results_file)
            consolidated_results['tools_used'].append('terrascan')
            consolidated_results['detailed_findings'].extend(terrascan_issues)
        
        # Calculate summary statistics
        consolidated_results = self.calculate_iac_summary(consolidated_results)
        
        # Generate compliance status
        consolidated_results['compliance_status'] = self.assess_compliance_status(
            consolidated_results['detailed_findings']
        )
        
        # Generate remediation suggestions
        consolidated_results['remediation_suggestions'] = self.generate_iac_remediation_suggestions(
            consolidated_results['detailed_findings']
        )
        
        # Store results
        self.store_iac_scan_results(consolidated_results)
        
        return consolidated_results
    
    def parse_checkov_results(self, results_file: str) -> List[Dict]:
        """
        Parse Checkov scan results
        """
        issues = []
        try:
            with open(results_file, 'r') as f:
                checkov_data = json.load(f)
            
            for result in checkov_data.get('results', {}).get('failed_checks', []):
                issue = {
                    'tool': 'checkov',
                    'check_id': result.get('check_id'),
                    'check_name': result.get('check_name'),
                    'severity': self.normalize_severity(result.get('severity', 'MEDIUM')),
                    'resource_type': result.get('resource'),
                    'resource_name': result.get('resource_name', ''),
                    'file_path': result.get('file_path'),
                    'line_range': result.get('file_line_range', []),
                    'description': result.get('description'),
                    'guideline': result.get('guideline'),
                    'category': self.categorize_iac_issue(result.get('check_id', '')),
                    'remediation': result.get('fixed_definition'),
                    'compliance_frameworks': result.get('bc_check_id', '').split('_')[0] if result.get('bc_check_id') else None
                }
                issues.append(issue)
                
        except Exception as e:
            print(f"Error parsing Checkov results: {e}")
        
        return issues
    
    def parse_tfsec_results(self, results_file: str) -> List[Dict]:
        """
        Parse TFSec scan results
        """
        issues = []
        try:
            with open(results_file, 'r') as f:
                tfsec_data = json.load(f)
            
            for result in tfsec_data.get('results', []):
                issue = {
                    'tool': 'tfsec',
                    'check_id': result.get('rule_id'),
                    'check_name': result.get('rule_description'),
                    'severity': self.normalize_severity(result.get('severity', 'MEDIUM')),
                    'resource_type': result.get('resource_type'),
                    'resource_name': result.get('resource_name', ''),
                    'file_path': result.get('location', {}).get('filename'),
                    'line_range': [result.get('location', {}).get('start_line', 0)],
                    'description': result.get('description'),
                    'impact': result.get('impact'),
                    'resolution': result.get('resolution'),
                    'category': self.categorize_iac_issue(result.get('rule_id', '')),
                    'links': result.get('links', [])
                }
                issues.append(issue)
                
        except Exception as e:
            print(f"Error parsing TFSec results: {e}")
        
        return issues
    
    def categorize_iac_issue(self, check_id: str) -> str:
        """
        Categorize IaC security issues
        """
        category_mapping = {
            'encryption': ['encrypt', 'kms', 'ssl', 'tls'],
            'access_control': ['iam', 'policy', 'permission', 'access'],
            'network_security': ['security_group', 'nacl', 'vpc', 'subnet'],
            'logging_monitoring': ['logging', 'cloudtrail', 'monitoring'],
            'backup_recovery': ['backup', 'snapshot', 'versioning'],
            'compliance': ['compliance', 'cis', 'pci', 'hipaa'],
            'secrets_management': ['secret', 'password', 'key', 'credential'],
            'resource_configuration': ['config', 'setting', 'parameter']
        }
        
        check_id_lower = check_id.lower()
        for category, keywords in category_mapping.items():
            if any(keyword in check_id_lower for keyword in keywords):
                return category
        
        return 'other'
    
    def assess_compliance_status(self, findings: List[Dict]) -> Dict:
        """
        Assess compliance status based on findings
        """
        compliance_frameworks = {
            'CIS': {'total_checks': 0, 'passed_checks': 0, 'failed_checks': 0},
            'PCI-DSS': {'total_checks': 0, 'passed_checks': 0, 'failed_checks': 0},
            'SOC2': {'total_checks': 0, 'passed_checks': 0, 'failed_checks': 0},
            'NIST': {'total_checks': 0, 'passed_checks': 0, 'failed_checks': 0}
        }
        
        for finding in findings:
            # Map findings to compliance frameworks
            frameworks = self.map_finding_to_compliance(finding)
            for framework in frameworks:
                if framework in compliance_frameworks:
                    compliance_frameworks[framework]['total_checks'] += 1
                    compliance_frameworks[framework]['failed_checks'] += 1
        
        # Calculate compliance percentages
        for framework, stats in compliance_frameworks.items():
            if stats['total_checks'] > 0:
                stats['compliance_percentage'] = (
                    stats['passed_checks'] / stats['total_checks']
                ) * 100
            else:
                stats['compliance_percentage'] = 100
        
        return compliance_frameworks
    
    def generate_iac_remediation_suggestions(self, findings: List[Dict]) -> List[Dict]:
        """
        Generate remediation suggestions for IaC issues
        """
        suggestions = []
        
        # Group findings by category
        findings_by_category = {}
        for finding in findings:
            category = finding.get('category', 'other')
            if category not in findings_by_category:
                findings_by_category[category] = []
            findings_by_category[category].append(finding)
        
        # Generate category-specific suggestions
        for category, category_findings in findings_by_category.items():
            suggestion = {
                'category': category,
                'issue_count': len(category_findings),
                'priority': self.calculate_category_priority(category_findings),
                'remediation_steps': self.get_category_remediation_steps(category),
                'automation_potential': self.assess_automation_potential(category),
                'estimated_effort': self.estimate_remediation_effort(category_findings)
            }
            suggestions.append(suggestion)
        
        return suggestions
    
    def create_iac_security_policies(self, policy_config: Dict) -> Dict:
        """
        Create custom security policies for IaC scanning
        """
        policies = {
            'checkov_custom_policies': self.create_checkov_policies(policy_config),
            'tfsec_custom_checks': self.create_tfsec_checks(policy_config),
            'cfn_nag_custom_rules': self.create_cfn_nag_rules(policy_config),
            'terrascan_custom_policies': self.create_terrascan_policies(policy_config)
        }
        
        return policies
    
    def create_checkov_policies(self, policy_config: Dict) -> List[Dict]:
        """
        Create custom Checkov policies
        """
        custom_policies = [
            {
                'policy_name': 'CompanyS3BucketEncryption',
                'policy_description': 'Ensure S3 buckets use company-approved encryption',
                'resource_types': ['aws_s3_bucket'],
                'check_logic': {
                    'condition': 'encryption.server_side_encryption_configuration.rule.apply_server_side_encryption_by_default.sse_algorithm',
                    'operator': 'in',
                    'value': ['AES256', 'aws:kms']
                },
                'severity': 'HIGH',
                'category': 'encryption'
            },
            {
                'policy_name': 'CompanyIAMPasswordPolicy',
                'policy_description': 'Ensure IAM password policy meets company requirements',
                'resource_types': ['aws_iam_account_password_policy'],
                'check_logic': {
                    'conditions': [
                        {'field': 'minimum_password_length', 'operator': '>=', 'value': 12},
                        {'field': 'require_uppercase_characters', 'operator': '==', 'value': True},
                        {'field': 'require_lowercase_characters', 'operator': '==', 'value': True},
                        {'field': 'require_numbers', 'operator': '==', 'value': True},
                        {'field': 'require_symbols', 'operator': '==', 'value': True}
                    ]
                },
                'severity': 'MEDIUM',
                'category': 'access_control'
            }
        ]
        
        return custom_policies

# Example usage
iac_scanner = IaCSecurityTesting()

# Configure IaC scanners
project_config = {
    'project_name': 'secure-infrastructure',
    'iac_types': ['terraform', 'cloudformation', 'kubernetes'],
    'checkov_skip_checks': ['CKV_AWS_20'],
    'tfsec_exclude_checks': ['aws-s3-enable-logging'],
    'terrascan_skip_rules': ['AC_AWS_0001']
}

scanner_config = iac_scanner.configure_iac_scanners(project_config)
print("IaC Scanner Configuration:")
print(json.dumps(scanner_config, indent=2))

# Create IaC scan pipeline
pipeline_arn = iac_scanner.create_iac_scan_pipeline({
    **scanner_config,
    'project_name': project_config['project_name'],
    'results_bucket': 'iac-scan-results-bucket'
})
print(f"\\nIaC Scan Pipeline ARN: {pipeline_arn}")

Step 5: Integrate Security Testing into CI/CD Pipeline

Create a comprehensive CI/CD pipeline that integrates all security testing tools:

View code
# Comprehensive Security Testing CI/CD Pipeline
import boto3
import json
from datetime import datetime
from typing import Dict, List, Optional

class SecurityTestingPipeline:
    def __init__(self):
        self.codepipeline = boto3.client('codepipeline')
        self.codebuild = boto3.client('codebuild')
        self.s3 = boto3.client('s3')
        self.sns = boto3.client('sns')
        self.dynamodb = boto3.resource('dynamodb')
        
    def create_comprehensive_security_pipeline(self, pipeline_config: Dict) -> Dict:
        """
        Create comprehensive security testing pipeline
        """
        pipeline_definition = {
            'name': f"security-pipeline-{pipeline_config['application_name']}",
            'roleArn': pipeline_config['pipeline_role_arn'],
            'artifactStore': {
                'type': 'S3',
                'location': pipeline_config['artifact_bucket']
            },
            'stages': [
                # Stage 1: Source
                {
                    'name': 'Source',
                    'actions': [
                        {
                            'name': 'SourceAction',
                            'actionTypeId': {
                                'category': 'Source',
                                'owner': 'AWS',
                                'provider': 'CodeCommit',
                                'version': '1'
                            },
                            'configuration': {
                                'RepositoryName': pipeline_config['repository_name'],
                                'BranchName': pipeline_config.get('branch_name', 'main')
                            },
                            'outputArtifacts': [{'name': 'SourceOutput'}]
                        }
                    ]
                },
                
                # Stage 2: Pre-commit Security Checks
                {
                    'name': 'PreCommitSecurity',
                    'actions': [
                        {
                            'name': 'SecretsScanning',
                            'actionTypeId': {
                                'category': 'Build',
                                'owner': 'AWS',
                                'provider': 'CodeBuild',
                                'version': '1'
                            },
                            'configuration': {
                                'ProjectName': f"secrets-scan-{pipeline_config['application_name']}"
                            },
                            'inputArtifacts': [{'name': 'SourceOutput'}],
                            'outputArtifacts': [{'name': 'SecretsOutput'}],
                            'runOrder': 1
                        },
                        {
                            'name': 'LicenseCompliance',
                            'actionTypeId': {
                                'category': 'Build',
                                'owner': 'AWS',
                                'provider': 'CodeBuild',
                                'version': '1'
                            },
                            'configuration': {
                                'ProjectName': f"license-check-{pipeline_config['application_name']}"
                            },
                            'inputArtifacts': [{'name': 'SourceOutput'}],
                            'outputArtifacts': [{'name': 'LicenseOutput'}],
                            'runOrder': 1
                        }
                    ]
                },
                
                # Stage 3: Static Analysis Security Testing (SAST)
                {
                    'name': 'StaticAnalysis',
                    'actions': [
                        {
                            'name': 'SASTScan',
                            'actionTypeId': {
                                'category': 'Build',
                                'owner': 'AWS',
                                'provider': 'CodeBuild',
                                'version': '1'
                            },
                            'configuration': {
                                'ProjectName': f"sast-scan-{pipeline_config['application_name']}"
                            },
                            'inputArtifacts': [{'name': 'SourceOutput'}],
                            'outputArtifacts': [{'name': 'SASTOutput'}],
                            'runOrder': 1
                        },
                        {
                            'name': 'IaCScan',
                            'actionTypeId': {
                                'category': 'Build',
                                'owner': 'AWS',
                                'provider': 'CodeBuild',
                                'version': '1'
                            },
                            'configuration': {
                                'ProjectName': f"iac-scan-{pipeline_config['application_name']}"
                            },
                            'inputArtifacts': [{'name': 'SourceOutput'}],
                            'outputArtifacts': [{'name': 'IaCOutput'}],
                            'runOrder': 1
                        }
                    ]
                },
                
                # Stage 4: Dependency Security Testing
                {
                    'name': 'DependencyAnalysis',
                    'actions': [
                        {
                            'name': 'DependencyScan',
                            'actionTypeId': {
                                'category': 'Build',
                                'owner': 'AWS',
                                'provider': 'CodeBuild',
                                'version': '1'
                            },
                            'configuration': {
                                'ProjectName': f"dependency-scan-{pipeline_config['application_name']}"
                            },
                            'inputArtifacts': [{'name': 'SourceOutput'}],
                            'outputArtifacts': [{'name': 'DependencyOutput'}],
                            'runOrder': 1
                        }
                    ]
                },
                
                # Stage 5: Build and Package
                {
                    'name': 'Build',
                    'actions': [
                        {
                            'name': 'BuildApplication',
                            'actionTypeId': {
                                'category': 'Build',
                                'owner': 'AWS',
                                'provider': 'CodeBuild',
                                'version': '1'
                            },
                            'configuration': {
                                'ProjectName': f"build-{pipeline_config['application_name']}"
                            },
                            'inputArtifacts': [{'name': 'SourceOutput'}],
                            'outputArtifacts': [{'name': 'BuildOutput'}],
                            'runOrder': 1
                        }
                    ]
                },
                
                # Stage 6: Container Security Scanning
                {
                    'name': 'ContainerSecurity',
                    'actions': [
                        {
                            'name': 'ContainerScan',
                            'actionTypeId': {
                                'category': 'Build',
                                'owner': 'AWS',
                                'provider': 'CodeBuild',
                                'version': '1'
                            },
                            'configuration': {
                                'ProjectName': f"container-scan-{pipeline_config['application_name']}"
                            },
                            'inputArtifacts': [{'name': 'BuildOutput'}],
                            'outputArtifacts': [{'name': 'ContainerOutput'}],
                            'runOrder': 1
                        }
                    ]
                },
                
                # Stage 7: Deploy to Test Environment
                {
                    'name': 'DeployTest',
                    'actions': [
                        {
                            'name': 'DeployToTest',
                            'actionTypeId': {
                                'category': 'Deploy',
                                'owner': 'AWS',
                                'provider': 'ECS',
                                'version': '1'
                            },
                            'configuration': {
                                'ClusterName': pipeline_config['test_cluster'],
                                'ServiceName': f"test-{pipeline_config['application_name']}",
                                'FileName': 'imagedefinitions.json'
                            },
                            'inputArtifacts': [{'name': 'BuildOutput'}],
                            'runOrder': 1
                        }
                    ]
                },
                
                # Stage 8: Dynamic Application Security Testing (DAST)
                {
                    'name': 'DynamicTesting',
                    'actions': [
                        {
                            'name': 'DASTScan',
                            'actionTypeId': {
                                'category': 'Build',
                                'owner': 'AWS',
                                'provider': 'CodeBuild',
                                'version': '1'
                            },
                            'configuration': {
                                'ProjectName': f"dast-scan-{pipeline_config['application_name']}"
                            },
                            'inputArtifacts': [{'name': 'BuildOutput'}],
                            'outputArtifacts': [{'name': 'DASTOutput'}],
                            'runOrder': 1
                        },
                        {
                            'name': 'APISecurityTest',
                            'actionTypeId': {
                                'category': 'Build',
                                'owner': 'AWS',
                                'provider': 'CodeBuild',
                                'version': '1'
                            },
                            'configuration': {
                                'ProjectName': f"api-security-test-{pipeline_config['application_name']}"
                            },
                            'inputArtifacts': [{'name': 'BuildOutput'}],
                            'outputArtifacts': [{'name': 'APITestOutput'}],
                            'runOrder': 2
                        }
                    ]
                },
                
                # Stage 9: Security Gate and Approval
                {
                    'name': 'SecurityGate',
                    'actions': [
                        {
                            'name': 'SecurityResultsConsolidation',
                            'actionTypeId': {
                                'category': 'Build',
                                'owner': 'AWS',
                                'provider': 'CodeBuild',
                                'version': '1'
                            },
                            'configuration': {
                                'ProjectName': f"security-gate-{pipeline_config['application_name']}"
                            },
                            'inputArtifacts': [
                                {'name': 'SASTOutput'},
                                {'name': 'DependencyOutput'},
                                {'name': 'IaCOutput'},
                                {'name': 'DASTOutput'},
                                {'name': 'ContainerOutput'}
                            ],
                            'outputArtifacts': [{'name': 'SecurityGateOutput'}],
                            'runOrder': 1
                        },
                        {
                            'name': 'SecurityApproval',
                            'actionTypeId': {
                                'category': 'Approval',
                                'owner': 'AWS',
                                'provider': 'Manual',
                                'version': '1'
                            },
                            'configuration': {
                                'NotificationArn': pipeline_config['approval_topic_arn'],
                                'CustomData': 'Please review security scan results before approving deployment to production.'
                            },
                            'runOrder': 2
                        }
                    ]
                },
                
                # Stage 10: Production Deployment
                {
                    'name': 'DeployProduction',
                    'actions': [
                        {
                            'name': 'DeployToProduction',
                            'actionTypeId': {
                                'category': 'Deploy',
                                'owner': 'AWS',
                                'provider': 'ECS',
                                'version': '1'
                            },
                            'configuration': {
                                'ClusterName': pipeline_config['prod_cluster'],
                                'ServiceName': f"prod-{pipeline_config['application_name']}",
                                'FileName': 'imagedefinitions.json'
                            },
                            'inputArtifacts': [{'name': 'BuildOutput'}],
                            'runOrder': 1
                        }
                    ]
                }
            ]
        }
        
        # Create the pipeline
        response = self.codepipeline.create_pipeline(pipeline=pipeline_definition)
        
        return {
            'pipeline_name': pipeline_definition['name'],
            'pipeline_arn': response['pipeline']['name'],
            'stages_count': len(pipeline_definition['stages']),
            'security_stages': [
                'PreCommitSecurity',
                'StaticAnalysis', 
                'DependencyAnalysis',
                'ContainerSecurity',
                'DynamicTesting',
                'SecurityGate'
            ]
        }
    
    def create_security_gate_logic(self, gate_config: Dict) -> Dict:
        """
        Create security gate logic for pipeline
        """
        security_gate_buildspec = {
            'version': '0.2',
            'phases': {
                'install': {
                    'runtime-versions': {
                        'python': '3.9'
                    },
                    'commands': [
                        'pip install boto3 jq'
                    ]
                },
                'build': {
                    'commands': [
                        'echo "Consolidating security scan results..."',
                        'python scripts/security_gate_processor.py',
                        'echo "Applying security gates..."',
                        'python scripts/apply_security_gates.py'
                    ]
                }
            },
            'artifacts': {
                'files': [
                    'security-gate-report.json',
                    'security-gate-decision.json'
                ]
            }
        }
        
        gate_logic = {
            'buildspec': security_gate_buildspec,
            'gate_rules': {
                'critical_vulnerabilities_threshold': gate_config.get('critical_threshold', 0),
                'high_vulnerabilities_threshold': gate_config.get('high_threshold', 5),
                'medium_vulnerabilities_threshold': gate_config.get('medium_threshold', 20),
                'dependency_vulnerabilities_threshold': gate_config.get('dependency_threshold', 10),
                'iac_violations_threshold': gate_config.get('iac_threshold', 15),
                'container_vulnerabilities_threshold': gate_config.get('container_threshold', 8),
                'dast_findings_threshold': gate_config.get('dast_threshold', 12)
            },
            'gate_actions': {
                'block_deployment_on_critical': True,
                'require_approval_on_high': True,
                'auto_create_security_tickets': True,
                'notify_security_team': True,
                'generate_security_report': True
            }
        }
        
        return gate_logic
    
    def create_security_dashboard(self, dashboard_config: Dict) -> Dict:
        """
        Create security testing dashboard
        """
        dashboard_definition = {
            'dashboard_name': f"SecurityTesting-{dashboard_config['application_name']}",
            'widgets': [
                {
                    'type': 'metric',
                    'properties': {
                        'metrics': [
                            ['AWS/CodePipeline', 'PipelineExecutionSuccess', 'PipelineName', dashboard_config['pipeline_name']],
                            ['AWS/CodePipeline', 'PipelineExecutionFailure', 'PipelineName', dashboard_config['pipeline_name']]
                        ],
                        'period': 300,
                        'stat': 'Sum',
                        'region': 'us-east-1',
                        'title': 'Pipeline Execution Status'
                    }
                },
                {
                    'type': 'log',
                    'properties': {
                        'query': f'''
                        SOURCE '/aws/codebuild/sast-scan-{dashboard_config["application_name"]}'
                        | fields @timestamp, @message
                        | filter @message like /CRITICAL|HIGH/
                        | sort @timestamp desc
                        | limit 100
                        ''',
                        'region': 'us-east-1',
                        'title': 'Critical Security Findings',
                        'view': 'table'
                    }
                },
                {
                    'type': 'metric',
                    'properties': {
                        'metrics': [
                            ['Custom/Security', 'VulnerabilitiesFound', 'Application', dashboard_config['application_name'], 'Severity', 'Critical'],
                            ['Custom/Security', 'VulnerabilitiesFound', 'Application', dashboard_config['application_name'], 'Severity', 'High'],
                            ['Custom/Security', 'VulnerabilitiesFound', 'Application', dashboard_config['application_name'], 'Severity', 'Medium']
                        ],
                        'period': 3600,
                        'stat': 'Average',
                        'region': 'us-east-1',
                        'title': 'Vulnerability Trends'
                    }
                }
            ]
        }
        
        return dashboard_definition
    
    def implement_security_feedback_loop(self, feedback_config: Dict) -> Dict:
        """
        Implement security feedback loop for continuous improvement
        """
        feedback_system = {
            'automated_feedback': {
                'vulnerability_trending': {
                    'enabled': True,
                    'analysis_period_days': 30,
                    'trend_threshold_percentage': 20,
                    'notification_channels': ['slack', 'email', 'jira']
                },
                'false_positive_learning': {
                    'enabled': True,
                    'ml_model_training': True,
                    'feedback_collection_method': 'developer_annotation',
                    'model_update_frequency': 'weekly'
                },
                'security_metrics_tracking': {
                    'enabled': True,
                    'metrics': [
                        'mean_time_to_detection',
                        'mean_time_to_remediation',
                        'vulnerability_density',
                        'security_debt_ratio',
                        'compliance_score'
                    ],
                    'reporting_frequency': 'daily'
                }
            },
            'manual_feedback': {
                'security_champion_reviews': {
                    'enabled': True,
                    'review_frequency': 'weekly',
                    'review_scope': ['high_severity_findings', 'new_vulnerability_types'],
                    'feedback_integration': 'tool_configuration_updates'
                },
                'developer_feedback_collection': {
                    'enabled': True,
                    'feedback_methods': ['survey', 'interview', 'tool_usage_analytics'],
                    'feedback_frequency': 'monthly',
                    'improvement_tracking': True
                }
            },
            'continuous_improvement': {
                'tool_effectiveness_analysis': {
                    'enabled': True,
                    'analysis_metrics': [
                        'true_positive_rate',
                        'false_positive_rate',
                        'coverage_percentage',
                        'performance_impact'
                    ],
                    'improvement_actions': [
                        'tool_configuration_tuning',
                        'custom_rule_development',
                        'tool_replacement_evaluation'
                    ]
                },
                'process_optimization': {
                    'enabled': True,
                    'optimization_areas': [
                        'scan_execution_time',
                        'result_processing_efficiency',
                        'developer_workflow_integration',
                        'security_gate_accuracy'
                    ]
                }
            }
        }
        
        return feedback_system

# Example usage
security_pipeline = SecurityTestingPipeline()

# Configure comprehensive security pipeline
pipeline_config = {
    'application_name': 'secure-web-app',
    'repository_name': 'secure-web-app-repo',
    'branch_name': 'main',
    'pipeline_role_arn': 'arn:aws:iam::123456789012:role/CodePipelineServiceRole',
    'artifact_bucket': 'security-pipeline-artifacts',
    'test_cluster': 'test-cluster',
    'prod_cluster': 'prod-cluster',
    'approval_topic_arn': 'arn:aws:sns:us-east-1:123456789012:security-approval'
}

# Create comprehensive security pipeline
pipeline_result = security_pipeline.create_comprehensive_security_pipeline(pipeline_config)
print("Security Pipeline Created:")
print(json.dumps(pipeline_result, indent=2))

# Create security gate logic
gate_config = {
    'critical_threshold': 0,
    'high_threshold': 3,
    'medium_threshold': 15,
    'dependency_threshold': 8,
    'iac_threshold': 10,
    'container_threshold': 5,
    'dast_threshold': 8
}

gate_logic = security_pipeline.create_security_gate_logic(gate_config)
print("\\nSecurity Gate Logic:")
print(json.dumps(gate_logic, indent=2))

Best Practices for Automated Security Testing

1. Implement Shift-Left Security

Early Integration: Integrate security testing as early as possible in the development process, including pre-commit hooks, IDE plugins, and early CI/CD stages.

Developer-Friendly Tools: Choose tools that provide clear, actionable feedback and integrate well with developer workflows.

Fast Feedback Loops: Ensure security tests run quickly to avoid disrupting development velocity.

2. Use Multiple Testing Approaches

Layered Security Testing: Implement multiple types of security testing (SAST, DAST, IAST, dependency scanning) to achieve comprehensive coverage.

Tool Diversity: Use multiple tools for each testing type to reduce false negatives and increase detection coverage.

Complementary Techniques: Combine automated testing with manual security reviews and penetration testing.

3. Optimize for Accuracy and Performance

Reduce False Positives: Tune tools and create custom rules to minimize false positives that can lead to alert fatigue.

Prioritize Findings: Implement risk-based prioritization to focus on the most critical security issues first.

Performance Optimization: Optimize scan execution time and resource usage to maintain development velocity.

4. Establish Effective Security Gates

Risk-Based Thresholds: Set security gate thresholds based on risk assessment and business requirements.

Graduated Response: Implement different actions based on severity levels (block, require approval, notify).

Exception Handling: Provide mechanisms for handling legitimate exceptions while maintaining security standards.

Common Challenges and Solutions

Challenge 1: Tool Integration Complexity

Problem: Difficulty integrating multiple security tools into existing CI/CD pipelines.

Solutions:

  • Use standardized APIs and output formats
  • Implement orchestration platforms (SOAR)
  • Create wrapper scripts for tool integration
  • Use containerized tools for consistency
  • Implement gradual rollout strategies

Challenge 2: False Positive Management

Problem: High false positive rates leading to alert fatigue and reduced effectiveness.

Solutions:

  • Implement machine learning for false positive reduction
  • Create custom rules and suppressions
  • Use multiple tools for validation
  • Implement developer feedback loops
  • Regular tool tuning and optimization

Challenge 3: Performance Impact

Problem: Security testing slowing down development and deployment processes.

Solutions:

  • Implement parallel scanning
  • Use incremental and differential scanning
  • Optimize tool configurations
  • Cache scan results where appropriate
  • Implement smart scheduling

Challenge 4: Results Management and Tracking

Problem: Difficulty managing and tracking security findings across multiple tools and projects.

Solutions:

  • Implement centralized vulnerability management
  • Use standardized vulnerability formats (SARIF)
  • Create unified dashboards and reporting
  • Implement automated ticket creation and tracking
  • Establish clear remediation workflows

Resources and Further Reading

AWS Documentation and Services

Security Testing Tools

  • OWASP ZAP - Dynamic application security testing
  • SonarQube - Static code analysis
  • Snyk - Dependency vulnerability scanning
  • Checkov - Infrastructure as code security scanning
  • Semgrep - Static analysis for security

Industry Standards and Frameworks

Best Practices and Guides


This documentation provides comprehensive guidance for implementing automated security testing throughout the development and release lifecycle. Regular updates ensure the content remains current with evolving security testing tools and practices.