SEC11-BP08 - Build a program that embeds security ownership in workload teams
Implementation guidance
Embedding security ownership in workload teams is essential for creating a security-conscious culture where every team member takes responsibility for security outcomes. By distributing security knowledge and accountability throughout the organization, you create multiple layers of defense and ensure security is considered at every stage of development and operations.
Key steps for implementing this best practice:
-
Establish security ownership framework:
- Define clear security roles and responsibilities for each team member
- Create security accountability metrics and KPIs
- Implement security ownership documentation and processes
- Establish security decision-making authority within teams
- Create escalation paths for security issues and incidents
-
Implement security champions program:
- Identify and train security champions within each workload team
- Provide advanced security training and certification opportunities
- Create security champion networks and communities of practice
- Establish regular security champion meetings and knowledge sharing
- Recognize and reward security champion contributions
-
Provide comprehensive security training:
- Develop role-based security training programs
- Implement hands-on security workshops and labs
- Create security awareness campaigns and communications
- Establish continuous learning paths and certification programs
- Measure training effectiveness and knowledge retention
-
Create security feedback and improvement mechanisms:
- Implement security metrics and dashboards for teams
- Establish regular security reviews and retrospectives
- Create security incident post-mortem processes
- Implement security suggestion and improvement programs
- Establish security maturity assessment frameworks
-
Integrate security into team processes:
- Embed security requirements in development workflows
- Implement security checkpoints in deployment pipelines
- Create security-focused code review processes
- Establish security testing and validation procedures
- Integrate security considerations into planning and design
-
Foster security culture and collaboration:
- Promote security-first mindset across all team activities
- Encourage proactive security thinking and innovation
- Create cross-team security collaboration opportunities
- Establish security knowledge sharing platforms
- Celebrate security achievements and learnings
Implementation examples
Example 1: Security champions program management system
View code
import json
import boto3
from datetime import datetime, timedelta
from typing import Dict, List, Any
import uuid
class SecurityChampionsProgram:
def __init__(self):
self.dynamodb = boto3.resource('dynamodb')
self.sns = boto3.client('sns')
self.ses = boto3.client('ses')
self.lambda_client = boto3.client('lambda')
# DynamoDB tables
self.champions_table = self.dynamodb.Table('SecurityChampions')
self.training_table = self.dynamodb.Table('SecurityTraining')
self.activities_table = self.dynamodb.Table('SecurityActivities')
self.metrics_table = self.dynamodb.Table('SecurityMetrics')
# Program configuration
self.program_config = {
'champion_requirements': {
'min_training_hours': 40,
'required_certifications': ['AWS Security Specialty', 'Security+'],
'min_activities_per_quarter': 4,
'peer_nominations_required': 2
},
'training_tracks': {
'foundation': {
'duration_hours': 16,
'modules': ['Security Fundamentals', 'Threat Modeling', 'Secure Coding']
},
'advanced': {
'duration_hours': 24,
'modules': ['Advanced Threats', 'Incident Response', 'Security Architecture']
},
'leadership': {
'duration_hours': 12,
'modules': ['Security Leadership', 'Risk Management', 'Compliance']
}
},
'recognition_levels': {
'bronze': {'points_required': 100, 'benefits': ['Certificate', 'Badge']},
'silver': {'points_required': 250, 'benefits': ['Certificate', 'Badge', 'Conference Ticket']},
'gold': {'points_required': 500, 'benefits': ['Certificate', 'Badge', 'Conference Ticket', 'Bonus']}
}
}
def nominate_champion(self, nominee_data: Dict[str, Any]) -> Dict[str, Any]:
"""Nominate a team member as a security champion"""
nomination_id = str(uuid.uuid4())
nomination = {
'nomination_id': nomination_id,
'nominee_email': nominee_data['email'],
'nominee_name': nominee_data['name'],
'team': nominee_data['team'],
'department': nominee_data['department'],
'nominator_email': nominee_data['nominator_email'],
'nominator_name': nominee_data['nominator_name'],
'nomination_reason': nominee_data['reason'],
'skills_assessment': nominee_data.get('skills', {}),
'nomination_timestamp': datetime.utcnow().isoformat(),
'status': 'pending_review',
'review_scores': [],
'final_decision': None
}
# Store nomination
self.champions_table.put_item(Item=nomination)
# Send notification to security team for review
self.send_nomination_notification(nomination)
# Send confirmation to nominator
self.send_nomination_confirmation(nomination)
return {
'nomination_id': nomination_id,
'status': 'submitted',
'next_steps': 'Nomination will be reviewed by the security team within 5 business days'
}
def review_nomination(self, nomination_id: str, reviewer_data: Dict[str, Any]) -> Dict[str, Any]:
"""Review and score a champion nomination"""
# Get nomination
response = self.champions_table.get_item(Key={'nomination_id': nomination_id})
if 'Item' not in response:
return {'error': 'Nomination not found'}
nomination = response['Item']
# Add review score
review_score = {
'reviewer_email': reviewer_data['reviewer_email'],
'reviewer_name': reviewer_data['reviewer_name'],
'technical_score': reviewer_data['technical_score'], # 1-10
'leadership_score': reviewer_data['leadership_score'], # 1-10
'communication_score': reviewer_data['communication_score'], # 1-10
'motivation_score': reviewer_data['motivation_score'], # 1-10
'overall_score': reviewer_data['overall_score'], # 1-10
'comments': reviewer_data.get('comments', ''),
'recommendation': reviewer_data['recommendation'], # approve/reject/needs_more_info
'review_timestamp': datetime.utcnow().isoformat()
}
# Update nomination with review
nomination['review_scores'].append(review_score)
# Check if we have enough reviews to make a decision
if len(nomination['review_scores']) >= 2:
decision = self.make_nomination_decision(nomination)
nomination['final_decision'] = decision
nomination['status'] = 'approved' if decision['approved'] else 'rejected'
# Process approved nomination
if decision['approved']:
champion_result = self.onboard_new_champion(nomination)
nomination['champion_id'] = champion_result['champion_id']
# Update nomination
self.champions_table.put_item(Item=nomination)
# Send status update
self.send_nomination_status_update(nomination)
return {
'nomination_id': nomination_id,
'status': nomination['status'],
'reviews_completed': len(nomination['review_scores']),
'decision': nomination.get('final_decision')
}
def onboard_new_champion(self, nomination: Dict[str, Any]) -> Dict[str, Any]:
"""Onboard a newly approved security champion"""
champion_id = str(uuid.uuid4())
champion = {
'champion_id': champion_id,
'email': nomination['nominee_email'],
'name': nomination['nominee_name'],
'team': nomination['team'],
'department': nomination['department'],
'start_date': datetime.utcnow().isoformat(),
'status': 'active',
'level': 'bronze',
'points': 0,
'training_completed': [],
'certifications': [],
'activities': [],
'mentorship': {
'mentor_assigned': False,
'mentor_id': None,
'mentorship_start_date': None
},
'performance_metrics': {
'training_hours': 0,
'activities_completed': 0,
'team_impact_score': 0,
'peer_feedback_score': 0
}
}
# Store champion record
self.champions_table.put_item(Item=champion)
# Assign mentor
mentor_result = self.assign_mentor(champion_id)
if mentor_result['mentor_assigned']:
champion['mentorship'] = mentor_result
self.champions_table.put_item(Item=champion)
# Create onboarding training plan
training_plan = self.create_training_plan(champion_id, 'foundation')
# Send welcome package
self.send_champion_welcome_package(champion, training_plan)
# Schedule initial check-in
self.schedule_champion_checkin(champion_id, days_from_now=30)
return {
'champion_id': champion_id,
'onboarding_status': 'completed',
'training_plan_id': training_plan['plan_id'],
'mentor_assigned': mentor_result['mentor_assigned']
}
def create_training_plan(self, champion_id: str, track: str) -> Dict[str, Any]:
"""Create personalized training plan for champion"""
plan_id = str(uuid.uuid4())
track_config = self.program_config['training_tracks'][track]
training_plan = {
'plan_id': plan_id,
'champion_id': champion_id,
'track': track,
'total_hours': track_config['duration_hours'],
'modules': [],
'created_date': datetime.utcnow().isoformat(),
'target_completion_date': (datetime.utcnow() + timedelta(days=90)).isoformat(),
'status': 'active',
'progress': {
'completed_modules': 0,
'total_modules': len(track_config['modules']),
'completion_percentage': 0
}
}
# Create training modules
for i, module_name in enumerate(track_config['modules']):
module = {
'module_id': str(uuid.uuid4()),
'name': module_name,
'order': i + 1,
'estimated_hours': track_config['duration_hours'] // len(track_config['modules']),
'status': 'not_started',
'start_date': None,
'completion_date': None,
'score': None,
'resources': self.get_training_resources(module_name)
}
training_plan['modules'].append(module)
# Store training plan
self.training_table.put_item(Item=training_plan)
return training_plan
def track_champion_activity(self, activity_data: Dict[str, Any]) -> Dict[str, Any]:
"""Track and record champion security activities"""
activity_id = str(uuid.uuid4())
activity = {
'activity_id': activity_id,
'champion_id': activity_data['champion_id'],
'activity_type': activity_data['type'], # training, presentation, code_review, incident_response, etc.
'title': activity_data['title'],
'description': activity_data['description'],
'date': activity_data['date'],
'duration_hours': activity_data.get('duration_hours', 0),
'impact_level': activity_data.get('impact_level', 'medium'), # low, medium, high
'team_members_impacted': activity_data.get('team_members_impacted', 0),
'artifacts': activity_data.get('artifacts', []), # documents, presentations, code, etc.
'feedback': activity_data.get('feedback', {}),
'points_earned': self.calculate_activity_points(activity_data),
'verification_status': 'pending',
'verified_by': None,
'verification_date': None
}
# Store activity
self.activities_table.put_item(Item=activity)
# Update champion points (pending verification)
self.update_champion_points(activity_data['champion_id'], activity['points_earned'], pending=True)
# Send verification request
self.send_activity_verification_request(activity)
return {
'activity_id': activity_id,
'points_earned': activity['points_earned'],
'verification_status': 'pending'
}
def verify_champion_activity(self, activity_id: str, verifier_data: Dict[str, Any]) -> Dict[str, Any]:
"""Verify and approve champion activity"""
# Get activity
response = self.activities_table.get_item(Key={'activity_id': activity_id})
if 'Item' not in response:
return {'error': 'Activity not found'}
activity = response['Item']
# Update verification status
activity['verification_status'] = verifier_data['status'] # approved, rejected, needs_revision
activity['verified_by'] = verifier_data['verifier_email']
activity['verification_date'] = datetime.utcnow().isoformat()
activity['verification_comments'] = verifier_data.get('comments', '')
# Adjust points if needed
if verifier_data['status'] == 'approved':
adjusted_points = verifier_data.get('adjusted_points', activity['points_earned'])
activity['points_earned'] = adjusted_points
# Update champion points (confirmed)
self.update_champion_points(activity['champion_id'], adjusted_points, pending=False)
elif verifier_data['status'] == 'rejected':
# Remove pending points
self.update_champion_points(activity['champion_id'], -activity['points_earned'], pending=True)
# Update activity
self.activities_table.put_item(Item=activity)
# Send notification to champion
self.send_activity_verification_result(activity)
return {
'activity_id': activity_id,
'verification_status': activity['verification_status'],
'points_awarded': activity['points_earned'] if verifier_data['status'] == 'approved' else 0
}
def generate_champion_metrics(self, champion_id: str, period: str = 'quarterly') -> Dict[str, Any]:
"""Generate comprehensive metrics for a security champion"""
# Get champion data
champion_response = self.champions_table.get_item(Key={'champion_id': champion_id})
if 'Item' not in champion_response:
return {'error': 'Champion not found'}
champion = champion_response['Item']
# Calculate date range
end_date = datetime.utcnow()
if period == 'quarterly':
start_date = end_date - timedelta(days=90)
elif period == 'monthly':
start_date = end_date - timedelta(days=30)
elif period == 'yearly':
start_date = end_date - timedelta(days=365)
else:
start_date = datetime.fromisoformat(champion['start_date'])
# Get activities in period
activities = self.get_champion_activities(champion_id, start_date, end_date)
# Calculate metrics
metrics = {
'champion_id': champion_id,
'champion_name': champion['name'],
'period': period,
'start_date': start_date.isoformat(),
'end_date': end_date.isoformat(),
'current_level': champion['level'],
'total_points': champion['points'],
'period_metrics': {
'activities_completed': len(activities),
'total_hours_contributed': sum(a.get('duration_hours', 0) for a in activities),
'points_earned': sum(a.get('points_earned', 0) for a in activities if a.get('verification_status') == 'approved'),
'team_members_impacted': sum(a.get('team_members_impacted', 0) for a in activities),
'high_impact_activities': len([a for a in activities if a.get('impact_level') == 'high'])
},
'activity_breakdown': self.analyze_activity_breakdown(activities),
'training_progress': self.get_training_progress(champion_id),
'peer_feedback': self.get_peer_feedback(champion_id, start_date, end_date),
'recommendations': self.generate_champion_recommendations(champion, activities)
}
# Store metrics
metrics_record = {
'metric_id': str(uuid.uuid4()),
'champion_id': champion_id,
'period': period,
'generated_date': datetime.utcnow().isoformat(),
'metrics': metrics
}
self.metrics_table.put_item(Item=metrics_record)
return metrics
def calculate_activity_points(self, activity_data: Dict[str, Any]) -> int:
"""Calculate points for a champion activity"""
base_points = {
'training': 10,
'presentation': 20,
'code_review': 15,
'incident_response': 25,
'mentoring': 20,
'documentation': 15,
'tool_development': 30,
'vulnerability_discovery': 40,
'process_improvement': 25
}
activity_type = activity_data['type']
points = base_points.get(activity_type, 10)
# Multiply by impact level
impact_multiplier = {
'low': 1.0,
'medium': 1.5,
'high': 2.0
}
points *= impact_multiplier.get(activity_data.get('impact_level', 'medium'), 1.5)
# Add bonus for team impact
team_impact = activity_data.get('team_members_impacted', 0)
if team_impact > 10:
points *= 1.5
elif team_impact > 5:
points *= 1.2
return int(points)
def assign_mentor(self, champion_id: str) -> Dict[str, Any]:
"""Assign a mentor to a new security champion"""
# Find available mentors
available_mentors = self.find_available_mentors()
if not available_mentors:
return {
'mentor_assigned': False,
'reason': 'No available mentors at this time'
}
# Select best mentor based on team, department, and availability
selected_mentor = self.select_best_mentor(champion_id, available_mentors)
# Create mentorship relationship
mentorship = {
'mentor_assigned': True,
'mentor_id': selected_mentor['champion_id'],
'mentor_name': selected_mentor['name'],
'mentorship_start_date': datetime.utcnow().isoformat(),
'mentorship_duration_months': 6,
'meeting_frequency': 'bi-weekly',
'goals': [
'Complete foundation training track',
'Lead first security presentation',
'Participate in security code reviews',
'Develop security expertise in team domain'
]
}
# Update mentor's mentee list
self.add_mentee_to_mentor(selected_mentor['champion_id'], champion_id)
# Send mentorship notifications
self.send_mentorship_notifications(champion_id, selected_mentor['champion_id'])
return mentorship
def send_nomination_notification(self, nomination: Dict[str, Any]):
"""Send nomination notification to security team"""
message = {
'subject': f'New Security Champion Nomination: {nomination["nominee_name"]}',
'body': f"""
A new security champion nomination has been submitted:
Nominee: {nomination['nominee_name']} ({nomination['nominee_email']})
Team: {nomination['team']}
Department: {nomination['department']}
Nominator: {nomination['nominator_name']} ({nomination['nominator_email']})
Reason for Nomination:
{nomination['nomination_reason']}
Please review this nomination in the Security Champions portal.
Nomination ID: {nomination['nomination_id']}
""",
'recipients': ['security-team@company.com']
}
self.send_email_notification(message)
def send_champion_welcome_package(self, champion: Dict[str, Any], training_plan: Dict[str, Any]):
"""Send welcome package to new security champion"""
message = {
'subject': 'Welcome to the Security Champions Program!',
'body': f"""
Congratulations {champion['name']}!
You have been selected to join our Security Champions Program. We're excited to have you on board!
Your Champion ID: {champion['champion_id']}
Starting Level: {champion['level']}
Next Steps:
1. Complete your foundation training track ({training_plan['total_hours']} hours)
2. Meet with your assigned mentor
3. Attend the monthly Security Champions meeting
4. Start contributing to your team's security initiatives
Resources:
- Security Champions Portal: https://security-champions.company.com
- Training Materials: https://training.company.com/security
- Slack Channel: #security-champions
Welcome to the team!
Security Champions Program Team
""",
'recipients': [champion['email']]
}
self.send_email_notification(message)
def send_email_notification(self, message: Dict[str, Any]):
"""Send email notification using SES"""
try:
self.ses.send_email(
Source='security-champions@company.com',
Destination={'ToAddresses': message['recipients']},
Message={
'Subject': {'Data': message['subject']},
'Body': {'Text': {'Data': message['body']}}
}
)
except Exception as e:
print(f"Error sending email: {str(e)}")
def lambda_handler(event, context):
"""Lambda function for Security Champions Program management"""
program = SecurityChampionsProgram()
action = event.get('action')
if action == 'nominate_champion':
result = program.nominate_champion(event['nominee_data'])
elif action == 'review_nomination':
result = program.review_nomination(event['nomination_id'], event['reviewer_data'])
elif action == 'track_activity':
result = program.track_champion_activity(event['activity_data'])
elif action == 'verify_activity':
result = program.verify_champion_activity(event['activity_id'], event['verifier_data'])
elif action == 'generate_metrics':
result = program.generate_champion_metrics(event['champion_id'], event.get('period', 'quarterly'))
else:
result = {'error': 'Invalid action specified'}
return {
'statusCode': 200,
'body': json.dumps(result)
}Example 2: Security training and competency management system
View code
import json
import boto3
from datetime import datetime, timedelta
from typing import Dict, List, Any
import uuid
class SecurityTrainingManager:
def __init__(self):
self.dynamodb = boto3.resource('dynamodb')
self.s3 = boto3.client('s3')
self.lambda_client = boto3.client('lambda')
self.ses = boto3.client('ses')
# DynamoDB tables
self.employees_table = self.dynamodb.Table('Employees')
self.training_catalog_table = self.dynamodb.Table('TrainingCatalog')
self.training_records_table = self.dynamodb.Table('TrainingRecords')
self.competency_table = self.dynamodb.Table('SecurityCompetencies')
self.assessments_table = self.dynamodb.Table('SecurityAssessments')
# Training configuration
self.training_config = {
'role_based_requirements': {
'developer': {
'required_courses': [
'secure-coding-fundamentals',
'owasp-top-10',
'threat-modeling-basics',
'code-review-security'
],
'annual_hours_required': 20,
'competency_areas': ['secure_coding', 'vulnerability_assessment', 'code_review']
},
'devops': {
'required_courses': [
'infrastructure-security',
'container-security',
'pipeline-security',
'cloud-security-aws'
],
'annual_hours_required': 24,
'competency_areas': ['infrastructure_security', 'pipeline_security', 'incident_response']
},
'architect': {
'required_courses': [
'security-architecture',
'threat-modeling-advanced',
'compliance-frameworks',
'risk-assessment'
],
'annual_hours_required': 30,
'competency_areas': ['security_architecture', 'threat_modeling', 'compliance', 'risk_management']
},
'manager': {
'required_courses': [
'security-leadership',
'risk-management',
'incident-management',
'security-governance'
],
'annual_hours_required': 16,
'competency_areas': ['security_leadership', 'risk_management', 'governance']
}
},
'competency_levels': {
'novice': {'score_range': [0, 40], 'description': 'Basic understanding'},
'competent': {'score_range': [41, 70], 'description': 'Can perform with guidance'},
'proficient': {'score_range': [71, 85], 'description': 'Can perform independently'},
'expert': {'score_range': [86, 100], 'description': 'Can teach and mentor others'}
},
'assessment_frequency': {
'initial': 'upon_hire',
'regular': 'quarterly',
'post_training': 'immediate',
'annual': 'yearly'
}
}
def create_personalized_training_plan(self, employee_id: str) -> Dict[str, Any]:
"""Create a personalized training plan based on role and current competencies"""
# Get employee information
employee_response = self.employees_table.get_item(Key={'employee_id': employee_id})
if 'Item' not in employee_response:
return {'error': 'Employee not found'}
employee = employee_response['Item']
role = employee.get('role', 'developer')
# Get current competency assessment
current_competencies = self.get_current_competencies(employee_id)
# Get role requirements
role_requirements = self.training_config['role_based_requirements'].get(role,
self.training_config['role_based_requirements']['developer'])
# Create training plan
plan_id = str(uuid.uuid4())
training_plan = {
'plan_id': plan_id,
'employee_id': employee_id,
'employee_name': employee['name'],
'role': role,
'created_date': datetime.utcnow().isoformat(),
'plan_year': datetime.utcnow().year,
'status': 'active',
'required_courses': [],
'recommended_courses': [],
'competency_goals': [],
'progress': {
'courses_completed': 0,
'total_courses': 0,
'hours_completed': 0,
'required_hours': role_requirements['annual_hours_required'],
'completion_percentage': 0
}
}
# Add required courses
for course_id in role_requirements['required_courses']:
course_info = self.get_course_info(course_id)
if course_info and not self.is_course_completed(employee_id, course_id):
training_plan['required_courses'].append({
'course_id': course_id,
'course_name': course_info['name'],
'duration_hours': course_info['duration_hours'],
'priority': 'required',
'due_date': (datetime.utcnow() + timedelta(days=90)).isoformat(),
'status': 'not_started'
})
# Add recommended courses based on competency gaps
recommended_courses = self.identify_recommended_courses(current_competencies, role_requirements)
training_plan['recommended_courses'] = recommended_courses
# Set competency goals
for competency_area in role_requirements['competency_areas']:
current_level = current_competencies.get(competency_area, {}).get('level', 'novice')
target_level = 'proficient' if current_level in ['novice', 'competent'] else 'expert'
training_plan['competency_goals'].append({
'competency_area': competency_area,
'current_level': current_level,
'target_level': target_level,
'target_date': (datetime.utcnow() + timedelta(days=180)).isoformat()
})
# Update progress totals
training_plan['progress']['total_courses'] = len(training_plan['required_courses']) + len(training_plan['recommended_courses'])
# Store training plan
self.training_records_table.put_item(Item=training_plan)
# Send training plan notification
self.send_training_plan_notification(training_plan)
return training_plan
def track_training_completion(self, completion_data: Dict[str, Any]) -> Dict[str, Any]:
"""Track and record training completion"""
completion_id = str(uuid.uuid4())
completion_record = {
'completion_id': completion_id,
'employee_id': completion_data['employee_id'],
'course_id': completion_data['course_id'],
'completion_date': completion_data['completion_date'],
'duration_hours': completion_data['duration_hours'],
'score': completion_data.get('score'),
'certificate_url': completion_data.get('certificate_url'),
'instructor': completion_data.get('instructor'),
'training_method': completion_data.get('method', 'online'), # online, classroom, workshop
'feedback': completion_data.get('feedback', {}),
'verification_status': 'verified' if completion_data.get('auto_verified') else 'pending'
}
# Store completion record
self.training_records_table.put_item(Item=completion_record)
# Update employee training plan
self.update_training_plan_progress(completion_data['employee_id'], completion_data['course_id'])
# Trigger post-training assessment if required
course_info = self.get_course_info(completion_data['course_id'])
if course_info and course_info.get('requires_assessment'):
self.schedule_post_training_assessment(completion_data['employee_id'], completion_data['course_id'])
# Update competency scores
self.update_competency_scores(completion_data['employee_id'], completion_data['course_id'], completion_data.get('score'))
# Send completion notification
self.send_training_completion_notification(completion_record)
return {
'completion_id': completion_id,
'status': 'recorded',
'verification_status': completion_record['verification_status']
}
def conduct_security_assessment(self, assessment_data: Dict[str, Any]) -> Dict[str, Any]:
"""Conduct security competency assessment"""
assessment_id = str(uuid.uuid4())
assessment = {
'assessment_id': assessment_id,
'employee_id': assessment_data['employee_id'],
'assessment_type': assessment_data['type'], # initial, quarterly, post_training, annual
'competency_areas': assessment_data['competency_areas'],
'assessment_date': datetime.utcnow().isoformat(),
'assessor': assessment_data.get('assessor'),
'method': assessment_data.get('method', 'online_quiz'), # online_quiz, practical_exercise, interview
'results': {},
'overall_score': 0,
'recommendations': [],
'status': 'in_progress'
}
# Conduct assessment for each competency area
total_score = 0
for competency_area in assessment_data['competency_areas']:
area_result = self.assess_competency_area(
assessment_data['employee_id'],
competency_area,
assessment_data.get('questions', {}).get(competency_area, [])
)
assessment['results'][competency_area] = area_result
total_score += area_result['score']
# Calculate overall score
assessment['overall_score'] = total_score / len(assessment_data['competency_areas'])
assessment['status'] = 'completed'
# Generate recommendations
assessment['recommendations'] = self.generate_assessment_recommendations(assessment)
# Store assessment
self.assessments_table.put_item(Item=assessment)
# Update employee competency records
self.update_employee_competencies(assessment_data['employee_id'], assessment['results'])
# Send assessment results
self.send_assessment_results(assessment)
return assessment
def assess_competency_area(self, employee_id: str, competency_area: str, questions: List[Dict]) -> Dict[str, Any]:
"""Assess a specific competency area"""
if not questions:
# Use default questions for the competency area
questions = self.get_default_questions(competency_area)
total_points = 0
earned_points = 0
for question in questions:
total_points += question.get('points', 1)
if question.get('correct', False):
earned_points += question.get('points', 1)
score = (earned_points / total_points * 100) if total_points > 0 else 0
level = self.determine_competency_level(score)
return {
'competency_area': competency_area,
'score': score,
'level': level,
'questions_answered': len(questions),
'correct_answers': sum(1 for q in questions if q.get('correct', False)),
'areas_for_improvement': self.identify_improvement_areas(competency_area, questions)
}
def generate_team_security_dashboard(self, team_id: str) -> Dict[str, Any]:
"""Generate security training and competency dashboard for a team"""
# Get team members
team_members = self.get_team_members(team_id)
dashboard = {
'team_id': team_id,
'generated_date': datetime.utcnow().isoformat(),
'team_size': len(team_members),
'overall_metrics': {
'training_compliance_rate': 0,
'average_competency_score': 0,
'security_champions': 0,
'overdue_training': 0
},
'competency_breakdown': {},
'training_status': {
'completed_this_quarter': 0,
'in_progress': 0,
'overdue': 0,
'upcoming_due': 0
},
'top_performers': [],
'improvement_opportunities': [],
'recommended_actions': []
}
total_compliance = 0
total_competency_score = 0
competency_scores = {}
for member in team_members:
employee_id = member['employee_id']
# Get training compliance
compliance = self.calculate_training_compliance(employee_id)
total_compliance += compliance['compliance_percentage']
# Get competency scores
competencies = self.get_current_competencies(employee_id)
member_avg_score = 0
for area, comp_data in competencies.items():
score = comp_data.get('score', 0)
if area not in competency_scores:
competency_scores[area] = []
competency_scores[area].append(score)
member_avg_score += score
if competencies:
member_avg_score /= len(competencies)
total_competency_score += member_avg_score
# Check for security champion status
if member.get('security_champion', False):
dashboard['overall_metrics']['security_champions'] += 1
# Check for overdue training
if compliance['overdue_courses'] > 0:
dashboard['overall_metrics']['overdue_training'] += 1
# Calculate overall metrics
if team_members:
dashboard['overall_metrics']['training_compliance_rate'] = total_compliance / len(team_members)
dashboard['overall_metrics']['average_competency_score'] = total_competency_score / len(team_members)
# Calculate competency breakdown
for area, scores in competency_scores.items():
dashboard['competency_breakdown'][area] = {
'average_score': sum(scores) / len(scores),
'level_distribution': self.calculate_level_distribution(scores),
'improvement_needed': len([s for s in scores if s < 70])
}
# Identify top performers
dashboard['top_performers'] = self.identify_top_performers(team_members)
# Generate recommendations
dashboard['recommended_actions'] = self.generate_team_recommendations(dashboard)
return dashboard
def schedule_training_reminders(self, employee_id: str) -> Dict[str, Any]:
"""Schedule automated training reminders"""
# Get employee training plan
training_plan = self.get_active_training_plan(employee_id)
if not training_plan:
return {'error': 'No active training plan found'}
reminders_scheduled = []
# Schedule reminders for required courses
for course in training_plan.get('required_courses', []):
if course['status'] == 'not_started':
due_date = datetime.fromisoformat(course['due_date'])
# Schedule reminder 30 days before due date
reminder_date = due_date - timedelta(days=30)
if reminder_date > datetime.utcnow():
reminder_id = self.schedule_reminder(
employee_id,
course['course_id'],
reminder_date,
'course_due_soon'
)
reminders_scheduled.append({
'reminder_id': reminder_id,
'course_id': course['course_id'],
'reminder_date': reminder_date.isoformat(),
'type': 'course_due_soon'
})
# Schedule reminder 7 days before due date
urgent_reminder_date = due_date - timedelta(days=7)
if urgent_reminder_date > datetime.utcnow():
reminder_id = self.schedule_reminder(
employee_id,
course['course_id'],
urgent_reminder_date,
'course_due_urgent'
)
reminders_scheduled.append({
'reminder_id': reminder_id,
'course_id': course['course_id'],
'reminder_date': urgent_reminder_date.isoformat(),
'type': 'course_due_urgent'
})
return {
'employee_id': employee_id,
'reminders_scheduled': len(reminders_scheduled),
'reminders': reminders_scheduled
}
def generate_security_culture_metrics(self, organization_level: str = 'company') -> Dict[str, Any]:
"""Generate organization-wide security culture metrics"""
metrics = {
'organization_level': organization_level,
'generated_date': datetime.utcnow().isoformat(),
'period': 'quarterly',
'overall_metrics': {
'total_employees': 0,
'security_trained_employees': 0,
'security_champions': 0,
'average_competency_score': 0,
'training_compliance_rate': 0
},
'competency_trends': {},
'training_effectiveness': {},
'security_incidents_correlation': {},
'culture_indicators': {
'proactive_security_reports': 0,
'security_suggestions_submitted': 0,
'cross_team_security_collaboration': 0,
'security_innovation_projects': 0
},
'recommendations': []
}
# Get all employees
all_employees = self.get_all_employees()
metrics['overall_metrics']['total_employees'] = len(all_employees)
total_competency_score = 0
total_compliance = 0
security_champions = 0
trained_employees = 0
for employee in all_employees:
employee_id = employee['employee_id']
# Check training status
compliance = self.calculate_training_compliance(employee_id)
if compliance['compliance_percentage'] > 0:
trained_employees += 1
total_compliance += compliance['compliance_percentage']
# Check competency scores
competencies = self.get_current_competencies(employee_id)
if competencies:
avg_score = sum(comp['score'] for comp in competencies.values()) / len(competencies)
total_competency_score += avg_score
# Check security champion status
if employee.get('security_champion', False):
security_champions += 1
# Calculate overall metrics
metrics['overall_metrics']['security_trained_employees'] = trained_employees
metrics['overall_metrics']['security_champions'] = security_champions
if trained_employees > 0:
metrics['overall_metrics']['training_compliance_rate'] = total_compliance / trained_employees
metrics['overall_metrics']['average_competency_score'] = total_competency_score / trained_employees
# Generate culture indicators
metrics['culture_indicators'] = self.calculate_culture_indicators()
# Generate recommendations
metrics['recommendations'] = self.generate_culture_recommendations(metrics)
return metrics
def send_training_plan_notification(self, training_plan: Dict[str, Any]):
"""Send training plan notification to employee"""
message = {
'subject': f'Your {training_plan["plan_year"]} Security Training Plan is Ready',
'body': f"""
Hello {training_plan['employee_name']},
Your personalized security training plan for {training_plan['plan_year']} has been created.
Training Requirements:
- Required Courses: {len(training_plan['required_courses'])}
- Recommended Courses: {len(training_plan['recommended_courses'])}
- Total Hours Required: {training_plan['progress']['required_hours']}
Competency Goals:
{chr(10).join([f"- {goal['competency_area']}: {goal['current_level']} → {goal['target_level']}" for goal in training_plan['competency_goals']])}
Please log into the training portal to begin your courses.
Training Portal: https://training.company.com/security
Best regards,
Security Training Team
""",
'recipients': [self.get_employee_email(training_plan['employee_id'])]
}
self.send_email_notification(message)
def send_email_notification(self, message: Dict[str, Any]):
"""Send email notification using SES"""
try:
self.ses.send_email(
Source='security-training@company.com',
Destination={'ToAddresses': message['recipients']},
Message={
'Subject': {'Data': message['subject']},
'Body': {'Text': {'Data': message['body']}}
}
)
except Exception as e:
print(f"Error sending email: {str(e)}")
def lambda_handler(event, context):
"""Lambda function for Security Training Management"""
training_manager = SecurityTrainingManager()
action = event.get('action')
if action == 'create_training_plan':
result = training_manager.create_personalized_training_plan(event['employee_id'])
elif action == 'track_completion':
result = training_manager.track_training_completion(event['completion_data'])
elif action == 'conduct_assessment':
result = training_manager.conduct_security_assessment(event['assessment_data'])
elif action == 'generate_team_dashboard':
result = training_manager.generate_team_security_dashboard(event['team_id'])
elif action == 'schedule_reminders':
result = training_manager.schedule_training_reminders(event['employee_id'])
elif action == 'generate_culture_metrics':
result = training_manager.generate_security_culture_metrics(event.get('organization_level', 'company'))
else:
result = {'error': 'Invalid action specified'}
return {
'statusCode': 200,
'body': json.dumps(result)
}Example 3: Security ownership integration with development workflows
View code
# .github/workflows/security-ownership-integration.yml
name: Security Ownership Integration
on:
push:
branches: [ main, develop ]
pull_request:
branches: [ main ]
schedule:
- cron: '0 9 * * 1' # Weekly security ownership check
env:
SECURITY_OWNERSHIP_API: https://api.company.com/security-ownership
TEAM_SECURITY_DASHBOARD: https://dashboard.company.com/security
jobs:
security-ownership-check:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: '3.9'
- name: Install dependencies
run: |
pip install requests boto3 pyyaml
- name: Check security ownership assignments
run: |
python scripts/check-security-ownership.py
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
- name: Validate security champion involvement
run: |
python scripts/validate-champion-involvement.py
- name: Generate security ownership report
run: |
python scripts/generate-ownership-report.py
- name: Upload security ownership report
uses: actions/upload-artifact@v3
with:
name: security-ownership-report
path: reports/security-ownership-report.json
security-training-compliance:
runs-on: ubuntu-latest
steps:
- name: Check team training compliance
run: |
python scripts/check-training-compliance.py --team="${{ github.repository_owner }}"
- name: Identify training gaps
run: |
python scripts/identify-training-gaps.py
- name: Schedule training reminders
if: github.event_name == 'schedule'
run: |
python scripts/schedule-training-reminders.py
security-culture-metrics:
runs-on: ubuntu-latest
if: github.event_name == 'schedule'
steps:
- name: Collect security culture metrics
run: |
python scripts/collect-culture-metrics.py
- name: Update security dashboard
run: |
python scripts/update-security-dashboard.py
- name: Send weekly security report
run: |
python scripts/send-weekly-report.pyView code
# scripts/check-security-ownership.py
import json
import requests
import os
from typing import Dict, List, Any
import boto3
class SecurityOwnershipChecker:
def __init__(self):
self.github_token = os.environ.get('GITHUB_TOKEN')
self.repo_owner = os.environ.get('GITHUB_REPOSITORY_OWNER')
self.repo_name = os.environ.get('GITHUB_REPOSITORY', '').split('/')[-1]
self.dynamodb = boto3.resource('dynamodb')
# DynamoDB tables
self.ownership_table = self.dynamodb.Table('SecurityOwnership')
self.teams_table = self.dynamodb.Table('Teams')
# Security ownership requirements
self.ownership_requirements = {
'security_champion_required': True,
'security_reviewer_required': True,
'security_training_compliance': 80, # percentage
'security_documentation_required': True,
'incident_response_contact_required': True
}
def check_repository_security_ownership(self) -> Dict[str, Any]:
"""Check security ownership for the current repository"""
ownership_check = {
'repository': f"{self.repo_owner}/{self.repo_name}",
'check_timestamp': datetime.utcnow().isoformat(),
'ownership_status': 'compliant',
'issues': [],
'recommendations': [],
'team_info': {},
'security_contacts': {}
}
try:
# Get repository team information
team_info = self.get_repository_team_info()
ownership_check['team_info'] = team_info
# Check for security champion
champion_check = self.check_security_champion(team_info)
if not champion_check['has_champion']:
ownership_check['issues'].append({
'type': 'missing_security_champion',
'severity': 'high',
'message': 'No security champion assigned to this repository',
'recommendation': 'Assign a trained security champion to the team'
})
ownership_check['ownership_status'] = 'non_compliant'
else:
ownership_check['security_contacts']['champion'] = champion_check['champion_info']
# Check for security reviewer
reviewer_check = self.check_security_reviewer(team_info)
if not reviewer_check['has_reviewer']:
ownership_check['issues'].append({
'type': 'missing_security_reviewer',
'severity': 'medium',
'message': 'No designated security reviewer for code changes',
'recommendation': 'Designate team members as security reviewers'
})
else:
ownership_check['security_contacts']['reviewers'] = reviewer_check['reviewer_info']
# Check training compliance
training_check = self.check_team_training_compliance(team_info)
if training_check['compliance_percentage'] < self.ownership_requirements['security_training_compliance']:
ownership_check['issues'].append({
'type': 'low_training_compliance',
'severity': 'medium',
'message': f"Team training compliance ({training_check['compliance_percentage']}%) below required threshold ({self.ownership_requirements['security_training_compliance']}%)",
'recommendation': 'Complete required security training courses'
})
# Check security documentation
docs_check = self.check_security_documentation()
if not docs_check['has_security_docs']:
ownership_check['issues'].append({
'type': 'missing_security_documentation',
'severity': 'low',
'message': 'Security documentation not found in repository',
'recommendation': 'Create SECURITY.md file with security practices and contacts'
})
# Check incident response contact
incident_check = self.check_incident_response_contact(team_info)
if not incident_check['has_contact']:
ownership_check['issues'].append({
'type': 'missing_incident_contact',
'severity': 'high',
'message': 'No incident response contact defined',
'recommendation': 'Define incident response contact in team configuration'
})
else:
ownership_check['security_contacts']['incident_response'] = incident_check['contact_info']
# Generate overall recommendations
ownership_check['recommendations'] = self.generate_ownership_recommendations(ownership_check)
# Store ownership check results
self.store_ownership_check_results(ownership_check)
return ownership_check
except Exception as e:
ownership_check['error'] = str(e)
ownership_check['ownership_status'] = 'error'
return ownership_check
def get_repository_team_info(self) -> Dict[str, Any]:
"""Get team information for the repository"""
# Get repository collaborators
headers = {'Authorization': f'token {self.github_token}'}
# Get repository info
repo_response = requests.get(
f'https://api.github.com/repos/{self.repo_owner}/{self.repo_name}',
headers=headers
)
if repo_response.status_code != 200:
return {'error': 'Failed to get repository information'}
repo_data = repo_response.json()
# Get collaborators
collaborators_response = requests.get(
f'https://api.github.com/repos/{self.repo_owner}/{self.repo_name}/collaborators',
headers=headers
)
collaborators = []
if collaborators_response.status_code == 200:
collaborators = collaborators_response.json()
# Get team assignments from DynamoDB
team_assignments = self.get_team_assignments(f"{self.repo_owner}/{self.repo_name}")
return {
'repository_name': repo_data['name'],
'repository_owner': repo_data['owner']['login'],
'primary_language': repo_data.get('language'),
'collaborators': [{'login': c['login'], 'permissions': c.get('permissions', {})} for c in collaborators],
'team_assignments': team_assignments,
'repository_topics': repo_data.get('topics', [])
}
def check_security_champion(self, team_info: Dict[str, Any]) -> Dict[str, Any]:
"""Check if team has a designated security champion"""
team_assignments = team_info.get('team_assignments', {})
# Check for security champion in team assignments
security_champion = team_assignments.get('security_champion')
if security_champion:
# Verify champion is still active and trained
champion_status = self.verify_champion_status(security_champion['employee_id'])
return {
'has_champion': champion_status['active'],
'champion_info': {
'name': security_champion['name'],
'email': security_champion['email'],
'employee_id': security_champion['employee_id'],
'certification_status': champion_status['certification_status'],
'last_training_date': champion_status['last_training_date']
}
}
# Check if any collaborators are security champions
for collaborator in team_info.get('collaborators', []):
champion_status = self.check_if_user_is_champion(collaborator['login'])
if champion_status['is_champion']:
return {
'has_champion': True,
'champion_info': champion_status['champion_info']
}
return {
'has_champion': False,
'recommendation': 'Assign a security champion to this repository team'
}
def check_security_reviewer(self, team_info: Dict[str, Any]) -> Dict[str, Any]:
"""Check if team has designated security reviewers"""
team_assignments = team_info.get('team_assignments', {})
security_reviewers = team_assignments.get('security_reviewers', [])
if security_reviewers:
# Verify reviewers are trained and active
active_reviewers = []
for reviewer in security_reviewers:
reviewer_status = self.verify_reviewer_status(reviewer['employee_id'])
if reviewer_status['qualified']:
active_reviewers.append({
'name': reviewer['name'],
'email': reviewer['email'],
'employee_id': reviewer['employee_id'],
'specializations': reviewer_status['specializations']
})
return {
'has_reviewer': len(active_reviewers) > 0,
'reviewer_info': active_reviewers
}
return {
'has_reviewer': False,
'recommendation': 'Designate and train security reviewers for code changes'
}
def check_team_training_compliance(self, team_info: Dict[str, Any]) -> Dict[str, Any]:
"""Check team security training compliance"""
collaborators = team_info.get('collaborators', [])
if not collaborators:
return {
'compliance_percentage': 0,
'compliant_members': 0,
'total_members': 0
}
compliant_members = 0
total_members = len(collaborators)
member_details = []
for collaborator in collaborators:
# Get training status for each team member
training_status = self.get_member_training_status(collaborator['login'])
member_details.append({
'username': collaborator['login'],
'training_compliant': training_status['compliant'],
'completion_percentage': training_status['completion_percentage'],
'overdue_courses': training_status['overdue_courses']
})
if training_status['compliant']:
compliant_members += 1
compliance_percentage = (compliant_members / total_members * 100) if total_members > 0 else 0
return {
'compliance_percentage': compliance_percentage,
'compliant_members': compliant_members,
'total_members': total_members,
'member_details': member_details
}
def check_security_documentation(self) -> Dict[str, Any]:
"""Check if repository has security documentation"""
headers = {'Authorization': f'token {self.github_token}'}
# Check for SECURITY.md file
security_files = ['SECURITY.md', 'docs/SECURITY.md', '.github/SECURITY.md', 'security/README.md']
for file_path in security_files:
response = requests.get(
f'https://api.github.com/repos/{self.repo_owner}/{self.repo_name}/contents/{file_path}',
headers=headers
)
if response.status_code == 200:
return {
'has_security_docs': True,
'security_file_path': file_path,
'last_updated': response.json().get('commit', {}).get('committer', {}).get('date')
}
return {
'has_security_docs': False,
'recommendation': 'Create SECURITY.md file with security practices and contact information'
}
def check_incident_response_contact(self, team_info: Dict[str, Any]) -> Dict[str, Any]:
"""Check if team has incident response contact defined"""
team_assignments = team_info.get('team_assignments', {})
incident_contact = team_assignments.get('incident_response_contact')
if incident_contact:
return {
'has_contact': True,
'contact_info': {
'name': incident_contact['name'],
'email': incident_contact['email'],
'phone': incident_contact.get('phone'),
'escalation_path': incident_contact.get('escalation_path', [])
}
}
return {
'has_contact': False,
'recommendation': 'Define incident response contact and escalation path'
}
def generate_ownership_recommendations(self, ownership_check: Dict[str, Any]) -> List[str]:
"""Generate recommendations based on ownership check results"""
recommendations = []
# High priority recommendations
high_priority_issues = [issue for issue in ownership_check['issues'] if issue['severity'] == 'high']
if high_priority_issues:
recommendations.append("URGENT: Address high-priority security ownership issues immediately")
for issue in high_priority_issues:
recommendations.append(f"• {issue['recommendation']}")
# Medium priority recommendations
medium_priority_issues = [issue for issue in ownership_check['issues'] if issue['severity'] == 'medium']
if medium_priority_issues:
recommendations.append("Address medium-priority security ownership issues within 2 weeks")
for issue in medium_priority_issues:
recommendations.append(f"• {issue['recommendation']}")
# General recommendations
if ownership_check['ownership_status'] == 'compliant':
recommendations.append("✅ Security ownership is compliant - maintain current practices")
else:
recommendations.append("Establish comprehensive security ownership framework for this repository")
# Training recommendations
team_info = ownership_check.get('team_info', {})
if team_info.get('collaborators'):
recommendations.append("Schedule regular security training updates for all team members")
recommendations.append("Consider implementing security champions program if not already in place")
return recommendations
def store_ownership_check_results(self, ownership_check: Dict[str, Any]):
"""Store ownership check results in DynamoDB"""
try:
self.ownership_table.put_item(Item={
'repository': ownership_check['repository'],
'check_timestamp': ownership_check['check_timestamp'],
'ownership_status': ownership_check['ownership_status'],
'issues_count': len(ownership_check['issues']),
'issues': ownership_check['issues'],
'recommendations': ownership_check['recommendations'],
'team_info': ownership_check['team_info'],
'security_contacts': ownership_check['security_contacts']
})
except Exception as e:
print(f"Error storing ownership check results: {str(e)}")
def get_team_assignments(self, repository: str) -> Dict[str, Any]:
"""Get team security assignments from DynamoDB"""
try:
response = self.teams_table.get_item(Key={'repository': repository})
if 'Item' in response:
return response['Item'].get('security_assignments', {})
except Exception as e:
print(f"Error getting team assignments: {str(e)}")
return {}
def verify_champion_status(self, employee_id: str) -> Dict[str, Any]:
"""Verify security champion status and training"""
# This would integrate with the Security Champions Program system
# For now, return mock data
return {
'active': True,
'certification_status': 'current',
'last_training_date': '2024-01-15T00:00:00Z'
}
def get_member_training_status(self, username: str) -> Dict[str, Any]:
"""Get training status for a team member"""
# This would integrate with the Security Training Management system
# For now, return mock data
return {
'compliant': True,
'completion_percentage': 85,
'overdue_courses': 0
}
def main():
"""Main function to run security ownership check"""
checker = SecurityOwnershipChecker()
# Perform ownership check
ownership_check = checker.check_repository_security_ownership()
# Print results
print("Security Ownership Check Results:")
print("=" * 50)
print(f"Repository: {ownership_check['repository']}")
print(f"Status: {ownership_check['ownership_status']}")
print(f"Issues Found: {len(ownership_check['issues'])}")
if ownership_check['issues']:
print("\nIssues:")
for issue in ownership_check['issues']:
print(f" • [{issue['severity'].upper()}] {issue['message']}")
if ownership_check['recommendations']:
print("\nRecommendations:")
for rec in ownership_check['recommendations']:
print(f" • {rec}")
# Save detailed report
os.makedirs('reports', exist_ok=True)
with open('reports/security-ownership-report.json', 'w') as f:
json.dump(ownership_check, f, indent=2)
# Exit with appropriate code
if ownership_check['ownership_status'] == 'compliant':
print("\n✅ Security ownership check passed!")
exit(0)
else:
print("\n❌ Security ownership check failed!")
exit(1)
if __name__ == "__main__":
main()Example 4: Security culture measurement and improvement framework
View code
import json
import boto3
from datetime import datetime, timedelta
from typing import Dict, List, Any
import statistics
class SecurityCultureFramework:
def __init__(self):
self.dynamodb = boto3.resource('dynamodb')
self.cloudwatch = boto3.client('cloudwatch')
self.sns = boto3.client('sns')
# DynamoDB tables
self.culture_metrics_table = self.dynamodb.Table('SecurityCultureMetrics')
self.feedback_table = self.dynamodb.Table('SecurityFeedback')
self.initiatives_table = self.dynamodb.Table('SecurityInitiatives')
self.surveys_table = self.dynamodb.Table('SecuritySurveys')
# Culture measurement framework
self.culture_dimensions = {
'security_awareness': {
'weight': 0.25,
'indicators': [
'security_training_completion_rate',
'security_incident_reporting_rate',
'proactive_security_suggestions',
'security_tool_adoption_rate'
]
},
'security_ownership': {
'weight': 0.30,
'indicators': [
'security_champion_participation',
'security_code_review_coverage',
'security_testing_integration',
'security_documentation_quality'
]
},
'security_collaboration': {
'weight': 0.20,
'indicators': [
'cross_team_security_projects',
'security_knowledge_sharing',
'security_mentoring_activities',
'security_community_engagement'
]
},
'security_innovation': {
'weight': 0.15,
'indicators': [
'security_automation_initiatives',
'security_tool_development',
'security_process_improvements',
'security_research_contributions'
]
},
'security_resilience': {
'weight': 0.10,
'indicators': [
'incident_response_effectiveness',
'security_recovery_time',
'lessons_learned_implementation',
'security_preparedness_exercises'
]
}
}
def measure_security_culture(self, organization_unit: str, period: str = 'quarterly') -> Dict[str, Any]:
"""Measure security culture across multiple dimensions"""
measurement_id = str(uuid.uuid4())
culture_measurement = {
'measurement_id': measurement_id,
'organization_unit': organization_unit,
'measurement_period': period,
'measurement_date': datetime.utcnow().isoformat(),
'overall_culture_score': 0,
'culture_grade': 'F',
'dimension_scores': {},
'trends': {},
'strengths': [],
'improvement_areas': [],
'recommendations': [],
'action_plan': {}
}
# Calculate date range for measurement
end_date = datetime.utcnow()
if period == 'quarterly':
start_date = end_date - timedelta(days=90)
elif period == 'monthly':
start_date = end_date - timedelta(days=30)
elif period == 'yearly':
start_date = end_date - timedelta(days=365)
else:
start_date = end_date - timedelta(days=90)
total_weighted_score = 0
# Measure each culture dimension
for dimension, config in self.culture_dimensions.items():
dimension_score = self.measure_culture_dimension(
organization_unit, dimension, config, start_date, end_date
)
culture_measurement['dimension_scores'][dimension] = dimension_score
total_weighted_score += dimension_score['score'] * config['weight']
# Calculate overall culture score
culture_measurement['overall_culture_score'] = total_weighted_score
culture_measurement['culture_grade'] = self.determine_culture_grade(total_weighted_score)
# Analyze trends
culture_measurement['trends'] = self.analyze_culture_trends(organization_unit, period)
# Identify strengths and improvement areas
culture_measurement['strengths'] = self.identify_culture_strengths(culture_measurement['dimension_scores'])
culture_measurement['improvement_areas'] = self.identify_improvement_areas(culture_measurement['dimension_scores'])
# Generate recommendations
culture_measurement['recommendations'] = self.generate_culture_recommendations(culture_measurement)
# Create action plan
culture_measurement['action_plan'] = self.create_culture_action_plan(culture_measurement)
# Store measurement results
self.store_culture_measurement(culture_measurement)
# Send culture report
self.send_culture_report(culture_measurement)
return culture_measurement
def measure_culture_dimension(self, organization_unit: str, dimension: str, config: Dict[str, Any],
start_date: datetime, end_date: datetime) -> Dict[str, Any]:
"""Measure a specific culture dimension"""
dimension_result = {
'dimension': dimension,
'score': 0,
'max_score': 100,
'indicator_scores': {},
'data_sources': [],
'measurement_confidence': 'high'
}
total_indicator_score = 0
indicators_measured = 0
# Measure each indicator in the dimension
for indicator in config['indicators']:
try:
indicator_score = self.measure_culture_indicator(
organization_unit, indicator, start_date, end_date
)
dimension_result['indicator_scores'][indicator] = indicator_score
total_indicator_score += indicator_score['score']
indicators_measured += 1
dimension_result['data_sources'].extend(indicator_score.get('data_sources', []))
except Exception as e:
print(f"Error measuring indicator {indicator}: {str(e)}")
dimension_result['measurement_confidence'] = 'medium'
# Calculate dimension score
if indicators_measured > 0:
dimension_result['score'] = total_indicator_score / indicators_measured
return dimension_result
def measure_culture_indicator(self, organization_unit: str, indicator: str,
start_date: datetime, end_date: datetime) -> Dict[str, Any]:
"""Measure a specific culture indicator"""
indicator_result = {
'indicator': indicator,
'score': 0,
'measurement_method': 'automated',
'data_sources': [],
'raw_data': {},
'calculation_details': {}
}
# Define measurement methods for each indicator
measurement_methods = {
'security_training_completion_rate': self.measure_training_completion_rate,
'security_incident_reporting_rate': self.measure_incident_reporting_rate,
'proactive_security_suggestions': self.measure_proactive_suggestions,
'security_tool_adoption_rate': self.measure_tool_adoption_rate,
'security_champion_participation': self.measure_champion_participation,
'security_code_review_coverage': self.measure_code_review_coverage,
'security_testing_integration': self.measure_testing_integration,
'security_documentation_quality': self.measure_documentation_quality,
'cross_team_security_projects': self.measure_cross_team_projects,
'security_knowledge_sharing': self.measure_knowledge_sharing,
'security_mentoring_activities': self.measure_mentoring_activities,
'security_community_engagement': self.measure_community_engagement,
'security_automation_initiatives': self.measure_automation_initiatives,
'security_tool_development': self.measure_tool_development,
'security_process_improvements': self.measure_process_improvements,
'security_research_contributions': self.measure_research_contributions,
'incident_response_effectiveness': self.measure_incident_response_effectiveness,
'security_recovery_time': self.measure_recovery_time,
'lessons_learned_implementation': self.measure_lessons_learned,
'security_preparedness_exercises': self.measure_preparedness_exercises
}
# Measure the indicator
if indicator in measurement_methods:
try:
measurement_result = measurement_methods[indicator](
organization_unit, start_date, end_date
)
indicator_result.update(measurement_result)
except Exception as e:
print(f"Error measuring {indicator}: {str(e)}")
indicator_result['score'] = 0
indicator_result['error'] = str(e)
return indicator_result
def measure_training_completion_rate(self, organization_unit: str,
start_date: datetime, end_date: datetime) -> Dict[str, Any]:
"""Measure security training completion rate"""
# Get training data from training management system
training_data = self.get_training_data(organization_unit, start_date, end_date)
total_employees = training_data.get('total_employees', 0)
completed_training = training_data.get('completed_training', 0)
completion_rate = (completed_training / total_employees * 100) if total_employees > 0 else 0
return {
'score': min(completion_rate, 100),
'raw_data': {
'total_employees': total_employees,
'completed_training': completed_training,
'completion_rate': completion_rate
},
'data_sources': ['training_management_system'],
'calculation_details': {
'formula': 'completed_training / total_employees * 100',
'target_threshold': 90
}
}
def measure_incident_reporting_rate(self, organization_unit: str,
start_date: datetime, end_date: datetime) -> Dict[str, Any]:
"""Measure proactive security incident reporting rate"""
# Get incident data
incident_data = self.get_incident_data(organization_unit, start_date, end_date)
total_incidents = incident_data.get('total_incidents', 0)
proactive_reports = incident_data.get('proactive_reports', 0)
reporting_rate = (proactive_reports / total_incidents * 100) if total_incidents > 0 else 0
# Also consider the absolute number of proactive reports
proactive_score = min(proactive_reports * 10, 50) # Up to 50 points for volume
rate_score = min(reporting_rate, 50) # Up to 50 points for rate
total_score = proactive_score + rate_score
return {
'score': min(total_score, 100),
'raw_data': {
'total_incidents': total_incidents,
'proactive_reports': proactive_reports,
'reporting_rate': reporting_rate
},
'data_sources': ['incident_management_system'],
'calculation_details': {
'formula': 'proactive_reports_score + reporting_rate_score',
'target_threshold': 80
}
}
def measure_champion_participation(self, organization_unit: str,
start_date: datetime, end_date: datetime) -> Dict[str, Any]:
"""Measure security champion participation and activity"""
# Get champion data
champion_data = self.get_champion_data(organization_unit, start_date, end_date)
total_teams = champion_data.get('total_teams', 0)
teams_with_champions = champion_data.get('teams_with_champions', 0)
active_champions = champion_data.get('active_champions', 0)
champion_activities = champion_data.get('champion_activities', 0)
# Calculate participation metrics
champion_coverage = (teams_with_champions / total_teams * 100) if total_teams > 0 else 0
champion_activity_score = min(champion_activities * 2, 50) # Up to 50 points for activities
coverage_score = min(champion_coverage, 50) # Up to 50 points for coverage
total_score = champion_activity_score + coverage_score
return {
'score': min(total_score, 100),
'raw_data': {
'total_teams': total_teams,
'teams_with_champions': teams_with_champions,
'active_champions': active_champions,
'champion_activities': champion_activities,
'champion_coverage': champion_coverage
},
'data_sources': ['security_champions_system'],
'calculation_details': {
'formula': 'champion_activity_score + coverage_score',
'target_threshold': 85
}
}
def conduct_security_culture_survey(self, survey_config: Dict[str, Any]) -> Dict[str, Any]:
"""Conduct security culture survey"""
survey_id = str(uuid.uuid4())
survey = {
'survey_id': survey_id,
'survey_name': survey_config['name'],
'survey_type': survey_config.get('type', 'culture_assessment'),
'target_audience': survey_config['target_audience'],
'questions': survey_config['questions'],
'launch_date': datetime.utcnow().isoformat(),
'response_deadline': (datetime.utcnow() + timedelta(days=14)).isoformat(),
'status': 'active',
'responses': [],
'response_rate': 0,
'results': {}
}
# Store survey
self.surveys_table.put_item(Item=survey)
# Send survey invitations
self.send_survey_invitations(survey)
return {
'survey_id': survey_id,
'status': 'launched',
'response_deadline': survey['response_deadline']
}
def analyze_survey_results(self, survey_id: str) -> Dict[str, Any]:
"""Analyze security culture survey results"""
# Get survey data
survey_response = self.surveys_table.get_item(Key={'survey_id': survey_id})
if 'Item' not in survey_response:
return {'error': 'Survey not found'}
survey = survey_response['Item']
responses = survey.get('responses', [])
if not responses:
return {'error': 'No survey responses available'}
analysis = {
'survey_id': survey_id,
'survey_name': survey['survey_name'],
'analysis_date': datetime.utcnow().isoformat(),
'total_responses': len(responses),
'response_rate': survey.get('response_rate', 0),
'question_analysis': {},
'demographic_breakdown': {},
'sentiment_analysis': {},
'key_insights': [],
'recommendations': []
}
# Analyze each question
for question in survey['questions']:
question_id = question['question_id']
question_responses = [r.get('answers', {}).get(question_id) for r in responses if question_id in r.get('answers', {})]
if question['type'] == 'rating':
# Analyze rating questions
ratings = [int(r) for r in question_responses if r is not None]
if ratings:
analysis['question_analysis'][question_id] = {
'question_text': question['text'],
'average_rating': statistics.mean(ratings),
'median_rating': statistics.median(ratings),
'rating_distribution': {str(i): ratings.count(i) for i in range(1, 6)},
'response_count': len(ratings)
}
elif question['type'] == 'multiple_choice':
# Analyze multiple choice questions
choices = [r for r in question_responses if r is not None]
if choices:
choice_counts = {}
for choice in choices:
choice_counts[choice] = choice_counts.get(choice, 0) + 1
analysis['question_analysis'][question_id] = {
'question_text': question['text'],
'choice_distribution': choice_counts,
'most_common_choice': max(choice_counts, key=choice_counts.get),
'response_count': len(choices)
}
elif question['type'] == 'text':
# Analyze text responses
text_responses = [r for r in question_responses if r is not None and r.strip()]
analysis['question_analysis'][question_id] = {
'question_text': question['text'],
'response_count': len(text_responses),
'sample_responses': text_responses[:5] # First 5 responses as samples
}
# Generate insights and recommendations
analysis['key_insights'] = self.generate_survey_insights(analysis)
analysis['recommendations'] = self.generate_survey_recommendations(analysis)
# Update survey with results
survey['results'] = analysis
survey['status'] = 'completed'
self.surveys_table.put_item(Item=survey)
return analysis
def create_culture_improvement_initiative(self, initiative_data: Dict[str, Any]) -> Dict[str, Any]:
"""Create a security culture improvement initiative"""
initiative_id = str(uuid.uuid4())
initiative = {
'initiative_id': initiative_id,
'name': initiative_data['name'],
'description': initiative_data['description'],
'target_dimension': initiative_data['target_dimension'],
'target_indicators': initiative_data.get('target_indicators', []),
'owner': initiative_data['owner'],
'stakeholders': initiative_data.get('stakeholders', []),
'start_date': initiative_data['start_date'],
'target_completion_date': initiative_data['target_completion_date'],
'budget': initiative_data.get('budget', 0),
'success_metrics': initiative_data['success_metrics'],
'milestones': initiative_data.get('milestones', []),
'status': 'planning',
'progress': {
'completion_percentage': 0,
'milestones_completed': 0,
'total_milestones': len(initiative_data.get('milestones', [])),
'current_phase': 'planning'
},
'impact_assessment': {
'baseline_metrics': {},
'current_metrics': {},
'target_metrics': initiative_data.get('target_metrics', {}),
'roi_calculation': {}
}
}
# Store initiative
self.initiatives_table.put_item(Item=initiative)
# Send initiative launch notification
self.send_initiative_notification(initiative)
return {
'initiative_id': initiative_id,
'status': 'created',
'next_steps': 'Initiative planning phase initiated'
}
def track_initiative_progress(self, initiative_id: str, progress_update: Dict[str, Any]) -> Dict[str, Any]:
"""Track progress of a security culture improvement initiative"""
# Get initiative
initiative_response = self.initiatives_table.get_item(Key={'initiative_id': initiative_id})
if 'Item' not in initiative_response:
return {'error': 'Initiative not found'}
initiative = initiative_response['Item']
# Update progress
initiative['progress'].update(progress_update.get('progress', {}))
initiative['status'] = progress_update.get('status', initiative['status'])
# Update impact metrics
if 'current_metrics' in progress_update:
initiative['impact_assessment']['current_metrics'].update(progress_update['current_metrics'])
# Add progress entry
if 'progress_entries' not in initiative:
initiative['progress_entries'] = []
initiative['progress_entries'].append({
'date': datetime.utcnow().isoformat(),
'update': progress_update.get('description', ''),
'metrics': progress_update.get('current_metrics', {}),
'updated_by': progress_update.get('updated_by', 'system')
})
# Calculate ROI if enough data is available
if initiative['impact_assessment']['baseline_metrics'] and initiative['impact_assessment']['current_metrics']:
initiative['impact_assessment']['roi_calculation'] = self.calculate_initiative_roi(initiative)
# Update initiative
self.initiatives_table.put_item(Item=initiative)
# Send progress notification
self.send_progress_notification(initiative, progress_update)
return {
'initiative_id': initiative_id,
'status': initiative['status'],
'completion_percentage': initiative['progress']['completion_percentage'],
'roi': initiative['impact_assessment']['roi_calculation'].get('roi_percentage', 'N/A')
}
def generate_culture_dashboard(self, organization_unit: str) -> Dict[str, Any]:
"""Generate comprehensive security culture dashboard"""
dashboard = {
'organization_unit': organization_unit,
'generated_date': datetime.utcnow().isoformat(),
'dashboard_type': 'security_culture_overview',
'current_culture_score': 0,
'culture_grade': 'F',
'trend_direction': 'stable',
'dimension_scores': {},
'key_metrics': {},
'active_initiatives': [],
'recent_achievements': [],
'upcoming_milestones': [],
'recommendations': []
}
# Get latest culture measurement
latest_measurement = self.get_latest_culture_measurement(organization_unit)
if latest_measurement:
dashboard['current_culture_score'] = latest_measurement['overall_culture_score']
dashboard['culture_grade'] = latest_measurement['culture_grade']
dashboard['dimension_scores'] = latest_measurement['dimension_scores']
dashboard['trend_direction'] = self.calculate_trend_direction(organization_unit)
# Get key metrics
dashboard['key_metrics'] = self.get_key_culture_metrics(organization_unit)
# Get active initiatives
dashboard['active_initiatives'] = self.get_active_initiatives(organization_unit)
# Get recent achievements
dashboard['recent_achievements'] = self.get_recent_achievements(organization_unit)
# Get upcoming milestones
dashboard['upcoming_milestones'] = self.get_upcoming_milestones(organization_unit)
# Generate recommendations
dashboard['recommendations'] = self.generate_dashboard_recommendations(dashboard)
return dashboard
def lambda_handler(event, context):
"""Lambda function for Security Culture Framework"""
culture_framework = SecurityCultureFramework()
action = event.get('action')
if action == 'measure_culture':
result = culture_framework.measure_security_culture(
event['organization_unit'],
event.get('period', 'quarterly')
)
elif action == 'conduct_survey':
result = culture_framework.conduct_security_culture_survey(event['survey_config'])
elif action == 'analyze_survey':
result = culture_framework.analyze_survey_results(event['survey_id'])
elif action == 'create_initiative':
result = culture_framework.create_culture_improvement_initiative(event['initiative_data'])
elif action == 'track_progress':
result = culture_framework.track_initiative_progress(event['initiative_id'], event['progress_update'])
elif action == 'generate_dashboard':
result = culture_framework.generate_culture_dashboard(event['organization_unit'])
else:
result = {'error': 'Invalid action specified'}
return {
'statusCode': 200,
'body': json.dumps(result)
}AWS services to consider
Amazon DynamoDB
NoSQL database service for storing security champion data, training records, competency assessments, and culture metrics.
AWS Lambda
Serverless compute service for running security ownership management functions, training tracking, and culture measurement automation.
Amazon SES
Email service for sending training notifications, security champion communications, and culture survey invitations.
Amazon SNS
Messaging service for sending alerts about security ownership issues, training compliance, and culture metric thresholds.
Amazon CloudWatch
Monitoring service for tracking security culture metrics, training completion rates, and security ownership KPIs.
AWS Step Functions
Workflow orchestration service for managing complex security training workflows and culture improvement initiatives.
Amazon S3
Object storage service for storing training materials, security documentation, and culture assessment reports.
AWS Systems Manager
Management service for maintaining security ownership configurations and automating security culture processes.
Benefits of building a program that embeds security ownership in workload teams
- Distributed security responsibility: Creates multiple layers of security ownership throughout the organization
- Improved security awareness: Increases security knowledge and consciousness across all team members
- Faster security response: Enables quicker identification and resolution of security issues
- Enhanced security culture: Fosters a culture where security is everyone’s responsibility
- Reduced security debt: Prevents security issues through proactive ownership and accountability
- Better security outcomes: Improves overall security posture through embedded expertise
- Increased team autonomy: Empowers teams to make security decisions independently
- Sustainable security practices: Creates long-term security capabilities within teams