Skip to content
SEC11

SEC11-BP04: Conduct code reviews

SEC11-BP04: Conduct code reviews

Overview

Implement systematic code review processes to identify security vulnerabilities, ensure adherence to secure coding practices, and maintain code quality. Code reviews should combine automated tools with manual inspection by security-aware developers to catch issues that automated tools might miss.

Implementation Guidance

Code reviews are a critical security control that provides human oversight of code changes before they reach production. While automated security testing tools can identify many common vulnerabilities, manual code reviews can catch complex logic flaws, business logic vulnerabilities, and subtle security issues that require human understanding and context.

Key Principles of Security Code Reviews

Security-Focused Review Process: Integrate security considerations into all code reviews, not just dedicated security reviews. Every code change should be evaluated for potential security implications.

Multi-Layered Review Approach: Combine automated tools, peer reviews, and specialized security reviews to achieve comprehensive coverage of potential security issues.

Risk-Based Review Intensity: Apply more rigorous review processes to high-risk code changes, such as authentication systems, data handling logic, and external integrations.

Continuous Learning: Use code reviews as opportunities to educate developers about secure coding practices and share security knowledge across the team.

Actionable Feedback: Provide specific, actionable feedback that helps developers understand security issues and how to fix them effectively.

Implementation Steps

Step 1: Establish Security Code Review Framework

Create a comprehensive framework for conducting security-focused code reviews:

View code
# Security Code Review Framework
import boto3
import json
import re
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
import ast
import subprocess

