SEC11-BP05: Centralize services for packages and dependencies
Provide centralized services for your builders to acquire and verify packages and dependencies. This allows you to validate packages before they are included in your software, and provides a centralized platform for patching and updating packages that address security issues.
Implementation guidance
Centralizing package and dependency management is crucial for maintaining security, consistency, and compliance across your development lifecycle. By providing centralized services, you can ensure that all packages are vetted, secure, and up-to-date before being used in your applications.
Key steps for implementing this best practice:
- Establish centralized package repositories:
- Set up private package repositories for different languages and frameworks
- Configure artifact repositories with security scanning capabilities
- Implement package approval workflows and governance policies
- Create mirrors of public repositories with additional security controls
- Establish package versioning and lifecycle management policies
- Implement package security scanning:
- Configure automated vulnerability scanning for all packages
- Set up license compliance checking and approval processes
- Implement malware and supply chain attack detection
- Create security policies for package approval and rejection
- Establish continuous monitoring for newly discovered vulnerabilities
- Create package validation and approval processes:
- Define security criteria for package acceptance
- Implement automated and manual review processes
- Create package metadata and documentation requirements
- Establish digital signature verification for packages
- Set up package provenance and integrity checking
- Configure development environment integration:
- Integrate centralized repositories with CI/CD pipelines
- Configure development tools to use centralized repositories
- Implement package caching and distribution optimization
- Create developer onboarding and training materials
- Establish troubleshooting and support processes
- Implement package lifecycle management:
- Create automated update and patching processes
- Establish deprecation and end-of-life policies
- Implement rollback and recovery procedures
- Set up monitoring and alerting for package issues
- Create reporting and compliance dashboards
- Establish governance and compliance:
- Define roles and responsibilities for package management
- Create audit trails and compliance reporting
- Implement access controls and authentication
- Establish incident response procedures for package security issues
- Create metrics and KPIs for package management effectiveness
Implementation examples
Example 1: AWS CodeArtifact setup for centralized package management
Example 2: Package security scanning with AWS Inspector and Lambda
Example 3: CI/CD integration with centralized package management
python import json import boto3 from datetime import datetime, timedelta
class PackageGovernanceEngine: def init(self): self.dynamodb = boto3.resource(‘dynamodb’) self.sns = boto3.client(‘sns’) self.stepfunctions = boto3.client(‘stepfunctions’)
# DynamoDB tables
self.packages_table = self.dynamodb.Table('PackageRegistry')
self.policies_table = self.dynamodb.Table('PackagePolicies')
self.approvals_table = self.dynamodb.Table('PackageApprovals')
def evaluate_package_policy(self, package_name, package_version, metadata):
"""Evaluate package against governance policies"""
# Get applicable policies
policies = self.get_applicable_policies(package_name, metadata)
evaluation_result = {
'package_name': package_name,
'package_version': package_version,
'evaluation_timestamp': datetime.utcnow().isoformat(),
'policies_evaluated': [],
'violations': [],
'approval_required': False,
'auto_approved': False
}
for policy in policies:
policy_result = self.evaluate_single_policy(policy, metadata)
evaluation_result['policies_evaluated'].append(policy_result)
if not policy_result['compliant']:
evaluation_result['violations'].append({
'policy_id': policy['policy_id'],
'policy_name': policy['name'],
'violation_type': policy_result['violation_type'],
'severity': policy_result['severity'],
'message': policy_result['message']
})
# Determine approval status
evaluation_result['approval_required'] = self.requires_manual_approval(evaluation_result)
evaluation_result['auto_approved'] = self.can_auto_approve(evaluation_result)
# Store evaluation result
self.store_evaluation_result(evaluation_result)
# Trigger approval workflow if needed
if evaluation_result['approval_required'] and not evaluation_result['auto_approved']:
self.trigger_approval_workflow(evaluation_result)
return evaluation_result
def get_applicable_policies(self, package_name, metadata):
"""Get policies applicable to the package"""
response = self.policies_table.scan(
FilterExpression='attribute_exists(active) AND active = :active',
ExpressionAttributeValues={':active': True}
)
applicable_policies = []
for policy in response['Items']:
if self.policy_applies_to_package(policy, package_name, metadata):
applicable_policies.append(policy)
return applicable_policies
def policy_applies_to_package(self, policy, package_name, metadata):
"""Check if policy applies to the specific package"""
# Check package name patterns
if 'package_patterns' in policy:
import re
for pattern in policy['package_patterns']:
if re.match(pattern, package_name):
return True
# Check package categories
if 'categories' in policy and 'category' in metadata:
if metadata['category'] in policy['categories']:
return True
# Check package ecosystems
if 'ecosystems' in policy and 'ecosystem' in metadata:
if metadata['ecosystem'] in policy['ecosystems']:
return True
# Default policies apply to all packages
return policy.get('applies_to_all', False)
def evaluate_single_policy(self, policy, metadata):
"""Evaluate a single policy against package metadata"""
policy_result = {
'policy_id': policy['policy_id'],
'policy_name': policy['name'],
'compliant': True,
'violation_type': None,
'severity': 'LOW',
'message': 'Policy compliant'
}
# Evaluate security requirements
if 'security_requirements' in policy:
security_result = self.evaluate_security_requirements(
policy['security_requirements'], metadata
)
if not security_result['compliant']:
policy_result.update(security_result)
return policy_result
# Evaluate license requirements
if 'license_requirements' in policy:
license_result = self.evaluate_license_requirements(
policy['license_requirements'], metadata
)
if not license_result['compliant']:
policy_result.update(license_result)
return policy_result
# Evaluate version requirements
if 'version_requirements' in policy:
version_result = self.evaluate_version_requirements(
policy['version_requirements'], metadata
)
if not version_result['compliant']:
policy_result.update(version_result)
return policy_result
# Evaluate maintenance requirements
if 'maintenance_requirements' in policy:
maintenance_result = self.evaluate_maintenance_requirements(
policy['maintenance_requirements'], metadata
)
if not maintenance_result['compliant']:
policy_result.update(maintenance_result)
return policy_result
return policy_result
def evaluate_security_requirements(self, requirements, metadata):
"""Evaluate security-related policy requirements"""
# Check vulnerability thresholds
if 'max_vulnerabilities' in requirements:
vuln_count = len(metadata.get('vulnerabilities', []))
max_allowed = requirements['max_vulnerabilities']
if vuln_count > max_allowed:
return {
'compliant': False,
'violation_type': 'SECURITY_VULNERABILITY_THRESHOLD',
'severity': 'HIGH',
'message': f'Package has {vuln_count} vulnerabilities, exceeds limit of {max_allowed}'
}
# Check security score threshold
if 'min_security_score' in requirements:
security_score = metadata.get('security_score', 0)
min_score = requirements['min_security_score']
if security_score < min_score:
return {
'compliant': False,
'violation_type': 'SECURITY_SCORE_THRESHOLD',
'severity': 'MEDIUM',
'message': f'Package security score {security_score} below minimum {min_score}'
}
# Check for critical vulnerabilities
if requirements.get('no_critical_vulnerabilities', False):
critical_vulns = [
v for v in metadata.get('vulnerabilities', [])
if v.get('severity') == 'CRITICAL'
]
if critical_vulns:
return {
'compliant': False,
'violation_type': 'CRITICAL_VULNERABILITY',
'severity': 'CRITICAL',
'message': f'Package contains {len(critical_vulns)} critical vulnerabilities'
}
return {'compliant': True}
def evaluate_license_requirements(self, requirements, metadata):
"""Evaluate license-related policy requirements"""
package_license = metadata.get('license', 'Unknown')
# Check approved licenses
if 'approved_licenses' in requirements:
if package_license not in requirements['approved_licenses']:
return {
'compliant': False,
'violation_type': 'UNAPPROVED_LICENSE',
'severity': 'MEDIUM',
'message': f'Package license {package_license} not in approved list'
}
# Check prohibited licenses
if 'prohibited_licenses' in requirements:
if package_license in requirements['prohibited_licenses']:
return {
'compliant': False,
'violation_type': 'PROHIBITED_LICENSE',
'severity': 'HIGH',
'message': f'Package license {package_license} is prohibited'
}
return {'compliant': True}
def evaluate_version_requirements(self, requirements, metadata):
"""Evaluate version-related policy requirements"""
# Check for pre-release versions
if requirements.get('no_prerelease', False):
version = metadata.get('version', '')
if any(marker in version.lower() for marker in ['alpha', 'beta', 'rc', 'pre']):
return {
'compliant': False,
'violation_type': 'PRERELEASE_VERSION',
'severity': 'MEDIUM',
'message': f'Pre-release version {version} not allowed'
}
# Check version age
if 'max_age_days' in requirements:
published_date = metadata.get('published_date')
if published_date:
age_days = (datetime.utcnow() - datetime.fromisoformat(published_date)).days
max_age = requirements['max_age_days']
if age_days > max_age:
return {
'compliant': False,
'violation_type': 'VERSION_TOO_OLD',
'severity': 'LOW',
'message': f'Package version is {age_days} days old, exceeds limit of {max_age}'
}
return {'compliant': True}
def evaluate_maintenance_requirements(self, requirements, metadata):
"""Evaluate maintenance-related policy requirements"""
# Check last update date
if 'max_days_since_update' in requirements:
last_update = metadata.get('last_update_date')
if last_update:
days_since_update = (datetime.utcnow() - datetime.fromisoformat(last_update)).days
max_days = requirements['max_days_since_update']
if days_since_update > max_days:
return {
'compliant': False,
'violation_type': 'STALE_PACKAGE',
'severity': 'MEDIUM',
'message': f'Package not updated for {days_since_update} days, exceeds limit of {max_days}'
}
# Check maintainer activity
if requirements.get('active_maintainer_required', False):
maintainer_active = metadata.get('maintainer_active', False)
if not maintainer_active:
return {
'compliant': False,
'violation_type': 'INACTIVE_MAINTAINER',
'severity': 'MEDIUM',
'message': 'Package maintainer appears to be inactive'
}
return {'compliant': True}
def requires_manual_approval(self, evaluation_result):
"""Determine if package requires manual approval"""
# Always require approval for violations
if evaluation_result['violations']:
return True
# Require approval for new packages from unknown sources
# Additional logic can be added here
return False
def can_auto_approve(self, evaluation_result):
"""Determine if package can be automatically approved"""
# Don't auto-approve if there are violations
if evaluation_result['violations']:
return False
# Don't auto-approve if manual approval is required
if evaluation_result['approval_required']:
return False
return True
def store_evaluation_result(self, evaluation_result):
"""Store policy evaluation result"""
self.packages_table.put_item(Item=evaluation_result)
def trigger_approval_workflow(self, evaluation_result):
"""Trigger manual approval workflow"""
workflow_input = {
'package_name': evaluation_result['package_name'],
'package_version': evaluation_result['package_version'],
'evaluation_result': evaluation_result,
'approval_required_reason': 'Policy violations detected'
}
# Start Step Functions workflow for approval process
self.stepfunctions.start_execution(
stateMachineArn='arn:aws:states:us-west-2:123456789012:stateMachine:PackageApprovalWorkflow',
input=json.dumps(workflow_input)
)
# Send notification to security team
self.sns.publish(
TopicArn='arn:aws:sns:us-west-2:123456789012:PackageApprovalRequired',
Subject=f'Package Approval Required: {evaluation_result["package_name"]}',
Message=json.dumps(evaluation_result, indent=2)
)
Example usage
def lambda_handler(event, context): “"”Lambda function for package governance evaluation”””
governance_engine = PackageGovernanceEngine()
# Process package evaluation request
package_name = event.get('package_name')
package_version = event.get('package_version')
metadata = event.get('metadata', {})
if package_name and package_version:
result = governance_engine.evaluate_package_policy(
package_name, package_version, metadata
)
return {
'statusCode': 200,
'body': json.dumps(result)
}
return {
'statusCode': 400,
'body': json.dumps({'error': 'Missing required parameters'})
} ```
AWS services to consider
Benefits of centralizing services for packages and dependencies
- Enhanced security posture: Centralized scanning and validation of all packages before use
- Improved compliance: Consistent application of security and licensing policies
- Reduced supply chain risk: Protection against malicious packages and supply chain attacks
- Operational efficiency: Streamlined package management and automated security processes
- Cost optimization: Reduced bandwidth usage through package caching and mirroring
- Developer productivity: Simplified access to approved packages with clear governance
- Audit capabilities: Complete visibility into package usage and security status
- Faster incident response: Centralized tracking enables quick identification of affected systems