class SecurityCodeReviewFramework:
    def __init__(self):
        self.dynamodb = boto3.resource('dynamodb')
        self.s3 = boto3.client('s3')
        self.codecommit = boto3.client('codecommit')
        self.codeguru_reviewer = boto3.client('codeguru-reviewer')
        
        # DynamoDB tables for tracking
        self.reviews_table = self.dynamodb.Table('code-reviews')
        self.findings_table = self.dynamodb.Table('code-review-findings')
        self.metrics_table = self.dynamodb.Table('code-review-metrics')
        
    def create_security_review_framework(self, framework_config: Dict) -> Dict:
        """
        Create comprehensive security code review framework
        """
        framework = {
            'framework_id': f"SCR-FRAMEWORK-{datetime.now().strftime('%Y%m%d')}",
            'organization': framework_config['organization_name'],
            'review_types': {
                'standard_review': {
                    'description': 'Regular peer review with security considerations',
                    'required_reviewers': 1,
                    'security_checklist_required': True,
                    'automated_tools_required': True,
                    'review_time_target': '2 hours'
                },
                'security_focused_review': {
                    'description': 'Dedicated security review for high-risk changes',
                    'required_reviewers': 2,
                    'security_expert_required': True,
                    'threat_modeling_required': True,
                    'review_time_target': '4 hours'
                },
                'critical_security_review': {
                    'description': 'Comprehensive review for security-critical components',
                    'required_reviewers': 3,
                    'security_architect_required': True,
                    'penetration_tester_input': True,
                    'formal_sign_off_required': True,
                    'review_time_target': '8 hours'
                }
            },
            'review_triggers': {
                'automatic_triggers': [
                    'authentication_code_changes',
                    'authorization_logic_modifications',
                    'cryptographic_implementations',
                    'input_validation_changes',
                    'database_query_modifications',
                    'external_api_integrations',
                    'file_upload_functionality',
                    'session_management_changes'
                ],
                'risk_based_triggers': [
                    'changes_to_critical_business_logic',
                    'modifications_to_payment_processing',
                    'updates_to_user_data_handling',
                    'changes_to_admin_functionality',
                    'third_party_library_integrations'
                ],
                'compliance_triggers': [
                    'pci_dss_scope_changes',
                    'hipaa_covered_entity_modifications',
                    'gdpr_data_processing_changes',
                    'sox_financial_reporting_updates'
                ]
            },
            'security_checklist': {
                'input_validation': [
                    'All user inputs are validated and sanitized',
                    'Input validation is performed on server-side',
                    'Whitelist validation is used where possible',
                    'Input length limits are enforced',
                    'Special characters are properly handled'
                ],
                'authentication_authorization': [
                    'Authentication mechanisms are properly implemented',
                    'Authorization checks are performed at appropriate points',
                    'Principle of least privilege is followed',
                    'Session management is secure',
                    'Multi-factor authentication is considered'
                ],
                'data_protection': [
                    'Sensitive data is encrypted at rest and in transit',
                    'Proper key management practices are followed',
                    'Data classification is respected',
                    'PII handling follows privacy requirements',
                    'Data retention policies are implemented'
                ],
                'error_handling': [
                    'Error messages do not leak sensitive information',
                    'Proper logging is implemented for security events',
                    'Exception handling does not expose system details',
                    'Security-relevant errors are monitored',
                    'Graceful degradation is implemented'
                ],
                'secure_communications': [
                    'TLS/SSL is properly configured',
                    'Certificate validation is implemented',
                    'Secure protocols are used for all communications',
                    'API security best practices are followed',
                    'Cross-origin requests are properly handled'
                ]
            },
            'automated_tools_integration': {
                'static_analysis_tools': [
                    'sonarqube',
                    'checkmarx',
                    'veracode',
                    'semgrep',
                    'bandit'
                ],
                'dependency_scanners': [
                    'snyk',
                    'npm_audit',
                    'safety',
                    'owasp_dependency_check'
                ],
                'code_quality_tools': [
                    'eslint_security',
                    'pmd',
                    'spotbugs',
                    'pylint'
                ]
            },
            'reviewer_qualifications': {
                'standard_reviewer': {
                    'minimum_experience': '2 years',
                    'security_training_required': True,
                    'secure_coding_certification': False,
                    'domain_expertise_required': True
                },
                'security_reviewer': {
                    'minimum_experience': '5 years',
                    'security_training_required': True,
                    'secure_coding_certification': True,
                    'security_certifications': ['CISSP', 'CSSLP', 'CEH'],
                    'penetration_testing_experience': True
                },
                'security_architect': {
                    'minimum_experience': '8 years',
                    'security_architecture_experience': True,
                    'threat_modeling_expertise': True,
                    'compliance_knowledge': True,
                    'leadership_experience': True
                }
            },
            'metrics_and_kpis': {
                'review_effectiveness': [
                    'security_issues_found_per_review',
                    'false_positive_rate',
                    'time_to_complete_review',
                    'reviewer_agreement_rate'
                ],
                'process_efficiency': [
                    'review_cycle_time',
                    'review_backlog_size',
                    'reviewer_utilization',
                    'automated_tool_coverage'
                ],
                'security_outcomes': [
                    'production_security_incidents',
                    'vulnerability_escape_rate',
                    'security_debt_accumulation',
                    'compliance_violation_rate'
                ]
            }
        }
        
        return framework
    
    def initiate_code_review(self, review_request: Dict) -> Dict:
        """
        Initiate a security-focused code review
        """
        review_id = f"SCR-{datetime.now().strftime('%Y%m%d-%H%M%S')}-{review_request.get('pull_request_id', 'manual')}"
        
        # Analyze code changes to determine review type
        review_type = self.determine_review_type(review_request)
        
        # Get required reviewers based on review type
        required_reviewers = self.get_required_reviewers(review_type, review_request)
        
        # Run automated security analysis
        automated_analysis = self.run_automated_security_analysis(review_request)
        
        code_review = {
            'review_id': review_id,
            'pull_request_id': review_request.get('pull_request_id'),
            'repository': review_request.get('repository'),
            'branch': review_request.get('branch'),
            'author': review_request.get('author'),
            'created_date': datetime.now().isoformat(),
            'review_type': review_type,
            'status': 'pending_review',
            'required_reviewers': required_reviewers,
            'assigned_reviewers': [],
            'completed_reviewers': [],
            'automated_analysis': automated_analysis,
            'manual_findings': [],
            'security_checklist_status': {},
            'overall_security_rating': 'pending',
            'approval_status': 'pending',
            'code_changes': {
                'files_modified': review_request.get('files_modified', []),
                'lines_added': review_request.get('lines_added', 0),
                'lines_deleted': review_request.get('lines_deleted', 0),
                'complexity_score': self.calculate_complexity_score(review_request)
            },
            'risk_assessment': {
                'risk_level': self.assess_change_risk_level(review_request),
                'security_impact_areas': self.identify_security_impact_areas(review_request),
                'compliance_implications': self.assess_compliance_implications(review_request)
            }
        }
        
        # Store review record
        self.reviews_table.put_item(Item=code_review)
        
        # Assign reviewers
        self.assign_reviewers(review_id, required_reviewers)
        
        # Send notifications
        self.send_review_notifications(code_review)
        
        return code_review
    
    def determine_review_type(self, review_request: Dict) -> str:
        """
        Determine the appropriate review type based on code changes
        """
        files_modified = review_request.get('files_modified', [])
        change_description = review_request.get('description', '').lower()
        
        # Check for critical security triggers
        critical_patterns = [
            r'auth.*',
            r'.*password.*',
            r'.*crypto.*',
            r'.*security.*',
            r'.*admin.*',
            r'.*payment.*'
        ]
        
        security_patterns = [
            r'.*validation.*',
            r'.*session.*',
            r'.*permission.*',
            r'.*access.*',
            r'.*api.*'
        ]
        
        # Analyze file paths and content
        has_critical_changes = any(
            any(re.match(pattern, file_path, re.IGNORECASE) for pattern in critical_patterns)
            for file_path in files_modified
        )
        
        has_security_changes = any(
            any(re.match(pattern, file_path, re.IGNORECASE) for pattern in security_patterns)
            for file_path in files_modified
        )
        
        # Check change description
        has_critical_description = any(
            pattern.replace('.*', '').replace(r'\.', '.') in change_description
            for pattern in critical_patterns
        )
        
        # Determine review type
        if has_critical_changes or has_critical_description:
            return 'critical_security_review'
        elif has_security_changes or len(files_modified) > 20:
            return 'security_focused_review'
        else:
            return 'standard_review'
    
    def run_automated_security_analysis(self, review_request: Dict) -> Dict:
        """
        Run automated security analysis on code changes
        """
        analysis_results = {
            'analysis_id': f"AUTO-{datetime.now().strftime('%Y%m%d%H%M%S')}",
            'timestamp': datetime.now().isoformat(),
            'tools_used': [],
            'findings': [],
            'metrics': {
                'total_issues': 0,
                'critical_issues': 0,
                'high_issues': 0,
                'medium_issues': 0,
                'low_issues': 0
            },
            'code_quality_score': 0,
            'security_score': 0
        }
        
        # Static analysis with multiple tools
        static_analysis_results = self.run_static_analysis(review_request)
        analysis_results['findings'].extend(static_analysis_results['findings'])
        analysis_results['tools_used'].extend(static_analysis_results['tools_used'])
        
        # Dependency vulnerability scanning
        dependency_scan_results = self.run_dependency_scan(review_request)
        analysis_results['findings'].extend(dependency_scan_results['findings'])
        analysis_results['tools_used'].extend(dependency_scan_results['tools_used'])
        
        # Custom security pattern matching
        pattern_analysis_results = self.run_security_pattern_analysis(review_request)
        analysis_results['findings'].extend(pattern_analysis_results['findings'])
        
        # Calculate metrics
        analysis_results['metrics'] = self.calculate_analysis_metrics(analysis_results['findings'])
        analysis_results['security_score'] = self.calculate_security_score(analysis_results)
        
        return analysis_results
    
    def run_static_analysis(self, review_request: Dict) -> Dict:
        """
        Run static analysis tools on code changes
        """
        static_analysis_results = {
            'tools_used': [],
            'findings': []
        }
        
        repository = review_request.get('repository')
        branch = review_request.get('branch')
        
        # Language-specific static analysis
        languages = self.detect_languages(review_request.get('files_modified', []))
        
        for language in languages:
            if language == 'python':
                bandit_results = self.run_bandit_analysis(repository, branch)
                static_analysis_results['findings'].extend(bandit_results)
                static_analysis_results['tools_used'].append('bandit')
                
                pylint_results = self.run_pylint_security_analysis(repository, branch)
                static_analysis_results['findings'].extend(pylint_results)
                static_analysis_results['tools_used'].append('pylint')
                
            elif language == 'javascript':
                eslint_results = self.run_eslint_security_analysis(repository, branch)
                static_analysis_results['findings'].extend(eslint_results)
                static_analysis_results['tools_used'].append('eslint-security')
                
            elif language == 'java':
                spotbugs_results = self.run_spotbugs_analysis(repository, branch)
                static_analysis_results['findings'].extend(spotbugs_results)
                static_analysis_results['tools_used'].append('spotbugs')
        
        # Universal tools
        semgrep_results = self.run_semgrep_analysis(repository, branch)
        static_analysis_results['findings'].extend(semgrep_results)
        static_analysis_results['tools_used'].append('semgrep')
        
        return static_analysis_results
    
    def run_security_pattern_analysis(self, review_request: Dict) -> Dict:
        """
        Run custom security pattern analysis
        """
        pattern_results = {
            'findings': []
        }
        
        # Define security anti-patterns
        security_patterns = {
            'hardcoded_secrets': {
                'patterns': [
                    r'password\s*=\s*["\'][^"\']+["\']',
                    r'api_key\s*=\s*["\'][^"\']+["\']',
                    r'secret\s*=\s*["\'][^"\']+["\']',
                    r'token\s*=\s*["\'][^"\']+["\']'
                ],
                'severity': 'critical',
                'description': 'Hardcoded secrets detected'
            },
            'sql_injection_risk': {
                'patterns': [
                    r'execute\s*\(\s*["\'].*%.*["\']',
                    r'query\s*\(\s*["\'].*\+.*["\']',
                    r'SELECT.*\+.*FROM',
                    r'INSERT.*\+.*VALUES'
                ],
                'severity': 'high',
                'description': 'Potential SQL injection vulnerability'
            },
            'xss_risk': {
                'patterns': [
                    r'innerHTML\s*=\s*.*\+',
                    r'document\.write\s*\(',
                    r'eval\s*\(',
                    r'dangerouslySetInnerHTML'
                ],
                'severity': 'high',
                'description': 'Potential XSS vulnerability'
            },
            'insecure_random': {
                'patterns': [
                    r'Math\.random\(\)',
                    r'Random\(\)',
                    r'rand\(\)'
                ],
                'severity': 'medium',
                'description': 'Use of insecure random number generator'
            },
            'debug_code': {
                'patterns': [
                    r'console\.log\(',
                    r'print\(',
                    r'System\.out\.println',
                    r'debugger;'
                ],
                'severity': 'low',
                'description': 'Debug code left in production'
            }
        }
        
        # Analyze code changes for patterns
        files_content = self.get_files_content(review_request)
        
        for file_path, content in files_content.items():
            for pattern_name, pattern_config in security_patterns.items():
                for pattern in pattern_config['patterns']:
                    matches = re.finditer(pattern, content, re.IGNORECASE | re.MULTILINE)
                    
                    for match in matches:
                        line_number = content[:match.start()].count('\\n') + 1
                        
                        finding = {
                            'type': 'security_pattern',
                            'pattern_name': pattern_name,
                            'severity': pattern_config['severity'],
                            'description': pattern_config['description'],
                            'file_path': file_path,
                            'line_number': line_number,
                            'matched_text': match.group(),
                            'recommendation': self.get_pattern_recommendation(pattern_name)
                        }
                        
                        pattern_results['findings'].append(finding)
        
        return pattern_results
    
    def conduct_manual_security_review(self, review_id: str, reviewer: str, review_data: Dict) -> Dict:
        """
        Conduct manual security review
        """
        manual_review = {
            'review_id': review_id,
            'reviewer': reviewer,
            'review_date': datetime.now().isoformat(),
            'review_type': 'manual_security_review',
            'security_checklist_results': {},
            'manual_findings': [],
            'code_quality_assessment': {},
            'security_recommendations': [],
            'overall_assessment': {
                'security_rating': 'pending',
                'approval_recommendation': 'pending',
                'confidence_level': 'medium'
            }
        }
        
        # Process security checklist
        checklist_items = review_data.get('security_checklist', {})
        for category, items in checklist_items.items():
            manual_review['security_checklist_results'][category] = {
                'items_checked': len([item for item in items if item.get('status') == 'pass']),
                'total_items': len(items),
                'failed_items': [item for item in items if item.get('status') == 'fail'],
                'category_score': self.calculate_checklist_category_score(items)
            }
        
        # Process manual findings
        for finding in review_data.get('manual_findings', []):
            processed_finding = {
                'finding_id': f"MAN-{datetime.now().strftime('%Y%m%d%H%M%S')}-{len(manual_review['manual_findings'])}",
                'category': finding.get('category'),
                'severity': finding.get('severity'),
                'title': finding.get('title'),
                'description': finding.get('description'),
                'file_path': finding.get('file_path'),
                'line_number': finding.get('line_number'),
                'code_snippet': finding.get('code_snippet'),
                'security_impact': finding.get('security_impact'),
                'remediation_suggestion': finding.get('remediation_suggestion'),
                'confidence': finding.get('confidence', 'medium'),
                'false_positive_likelihood': finding.get('false_positive_likelihood', 'low')
            }
            manual_review['manual_findings'].append(processed_finding)
        
        # Assess code quality from security perspective
        manual_review['code_quality_assessment'] = {
            'input_validation_quality': review_data.get('input_validation_score', 0),
            'error_handling_quality': review_data.get('error_handling_score', 0),
            'authentication_implementation': review_data.get('auth_implementation_score', 0),
            'data_protection_measures': review_data.get('data_protection_score', 0),
            'logging_and_monitoring': review_data.get('logging_score', 0)
        }
        
        # Generate security recommendations
        manual_review['security_recommendations'] = self.generate_security_recommendations(
            manual_review['manual_findings'],
            manual_review['security_checklist_results']
        )
        
        # Calculate overall assessment
        manual_review['overall_assessment'] = self.calculate_overall_security_assessment(manual_review)
        
        # Store manual review
        self.findings_table.put_item(Item=manual_review)
        
        # Update main review record
        self.update_review_with_manual_findings(review_id, manual_review)
        
        return manual_review
    
    def generate_security_recommendations(self, findings: List[Dict], checklist_results: Dict) -> List[Dict]:
        """
        Generate actionable security recommendations
        """
        recommendations = []
        
        # Analyze findings patterns
        finding_categories = {}
        for finding in findings:
            category = finding.get('category', 'other')
            if category not in finding_categories:
                finding_categories[category] = []
            finding_categories[category].append(finding)
        
        # Generate category-specific recommendations
        for category, category_findings in finding_categories.items():
            if category == 'input_validation':
                recommendations.append({
                    'category': 'input_validation',
                    'priority': 'high',
                    'title': 'Implement Comprehensive Input Validation',
                    'description': 'Add server-side input validation and sanitization for all user inputs',
                    'implementation_steps': [
                        'Use whitelist validation where possible',
                        'Implement input length limits',
                        'Sanitize special characters',
                        'Validate data types and formats',
                        'Use parameterized queries for database operations'
                    ],
                    'affected_findings': [f['finding_id'] for f in category_findings],
                    'estimated_effort': 'medium',
                    'security_impact': 'high'
                })
            
            elif category == 'authentication':
                recommendations.append({
                    'category': 'authentication',
                    'priority': 'critical',
                    'title': 'Strengthen Authentication Mechanisms',
                    'description': 'Improve authentication security and implement best practices',
                    'implementation_steps': [
                        'Implement multi-factor authentication',
                        'Use secure password hashing (bcrypt, Argon2)',
                        'Implement account lockout mechanisms',
                        'Add session timeout controls',
                        'Implement secure password reset flows'
                    ],
                    'affected_findings': [f['finding_id'] for f in category_findings],
                    'estimated_effort': 'high',
                    'security_impact': 'critical'
                })
            
            elif category == 'data_protection':
                recommendations.append({
                    'category': 'data_protection',
                    'priority': 'high',
                    'title': 'Enhance Data Protection Measures',
                    'description': 'Implement proper data encryption and protection controls',
                    'implementation_steps': [
                        'Encrypt sensitive data at rest',
                        'Use TLS for data in transit',
                        'Implement proper key management',
                        'Add data classification labels',
                        'Implement data retention policies'
                    ],
                    'affected_findings': [f['finding_id'] for f in category_findings],
                    'estimated_effort': 'medium',
                    'security_impact': 'high'
                })
        
        # Add checklist-based recommendations
        for category, results in checklist_results.items():
            if results['category_score'] < 80:  # Below acceptable threshold
                recommendations.append({
                    'category': f'checklist_{category}',
                    'priority': 'medium',
                    'title': f'Address {category.replace("_", " ").title()} Checklist Items',
                    'description': f'Complete remaining {category} security checklist items',
                    'implementation_steps': [
                        f'Review failed {category} checklist items',
                        'Implement missing security controls',
                        'Update code to meet security standards',
                        'Add appropriate documentation'
                    ],
                    'failed_items': results['failed_items'],
                    'estimated_effort': 'low',
                    'security_impact': 'medium'
                })
        
        # Sort recommendations by priority
        priority_order = {'critical': 0, 'high': 1, 'medium': 2, 'low': 3}
        recommendations.sort(key=lambda x: priority_order.get(x['priority'], 3))
        
        return recommendations
    
    def calculate_overall_security_assessment(self, manual_review: Dict) -> Dict:
        """
        Calculate overall security assessment
        """
        # Count findings by severity
        findings = manual_review['manual_findings']
        severity_counts = {
            'critical': len([f for f in findings if f.get('severity') == 'critical']),
            'high': len([f for f in findings if f.get('severity') == 'high']),
            'medium': len([f for f in findings if f.get('severity') == 'medium']),
            'low': len([f for f in findings if f.get('severity') == 'low'])
        }
        
        # Calculate checklist score
        checklist_results = manual_review['security_checklist_results']
        total_checklist_score = 0
        total_categories = len(checklist_results)
        
        for category_results in checklist_results.values():
            total_checklist_score += category_results['category_score']
        
        average_checklist_score = total_checklist_score / total_categories if total_categories > 0 else 0
        
        # Calculate security rating
        security_score = 100
        security_score -= severity_counts['critical'] * 25
        security_score -= severity_counts['high'] * 15
        security_score -= severity_counts['medium'] * 8
        security_score -= severity_counts['low'] * 3
        
        # Factor in checklist score
        security_score = (security_score + average_checklist_score) / 2
        
        # Determine security rating
        if security_score >= 90:
            security_rating = 'excellent'
        elif security_score >= 80:
            security_rating = 'good'
        elif security_score >= 70:
            security_rating = 'acceptable'
        elif security_score >= 60:
            security_rating = 'needs_improvement'
        else:
            security_rating = 'poor'
        
        # Determine approval recommendation
        if severity_counts['critical'] > 0:
            approval_recommendation = 'reject'
        elif severity_counts['high'] > 2:
            approval_recommendation = 'conditional'
        elif security_score >= 70:
            approval_recommendation = 'approve'
        else:
            approval_recommendation = 'conditional'
        
        # Determine confidence level
        total_findings = sum(severity_counts.values())
        if total_findings == 0 and average_checklist_score > 90:
            confidence_level = 'high'
        elif total_findings <= 3 and average_checklist_score > 80:
            confidence_level = 'medium'
        else:
            confidence_level = 'low'
        
        return {
            'security_rating': security_rating,
            'security_score': security_score,
            'approval_recommendation': approval_recommendation,
            'confidence_level': confidence_level,
            'severity_breakdown': severity_counts,
            'checklist_score': average_checklist_score,
            'key_concerns': self.identify_key_security_concerns(findings),
            'strengths': self.identify_security_strengths(checklist_results)
        }

# Example usage
security_review_framework = SecurityCodeReviewFramework()

# Create security review framework
framework_config = {
    'organization_name': 'SecureCompany Inc.',
    'compliance_requirements': ['PCI-DSS', 'SOC2'],
    'security_standards': ['OWASP', 'NIST']
}

framework = security_review_framework.create_security_review_framework(framework_config)
print("Security Code Review Framework:")
print(json.dumps(framework['review_types'], indent=2))

# Initiate a code review
review_request = {
    'pull_request_id': 'PR-12345',
    'repository': 'secure-web-app',
    'branch': 'feature/authentication-update',
    'author': 'developer@company.com',
    'files_modified': [
        'src/auth/login.py',
        'src/auth/session.py',
        'src/models/user.py'
    ],
    'lines_added': 150,
    'lines_deleted': 45,
    'description': 'Update authentication system with MFA support'
}

code_review = security_review_framework.initiate_code_review(review_request)
print(f"\\nCode Review Initiated: {code_review['review_id']}")
print(f"Review Type: {code_review['review_type']}")

Step 2: Integrate Automated Code Review Tools

Implement comprehensive automated code review tools to augment manual reviews:

View code
# Automated Code Review Tools Integration
import boto3
import json
import subprocess
import os
from datetime import datetime
from typing import Dict, List, Optional

class AutomatedCodeReviewIntegration:
    def __init__(self):
        self.codebuild = boto3.client('codebuild')
        self.codecommit = boto3.client('codecommit')
        self.codeguru_reviewer = boto3.client('codeguru-reviewer')
        self.s3 = boto3.client('s3')
        self.lambda_client = boto3.client('lambda')
        
    def setup_automated_review_pipeline(self, pipeline_config: Dict) -> Dict:
        """
        Set up automated code review pipeline
        """
        pipeline_setup = {
            'pipeline_id': f"AUTO-REVIEW-{datetime.now().strftime('%Y%m%d-%H%M%S')}",
            'repository': pipeline_config['repository'],
            'tools_configured': [],
            'webhooks_created': [],
            'build_projects': [],
            'lambda_functions': []
        }
        
        # Configure CodeGuru Reviewer
        codeguru_config = self.setup_codeguru_reviewer(pipeline_config)
        pipeline_setup['tools_configured'].append('codeguru_reviewer')
        
        # Set up static analysis build project
        static_analysis_project = self.create_static_analysis_project(pipeline_config)
        pipeline_setup['build_projects'].append(static_analysis_project)
        
        # Create webhook for automated reviews
        webhook_config = self.create_review_webhook(pipeline_config)
        pipeline_setup['webhooks_created'].append(webhook_config)
        
        # Set up review orchestration Lambda
        orchestration_lambda = self.create_review_orchestration_lambda(pipeline_config)
        pipeline_setup['lambda_functions'].append(orchestration_lambda)
        
        return pipeline_setup
    
    def setup_codeguru_reviewer(self, config: Dict) -> Dict:
        """
        Set up Amazon CodeGuru Reviewer
        """
        repository_arn = f"arn:aws:codecommit:{boto3.Session().region_name}:{boto3.client('sts').get_caller_identity()['Account']}:{config['repository']}"
        
        try:
            # Associate repository with CodeGuru Reviewer
            response = self.codeguru_reviewer.associate_repository(
                Repository={
                    'CodeCommit': {
                        'Name': config['repository']
                    }
                },
                Type='PullRequest',
                ClientRequestToken=f"setup-{datetime.now().strftime('%Y%m%d%H%M%S')}"
            )
            
            return {
                'status': 'success',
                'association_arn': response['RepositoryAssociation']['AssociationArn'],
                'repository_arn': repository_arn
            }
            
        except Exception as e:
            return {
                'status': 'error',
                'error': str(e)
            }
    
    def create_static_analysis_project(self, config: Dict) -> Dict:
        """
        Create CodeBuild project for static analysis
        """
        buildspec = {
            'version': '0.2',
            'phases': {
                'install': {
                    'runtime-versions': {
                        'python': '3.9',
                        'nodejs': '16',
                        'java': 'corretto11'
                    },
                    'commands': [
                        'echo "Installing static analysis tools..."',
                        'pip install bandit safety semgrep',
                        'npm install -g eslint eslint-plugin-security',
                        'curl -L "https://github.com/returntocorp/semgrep/releases/latest/download/semgrep-linux" -o /usr/local/bin/semgrep',
                        'chmod +x /usr/local/bin/semgrep'
                    ]
                },
                'pre_build': {
                    'commands': [
                        'echo "Preparing static analysis..."',
                        'mkdir -p analysis-results',
                        'echo "Analyzing changed files..."',
                        'git diff --name-only HEAD~1 HEAD > changed-files.txt'
                    ]
                },
                'build': {
                    'commands': [
                        'echo "Running static analysis tools..."',
                        self.generate_static_analysis_commands(config)
                    ]
                },
                'post_build': {
                    'commands': [
                        'echo "Processing analysis results..."',
                        'python scripts/process_analysis_results.py',
                        'python scripts/create_review_comments.py',
                        'aws s3 cp analysis-results/ s3://$ANALYSIS_RESULTS_BUCKET/$(date +%Y%m%d-%H%M%S)/ --recursive'
                    ]
                }
            },
            'artifacts': {
                'files': [
                    'analysis-results/**/*',
                    'review-comments.json'
                ]
            }
        }
        
        project_config = {
            'name': f"static-analysis-{config['repository']}",
            'source': {
                'type': 'CODECOMMIT',
                'location': f"https://git-codecommit.{boto3.Session().region_name}.amazonaws.com/v1/repos/{config['repository']}",
                'buildspec': json.dumps(buildspec, indent=2)
            },
            'artifacts': {
                'type': 'S3',
                'location': f"{config.get('artifacts_bucket', 'code-analysis-artifacts')}/static-analysis"
            },
            'environment': {
                'type': 'LINUX_CONTAINER',
                'image': 'aws/codebuild/amazonlinux2-x86_64-standard:3.0',
                'computeType': 'BUILD_GENERAL1_MEDIUM',
                'environmentVariables': [
                    {
                        'name': 'ANALYSIS_RESULTS_BUCKET',
                        'value': config.get('results_bucket', 'code-analysis-results')
                    },
                    {
                        'name': 'REPOSITORY_NAME',
                        'value': config['repository']
                    }
                ]
            },
            'serviceRole': config.get('service_role_arn')
        }
        
        response = self.codebuild.create_project(**project_config)
        return {
            'project_name': project_config['name'],
            'project_arn': response['project']['arn']
        }
    
    def generate_static_analysis_commands(self, config: Dict) -> str:
        """
        Generate static analysis commands based on project configuration
        """
        commands = []
        
        # Language detection and tool selection
        languages = config.get('languages', ['python', 'javascript', 'java'])
        
        # Python analysis
        if 'python' in languages:
            commands.extend([
                '# Python static analysis',
                'if find . -name "*.py" -not -path "./venv/*" -not -path "./.venv/*" | head -1; then',
                '  echo "Running Bandit for Python security analysis..."',
                '  bandit -r . -f json -o analysis-results/bandit-results.json -x ./venv,./test,./tests || true',
                '  echo "Running Safety for dependency vulnerability scanning..."',
                '  safety check --json --output analysis-results/safety-results.json || true',
                'fi'
            ])
        
        # JavaScript/Node.js analysis
        if 'javascript' in languages:
            commands.extend([
                '# JavaScript static analysis',
                'if find . -name "*.js" -o -name "*.jsx" -o -name "*.ts" -o -name "*.tsx" | head -1; then',
                '  echo "Running ESLint security analysis..."',
                '  npx eslint . --ext .js,.jsx,.ts,.tsx --format json --output-file analysis-results/eslint-results.json || true',
                '  echo "Running npm audit..."',
                '  npm audit --json > analysis-results/npm-audit-results.json || true',
                'fi'
            ])
        
        # Java analysis
        if 'java' in languages:
            commands.extend([
                '# Java static analysis',
                'if find . -name "*.java" | head -1; then',
                '  echo "Running SpotBugs for Java security analysis..."',
                '  # SpotBugs analysis would go here',
                '  echo "Java analysis placeholder" > analysis-results/java-analysis.json',
                'fi'
            ])
        
        # Universal tools
        commands.extend([
            '# Universal static analysis',
            'echo "Running Semgrep for multi-language security analysis..."',
            'semgrep --config=auto --json --output=analysis-results/semgrep-results.json . || true',
            '',
            '# Custom security pattern analysis',
            'echo "Running custom security pattern analysis..."',
            'python scripts/custom_security_patterns.py > analysis-results/custom-patterns.json || true'
        ])
        
        return ' && '.join(commands)
    
    def create_review_webhook(self, config: Dict) -> Dict:
        """
        Create webhook for automated code reviews
        """
        # Create API Gateway for webhook
        apigateway = boto3.client('apigateway')
        
        # Create REST API
        api_response = apigateway.create_rest_api(
            name=f"code-review-webhook-{config['repository']}",
            description=f"Webhook for automated code reviews in {config['repository']}",
            endpointConfiguration={
                'types': ['REGIONAL']
            }
        )
        
        api_id = api_response['id']
        
        # Get root resource
        resources_response = apigateway.get_resources(restApiId=api_id)
        root_resource_id = resources_response['items'][0]['id']
        
        # Create webhook resource
        webhook_resource = apigateway.create_resource(
            restApiId=api_id,
            parentId=root_resource_id,
            pathPart='webhook'
        )
        
        # Create POST method
        apigateway.put_method(
            restApiId=api_id,
            resourceId=webhook_resource['id'],
            httpMethod='POST',
            authorizationType='NONE'
        )
        
        # Create Lambda integration
        lambda_arn = f"arn:aws:lambda:{boto3.Session().region_name}:{boto3.client('sts').get_caller_identity()['Account']}:function:code-review-webhook-handler"
        
        apigateway.put_integration(
            restApiId=api_id,
            resourceId=webhook_resource['id'],
            httpMethod='POST',
            type='AWS_PROXY',
            integrationHttpMethod='POST',
            uri=f"arn:aws:apigateway:{boto3.Session().region_name}:lambda:path/2015-03-31/functions/{lambda_arn}/invocations"
        )
        
        # Deploy API
        deployment = apigateway.create_deployment(
            restApiId=api_id,
            stageName='prod'
        )
        
        webhook_url = f"https://{api_id}.execute-api.{boto3.Session().region_name}.amazonaws.com/prod/webhook"
        
        return {
            'api_id': api_id,
            'webhook_url': webhook_url,
            'deployment_id': deployment['id']
        }
    
    def create_review_orchestration_lambda(self, config: Dict) -> Dict:
        """
        Create Lambda function for review orchestration
        """
        lambda_code = '''
import json
import boto3
import os
from datetime import datetime

def lambda_handler(event, context):
    """
    Handle code review webhook events
    """
    print(f"Received event: {json.dumps(event)}")
    
    # Parse webhook payload
    if 'body' in event:
        body = json.loads(event['body']) if isinstance(event['body'], str) else event['body']
    else:
        body = event
    
    # Extract pull request information
    pr_info = extract_pr_info(body)
    
    if not pr_info:
        return {
            'statusCode': 400,
            'body': json.dumps({'error': 'Invalid webhook payload'})
        }
    
    # Trigger automated review process
    review_result = trigger_automated_review(pr_info)
    
    return {
        'statusCode': 200,
        'body': json.dumps({
            'message': 'Automated review triggered',
            'review_id': review_result.get('review_id'),
            'status': review_result.get('status')
        })
    }

def extract_pr_info(webhook_body):
    """
    Extract pull request information from webhook
    """
    try:
        # Handle CodeCommit pull request events
        if 'pullRequestId' in webhook_body:
            return {
                'pull_request_id': webhook_body['pullRequestId'],
                'repository': webhook_body.get('repositoryName'),
                'source_branch': webhook_body.get('sourceReference'),
                'destination_branch': webhook_body.get('destinationReference'),
                'author': webhook_body.get('author', {}).get('arn', ''),
                'event_type': webhook_body.get('eventType', 'pullRequestCreated')
            }
        
        return None
        
    except Exception as e:
        print(f"Error extracting PR info: {str(e)}")
        return None

def trigger_automated_review(pr_info):
    """
    Trigger automated code review process
    """
    codebuild = boto3.client('codebuild')
    
    try:
        # Start static analysis build
        build_response = codebuild.start_build(
            projectName=f"static-analysis-{pr_info['repository']}",
            environmentVariablesOverride=[
                {
                    'name': 'PULL_REQUEST_ID',
                    'value': pr_info['pull_request_id']
                },
                {
                    'name': 'SOURCE_BRANCH',
                    'value': pr_info['source_branch']
                },
                {
                    'name': 'DESTINATION_BRANCH',
                    'value': pr_info['destination_branch']
                }
            ]
        )
        
        return {
            'review_id': f"AUTO-{datetime.now().strftime('%Y%m%d%H%M%S')}",
            'status': 'started',
            'build_id': build_response['build']['id']
        }
        
    except Exception as e:
        print(f"Error triggering automated review: {str(e)}")
        return {
            'review_id': None,
            'status': 'error',
            'error': str(e)
        }
        '''
        
        # Create Lambda function
        lambda_response = self.lambda_client.create_function(
            FunctionName=f"code-review-orchestrator-{config['repository']}",
            Runtime='python3.9',
            Role=config.get('lambda_role_arn'),
            Handler='index.lambda_handler',
            Code={
                'ZipFile': lambda_code.encode('utf-8')
            },
            Description=f'Code review orchestration for {config["repository"]}',
            Timeout=300,
            Environment={
                'Variables': {
                    'REPOSITORY_NAME': config['repository'],
                    'ANALYSIS_RESULTS_BUCKET': config.get('results_bucket', 'code-analysis-results')
                }
            }
        )
        
        return {
            'function_name': lambda_response['FunctionName'],
            'function_arn': lambda_response['FunctionArn']
        }
    
    def create_review_comment_generator(self) -> str:
        """
        Create script for generating review comments from analysis results
        """
        script_content = '''#!/usr/bin/env python3
"""
Generate review comments from static analysis results
"""
import json
import os
from typing import Dict, List

def main():
    """
    Main function to process analysis results and generate review comments
    """
    analysis_results_dir = 'analysis-results'
    review_comments = []
    
    # Process different tool results
    if os.path.exists(f'{analysis_results_dir}/bandit-results.json'):
        bandit_comments = process_bandit_results(f'{analysis_results_dir}/bandit-results.json')
        review_comments.extend(bandit_comments)
    
    if os.path.exists(f'{analysis_results_dir}/eslint-results.json'):
        eslint_comments = process_eslint_results(f'{analysis_results_dir}/eslint-results.json')
        review_comments.extend(eslint_comments)
    
    if os.path.exists(f'{analysis_results_dir}/semgrep-results.json'):
        semgrep_comments = process_semgrep_results(f'{analysis_results_dir}/semgrep-results.json')
        review_comments.extend(semgrep_comments)
    
    # Filter and prioritize comments
    filtered_comments = filter_and_prioritize_comments(review_comments)
    
    # Generate final review comments
    final_comments = generate_review_comments(filtered_comments)
    
    # Save comments to file
    with open('review-comments.json', 'w') as f:
        json.dump(final_comments, f, indent=2)
    
    print(f"Generated {len(final_comments)} review comments")

def process_bandit_results(results_file: str) -> List[Dict]:
    """
    Process Bandit security analysis results
    """
    comments = []
    
    try:
        with open(results_file, 'r') as f:
            bandit_data = json.load(f)
        
        for result in bandit_data.get('results', []):
            comment = {
                'tool': 'bandit',
                'file_path': result.get('filename', ''),
                'line_number': result.get('line_number', 0),
                'severity': result.get('issue_severity', 'MEDIUM').lower(),
                'confidence': result.get('issue_confidence', 'MEDIUM').lower(),
                'title': result.get('test_name', ''),
                'message': result.get('issue_text', ''),
                'recommendation': get_bandit_recommendation(result.get('test_id', '')),
                'more_info': result.get('more_info', '')
            }
            comments.append(comment)
    
    except Exception as e:
        print(f"Error processing Bandit results: {e}")
    
    return comments

def process_eslint_results(results_file: str) -> List[Dict]:
    """
    Process ESLint security analysis results
    """
    comments = []
    
    try:
        with open(results_file, 'r') as f:
            eslint_data = json.load(f)
        
        for file_result in eslint_data:
            file_path = file_result.get('filePath', '')
            
            for message in file_result.get('messages', []):
                # Only include security-related rules
                if 'security' in message.get('ruleId', '').lower():
                    comment = {
                        'tool': 'eslint',
                        'file_path': file_path,
                        'line_number': message.get('line', 0),
                        'column': message.get('column', 0),
                        'severity': 'high' if message.get('severity') == 2 else 'medium',
                        'rule_id': message.get('ruleId', ''),
                        'message': message.get('message', ''),
                        'recommendation': get_eslint_recommendation(message.get('ruleId', ''))
                    }
                    comments.append(comment)
    
    except Exception as e:
        print(f"Error processing ESLint results: {e}")
    
    return comments

def filter_and_prioritize_comments(comments: List[Dict]) -> List[Dict]:
    """
    Filter and prioritize review comments
    """
    # Remove duplicates based on file path and line number
    seen_locations = set()
    filtered_comments = []
    
    for comment in comments:
        location_key = f"{comment.get('file_path', '')}:{comment.get('line_number', 0)}"
        
        if location_key not in seen_locations:
            seen_locations.add(location_key)
            filtered_comments.append(comment)
    
    # Sort by severity and confidence
    severity_order = {'critical': 0, 'high': 1, 'medium': 2, 'low': 3}
    confidence_order = {'high': 0, 'medium': 1, 'low': 2}
    
    filtered_comments.sort(key=lambda x: (
        severity_order.get(x.get('severity', 'low'), 3),
        confidence_order.get(x.get('confidence', 'low'), 2)
    ))
    
    # Limit to top 20 comments to avoid overwhelming reviewers
    return filtered_comments[:20]

def generate_review_comments(comments: List[Dict]) -> List[Dict]:
    """
    Generate formatted review comments
    """
    formatted_comments = []
    
    for comment in comments:
        formatted_comment = {
            'file_path': comment.get('file_path', ''),
            'line_number': comment.get('line_number', 0),
            'comment_text': format_comment_text(comment),
            'severity': comment.get('severity', 'medium'),
            'tool': comment.get('tool', 'unknown')
        }
        formatted_comments.append(formatted_comment)
    
    return formatted_comments

def format_comment_text(comment: Dict) -> str:
    """
    Format comment text for review
    """
    tool = comment.get('tool', 'Security Tool').title()
    severity = comment.get('severity', 'medium').upper()
    message = comment.get('message', 'Security issue detected')
    recommendation = comment.get('recommendation', 'Please review and address this security concern.')
    
    comment_text = f"🔒 **{tool} Security Finding** ({severity})\n\n"
    comment_text += f"{message}\n\n"
    comment_text += f"**Recommendation:** {recommendation}\n\n"
    
    if comment.get('more_info'):
        comment_text += f"**More Information:** {comment.get('more_info')}\n\n"
    
    comment_text += "_This comment was generated by automated security analysis._"
    
    return comment_text

def get_bandit_recommendation(test_id: str) -> str:
    """
    Get recommendation for Bandit test ID
    """
    recommendations = {
        'B101': 'Avoid using assert statements in production code. Use proper error handling instead.',
        'B102': 'Avoid using exec(). Consider safer alternatives for dynamic code execution.',
        'B103': 'Set file permissions explicitly. Avoid using overly permissive file permissions.',
        'B104': 'Avoid binding to all network interfaces (0.0.0.0). Bind to specific interfaces when possible.',
        'B105': 'Avoid using hardcoded passwords. Use environment variables or secure credential storage.',
        'B106': 'Avoid using hardcoded passwords in function arguments.',
        'B107': 'Avoid using hardcoded passwords in default function arguments.',
        'B108': 'Avoid using insecure temporary file creation. Use tempfile module with secure defaults.',
        'B110': 'Avoid using try/except/pass blocks. Handle exceptions appropriately.',
        'B112': 'Avoid using try/except/continue blocks. Handle exceptions appropriately.'
    }
    
    return recommendations.get(test_id, 'Review this security finding and implement appropriate fixes.')

def get_eslint_recommendation(rule_id: str) -> str:
    """
    Get recommendation for ESLint rule ID
    """
    recommendations = {
        'security/detect-object-injection': 'Avoid using user input directly in object property access. Validate and sanitize input.',
        'security/detect-non-literal-regexp': 'Avoid using user input in regular expressions. Use literal patterns when possible.',
        'security/detect-unsafe-regex': 'Review regular expression for ReDoS vulnerabilities. Consider using safer alternatives.',
        'security/detect-buffer-noassert': 'Use assert versions of Buffer methods to prevent buffer overflows.',
        'security/detect-child-process': 'Review child process usage for command injection vulnerabilities.',
        'security/detect-disable-mustache-escape': 'Avoid disabling mustache escaping. Use proper output encoding.',
        'security/detect-eval-with-expression': 'Avoid using eval() with user input. Consider safer alternatives.',
        'security/detect-no-csrf-before-method-override': 'Implement CSRF protection before method override middleware.',
        'security/detect-non-literal-fs-filename': 'Avoid using user input directly in file system operations.',
        'security/detect-non-literal-require': 'Avoid using user input in require() statements.',
        'security/detect-possible-timing-attacks': 'Use constant-time comparison for sensitive operations.',
        'security/detect-pseudoRandomBytes': 'Use cryptographically secure random number generation.'
    }
    
    return recommendations.get(rule_id, 'Review this security finding and implement appropriate fixes.')

if __name__ == '__main__':
    main()
'''
        
        return script_content

# Example usage
automated_review = AutomatedCodeReviewIntegration()

# Set up automated review pipeline
pipeline_config = {
    'repository': 'secure-web-app',
    'languages': ['python', 'javascript'],
    'service_role_arn': 'arn:aws:iam::123456789012:role/CodeBuildServiceRole',
    'lambda_role_arn': 'arn:aws:iam::123456789012:role/LambdaExecutionRole',
    'results_bucket': 'code-analysis-results-bucket',
    'artifacts_bucket': 'code-analysis-artifacts-bucket'
}

pipeline_setup = automated_review.setup_automated_review_pipeline(pipeline_config)
print("Automated Review Pipeline Setup:")
print(json.dumps(pipeline_setup, indent=2))

# Generate review comment script
comment_script = automated_review.create_review_comment_generator()
print("\\nReview comment generator script created")

Step 3: Implement Security-Focused Review Process

Create structured processes for conducting security-focused code reviews:

View code
# Security-Focused Code Review Process
import boto3
import json
from datetime import datetime, timedelta
from typing import Dict, List, Optional

class SecurityFocusedReviewProcess:
    def __init__(self):
        self.dynamodb = boto3.resource('dynamodb')
        self.sns = boto3.client('sns')
        self.ses = boto3.client('ses')
        
        # DynamoDB tables
        self.reviews_table = self.dynamodb.Table('security-code-reviews')
        self.reviewers_table = self.dynamodb.Table('security-reviewers')
        self.training_table = self.dynamodb.Table('reviewer-training')
        
    def create_security_review_checklist(self, review_type: str) -> Dict:
        """
        Create comprehensive security review checklist
        """
        base_checklist = {
            'input_validation': {
                'category_description': 'Verify all user inputs are properly validated and sanitized',
                'items': [
                    {
                        'id': 'IV001',
                        'description': 'All user inputs are validated on the server side',
                        'severity': 'critical',
                        'check_method': 'manual_inspection',
                        'guidance': 'Look for input validation at entry points, not just client-side'
                    },
                    {
                        'id': 'IV002',
                        'description': 'Input validation uses whitelist approach where possible',
                        'severity': 'high',
                        'check_method': 'code_pattern_analysis',
                        'guidance': 'Prefer allowing known good inputs over blocking known bad inputs'
                    },
                    {
                        'id': 'IV003',
                        'description': 'Input length limits are enforced',
                        'severity': 'medium',
                        'check_method': 'manual_inspection',
                        'guidance': 'Check for buffer overflow prevention and DoS protection'
                    },
                    {
                        'id': 'IV004',
                        'description': 'Special characters are properly escaped or encoded',
                        'severity': 'high',
                        'check_method': 'manual_inspection',
                        'guidance': 'Look for XSS and injection prevention measures'
                    },
                    {
                        'id': 'IV005',
                        'description': 'File upload validation includes type, size, and content checks',
                        'severity': 'critical',
                        'check_method': 'manual_inspection',
                        'guidance': 'Verify file uploads cannot execute malicious code'
                    }
                ]
            },
            'authentication_authorization': {
                'category_description': 'Ensure proper authentication and authorization controls',
                'items': [
                    {
                        'id': 'AA001',
                        'description': 'Authentication is required for all protected resources',
                        'severity': 'critical',
                        'check_method': 'manual_inspection',
                        'guidance': 'Verify no protected endpoints are accessible without authentication'
                    },
                    {
                        'id': 'AA002',
                        'description': 'Authorization checks are performed at the appropriate level',
                        'severity': 'critical',
                        'check_method': 'manual_inspection',
                        'guidance': 'Check for proper role-based or attribute-based access control'
                    },
                    {
                        'id': 'AA003',
                        'description': 'Session management is implemented securely',
                        'severity': 'high',
                        'check_method': 'manual_inspection',
                        'guidance': 'Look for secure session tokens, timeout, and invalidation'
                    },
                    {
                        'id': 'AA004',
                        'description': 'Password handling follows security best practices',
                        'severity': 'critical',
                        'check_method': 'code_pattern_analysis',
                        'guidance': 'Verify proper hashing, salting, and storage of passwords'
                    },
                    {
                        'id': 'AA005',
                        'description': 'Multi-factor authentication is implemented where required',
                        'severity': 'high',
                        'check_method': 'manual_inspection',
                        'guidance': 'Check for MFA implementation in high-risk scenarios'
                    }
                ]
            },
            'data_protection': {
                'category_description': 'Verify sensitive data is properly protected',
                'items': [
                    {
                        'id': 'DP001',
                        'description': 'Sensitive data is encrypted at rest',
                        'severity': 'critical',
                        'check_method': 'manual_inspection',
                        'guidance': 'Verify encryption of PII, financial data, and credentials'
                    },
                    {
                        'id': 'DP002',
                        'description': 'Data in transit is encrypted using TLS',
                        'severity': 'critical',
                        'check_method': 'configuration_review',
                        'guidance': 'Check for proper TLS configuration and certificate validation'
                    },
                    {
                        'id': 'DP003',
                        'description': 'Cryptographic keys are managed securely',
                        'severity': 'critical',
                        'check_method': 'manual_inspection',
                        'guidance': 'Verify keys are not hardcoded and use proper key management'
                    },
                    {
                        'id': 'DP004',
                        'description': 'Data classification is respected in handling',
                        'severity': 'high',
                        'check_method': 'manual_inspection',
                        'guidance': 'Check that data handling matches its classification level'
                    },
                    {
                        'id': 'DP005',
                        'description': 'Data retention and deletion policies are implemented',
                        'severity': 'medium',
                        'check_method': 'manual_inspection',
                        'guidance': 'Verify automatic data cleanup and retention compliance'
                    }
                ]
            },
            'error_handling_logging': {
                'category_description': 'Ensure proper error handling and security logging',
                'items': [
                    {
                        'id': 'EL001',
                        'description': 'Error messages do not leak sensitive information',
                        'severity': 'high',
                        'check_method': 'manual_inspection',
                        'guidance': 'Check for information disclosure in error responses'
                    },
                    {
                        'id': 'EL002',
                        'description': 'Security events are properly logged',
                        'severity': 'high',
                        'check_method': 'manual_inspection',
                        'guidance': 'Verify logging of authentication, authorization, and security events'
                    },
                    {
                        'id': 'EL003',
                        'description': 'Log data does not contain sensitive information',
                        'severity': 'medium',
                        'check_method': 'manual_inspection',
                        'guidance': 'Check that logs do not expose passwords, tokens, or PII'
                    },
                    {
                        'id': 'EL004',
                        'description': 'Exception handling prevents information disclosure',
                        'severity': 'medium',
                        'check_method': 'manual_inspection',
                        'guidance': 'Verify stack traces and debug info are not exposed'
                    },
                    {
                        'id': 'EL005',
                        'description': 'Security monitoring and alerting is implemented',
                        'severity': 'medium',
                        'check_method': 'configuration_review',
                        'guidance': 'Check for proper security event monitoring and alerting'
                    }
                ]
            },
            'secure_communications': {
                'category_description': 'Verify secure communication practices',
                'items': [
                    {
                        'id': 'SC001',
                        'description': 'TLS/SSL is properly configured',
                        'severity': 'critical',
                        'check_method': 'configuration_review',
                        'guidance': 'Check TLS version, cipher suites, and certificate validation'
                    },
                    {
                        'id': 'SC002',
                        'description': 'API security best practices are followed',
                        'severity': 'high',
                        'check_method': 'manual_inspection',
                        'guidance': 'Verify API authentication, rate limiting, and input validation'
                    },
                    {
                        'id': 'SC003',
                        'description': 'Cross-origin requests are properly handled',
                        'severity': 'high',
                        'check_method': 'configuration_review',
                        'guidance': 'Check CORS configuration and CSP headers'
                    },
                    {
                        'id': 'SC004',
                        'description': 'Security headers are implemented',
                        'severity': 'medium',
                        'check_method': 'configuration_review',
                        'guidance': 'Verify HSTS, CSP, X-Frame-Options, and other security headers'
                    },
                    {
                        'id': 'SC005',
                        'description': 'Third-party integrations are secured',
                        'severity': 'high',
                        'check_method': 'manual_inspection',
                        'guidance': 'Check authentication and data validation for external APIs'
                    }
                ]
            }
        }
        
        # Customize checklist based on review type
        if review_type == 'critical_security_review':
            # Add additional items for critical reviews
            base_checklist['business_logic'] = {
                'category_description': 'Verify business logic security',
                'items': [
                    {
                        'id': 'BL001',
                        'description': 'Business logic cannot be bypassed',
                        'severity': 'critical',
                        'check_method': 'manual_inspection',
                        'guidance': 'Check for logic flaws that could bypass security controls'
                    },
                    {
                        'id': 'BL002',
                        'description': 'Race conditions are prevented',
                        'severity': 'high',
                        'check_method': 'manual_inspection',
                        'guidance': 'Look for potential race conditions in critical operations'
                    },
                    {
                        'id': 'BL003',
                        'description': 'Time-of-check to time-of-use issues are addressed',
                        'severity': 'high',
                        'check_method': 'manual_inspection',
                        'guidance': 'Verify atomic operations for security-critical checks'
                    }
                ]
            }
        
        return {
            'checklist_id': f"SEC-CHECKLIST-{review_type.upper()}-{datetime.now().strftime('%Y%m%d')}",
            'review_type': review_type,
            'created_date': datetime.now().isoformat(),
            'categories': base_checklist,
            'total_items': sum(len(category['items']) for category in base_checklist.values()),
            'critical_items': sum(
                len([item for item in category['items'] if item['severity'] == 'critical'])
                for category in base_checklist.values()
            )
        }
    
    def conduct_security_review_session(self, review_id: str, session_data: Dict) -> Dict:
        """
        Conduct a structured security review session
        """
        review_session = {
            'session_id': f"SESSION-{review_id}-{datetime.now().strftime('%Y%m%d%H%M')}",
            'review_id': review_id,
            'reviewer': session_data['reviewer'],
            'session_start': datetime.now().isoformat(),
            'session_type': session_data.get('session_type', 'individual_review'),
            'checklist_results': {},
            'manual_findings': [],
            'code_quality_notes': [],
            'security_recommendations': [],
            'time_spent_minutes': 0,
            'completion_status': 'in_progress'
        }
        
        # Process checklist items
        checklist_data = session_data.get('checklist_results', {})
        for category, category_results in checklist_data.items():
            review_session['checklist_results'][category] = {
                'items_reviewed': len(category_results.get('items', [])),
                'items_passed': len([item for item in category_results.get('items', []) if item.get('status') == 'pass']),
                'items_failed': len([item for item in category_results.get('items', []) if item.get('status') == 'fail']),
                'items_not_applicable': len([item for item in category_results.get('items', []) if item.get('status') == 'na']),
                'category_score': self.calculate_category_score(category_results.get('items', [])),
                'critical_failures': [
                    item for item in category_results.get('items', [])
                    if item.get('status') == 'fail' and item.get('severity') == 'critical'
                ]
            }
        
        # Process manual findings
        for finding in session_data.get('manual_findings', []):
            processed_finding = {
                'finding_id': f"MF-{datetime.now().strftime('%Y%m%d%H%M%S')}-{len(review_session['manual_findings'])}",
                'category': finding.get('category'),
                'severity': finding.get('severity'),
                'title': finding.get('title'),
                'description': finding.get('description'),
                'file_path': finding.get('file_path'),
                'line_range': finding.get('line_range', []),
                'code_snippet': finding.get('code_snippet'),
                'security_impact': finding.get('security_impact'),
                'exploitability': finding.get('exploitability', 'unknown'),
                'remediation_effort': finding.get('remediation_effort', 'unknown'),
                'remediation_suggestion': finding.get('remediation_suggestion'),
                'references': finding.get('references', []),
                'reviewer_confidence': finding.get('confidence', 'medium')
            }
            review_session['manual_findings'].append(processed_finding)
        
        # Process code quality notes
        review_session['code_quality_notes'] = session_data.get('code_quality_notes', [])
        
        # Generate security recommendations
        review_session['security_recommendations'] = self.generate_session_recommendations(
            review_session['checklist_results'],
            review_session['manual_findings']
        )
        
        # Calculate time spent
        review_session['time_spent_minutes'] = session_data.get('time_spent_minutes', 0)
        review_session['completion_status'] = 'completed'
        review_session['session_end'] = datetime.now().isoformat()
        
        # Store session results
        self.reviews_table.put_item(Item=review_session)
        
        return review_session
    
    def create_security_reviewer_training_program(self) -> Dict:
        """
        Create comprehensive training program for security reviewers
        """
        training_program = {
            'program_id': f"SEC-REVIEWER-TRAINING-{datetime.now().strftime('%Y%m%d')}",
            'program_name': 'Security Code Review Training Program',
            'created_date': datetime.now().isoformat(),
            'training_tracks': {
                'foundation_track': {
                    'target_audience': 'New security reviewers',
                    'duration_hours': 40,
                    'modules': [
                        {
                            'module_id': 'SEC-REV-001',
                            'title': 'Security Code Review Fundamentals',
                            'duration_hours': 8,
                            'learning_objectives': [
                                'Understand the role of code reviews in security',
                                'Learn common vulnerability patterns',
                                'Master security review methodologies',
                                'Practice using security checklists'
                            ],
                            'content_topics': [
                                'OWASP Top 10 vulnerabilities',
                                'Secure coding principles',
                                'Code review best practices',
                                'Security testing integration'
                            ],
                            'hands_on_exercises': [
                                'Review vulnerable code samples',
                                'Identify security issues in real code',
                                'Practice using review checklists',
                                'Write security-focused review comments'
                            ]
                        },
                        {
                            'module_id': 'SEC-REV-002',
                            'title': 'Input Validation and Injection Prevention',
                            'duration_hours': 6,
                            'learning_objectives': [
                                'Identify injection vulnerabilities',
                                'Understand input validation techniques',
                                'Review sanitization implementations',
                                'Assess parameterized query usage'
                            ],
                            'practical_labs': [
                                'SQL injection vulnerability review',
                                'XSS prevention assessment',
                                'Command injection identification',
                                'LDAP injection detection'
                            ]
                        },
                        {
                            'module_id': 'SEC-REV-003',
                            'title': 'Authentication and Authorization Review',
                            'duration_hours': 8,
                            'learning_objectives': [
                                'Review authentication mechanisms',
                                'Assess authorization implementations',
                                'Evaluate session management',
                                'Check privilege escalation prevention'
                            ],
                            'practical_labs': [
                                'Authentication bypass identification',
                                'Authorization flaw detection',
                                'Session management review',
                                'Privilege escalation assessment'
                            ]
                        },
                        {
                            'module_id': 'SEC-REV-004',
                            'title': 'Cryptography and Data Protection Review',
                            'duration_hours': 6,
                            'learning_objectives': [
                                'Assess cryptographic implementations',
                                'Review key management practices',
                                'Evaluate data protection measures',
                                'Check compliance requirements'
                            ],
                            'practical_labs': [
                                'Cryptographic weakness identification',
                                'Key management assessment',
                                'Data encryption review',
                                'PII handling evaluation'
                            ]
                        },
                        {
                            'module_id': 'SEC-REV-005',
                            'title': 'Business Logic and Advanced Security Review',
                            'duration_hours': 8,
                            'learning_objectives': [
                                'Identify business logic flaws',
                                'Assess race condition vulnerabilities',
                                'Review error handling implementations',
                                'Evaluate logging and monitoring'
                            ],
                            'practical_labs': [
                                'Business logic flaw identification',
                                'Race condition assessment',
                                'Error handling review',
                                'Security logging evaluation'
                            ]
                        },
                        {
                            'module_id': 'SEC-REV-006',
                            'title': 'Review Tools and Automation',
                            'duration_hours': 4,
                            'learning_objectives': [
                                'Use automated security analysis tools',
                                'Integrate tools into review process',
                                'Interpret tool results effectively',
                                'Combine automated and manual review'
                            ],
                            'tool_training': [
                                'Static analysis tool usage',
                                'Dependency scanner integration',
                                'Code quality tool interpretation',
                                'Review workflow automation'
                            ]
                        }
                    ],
                    'assessment_requirements': [
                        'Complete all module exercises',
                        'Pass written examination (80% minimum)',
                        'Conduct supervised code review',
                        'Demonstrate tool proficiency'
                    ],
                    'certification': 'Certified Security Code Reviewer - Foundation'
                },
                'advanced_track': {
                    'target_audience': 'Experienced security reviewers',
                    'prerequisites': ['foundation_track_completion'],
                    'duration_hours': 32,
                    'modules': [
                        {
                            'module_id': 'SEC-REV-ADV-001',
                            'title': 'Advanced Threat Modeling for Code Review',
                            'duration_hours': 8,
                            'learning_objectives': [
                                'Apply threat modeling to code review',
                                'Identify complex attack vectors',
                                'Assess architectural security',
                                'Review security design patterns'
                            ]
                        },
                        {
                            'module_id': 'SEC-REV-ADV-002',
                            'title': 'Language-Specific Security Patterns',
                            'duration_hours': 12,
                            'learning_objectives': [
                                'Master language-specific vulnerabilities',
                                'Understand framework security features',
                                'Review platform-specific security',
                                'Assess third-party library usage'
                            ],
                            'language_coverage': [
                                'Python security patterns',
                                'JavaScript/Node.js security',
                                'Java security best practices',
                                'C# .NET security review',
                                'Go security considerations'
                            ]
                        },
                        {
                            'module_id': 'SEC-REV-ADV-003',
                            'title': 'Cloud Security Code Review',
                            'duration_hours': 8,
                            'learning_objectives': [
                                'Review cloud service integrations',
                                'Assess infrastructure as code',
                                'Evaluate serverless security',
                                'Check cloud configuration security'
                            ]
                        },
                        {
                            'module_id': 'SEC-REV-ADV-004',
                            'title': 'Security Review Leadership',
                            'duration_hours': 4,
                            'learning_objectives': [
                                'Lead security review processes',
                                'Mentor junior reviewers',
                                'Establish review standards',
                                'Measure review effectiveness'
                            ]
                        }
                    ],
                    'certification': 'Certified Security Code Reviewer - Advanced'
                },
                'specialist_track': {
                    'target_audience': 'Security architects and specialists',
                    'prerequisites': ['advanced_track_completion'],
                    'duration_hours': 24,
                    'modules': [
                        {
                            'module_id': 'SEC-REV-SPEC-001',
                            'title': 'Security Architecture Review',
                            'duration_hours': 8,
                            'learning_objectives': [
                                'Review security architecture decisions',
                                'Assess design pattern security',
                                'Evaluate security control effectiveness',
                                'Check compliance architecture'
                            ]
                        },
                        {
                            'module_id': 'SEC-REV-SPEC-002',
                            'title': 'Compliance-Focused Code Review',
                            'duration_hours': 8,
                            'learning_objectives': [
                                'Review for regulatory compliance',
                                'Assess privacy requirements',
                                'Check industry standards compliance',
                                'Evaluate audit trail requirements'
                            ],
                            'compliance_coverage': [
                                'PCI DSS code review',
                                'HIPAA compliance review',
                                'GDPR privacy review',
                                'SOX financial controls review'
                            ]
                        },
                        {
                            'module_id': 'SEC-REV-SPEC-003',
                            'title': 'Advanced Security Research and Review',
                            'duration_hours': 8,
                            'learning_objectives': [
                                'Research emerging vulnerabilities',
                                'Develop custom review techniques',
                                'Create security review tools',
                                'Contribute to security community'
                            ]
                        }
                    ],
                    'certification': 'Certified Security Code Reviewer - Specialist'
                }
            },
            'continuous_education': {
                'monthly_updates': 'Latest vulnerability patterns and review techniques',
                'quarterly_workshops': 'Hands-on sessions with new tools and methods',
                'annual_conference': 'Security code review best practices conference',
                'peer_learning': 'Regular peer review sessions and knowledge sharing'
            },
            'performance_tracking': {
                'review_quality_metrics': [
                    'Finding accuracy rate',
                    'False positive rate',
                    'Review completion time',
                    'Stakeholder satisfaction'
                ],
                'skill_assessments': [
                    'Annual competency evaluation',
                    'Peer review assessment',
                    'Tool proficiency testing',
                    'Continuous learning tracking'
                ]
            }
        }
        
        return training_program
    
    def track_reviewer_performance(self, reviewer_id: str, performance_period: Dict) -> Dict:
        """
        Track and analyze security reviewer performance
        """
        performance_metrics = {
            'reviewer_id': reviewer_id,
            'assessment_period': performance_period,
            'assessment_date': datetime.now().isoformat(),
            'quantitative_metrics': {
                'reviews_completed': 0,
                'average_review_time_hours': 0.0,
                'findings_identified': 0,
                'critical_findings_found': 0,
                'false_positive_rate': 0.0,
                'review_accuracy_score': 0.0,
                'checklist_completion_rate': 0.0
            },
            'qualitative_assessment': {
                'review_thoroughness': {
                    'score': 0,
                    'feedback': '',
                    'improvement_areas': []
                },
                'communication_effectiveness': {
                    'score': 0,
                    'feedback': '',
                    'strengths': []
                },
                'technical_competency': {
                    'score': 0,
                    'feedback': '',
                    'knowledge_gaps': []
                },
                'mentoring_contribution': {
                    'score': 0,
                    'feedback': '',
                    'mentoring_activities': []
                }
            },
            'development_recommendations': {
                'training_needs': [],
                'skill_development_areas': [],
                'certification_recommendations': [],
                'mentoring_opportunities': []
            },
            'overall_rating': 'satisfactory',
            'performance_trend': 'stable'
        }
        
        # Calculate quantitative metrics from review history
        performance_metrics = self.calculate_reviewer_metrics(reviewer_id, performance_metrics)
        
        # Store performance assessment
        self.reviewers_table.put_item(Item=performance_metrics)
        
        return performance_metrics
    
    def generate_review_quality_report(self, report_period: Dict) -> Dict:
        """
        Generate comprehensive review quality report
        """
        quality_report = {
            'report_id': f"QUALITY-REPORT-{datetime.now().strftime('%Y%m%d')}",
            'report_period': report_period,
            'generated_date': datetime.now().isoformat(),
            'summary_metrics': {
                'total_reviews_conducted': 0,
                'average_review_time_hours': 0.0,
                'total_security_findings': 0,
                'critical_findings_percentage': 0.0,
                'false_positive_rate': 0.0,
                'review_coverage_percentage': 0.0
            },
            'reviewer_performance': {
                'top_performers': [],
                'improvement_needed': [],
                'training_completion_rate': 0.0,
                'certification_status': {}
            },
            'process_effectiveness': {
                'review_type_distribution': {},
                'finding_category_distribution': {},
                'remediation_time_analysis': {},
                'stakeholder_satisfaction': 0.0
            },
            'trends_and_insights': {
                'common_vulnerability_patterns': [],
                'review_process_improvements': [],
                'tool_effectiveness_analysis': {},
                'training_impact_assessment': {}
            },
            'recommendations': {
                'process_improvements': [],
                'training_recommendations': [],
                'tool_enhancements': [],
                'resource_allocation': []
            }
        }
        
        # Calculate metrics from stored data
        quality_report = self.calculate_quality_metrics(report_period, quality_report)
        
        return quality_report

# Example usage
security_review_process = SecurityFocusedReviewProcess()

# Create security review checklist
checklist = security_review_process.create_security_review_checklist('security_focused_review')
print("Security Review Checklist:")
print(f"Total items: {checklist['total_items']}")
print(f"Critical items: {checklist['critical_items']}")

# Create reviewer training program
training_program = security_review_process.create_security_reviewer_training_program()
print("\\nReviewer Training Program:")
print(f"Foundation track duration: {training_program['training_tracks']['foundation_track']['duration_hours']} hours")
print(f"Number of modules: {len(training_program['training_tracks']['foundation_track']['modules'])}")

Step 4: Implement Metrics and Continuous Improvement

Establish comprehensive metrics and continuous improvement processes for code reviews:

View code
# Code Review Metrics and Continuous Improvement
import boto3
import json
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import statistics

class CodeReviewMetricsAndImprovement:
    def __init__(self):
        self.cloudwatch = boto3.client('cloudwatch')
        self.dynamodb = boto3.resource('dynamodb')
        self.quicksight = boto3.client('quicksight')
        
        # DynamoDB tables
        self.metrics_table = self.dynamodb.Table('code-review-metrics')
        self.trends_table = self.dynamodb.Table('security-trends')
        
    def collect_code_review_metrics(self, collection_period: Dict) -> Dict:
        """
        Collect comprehensive code review metrics
        """
        metrics_collection = {
            'collection_id': f"METRICS-{datetime.now().strftime('%Y%m%d-%H%M%S')}",
            'collection_period': collection_period,
            'collection_date': datetime.now().isoformat(),
            'review_volume_metrics': {},
            'review_quality_metrics': {},
            'reviewer_performance_metrics': {},
            'security_effectiveness_metrics': {},
            'process_efficiency_metrics': {},
            'trend_analysis': {}
        }
        
        # Collect review volume metrics
        metrics_collection['review_volume_metrics'] = self.collect_volume_metrics(collection_period)
        
        # Collect review quality metrics
        metrics_collection['review_quality_metrics'] = self.collect_quality_metrics(collection_period)
        
        # Collect reviewer performance metrics
        metrics_collection['reviewer_performance_metrics'] = self.collect_reviewer_metrics(collection_period)
        
        # Collect security effectiveness metrics
        metrics_collection['security_effectiveness_metrics'] = self.collect_security_metrics(collection_period)
        
        # Collect process efficiency metrics
        metrics_collection['process_efficiency_metrics'] = self.collect_efficiency_metrics(collection_period)
        
        # Perform trend analysis
        metrics_collection['trend_analysis'] = self.analyze_trends(collection_period)
        
        # Store metrics
        self.metrics_table.put_item(Item=metrics_collection)
        
        # Send metrics to CloudWatch
        self.send_metrics_to_cloudwatch(metrics_collection)
        
        return metrics_collection
    
    def collect_volume_metrics(self, period: Dict) -> Dict:
        """
        Collect review volume and throughput metrics
        """
        return {
            'total_reviews_initiated': 0,
            'total_reviews_completed': 0,
            'reviews_by_type': {
                'standard_review': 0,
                'security_focused_review': 0,
                'critical_security_review': 0
            },
            'reviews_by_repository': {},
            'reviews_by_team': {},
            'average_reviews_per_day': 0.0,
            'peak_review_periods': [],
            'review_backlog_size': 0,
            'review_completion_rate': 0.0
        }
    
    def collect_quality_metrics(self, period: Dict) -> Dict:
        """
        Collect review quality and effectiveness metrics
        """
        return {
            'average_findings_per_review': 0.0,
            'findings_by_severity': {
                'critical': 0,
                'high': 0,
                'medium': 0,
                'low': 0
            },
            'findings_by_category': {
                'input_validation': 0,
                'authentication_authorization': 0,
                'data_protection': 0,
                'error_handling_logging': 0,
                'secure_communications': 0,
                'business_logic': 0
            },
            'false_positive_rate': 0.0,
            'finding_accuracy_rate': 0.0,
            'checklist_completion_rate': 0.0,
            'manual_vs_automated_findings': {
                'manual_findings': 0,
                'automated_findings': 0,
                'combined_findings': 0
            },
            'review_thoroughness_score': 0.0,
            'stakeholder_satisfaction_score': 0.0
        }
    
    def collect_reviewer_metrics(self, period: Dict) -> Dict:
        """
        Collect reviewer performance metrics
        """
        return {
            'active_reviewers_count': 0,
            'average_review_time_hours': 0.0,
            'reviewer_utilization_rate': 0.0,
            'reviewer_expertise_distribution': {
                'junior_reviewers': 0,
                'senior_reviewers': 0,
                'security_specialists': 0,
                'security_architects': 0
            },
            'training_completion_rates': {
                'foundation_track': 0.0,
                'advanced_track': 0.0,
                'specialist_track': 0.0
            },
            'certification_status': {
                'certified_reviewers': 0,
                'certification_pending': 0,
                'recertification_due': 0
            },
            'reviewer_performance_distribution': {
                'excellent': 0,
                'good': 0,
                'satisfactory': 0,
                'needs_improvement': 0
            },
            'mentoring_activities': {
                'mentors_active': 0,
                'mentees_supported': 0,
                'mentoring_sessions_conducted': 0
            }
        }
    
    def collect_security_metrics(self, period: Dict) -> Dict:
        """
        Collect security effectiveness metrics
        """
        return {
            'vulnerabilities_prevented': 0,
            'security_debt_reduction': 0.0,
            'compliance_violations_prevented': 0,
            'security_incidents_post_review': 0,
            'vulnerability_escape_rate': 0.0,
            'security_control_coverage': {
                'authentication_coverage': 0.0,
                'authorization_coverage': 0.0,
                'input_validation_coverage': 0.0,
                'data_protection_coverage': 0.0,
                'error_handling_coverage': 0.0
            },
            'risk_reduction_achieved': {
                'critical_risk_reduction': 0.0,
                'high_risk_reduction': 0.0,
                'medium_risk_reduction': 0.0,
                'overall_risk_score_improvement': 0.0
            },
            'security_awareness_improvement': {
                'developer_security_knowledge_score': 0.0,
                'security_best_practices_adoption': 0.0,
                'proactive_security_implementations': 0
            }
        }
    
    def collect_efficiency_metrics(self, period: Dict) -> Dict:
        """
        Collect process efficiency metrics
        """
        return {
            'average_review_cycle_time_hours': 0.0,
            'review_scheduling_efficiency': 0.0,
            'automated_tool_utilization': 0.0,
            'review_rework_rate': 0.0,
            'reviewer_availability_rate': 0.0,
            'review_process_automation_level': 0.0,
            'cost_per_review': 0.0,
            'roi_of_security_reviews': 0.0,
            'process_bottlenecks': [],
            'efficiency_improvement_opportunities': []
        }
    
    def analyze_trends(self, period: Dict) -> Dict:
        """
        Analyze trends in code review metrics
        """
        trend_analysis = {
            'volume_trends': {
                'review_volume_trend': 'stable',  # increasing, decreasing, stable
                'volume_change_percentage': 0.0,
                'seasonal_patterns': [],
                'growth_projections': {}
            },
            'quality_trends': {
                'finding_quality_trend': 'improving',
                'false_positive_trend': 'decreasing',
                'thoroughness_trend': 'stable',
                'quality_improvement_rate': 0.0
            },
            'security_trends': {
                'vulnerability_detection_trend': 'improving',
                'security_debt_trend': 'decreasing',
                'compliance_trend': 'stable',
                'security_maturity_progression': 0.0
            },
            'efficiency_trends': {
                'review_time_trend': 'decreasing',
                'automation_adoption_trend': 'increasing',
                'cost_efficiency_trend': 'improving',
                'process_optimization_rate': 0.0
            },
            'emerging_patterns': {
                'new_vulnerability_types': [],
                'technology_adoption_impacts': [],
                'team_performance_patterns': [],
                'tool_effectiveness_changes': []
            }
        }
        
        return trend_analysis
    
    def send_metrics_to_cloudwatch(self, metrics: Dict):
        """
        Send metrics to CloudWatch for monitoring and alerting
        """
        namespace = 'SecurityCodeReview'
        timestamp = datetime.now()
        
        # Send volume metrics
        volume_metrics = metrics['review_volume_metrics']
        self.cloudwatch.put_metric_data(
            Namespace=namespace,
            MetricData=[
                {
                    'MetricName': 'ReviewsCompleted',
                    'Value': volume_metrics['total_reviews_completed'],
                    'Timestamp': timestamp,
                    'Unit': 'Count'
                },
                {
                    'MetricName': 'ReviewCompletionRate',
                    'Value': volume_metrics['review_completion_rate'],
                    'Timestamp': timestamp,
                    'Unit': 'Percent'
                },
                {
                    'MetricName': 'ReviewBacklogSize',
                    'Value': volume_metrics['review_backlog_size'],
                    'Timestamp': timestamp,
                    'Unit': 'Count'
                }
            ]
        )
        
        # Send quality metrics
        quality_metrics = metrics['review_quality_metrics']
        self.cloudwatch.put_metric_data(
            Namespace=namespace,
            MetricData=[
                {
                    'MetricName': 'AverageFindingsPerReview',
                    'Value': quality_metrics['average_findings_per_review'],
                    'Timestamp': timestamp,
                    'Unit': 'Count'
                },
                {
                    'MetricName': 'FalsePositiveRate',
                    'Value': quality_metrics['false_positive_rate'],
                    'Timestamp': timestamp,
                    'Unit': 'Percent'
                },
                {
                    'MetricName': 'ReviewThoroughnessScore',
                    'Value': quality_metrics['review_thoroughness_score'],
                    'Timestamp': timestamp,
                    'Unit': 'None'
                }
            ]
        )
        
        # Send security effectiveness metrics
        security_metrics = metrics['security_effectiveness_metrics']
        self.cloudwatch.put_metric_data(
            Namespace=namespace,
            MetricData=[
                {
                    'MetricName': 'VulnerabilitiesPrevented',
                    'Value': security_metrics['vulnerabilities_prevented'],
                    'Timestamp': timestamp,
                    'Unit': 'Count'
                },
                {
                    'MetricName': 'VulnerabilityEscapeRate',
                    'Value': security_metrics['vulnerability_escape_rate'],
                    'Timestamp': timestamp,
                    'Unit': 'Percent'
                },
                {
                    'MetricName': 'SecurityIncidentsPostReview',
                    'Value': security_metrics['security_incidents_post_review'],
                    'Timestamp': timestamp,
                    'Unit': 'Count'
                }
            ]
        )
    
    def create_improvement_recommendations(self, metrics: Dict) -> Dict:
        """
        Generate improvement recommendations based on metrics analysis
        """
        recommendations = {
            'recommendation_id': f"IMPROVE-{datetime.now().strftime('%Y%m%d-%H%M%S')}",
            'generated_date': datetime.now().isoformat(),
            'metrics_analyzed': metrics['collection_id'],
            'priority_improvements': [],
            'process_optimizations': [],
            'training_recommendations': [],
            'tool_enhancements': [],
            'resource_adjustments': [],
            'implementation_roadmap': {}
        }
        
        # Analyze metrics and generate recommendations
        quality_metrics = metrics['review_quality_metrics']
        efficiency_metrics = metrics['process_efficiency_metrics']
        security_metrics = metrics['security_effectiveness_metrics']
        
        # Priority improvements based on critical metrics
        if quality_metrics['false_positive_rate'] > 15:
            recommendations['priority_improvements'].append({
                'area': 'False Positive Reduction',
                'priority': 'high',
                'description': 'High false positive rate impacting reviewer efficiency',
                'recommended_actions': [
                    'Tune automated analysis tools',
                    'Improve reviewer training on tool interpretation',
                    'Implement better filtering mechanisms',
                    'Add context-aware analysis'
                ],
                'expected_impact': 'Reduce false positive rate by 50%',
                'implementation_effort': 'medium',
                'timeline_weeks': 8
            })
        
        if security_metrics['vulnerability_escape_rate'] > 5:
            recommendations['priority_improvements'].append({
                'area': 'Vulnerability Detection',
                'priority': 'critical',
                'description': 'Vulnerabilities escaping to production',
                'recommended_actions': [
                    'Enhance review checklists',
                    'Increase reviewer training intensity',
                    'Add specialized security reviewers',
                    'Implement additional automated tools'
                ],
                'expected_impact': 'Reduce escape rate to <2%',
                'implementation_effort': 'high',
                'timeline_weeks': 12
            })
        
        if efficiency_metrics['average_review_cycle_time_hours'] > 48:
            recommendations['process_optimizations'].append({
                'area': 'Review Cycle Time',
                'priority': 'medium',
                'description': 'Review cycle time exceeding target',
                'recommended_actions': [
                    'Implement parallel review processes',
                    'Improve reviewer scheduling',
                    'Automate routine checks',
                    'Streamline approval workflows'
                ],
                'expected_impact': 'Reduce cycle time by 30%',
                'implementation_effort': 'medium',
                'timeline_weeks': 6
            })
        
        # Training recommendations
        reviewer_metrics = metrics['reviewer_performance_metrics']
        if reviewer_metrics['training_completion_rates']['foundation_track'] < 90:
            recommendations['training_recommendations'].append({
                'area': 'Foundation Training',
                'priority': 'high',
                'description': 'Low completion rate for foundation training',
                'recommended_actions': [
                    'Make training mandatory for all reviewers',
                    'Provide dedicated training time',
                    'Implement training incentives',
                    'Add manager accountability'
                ],
                'expected_impact': 'Achieve 95% completion rate',
                'implementation_effort': 'low',
                'timeline_weeks': 4
            })
        
        # Tool enhancements
        if quality_metrics['manual_vs_automated_findings']['automated_findings'] < quality_metrics['manual_vs_automated_findings']['manual_findings'] * 0.5:
            recommendations['tool_enhancements'].append({
                'area': 'Automation Enhancement',
                'priority': 'medium',
                'description': 'Low automated finding detection rate',
                'recommended_actions': [
                    'Evaluate and integrate additional tools',
                    'Customize existing tool configurations',
                    'Develop custom security patterns',
                    'Improve tool integration workflows'
                ],
                'expected_impact': 'Increase automated detection by 40%',
                'implementation_effort': 'high',
                'timeline_weeks': 10
            })
        
        # Create implementation roadmap
        recommendations['implementation_roadmap'] = self.create_implementation_roadmap(
            recommendations['priority_improvements'] + 
            recommendations['process_optimizations'] + 
            recommendations['training_recommendations'] + 
            recommendations['tool_enhancements']
        )
        
        return recommendations
    
    def create_implementation_roadmap(self, improvements: List[Dict]) -> Dict:
        """
        Create implementation roadmap for improvements
        """
        # Sort improvements by priority and timeline
        priority_order = {'critical': 0, 'high': 1, 'medium': 2, 'low': 3}
        sorted_improvements = sorted(
            improvements,
            key=lambda x: (priority_order.get(x['priority'], 3), x.get('timeline_weeks', 0))
        )
        
        roadmap = {
            'total_improvements': len(improvements),
            'estimated_total_timeline_weeks': 0,
            'phases': [],
            'resource_requirements': {},
            'success_metrics': {},
            'risk_mitigation': []
        }
        
        # Create phases based on priority and dependencies
        current_week = 0
        phase_number = 1
        
        for improvement in sorted_improvements:
            phase = {
                'phase_number': phase_number,
                'phase_name': f"Phase {phase_number}: {improvement['area']}",
                'start_week': current_week + 1,
                'end_week': current_week + improvement.get('timeline_weeks', 4),
                'improvements': [improvement],
                'dependencies': [],
                'success_criteria': [improvement.get('expected_impact', 'Improvement implemented')]
            }
            
            roadmap['phases'].append(phase)
            current_week += improvement.get('timeline_weeks', 4)
            phase_number += 1
        
        roadmap['estimated_total_timeline_weeks'] = current_week
        
        return roadmap
    
    def generate_executive_dashboard(self, metrics: Dict) -> Dict:
        """
        Generate executive dashboard for code review program
        """
        dashboard = {
            'dashboard_id': f"EXEC-DASH-{datetime.now().strftime('%Y%m%d')}",
            'generated_date': datetime.now().isoformat(),
            'reporting_period': metrics['collection_period'],
            'executive_summary': {
                'program_health_score': 0,
                'key_achievements': [],
                'areas_of_concern': [],
                'investment_roi': 0.0
            },
            'key_metrics': {
                'reviews_completed': metrics['review_volume_metrics']['total_reviews_completed'],
                'vulnerabilities_prevented': metrics['security_effectiveness_metrics']['vulnerabilities_prevented'],
                'false_positive_rate': f"{metrics['review_quality_metrics']['false_positive_rate']:.1f}%",
                'average_review_time': f"{metrics['process_efficiency_metrics']['average_review_cycle_time_hours']:.1f} hours",
                'reviewer_satisfaction': f"{metrics['reviewer_performance_metrics'].get('satisfaction_score', 0):.1f}/5.0",
                'security_incident_reduction': f"{metrics['security_effectiveness_metrics'].get('incident_reduction_percentage', 0):.1f}%"
            },
            'trends': {
                'review_volume_trend': metrics['trend_analysis']['volume_trends']['review_volume_trend'],
                'quality_trend': metrics['trend_analysis']['quality_trends']['finding_quality_trend'],
                'efficiency_trend': metrics['trend_analysis']['efficiency_trends']['review_time_trend'],
                'security_effectiveness_trend': metrics['trend_analysis']['security_trends']['vulnerability_detection_trend']
            },
            'recommendations': {
                'immediate_actions': [],
                'strategic_investments': [],
                'resource_needs': []
            },
            'next_period_goals': {
                'review_completion_target': metrics['review_volume_metrics']['total_reviews_completed'] * 1.1,
                'false_positive_reduction_target': max(5, metrics['review_quality_metrics']['false_positive_rate'] * 0.8),
                'vulnerability_prevention_target': metrics['security_effectiveness_metrics']['vulnerabilities_prevented'] * 1.2,
                'efficiency_improvement_target': metrics['process_efficiency_metrics']['average_review_cycle_time_hours'] * 0.9
            }
        }
        
        # Calculate program health score
        dashboard['executive_summary']['program_health_score'] = self.calculate_program_health_score(metrics)
        
        return dashboard
    
    def calculate_program_health_score(self, metrics: Dict) -> int:
        """
        Calculate overall program health score (0-100)
        """
        score = 100
        
        # Deduct points for poor performance
        quality_metrics = metrics['review_quality_metrics']
        efficiency_metrics = metrics['process_efficiency_metrics']
        security_metrics = metrics['security_effectiveness_metrics']
        
        # Quality factors
        if quality_metrics['false_positive_rate'] > 20:
            score -= 15
        elif quality_metrics['false_positive_rate'] > 10:
            score -= 8
        
        if quality_metrics['finding_accuracy_rate'] < 80:
            score -= 10
        elif quality_metrics['finding_accuracy_rate'] < 90:
            score -= 5
        
        # Efficiency factors
        if efficiency_metrics['average_review_cycle_time_hours'] > 72:
            score -= 15
        elif efficiency_metrics['average_review_cycle_time_hours'] > 48:
            score -= 8
        
        # Security effectiveness factors
        if security_metrics['vulnerability_escape_rate'] > 10:
            score -= 20
        elif security_metrics['vulnerability_escape_rate'] > 5:
            score -= 10
        
        if security_metrics['security_incidents_post_review'] > 0:
            score -= 15
        
        return max(0, score)

# Example usage
metrics_improvement = CodeReviewMetricsAndImprovement()

# Collect metrics for the last 30 days
collection_period = {
    'start_date': (datetime.now() - timedelta(days=30)).isoformat(),
    'end_date': datetime.now().isoformat(),
    'period_type': 'monthly'
}

metrics = metrics_improvement.collect_code_review_metrics(collection_period)
print("Code Review Metrics Collected:")
print(f"Collection ID: {metrics['collection_id']}")

# Generate improvement recommendations
recommendations = metrics_improvement.create_improvement_recommendations(metrics)
print(f"\\nGenerated {len(recommendations['priority_improvements'])} priority improvements")

# Generate executive dashboard
dashboard = metrics_improvement.generate_executive_dashboard(metrics)
print(f"\\nProgram Health Score: {dashboard['executive_summary']['program_health_score']}/100")

Best Practices for Security Code Reviews

1. Establish Clear Review Standards

Consistent Criteria: Define clear, consistent criteria for what constitutes a thorough security review, including mandatory checklist items and quality standards.

Risk-Based Approach: Apply different levels of review rigor based on the risk level of code changes, with more intensive reviews for security-critical components.

Documentation Standards: Maintain comprehensive documentation of review processes, findings, and remediation actions for audit trails and knowledge sharing.

2. Combine Automated and Manual Reviews

Tool Integration: Use automated security analysis tools to catch common vulnerabilities while reserving human reviewers for complex logic and business context issues.

Complementary Approaches: Ensure automated and manual reviews complement each other rather than duplicate efforts, with clear delineation of responsibilities.

Continuous Tool Improvement: Regularly evaluate and improve automated tools based on their effectiveness and false positive rates.

3. Invest in Reviewer Training and Development

Comprehensive Training: Provide thorough training on secure coding practices, common vulnerability patterns, and review methodologies.

Continuous Learning: Establish ongoing education programs to keep reviewers updated on emerging threats and new security techniques.

Specialization Tracks: Develop specialized training tracks for different types of security reviews and technology stacks.

4. Measure and Improve Continuously

Comprehensive Metrics: Track both process metrics (efficiency, throughput) and outcome metrics (security effectiveness, vulnerability prevention).

Regular Assessment: Conduct regular assessments of review quality and effectiveness, using both quantitative metrics and qualitative feedback.

Iterative Improvement: Use metrics and feedback to continuously improve review processes, tools, and training programs.

Common Challenges and Solutions

Challenge 1: Balancing Thoroughness with Development Velocity

Problem: Comprehensive security reviews can slow down development cycles.

Solutions:

  • Implement risk-based review intensity
  • Use automated tools for routine checks
  • Provide clear review guidelines and checklists
  • Train reviewers to be efficient and focused
  • Parallelize reviews where possible

Challenge 2: Maintaining Reviewer Expertise and Motivation

Problem: Keeping reviewers engaged and maintaining high-quality reviews over time.

Solutions:

  • Provide regular training and skill development opportunities
  • Rotate review assignments to prevent fatigue
  • Recognize and reward high-quality review contributions
  • Create career advancement paths for security reviewers
  • Foster a culture of security ownership

Challenge 3: Managing False Positives and Tool Noise

Problem: High false positive rates from automated tools reducing reviewer efficiency.

Solutions:

  • Tune tool configurations to reduce noise
  • Implement intelligent filtering and prioritization
  • Train reviewers to quickly identify false positives
  • Provide feedback loops to improve tool accuracy
  • Use multiple tools with different strengths

Challenge 4: Scaling Reviews with Team Growth

Problem: Maintaining review quality and coverage as development teams grow.

Solutions:

  • Implement scalable review processes and workflows
  • Develop internal reviewer training programs
  • Use automation to handle routine review tasks
  • Create reviewer specialization and expertise areas
  • Establish clear escalation and support processes

Resources and Further Reading

AWS Documentation and Services

Security Code Review Resources

Static Analysis Tools

Professional Development


This documentation provides comprehensive guidance for implementing effective security code review processes. Regular updates ensure the content remains current with evolving security threats and review best practices.