SEC07-BP03: Automate identification and classification
SEC07-BP03: Automate identification and classification
Overview
Manual data classification is time-consuming, error-prone, and doesn’t scale with modern data volumes. Automated identification and classification of data ensures consistent, accurate, and timely classification of your data assets as they are created, modified, or moved within your environment.
This best practice focuses on implementing automated systems that can discover, analyze, and classify data based on content, context, and metadata, enabling real-time application of appropriate protection controls and compliance measures.
Implementation Guidance
1. Implement Content-Based Classification
Deploy automated tools that analyze data content to identify sensitive information:
- Pattern Recognition: Use regular expressions and machine learning to identify PII, PHI, financial data
- Contextual Analysis: Analyze data relationships and usage patterns
- Metadata Analysis: Examine file properties, database schemas, and system metadata
- Machine Learning Models: Train models to recognize organization-specific sensitive data patterns
2. Establish Real-Time Classification Workflows
Create automated workflows that classify data as it enters your environment:
- Data Ingestion Points: Classify data at entry points (APIs, file uploads, database inserts)
- Event-Driven Classification: Trigger classification on data creation, modification, or access events
- Streaming Classification: Process data streams in real-time for immediate classification
- Batch Processing: Schedule regular classification jobs for existing data
3. Configure Multi-Service Integration
Integrate classification across your AWS environment:
- Cross-Service Tagging: Apply consistent classification tags across all AWS services
- API Integration: Use AWS APIs to propagate classification metadata
- Service-Specific Classification: Leverage native classification features in AWS services
- Third-Party Integration: Connect with external classification tools and systems
4. Implement Classification Validation and Quality Control
Ensure accuracy and consistency of automated classification:
- Confidence Scoring: Implement confidence levels for classification decisions
- Human Review Workflows: Route uncertain classifications for manual review
- Classification Auditing: Track and audit classification decisions and changes
- Feedback Loops: Improve classification accuracy through continuous learning
5. Establish Classification Governance and Monitoring
Monitor and govern your automated classification processes:
- Classification Metrics: Track classification coverage, accuracy, and performance
- Policy Enforcement: Automatically enforce policies based on classification
- Exception Handling: Manage classification exceptions and edge cases
- Compliance Reporting: Generate reports for regulatory and audit requirements
6. Enable Dynamic Reclassification
Implement systems that can reclassify data as conditions change:
- Temporal Classification: Adjust classification based on data age or lifecycle stage
- Context-Aware Reclassification: Update classification based on usage patterns or business context
- Regulatory Changes: Automatically reclassify data when regulations change
- Business Rule Updates: Apply new classification rules to existing data
Implementation Examples
Example 1: Amazon Macie Automated Classification System
View code
# macie_auto_classifier.py
import boto3
import json
import time
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class ClassificationResult:
resource_arn: str
classification_level: str
sensitive_data_types: List[str]
confidence_score: float
classification_timestamp: str
requires_human_review: bool
compliance_frameworks: List[str]
@dataclass
class CustomDataIdentifier:
name: str
description: str
regex: str
keywords: List[str]
ignore_words: List[str]
maximum_match_distance: int
class MacieAutoClassifier:
"""
Automated data classification system using Amazon Macie
"""
def __init__(self, region: str = 'us-east-1'):
self.region = region
self.macie_client = boto3.client('macie2', region_name=region)
self.s3_client = boto3.client('s3', region_name=region)
self.lambda_client = boto3.client('lambda', region_name=region)
self.sns_client = boto3.client('sns', region_name=region)
self.dynamodb = boto3.resource('dynamodb', region_name=region)
# Initialize classification tracking table
self.classification_table = self.dynamodb.Table('data-classification-results')
# Classification thresholds
self.confidence_thresholds = {
'high': 0.9,
'medium': 0.7,
'low': 0.5
}
# Sensitive data type mappings
self.sensitivity_mappings = {
'CREDIT_CARD_NUMBER': {'level': 'restricted', 'frameworks': ['PCI_DSS']},
'SSN': {'level': 'restricted', 'frameworks': ['HIPAA', 'GDPR']},
'PHONE_NUMBER': {'level': 'confidential', 'frameworks': ['GDPR']},
'EMAIL_ADDRESS': {'level': 'internal', 'frameworks': ['GDPR']},
'PERSON_NAME': {'level': 'confidential', 'frameworks': ['GDPR', 'HIPAA']},
'ADDRESS': {'level': 'confidential', 'frameworks': ['GDPR']},
'BANK_ACCOUNT_NUMBER': {'level': 'restricted', 'frameworks': ['PCI_DSS']},
'PASSPORT_NUMBER': {'level': 'restricted', 'frameworks': ['GDPR']},
'DRIVER_LICENSE': {'level': 'restricted', 'frameworks': ['GDPR']},
'MEDICAL_RECORD_NUMBER': {'level': 'restricted', 'frameworks': ['HIPAA']}
}
def setup_macie_environment(self) -> Dict[str, Any]:
"""
Set up Macie environment for automated classification
"""
setup_results = {
'macie_enabled': False,
'custom_identifiers_created': [],
'classification_jobs_configured': [],
'findings_export_configured': False,
'errors': []
}
try:
# Enable Macie if not already enabled
try:
self.macie_client.get_macie_session()
setup_results['macie_enabled'] = True
logger.info("Macie is already enabled")
except self.macie_client.exceptions.ResourceNotFoundException:
self.macie_client.enable_macie()
setup_results['macie_enabled'] = True
logger.info("Macie has been enabled")
# Create custom data identifiers
custom_identifiers = self._get_custom_data_identifiers()
for identifier in custom_identifiers:
try:
response = self.macie_client.create_custom_data_identifier(
name=identifier.name,
description=identifier.description,
regex=identifier.regex,
keywords=identifier.keywords,
ignoreWords=identifier.ignore_words,
maximumMatchDistance=identifier.maximum_match_distance
)
setup_results['custom_identifiers_created'].append({
'name': identifier.name,
'id': response['customDataIdentifierId']
})
logger.info(f"Created custom data identifier: {identifier.name}")
except Exception as e:
setup_results['errors'].append(f"Failed to create identifier {identifier.name}: {str(e)}")
# Configure findings export
self._configure_findings_export()
setup_results['findings_export_configured'] = True
except Exception as e:
setup_results['errors'].append(f"Setup error: {str(e)}")
logger.error(f"Macie setup error: {str(e)}")
return setup_results
def _get_custom_data_identifiers(self) -> List[CustomDataIdentifier]:
"""
Define custom data identifiers for organization-specific data
"""
return [
CustomDataIdentifier(
name="Employee_ID",
description="Company employee identification numbers",
regex=r"EMP-\d{6}",
keywords=["employee", "emp", "staff"],
ignore_words=["example", "sample", "test"],
maximum_match_distance=50
),
CustomDataIdentifier(
name="Customer_ID",
description="Customer identification numbers",
regex=r"CUST-[A-Z]{2}\d{8}",
keywords=["customer", "client", "account"],
ignore_words=["example", "sample", "demo"],
maximum_match_distance=50
),
CustomDataIdentifier(
name="Internal_Project_Code",
description="Internal project codes",
regex=r"PROJ-\d{4}-[A-Z]{3}",
keywords=["project", "initiative", "program"],
ignore_words=["example", "template"],
maximum_match_distance=30
),
CustomDataIdentifier(
name="API_Key",
description="API keys and tokens",
regex=r"[A-Za-z0-9]{32,}",
keywords=["api", "key", "token", "secret"],
ignore_words=["example", "placeholder"],
maximum_match_distance=20
)
]
def create_classification_job(self,
bucket_names: List[str],
job_name: str,
schedule_expression: Optional[str] = None) -> Dict[str, Any]:
"""
Create a Macie classification job for specified S3 buckets
"""
try:
# Prepare S3 bucket criteria
s3_bucket_criteria = []
for bucket_name in bucket_names:
s3_bucket_criteria.append({
'bucketName': bucket_name,
'includes': {
'and': [
{
'simpleCriterion': {
'comparator': 'GT',
'key': 'OBJECT_SIZE',
'values': ['0']
}
}
]
}
})
# Create job parameters
job_params = {
'name': job_name,
'jobType': 'SCHEDULED' if schedule_expression else 'ONE_TIME',
's3JobDefinition': {
'bucketCriteria': {
'includes': {
'and': s3_bucket_criteria
}
},
'scoping': {
'includes': {
'and': [
{
'simpleCriterion': {
'comparator': 'NE',
'key': 'OBJECT_EXTENSION',
'values': ['zip', 'tar', 'gz', 'exe', 'bin']
}
}
]
}
}
},
'samplingPercentage': 100,
'description': f'Automated classification job for buckets: {", ".join(bucket_names)}'
}
# Add schedule if provided
if schedule_expression:
job_params['scheduleFrequency'] = {
'dailySchedule': {}
}
# Create the job
response = self.macie_client.create_classification_job(**job_params)
logger.info(f"Created classification job: {job_name} (ID: {response['jobId']})")
return {
'job_id': response['jobId'],
'job_name': job_name,
'buckets': bucket_names,
'status': 'created',
'schedule': schedule_expression
}
except Exception as e:
logger.error(f"Failed to create classification job: {str(e)}")
return {
'error': str(e),
'job_name': job_name,
'buckets': bucket_names,
'status': 'failed'
}
def process_classification_findings(self, findings: List[Dict[str, Any]]) -> List[ClassificationResult]:
"""
Process Macie findings and generate classification results
"""
classification_results = []
for finding in findings:
try:
# Extract finding details
resource_arn = finding.get('resourcesAffected', {}).get('s3Bucket', {}).get('arn', '')
sensitive_data = finding.get('classificationDetails', {}).get('result', {}).get('sensitiveData', [])
# Determine classification level and confidence
classification_level, confidence_score, compliance_frameworks = self._determine_classification(sensitive_data)
# Extract sensitive data types
sensitive_data_types = []
for data_item in sensitive_data:
category = data_item.get('category', '')
if category:
sensitive_data_types.append(category)
# Determine if human review is required
requires_review = confidence_score < self.confidence_thresholds['high']
# Create classification result
result = ClassificationResult(
resource_arn=resource_arn,
classification_level=classification_level,
sensitive_data_types=sensitive_data_types,
confidence_score=confidence_score,
classification_timestamp=datetime.utcnow().isoformat(),
requires_human_review=requires_review,
compliance_frameworks=compliance_frameworks
)
classification_results.append(result)
# Store result in DynamoDB
self._store_classification_result(result)
# Apply classification tags
self._apply_classification_tags(result)
# Trigger human review if needed
if requires_review:
self._trigger_human_review(result)
except Exception as e:
logger.error(f"Error processing finding: {str(e)}")
continue
return classification_results
def _determine_classification(self, sensitive_data: List[Dict[str, Any]]) -> tuple:
"""
Determine classification level based on sensitive data found
"""
max_level = 'public'
total_confidence = 0.0
count = 0
compliance_frameworks = set()
level_hierarchy = {'public': 0, 'internal': 1, 'confidential': 2, 'restricted': 3}
for data_item in sensitive_data:
category = data_item.get('category', '')
occurrences = data_item.get('occurrences', 0)
if category in self.sensitivity_mappings:
mapping = self.sensitivity_mappings[category]
level = mapping['level']
frameworks = mapping['frameworks']
# Update max classification level
if level_hierarchy[level] > level_hierarchy[max_level]:
max_level = level
# Add compliance frameworks
compliance_frameworks.update(frameworks)
# Calculate confidence based on occurrences
confidence = min(0.9, 0.5 + (occurrences * 0.1))
total_confidence += confidence
count += 1
# Calculate average confidence
avg_confidence = total_confidence / count if count > 0 else 0.0
return max_level, avg_confidence, list(compliance_frameworks)
def _store_classification_result(self, result: ClassificationResult):
"""
Store classification result in DynamoDB
"""
try:
self.classification_table.put_item(
Item={
'resource_arn': result.resource_arn,
'classification_timestamp': result.classification_timestamp,
'classification_level': result.classification_level,
'sensitive_data_types': result.sensitive_data_types,
'confidence_score': str(result.confidence_score),
'requires_human_review': result.requires_human_review,
'compliance_frameworks': result.compliance_frameworks,
'ttl': int((datetime.utcnow() + timedelta(days=365)).timestamp())
}
)
logger.info(f"Stored classification result for {result.resource_arn}")
except Exception as e:
logger.error(f"Failed to store classification result: {str(e)}")
def _apply_classification_tags(self, result: ClassificationResult):
"""
Apply classification tags to AWS resources
"""
try:
# Extract bucket name from ARN
bucket_name = result.resource_arn.split(':')[-1]
# Prepare tags
tags = {
'DataClassification': result.classification_level,
'ClassificationTimestamp': result.classification_timestamp,
'ConfidenceScore': str(result.confidence_score),
'AutoClassified': 'true'
}
# Add compliance framework tags
if result.compliance_frameworks:
tags['ComplianceFrameworks'] = ','.join(result.compliance_frameworks)
# Apply tags to S3 bucket
tag_set = [{'Key': k, 'Value': v} for k, v in tags.items()]
self.s3_client.put_bucket_tagging(
Bucket=bucket_name,
Tagging={'TagSet': tag_set}
)
logger.info(f"Applied classification tags to {bucket_name}")
except Exception as e:
logger.error(f"Failed to apply classification tags: {str(e)}")
def _trigger_human_review(self, result: ClassificationResult):
"""
Trigger human review workflow for uncertain classifications
"""
try:
# Prepare review message
message = {
'resource_arn': result.resource_arn,
'classification_level': result.classification_level,
'confidence_score': result.confidence_score,
'sensitive_data_types': result.sensitive_data_types,
'review_required': True,
'timestamp': result.classification_timestamp
}
# Send to SNS topic for human review
self.sns_client.publish(
TopicArn=f'arn:aws:sns:{self.region}:{self._get_account_id()}:data-classification-review',
Message=json.dumps(message, indent=2),
Subject=f'Data Classification Review Required: {result.resource_arn}'
)
logger.info(f"Triggered human review for {result.resource_arn}")
except Exception as e:
logger.error(f"Failed to trigger human review: {str(e)}")
def _configure_findings_export(self):
"""
Configure Macie findings export to S3
"""
try:
# This would configure findings export to S3 bucket
# Implementation depends on your specific requirements
logger.info("Findings export configuration completed")
except Exception as e:
logger.error(f"Failed to configure findings export: {str(e)}")
def _get_account_id(self) -> str:
"""Get AWS account ID"""
return boto3.client('sts').get_caller_identity()['Account']
def get_classification_metrics(self, days: int = 30) -> Dict[str, Any]:
"""
Get classification metrics for the specified time period
"""
try:
# Calculate date range
end_date = datetime.utcnow()
start_date = end_date - timedelta(days=days)
# Query classification results
response = self.classification_table.scan(
FilterExpression='classification_timestamp BETWEEN :start AND :end',
ExpressionAttributeValues={
':start': start_date.isoformat(),
':end': end_date.isoformat()
}
)
items = response['Items']
# Calculate metrics
metrics = {
'total_classifications': len(items),
'by_level': {},
'by_confidence': {'high': 0, 'medium': 0, 'low': 0},
'requiring_review': 0,
'compliance_frameworks': {},
'average_confidence': 0.0
}
total_confidence = 0.0
for item in items:
# Classification level distribution
level = item['classification_level']
metrics['by_level'][level] = metrics['by_level'].get(level, 0) + 1
# Confidence distribution
confidence = float(item['confidence_score'])
total_confidence += confidence
if confidence >= self.confidence_thresholds['high']:
metrics['by_confidence']['high'] += 1
elif confidence >= self.confidence_thresholds['medium']:
metrics['by_confidence']['medium'] += 1
else:
metrics['by_confidence']['low'] += 1
# Review requirements
if item.get('requires_human_review', False):
metrics['requiring_review'] += 1
# Compliance frameworks
frameworks = item.get('compliance_frameworks', [])
for framework in frameworks:
metrics['compliance_frameworks'][framework] = metrics['compliance_frameworks'].get(framework, 0) + 1
# Calculate average confidence
if len(items) > 0:
metrics['average_confidence'] = total_confidence / len(items)
return metrics
except Exception as e:
logger.error(f"Failed to get classification metrics: {str(e)}")
return {'error': str(e)}
def reclassify_resources(self, resource_arns: List[str]) -> Dict[str, Any]:
"""
Trigger reclassification of specified resources
"""
results = {
'reclassified': [],
'failed': [],
'total': len(resource_arns)
}
for arn in resource_arns:
try:
# Extract bucket name from ARN
bucket_name = arn.split(':')[-1]
# Create new classification job for this bucket
job_result = self.create_classification_job(
bucket_names=[bucket_name],
job_name=f'reclassify-{bucket_name}-{int(time.time())}'
)
if 'error' not in job_result:
results['reclassified'].append({
'resource_arn': arn,
'job_id': job_result['job_id']
})
else:
results['failed'].append({
'resource_arn': arn,
'error': job_result['error']
})
except Exception as e:
results['failed'].append({
'resource_arn': arn,
'error': str(e)
})
return results
# Example usage and testing
if __name__ == "__main__":
# Initialize the auto classifier
classifier = MacieAutoClassifier()
# Set up Macie environment
print("Setting up Macie environment...")
setup_result = classifier.setup_macie_environment()
print(f"Setup result: {setup_result}")
# Create classification job for sample buckets
sample_buckets = ['my-data-bucket', 'customer-files-bucket']
job_result = classifier.create_classification_job(
bucket_names=sample_buckets,
job_name='automated-classification-job',
schedule_expression='daily'
)
print(f"Classification job result: {job_result}")
# Get classification metrics
metrics = classifier.get_classification_metrics(days=30)
print(f"Classification metrics: {json.dumps(metrics, indent=2)}")Example 2: Event-Driven Real-Time Classification System
View code
# event_driven_classifier.py
import boto3
import json
import re
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from datetime import datetime
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class ClassificationRule:
name: str
pattern: str
classification_level: str
confidence_weight: float
compliance_frameworks: List[str]
description: str
@dataclass
class ClassificationEvent:
event_source: str
resource_arn: str
event_type: str
timestamp: str
metadata: Dict[str, Any]
class EventDrivenClassifier:
"""
Real-time data classification system triggered by AWS events
"""
def __init__(self, region: str = 'us-east-1'):
self.region = region
self.s3_client = boto3.client('s3', region_name=region)
self.lambda_client = boto3.client('lambda', region_name=region)
self.eventbridge_client = boto3.client('events', region_name=region)
self.stepfunctions_client = boto3.client('stepfunctions', region_name=region)
self.comprehend_client = boto3.client('comprehend', region_name=region)
self.textract_client = boto3.client('textract', region_name=region)
self.dynamodb = boto3.resource('dynamodb', region_name=region)
# Classification rules
self.classification_rules = self._load_classification_rules()
# Real-time classification table
self.realtime_table = self.dynamodb.Table('realtime-classification-events')
def _load_classification_rules(self) -> List[ClassificationRule]:
"""
Load classification rules for pattern matching
"""
return [
ClassificationRule(
name="credit_card",
pattern=r'\b(?:\d{4}[-\s]?){3}\d{4}\b',
classification_level="restricted",
confidence_weight=0.9,
compliance_frameworks=["PCI_DSS"],
description="Credit card number pattern"
),
ClassificationRule(
name="ssn",
pattern=r'\b\d{3}-\d{2}-\d{4}\b',
classification_level="restricted",
confidence_weight=0.95,
compliance_frameworks=["HIPAA", "GDPR"],
description="Social Security Number pattern"
),
ClassificationRule(
name="email",
pattern=r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
classification_level="internal",
confidence_weight=0.8,
compliance_frameworks=["GDPR"],
description="Email address pattern"
),
ClassificationRule(
name="phone",
pattern=r'\b(?:\+?1[-.\s]?)?\(?([0-9]{3})\)?[-.\s]?([0-9]{3})[-.\s]?([0-9]{4})\b',
classification_level="confidential",
confidence_weight=0.7,
compliance_frameworks=["GDPR"],
description="Phone number pattern"
),
ClassificationRule(
name="ip_address",
pattern=r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b',
classification_level="internal",
confidence_weight=0.6,
compliance_frameworks=[],
description="IP address pattern"
),
ClassificationRule(
name="api_key",
pattern=r'\b[A-Za-z0-9]{32,}\b',
classification_level="restricted",
confidence_weight=0.85,
compliance_frameworks=[],
description="API key pattern"
)
]
def setup_event_driven_classification(self) -> Dict[str, Any]:
"""
Set up event-driven classification infrastructure
"""
setup_results = {
'eventbridge_rules_created': [],
'lambda_functions_deployed': [],
'step_functions_created': [],
'errors': []
}
try:
# Create EventBridge rules for S3 events
s3_rule_result = self._create_s3_event_rule()
setup_results['eventbridge_rules_created'].append(s3_rule_result)
# Create EventBridge rules for RDS events
rds_rule_result = self._create_rds_event_rule()
setup_results['eventbridge_rules_created'].append(rds_rule_result)
# Deploy classification Lambda functions
lambda_result = self._deploy_classification_lambda()
setup_results['lambda_functions_deployed'].append(lambda_result)
# Create Step Functions workflow
stepfunctions_result = self._create_classification_workflow()
setup_results['step_functions_created'].append(stepfunctions_result)
except Exception as e:
setup_results['errors'].append(f"Setup error: {str(e)}")
logger.error(f"Event-driven classification setup error: {str(e)}")
return setup_results
def _create_s3_event_rule(self) -> Dict[str, Any]:
"""
Create EventBridge rule for S3 object creation events
"""
try:
rule_name = 'S3ObjectCreatedClassification'
# Create EventBridge rule
self.eventbridge_client.put_rule(
Name=rule_name,
EventPattern=json.dumps({
"source": ["aws.s3"],
"detail-type": ["Object Created"],
"detail": {
"eventSource": ["s3.amazonaws.com"],
"eventName": [
"PutObject",
"PostObject",
"CopyObject",
"CompleteMultipartUpload"
]
}
}),
State='ENABLED',
Description='Trigger classification on S3 object creation'
)
# Add Lambda target
self.eventbridge_client.put_targets(
Rule=rule_name,
Targets=[
{
'Id': '1',
'Arn': f'arn:aws:lambda:{self.region}:{self._get_account_id()}:function:realtime-classifier',
'InputTransformer': {
'InputPathsMap': {
'bucket': '$.detail.requestParameters.bucketName',
'key': '$.detail.requestParameters.key',
'eventName': '$.detail.eventName'
},
'InputTemplate': '{"bucket": "<bucket>", "key": "<key>", "eventName": "<eventName>", "source": "s3"}'
}
}
]
)
return {
'rule_name': rule_name,
'status': 'created',
'targets': 1
}
except Exception as e:
return {
'rule_name': 'S3ObjectCreatedClassification',
'status': 'failed',
'error': str(e)
}
def _create_rds_event_rule(self) -> Dict[str, Any]:
"""
Create EventBridge rule for RDS events
"""
try:
rule_name = 'RDSDataChangeClassification'
# Create EventBridge rule for RDS events
self.eventbridge_client.put_rule(
Name=rule_name,
EventPattern=json.dumps({
"source": ["aws.rds"],
"detail-type": ["RDS DB Instance Event", "RDS DB Cluster Event"],
"detail": {
"EventCategories": ["configuration change", "backup"]
}
}),
State='ENABLED',
Description='Trigger classification on RDS data changes'
)
# Add Lambda target
self.eventbridge_client.put_targets(
Rule=rule_name,
Targets=[
{
'Id': '1',
'Arn': f'arn:aws:lambda:{self.region}:{self._get_account_id()}:function:realtime-classifier',
'InputTransformer': {
'InputPathsMap': {
'sourceId': '$.detail.SourceId',
'eventCategories': '$.detail.EventCategories',
'message': '$.detail.Message'
},
'InputTemplate': '{"sourceId": "<sourceId>", "eventCategories": "<eventCategories>", "message": "<message>", "source": "rds"}'
}
}
]
)
return {
'rule_name': rule_name,
'status': 'created',
'targets': 1
}
except Exception as e:
return {
'rule_name': 'RDSDataChangeClassification',
'status': 'failed',
'error': str(e)
}
def _deploy_classification_lambda(self) -> Dict[str, Any]:
"""
Deploy Lambda function for real-time classification
"""
lambda_code = '''
import json
import boto3
import re
from datetime import datetime
def lambda_handler(event, context):
"""
Real-time classification Lambda function
"""
# Initialize clients
s3_client = boto3.client('s3')
comprehend_client = boto3.client('comprehend')
try:
source = event.get('source')
if source == 's3':
return classify_s3_object(event, s3_client, comprehend_client)
elif source == 'rds':
return classify_rds_event(event)
else:
return {
'statusCode': 400,
'body': json.dumps({'error': 'Unknown event source'})
}
except Exception as e:
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
def classify_s3_object(event, s3_client, comprehend_client):
"""
Classify S3 object content
"""
bucket = event['bucket']
key = event['key']
# Get object content (for text files)
try:
response = s3_client.get_object(Bucket=bucket, Key=key)
content = response['Body'].read().decode('utf-8')
# Perform classification
classification_result = perform_content_classification(content, comprehend_client)
# Apply tags based on classification
apply_s3_classification_tags(s3_client, bucket, key, classification_result)
return {
'statusCode': 200,
'body': json.dumps({
'bucket': bucket,
'key': key,
'classification': classification_result
})
}
except Exception as e:
return {
'statusCode': 500,
'body': json.dumps({'error': f'Classification failed: {str(e)}'})
}
def perform_content_classification(content, comprehend_client):
"""
Perform content-based classification
"""
classification_result = {
'level': 'public',
'confidence': 0.0,
'sensitive_data_types': [],
'compliance_frameworks': []
}
# Pattern-based classification
patterns = {
'credit_card': r'\\b(?:\\d{4}[-\\s]?){3}\\d{4}\\b',
'ssn': r'\\b\\d{3}-\\d{2}-\\d{4}\\b',
'email': r'\\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Z|a-z]{2,}\\b',
'phone': r'\\b(?:\\+?1[-\\.\\s]?)?\\(?([0-9]{3})\\)?[-\\.\\s]?([0-9]{3})[-\\.\\s]?([0-9]{4})\\b'
}
max_level = 'public'
total_confidence = 0.0
count = 0
for data_type, pattern in patterns.items():
matches = re.findall(pattern, content)
if matches:
classification_result['sensitive_data_types'].append(data_type)
# Determine classification level
if data_type in ['credit_card', 'ssn']:
max_level = 'restricted'
total_confidence += 0.9
elif data_type in ['phone']:
if max_level not in ['restricted']:
max_level = 'confidential'
total_confidence += 0.7
elif data_type in ['email']:
if max_level not in ['restricted', 'confidential']:
max_level = 'internal'
total_confidence += 0.6
count += 1
classification_result['level'] = max_level
classification_result['confidence'] = total_confidence / count if count > 0 else 0.0
return classification_result
def apply_s3_classification_tags(s3_client, bucket, key, classification_result):
"""
Apply classification tags to S3 object
"""
tags = {
'DataClassification': classification_result['level'],
'ClassificationConfidence': str(classification_result['confidence']),
'ClassificationTimestamp': datetime.utcnow().isoformat(),
'AutoClassified': 'true'
}
if classification_result['sensitive_data_types']:
tags['SensitiveDataTypes'] = ','.join(classification_result['sensitive_data_types'])
# Apply tags to object
tag_set = [{'Key': k, 'Value': v} for k, v in tags.items()]
s3_client.put_object_tagging(
Bucket=bucket,
Key=key,
Tagging={'TagSet': tag_set}
)
def classify_rds_event(event):
"""
Handle RDS classification events
"""
return {
'statusCode': 200,
'body': json.dumps({
'message': 'RDS event processed',
'sourceId': event.get('sourceId'),
'eventCategories': event.get('eventCategories')
})
}
'''
try:
# Create Lambda function
response = self.lambda_client.create_function(
FunctionName='realtime-classifier',
Runtime='python3.9',
Role=f'arn:aws:iam::{self._get_account_id()}:role/lambda-classification-role',
Handler='index.lambda_handler',
Code={'ZipFile': lambda_code.encode()},
Description='Real-time data classification function',
Timeout=300,
MemorySize=512,
Environment={
'Variables': {
'REGION': self.region
}
},
Tags={
'Purpose': 'DataClassification',
'Environment': 'Production'
}
)
return {
'function_name': 'realtime-classifier',
'function_arn': response['FunctionArn'],
'status': 'created'
}
except self.lambda_client.exceptions.ResourceConflictException:
# Function already exists, update it
self.lambda_client.update_function_code(
FunctionName='realtime-classifier',
ZipFile=lambda_code.encode()
)
return {
'function_name': 'realtime-classifier',
'status': 'updated'
}
except Exception as e:
return {
'function_name': 'realtime-classifier',
'status': 'failed',
'error': str(e)
}
def _create_classification_workflow(self) -> Dict[str, Any]:
"""
Create Step Functions workflow for complex classification scenarios
"""
workflow_definition = {
"Comment": "Data classification workflow",
"StartAt": "ClassifyContent",
"States": {
"ClassifyContent": {
"Type": "Task",
"Resource": f"arn:aws:lambda:{self.region}:{self._get_account_id()}:function:realtime-classifier",
"Next": "EvaluateConfidence"
},
"EvaluateConfidence": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.classification.confidence",
"NumericGreaterThan": 0.8,
"Next": "ApplyClassification"
},
{
"Variable": "$.classification.confidence",
"NumericLessThan": 0.5,
"Next": "RequestHumanReview"
}
],
"Default": "ApplyClassificationWithMonitoring"
},
"ApplyClassification": {
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:s3:putObjectTagging",
"End": True
},
"ApplyClassificationWithMonitoring": {
"Type": "Parallel",
"Branches": [
{
"StartAt": "ApplyTags",
"States": {
"ApplyTags": {
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:s3:putObjectTagging",
"End": True
}
}
},
{
"StartAt": "SetupMonitoring",
"States": {
"SetupMonitoring": {
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:cloudwatch:putMetricData",
"End": True
}
}
}
],
"End": True
},
"RequestHumanReview": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": f"arn:aws:sns:{self.region}:{self._get_account_id()}:classification-review",
"Message.$": "$"
},
"End": True
}
}
}
try:
response = self.stepfunctions_client.create_state_machine(
name='DataClassificationWorkflow',
definition=json.dumps(workflow_definition),
roleArn=f'arn:aws:iam::{self._get_account_id()}:role/stepfunctions-classification-role',
type='STANDARD',
tags=[
{
'key': 'Purpose',
'value': 'DataClassification'
}
]
)
return {
'state_machine_name': 'DataClassificationWorkflow',
'state_machine_arn': response['stateMachineArn'],
'status': 'created'
}
except Exception as e:
return {
'state_machine_name': 'DataClassificationWorkflow',
'status': 'failed',
'error': str(e)
}
def process_realtime_event(self, event: ClassificationEvent) -> Dict[str, Any]:
"""
Process real-time classification event
"""
try:
# Store event for tracking
self.realtime_table.put_item(
Item={
'event_id': f"{event.event_source}-{event.resource_arn}-{event.timestamp}",
'event_source': event.event_source,
'resource_arn': event.resource_arn,
'event_type': event.event_type,
'timestamp': event.timestamp,
'metadata': event.metadata,
'processing_status': 'received'
}
)
# Determine processing approach based on event source
if event.event_source == 's3':
return self._process_s3_event(event)
elif event.event_source == 'rds':
return self._process_rds_event(event)
elif event.event_source == 'dynamodb':
return self._process_dynamodb_event(event)
else:
return {
'status': 'unsupported',
'message': f'Event source {event.event_source} not supported'
}
except Exception as e:
logger.error(f"Error processing real-time event: {str(e)}")
return {
'status': 'error',
'message': str(e)
}
def _process_s3_event(self, event: ClassificationEvent) -> Dict[str, Any]:
"""
Process S3 object creation/modification event
"""
try:
# Extract bucket and key from resource ARN
arn_parts = event.resource_arn.split(':')
bucket_name = arn_parts[-1].split('/')[0]
object_key = '/'.join(arn_parts[-1].split('/')[1:])
# Get object metadata
response = self.s3_client.head_object(Bucket=bucket_name, Key=object_key)
content_type = response.get('ContentType', '')
content_length = response.get('ContentLength', 0)
# Determine if object should be classified
if self._should_classify_object(content_type, content_length):
# Start Step Functions workflow
workflow_input = {
'bucket': bucket_name,
'key': object_key,
'contentType': content_type,
'contentLength': content_length,
'eventTimestamp': event.timestamp
}
self.stepfunctions_client.start_execution(
stateMachineArn=f'arn:aws:states:{self.region}:{self._get_account_id()}:stateMachine:DataClassificationWorkflow',
name=f'classify-{bucket_name}-{object_key.replace("/", "-")}-{int(datetime.now().timestamp())}',
input=json.dumps(workflow_input)
)
return {
'status': 'classification_started',
'bucket': bucket_name,
'key': object_key,
'workflow_started': True
}
else:
return {
'status': 'skipped',
'reason': 'Object does not require classification',
'bucket': bucket_name,
'key': object_key
}
except Exception as e:
return {
'status': 'error',
'message': str(e)
}
def _process_rds_event(self, event: ClassificationEvent) -> Dict[str, Any]:
"""
Process RDS event for potential data classification
"""
# RDS events typically don't contain data content directly
# This would trigger metadata-based classification or schedule full scans
return {
'status': 'metadata_classification',
'message': 'RDS event processed for metadata classification'
}
def _process_dynamodb_event(self, event: ClassificationEvent) -> Dict[str, Any]:
"""
Process DynamoDB event for data classification
"""
# DynamoDB events can contain actual data changes
# This would analyze the changed data for sensitive content
return {
'status': 'content_classification',
'message': 'DynamoDB event processed for content classification'
}
def _should_classify_object(self, content_type: str, content_length: int) -> bool:
"""
Determine if an S3 object should be classified
"""
# Skip very large files or binary files that can't be easily analyzed
if content_length > 100 * 1024 * 1024: # 100MB limit
return False
# Only classify text-based content types
text_types = [
'text/',
'application/json',
'application/xml',
'application/csv',
'application/pdf'
]
return any(content_type.startswith(t) for t in text_types)
def _get_account_id(self) -> str:
"""Get AWS account ID"""
return boto3.client('sts').get_caller_identity()['Account']
def get_realtime_classification_stats(self, hours: int = 24) -> Dict[str, Any]:
"""
Get real-time classification statistics
"""
try:
# Calculate time range
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=hours)
# Query events from the specified time range
response = self.realtime_table.scan(
FilterExpression='#ts BETWEEN :start AND :end',
ExpressionAttributeNames={'#ts': 'timestamp'},
ExpressionAttributeValues={
':start': start_time.isoformat(),
':end': end_time.isoformat()
}
)
events = response['Items']
# Calculate statistics
stats = {
'total_events': len(events),
'by_source': {},
'by_type': {},
'processing_status': {},
'hourly_distribution': {}
}
for event in events:
# Source distribution
source = event['event_source']
stats['by_source'][source] = stats['by_source'].get(source, 0) + 1
# Type distribution
event_type = event['event_type']
stats['by_type'][event_type] = stats['by_type'].get(event_type, 0) + 1
# Processing status
status = event.get('processing_status', 'unknown')
stats['processing_status'][status] = stats['processing_status'].get(status, 0) + 1
# Hourly distribution
event_hour = datetime.fromisoformat(event['timestamp']).strftime('%Y-%m-%d %H:00')
stats['hourly_distribution'][event_hour] = stats['hourly_distribution'].get(event_hour, 0) + 1
return stats
except Exception as e:
logger.error(f"Failed to get real-time classification stats: {str(e)}")
return {'error': str(e)}
# Example usage
if __name__ == "__main__":
# Initialize event-driven classifier
classifier = EventDrivenClassifier()
# Set up event-driven classification
print("Setting up event-driven classification...")
setup_result = classifier.setup_event_driven_classification()
print(f"Setup result: {setup_result}")
# Example event processing
sample_event = ClassificationEvent(
event_source='s3',
resource_arn='arn:aws:s3:::my-bucket/sensitive-data.txt',
event_type='ObjectCreated',
timestamp=datetime.utcnow().isoformat(),
metadata={'contentType': 'text/plain', 'size': 1024}
)
result = classifier.process_realtime_event(sample_event)
print(f"Event processing result: {result}")
# Get real-time statistics
stats = classifier.get_realtime_classification_stats(hours=24)
print(f"Real-time classification stats: {json.dumps(stats, indent=2)}")Example 3: Multi-Service Classification Orchestration
View code
# classification-orchestration.yaml
AWSTemplateFormatVersion: '2010-09-09'
Description: 'Multi-service automated data classification orchestration'
Parameters:
Environment:
Type: String
Default: 'prod'
AllowedValues: ['dev', 'staging', 'prod']
Description: 'Environment for deployment'
ClassificationSchedule:
Type: String
Default: 'rate(1 hour)'
Description: 'Schedule expression for automated classification jobs'
Resources:
# DynamoDB table for classification results
ClassificationResultsTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: !Sub '${Environment}-classification-results'
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: resource_arn
AttributeType: S
- AttributeName: classification_timestamp
AttributeType: S
- AttributeName: classification_level
AttributeType: S
KeySchema:
- AttributeName: resource_arn
KeyType: HASH
- AttributeName: classification_timestamp
KeyType: RANGE
GlobalSecondaryIndexes:
- IndexName: ClassificationLevelIndex
KeySchema:
- AttributeName: classification_level
KeyType: HASH
- AttributeName: classification_timestamp
KeyType: RANGE
Projection:
ProjectionType: ALL
StreamSpecification:
StreamViewType: NEW_AND_OLD_IMAGES
TimeToLiveSpecification:
AttributeName: ttl
Enabled: true
Tags:
- Key: Environment
Value: !Ref Environment
- Key: Purpose
Value: DataClassification
# Lambda function for classification orchestration
ClassificationOrchestratorFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName: !Sub '${Environment}-classification-orchestrator'
Runtime: python3.9
Handler: index.lambda_handler
Role: !GetAtt ClassificationOrchestratorRole.Arn
Timeout: 900
MemorySize: 1024
Environment:
Variables:
ENVIRONMENT: !Ref Environment
CLASSIFICATION_TABLE: !Ref ClassificationResultsTable
MACIE_REGION: !Ref AWS::Region
Code:
ZipFile: |
import boto3
import json
import os
from datetime import datetime, timedelta
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
"""
Orchestrate multi-service data classification
"""
# Initialize clients
macie_client = boto3.client('macie2')
s3_client = boto3.client('s3')
rds_client = boto3.client('rds')
comprehend_client = boto3.client('comprehend')
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.environ['CLASSIFICATION_TABLE'])
try:
# Get list of resources to classify
resources_to_classify = discover_resources_for_classification()
results = {
'total_resources': len(resources_to_classify),
'classification_jobs_started': [],
'errors': []
}
for resource in resources_to_classify:
try:
if resource['type'] == 's3':
job_result = start_s3_classification(macie_client, resource)
results['classification_jobs_started'].append(job_result)
elif resource['type'] == 'rds':
job_result = start_rds_classification(rds_client, comprehend_client, resource)
results['classification_jobs_started'].append(job_result)
except Exception as e:
results['errors'].append({
'resource': resource,
'error': str(e)
})
return {
'statusCode': 200,
'body': json.dumps(results)
}
except Exception as e:
logger.error(f"Orchestration error: {str(e)}")
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
def discover_resources_for_classification():
"""
Discover AWS resources that need classification
"""
s3_client = boto3.client('s3')
rds_client = boto3.client('rds')
resources = []
# Discover S3 buckets
try:
buckets_response = s3_client.list_buckets()
for bucket in buckets_response['Buckets']:
# Check if bucket needs classification
if should_classify_bucket(s3_client, bucket['Name']):
resources.append({
'type': 's3',
'name': bucket['Name'],
'arn': f"arn:aws:s3:::{bucket['Name']}",
'last_modified': bucket['CreationDate'].isoformat()
})
except Exception as e:
logger.error(f"Error discovering S3 buckets: {str(e)}")
# Discover RDS instances
try:
rds_response = rds_client.describe_db_instances()
for db_instance in rds_response['DBInstances']:
if should_classify_rds_instance(db_instance):
resources.append({
'type': 'rds',
'name': db_instance['DBInstanceIdentifier'],
'arn': db_instance['DBInstanceArn'],
'engine': db_instance['Engine']
})
except Exception as e:
logger.error(f"Error discovering RDS instances: {str(e)}")
return resources
def should_classify_bucket(s3_client, bucket_name):
"""
Determine if S3 bucket should be classified
"""
try:
# Check if bucket has classification tags
tags_response = s3_client.get_bucket_tagging(Bucket=bucket_name)
tags = {tag['Key']: tag['Value'] for tag in tags_response['TagSet']}
# Skip if already classified recently
if 'DataClassification' in tags and 'ClassificationTimestamp' in tags:
classification_time = datetime.fromisoformat(tags['ClassificationTimestamp'])
if datetime.utcnow() - classification_time < timedelta(days=7):
return False
return True
except s3_client.exceptions.NoSuchTagSet:
return True
except Exception as e:
logger.error(f"Error checking bucket {bucket_name}: {str(e)}")
return False
def should_classify_rds_instance(db_instance):
"""
Determine if RDS instance should be classified
"""
# Check tags for recent classification
tags = {tag['Key']: tag['Value'] for tag in db_instance.get('TagList', [])}
if 'DataClassification' in tags and 'ClassificationTimestamp' in tags:
classification_time = datetime.fromisoformat(tags['ClassificationTimestamp'])
if datetime.utcnow() - classification_time < timedelta(days=30):
return False
return True
def start_s3_classification(macie_client, resource):
"""
Start Macie classification job for S3 bucket
"""
job_name = f"auto-classify-{resource['name']}-{int(datetime.utcnow().timestamp())}"
response = macie_client.create_classification_job(
name=job_name,
jobType='ONE_TIME',
s3JobDefinition={
'bucketCriteria': {
'includes': {
'and': [
{
'simpleCriterion': {
'comparator': 'EQ',
'key': 'BUCKET_NAME',
'values': [resource['name']]
}
}
]
}
}
},
samplingPercentage=100
)
return {
'resource_type': 's3',
'resource_name': resource['name'],
'job_id': response['jobId'],
'job_name': job_name
}
def start_rds_classification(rds_client, comprehend_client, resource):
"""
Start RDS classification process
"""
# For RDS, we would typically:
# 1. Create a snapshot
# 2. Export data to S3
# 3. Run classification on exported data
# This is a simplified placeholder
return {
'resource_type': 'rds',
'resource_name': resource['name'],
'status': 'metadata_classification_started'
}
Tags:
- Key: Environment
Value: !Ref Environment
- Key: Purpose
Value: DataClassification
# IAM role for classification orchestrator
ClassificationOrchestratorRole:
Type: AWS::IAM::Role
Properties:
RoleName: !Sub '${Environment}-classification-orchestrator-role'
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Policies:
- PolicyName: ClassificationOrchestratorPolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- macie2:*
- s3:ListAllMyBuckets
- s3:GetBucketTagging
- s3:PutBucketTagging
- s3:GetObject
- s3:ListBucket
- rds:DescribeDBInstances
- rds:DescribeDBClusters
- rds:ListTagsForResource
- rds:AddTagsToResource
- comprehend:*
- dynamodb:PutItem
- dynamodb:GetItem
- dynamodb:Query
- dynamodb:Scan
- dynamodb:UpdateItem
Resource: '*'
Tags:
- Key: Environment
Value: !Ref Environment
# EventBridge rule for scheduled classification
ClassificationScheduleRule:
Type: AWS::Events::Rule
Properties:
Name: !Sub '${Environment}-classification-schedule'
Description: 'Scheduled automated data classification'
ScheduleExpression: !Ref ClassificationSchedule
State: ENABLED
Targets:
- Arn: !GetAtt ClassificationOrchestratorFunction.Arn
Id: ClassificationOrchestratorTarget
Input: !Sub |
{
"source": "scheduled",
"environment": "${Environment}",
"timestamp": "{{aws.events.event.ingestion-time}}"
}
# Permission for EventBridge to invoke Lambda
ClassificationSchedulePermission:
Type: AWS::Lambda::Permission
Properties:
FunctionName: !Ref ClassificationOrchestratorFunction
Action: lambda:InvokeFunction
Principal: events.amazonaws.com
SourceArn: !GetAtt ClassificationScheduleRule.Arn
# SNS topic for classification notifications
ClassificationNotificationTopic:
Type: AWS::SNS::Topic
Properties:
TopicName: !Sub '${Environment}-classification-notifications'
DisplayName: 'Data Classification Notifications'
Tags:
- Key: Environment
Value: !Ref Environment
- Key: Purpose
Value: DataClassification
# Lambda function for processing classification results
ClassificationResultsProcessor:
Type: AWS::Lambda::Function
Properties:
FunctionName: !Sub '${Environment}-classification-results-processor'
Runtime: python3.9
Handler: index.lambda_handler
Role: !GetAtt ClassificationResultsProcessorRole.Arn
Timeout: 300
Environment:
Variables:
ENVIRONMENT: !Ref Environment
NOTIFICATION_TOPIC: !Ref ClassificationNotificationTopic
Code:
ZipFile: |
import boto3
import json
import os
from datetime import datetime
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
"""
Process classification results from DynamoDB stream
"""
sns_client = boto3.client('sns')
s3_client = boto3.client('s3')
try:
for record in event['Records']:
if record['eventName'] in ['INSERT', 'MODIFY']:
# Process new or updated classification result
classification_result = record['dynamodb']['NewImage']
# Extract classification details
resource_arn = classification_result['resource_arn']['S']
classification_level = classification_result['classification_level']['S']
confidence_score = float(classification_result.get('confidence_score', {}).get('N', '0'))
# Apply protection controls based on classification
apply_protection_controls(s3_client, resource_arn, classification_level)
# Send notification for high-risk classifications
if classification_level in ['confidential', 'restricted'] or confidence_score < 0.7:
send_classification_notification(
sns_client,
resource_arn,
classification_level,
confidence_score
)
return {
'statusCode': 200,
'body': json.dumps({'message': 'Classification results processed successfully'})
}
except Exception as e:
logger.error(f"Error processing classification results: {str(e)}")
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
def apply_protection_controls(s3_client, resource_arn, classification_level):
"""
Apply protection controls based on classification level
"""
try:
# Extract bucket name from ARN
bucket_name = resource_arn.split(':')[-1]
# Define protection controls based on classification
protection_controls = {
'public': {'encryption': False, 'public_access': True},
'internal': {'encryption': True, 'public_access': False},
'confidential': {'encryption': True, 'public_access': False, 'versioning': True},
'restricted': {'encryption': True, 'public_access': False, 'versioning': True, 'mfa_delete': True}
}
controls = protection_controls.get(classification_level, protection_controls['internal'])
# Apply encryption if required
if controls.get('encryption', False):
s3_client.put_bucket_encryption(
Bucket=bucket_name,
ServerSideEncryptionConfiguration={
'Rules': [
{
'ApplyServerSideEncryptionByDefault': {
'SSEAlgorithm': 'aws:kms' if classification_level == 'restricted' else 'AES256'
}
}
]
}
)
# Block public access if required
if not controls.get('public_access', False):
s3_client.put_public_access_block(
Bucket=bucket_name,
PublicAccessBlockConfiguration={
'BlockPublicAcls': True,
'IgnorePublicAcls': True,
'BlockPublicPolicy': True,
'RestrictPublicBuckets': True
}
)
# Enable versioning if required
if controls.get('versioning', False):
s3_client.put_bucket_versioning(
Bucket=bucket_name,
VersioningConfiguration={'Status': 'Enabled'}
)
logger.info(f"Applied protection controls to {bucket_name} for classification {classification_level}")
except Exception as e:
logger.error(f"Error applying protection controls: {str(e)}")
def send_classification_notification(sns_client, resource_arn, classification_level, confidence_score):
"""
Send notification for classification results requiring attention
"""
try:
message = {
'resource_arn': resource_arn,
'classification_level': classification_level,
'confidence_score': confidence_score,
'timestamp': datetime.utcnow().isoformat(),
'action_required': confidence_score < 0.7
}
sns_client.publish(
TopicArn=os.environ['NOTIFICATION_TOPIC'],
Message=json.dumps(message, indent=2),
Subject=f'Data Classification Alert: {classification_level} - {resource_arn}'
)
logger.info(f"Sent classification notification for {resource_arn}")
except Exception as e:
logger.error(f"Error sending notification: {str(e)}")
Tags:
- Key: Environment
Value: !Ref Environment
- Key: Purpose
Value: DataClassification
# IAM role for classification results processor
ClassificationResultsProcessorRole:
Type: AWS::IAM::Role
Properties:
RoleName: !Sub '${Environment}-classification-results-processor-role'
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Policies:
- PolicyName: ClassificationResultsProcessorPolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- dynamodb:DescribeStream
- dynamodb:GetRecords
- dynamodb:GetShardIterator
- dynamodb:ListStreams
- s3:PutBucketEncryption
- s3:PutPublicAccessBlock
- s3:PutBucketVersioning
- s3:PutBucketTagging
- sns:Publish
Resource: '*'
Tags:
- Key: Environment
Value: !Ref Environment
# Event source mapping for DynamoDB stream
ClassificationResultsStreamMapping:
Type: AWS::Lambda::EventSourceMapping
Properties:
EventSourceArn: !GetAtt ClassificationResultsTable.StreamArn
FunctionName: !Ref ClassificationResultsProcessor
StartingPosition: LATEST
BatchSize: 10
MaximumBatchingWindowInSeconds: 5
# CloudWatch dashboard for classification monitoring
ClassificationDashboard:
Type: AWS::CloudWatch::Dashboard
Properties:
DashboardName: !Sub '${Environment}-data-classification-dashboard'
DashboardBody: !Sub |
{
"widgets": [
{
"type": "metric",
"x": 0,
"y": 0,
"width": 12,
"height": 6,
"properties": {
"metrics": [
[ "AWS/Lambda", "Invocations", "FunctionName", "${ClassificationOrchestratorFunction}" ],
[ ".", "Errors", ".", "." ],
[ ".", "Duration", ".", "." ]
],
"period": 300,
"stat": "Sum",
"region": "${AWS::Region}",
"title": "Classification Orchestrator Metrics"
}
},
{
"type": "metric",
"x": 12,
"y": 0,
"width": 12,
"height": 6,
"properties": {
"metrics": [
[ "AWS/Lambda", "Invocations", "FunctionName", "${ClassificationResultsProcessor}" ],
[ ".", "Errors", ".", "." ]
],
"period": 300,
"stat": "Sum",
"region": "${AWS::Region}",
"title": "Results Processor Metrics"
}
}
]
}
Outputs:
ClassificationTableName:
Description: 'Name of the classification results table'
Value: !Ref ClassificationResultsTable
Export:
Name: !Sub '${AWS::StackName}-ClassificationTable'
OrchestratorFunctionArn:
Description: 'ARN of the classification orchestrator function'
Value: !GetAtt ClassificationOrchestratorFunction.Arn
Export:
Name: !Sub '${AWS::StackName}-OrchestratorFunction'
NotificationTopicArn:
Description: 'ARN of the classification notification topic'
Value: !Ref ClassificationNotificationTopic
Export:
Name: !Sub '${AWS::StackName}-NotificationTopic'
DashboardURL:
Description: 'URL of the classification monitoring dashboard'
Value: !Sub 'https://${AWS::Region}.console.aws.amazon.com/cloudwatch/home?region=${AWS::Region}#dashboards:name=${Environment}-data-classification-dashboard'Example 4: Machine Learning-Based Classification Pipeline
View code
# ml_classification_pipeline.py
import boto3
import json
import pandas as pd
import numpy as np
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime, timedelta
import logging
import pickle
import re
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class TrainingData:
content: str
classification_level: str
sensitive_data_types: List[str]
confidence_score: float
source: str
@dataclass
class ClassificationPrediction:
classification_level: str
confidence_score: float
feature_importance: Dict[str, float]
sensitive_patterns_detected: List[str]
recommendation: str
class MLClassificationPipeline:
"""
Machine Learning-based data classification pipeline
"""
def __init__(self, region: str = 'us-east-1'):
self.region = region
self.s3_client = boto3.client('s3', region_name=region)
self.sagemaker_client = boto3.client('sagemaker', region_name=region)
self.comprehend_client = boto3.client('comprehend', region_name=region)
self.dynamodb = boto3.resource('dynamodb', region_name=region)
# ML model components
self.vectorizer = None
self.classifier = None
self.model_version = None
self.feature_names = None
# Training data table
self.training_table = self.dynamodb.Table('ml-classification-training-data')
# Sensitive data patterns
self.sensitive_patterns = {
'credit_card': r'\b(?:\d{4}[-\s]?){3}\d{4}\b',
'ssn': r'\b\d{3}-\d{2}-\d{4}\b',
'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
'phone': r'\b(?:\+?1[-.\s]?)?\(?([0-9]{3})\)?[-.\s]?([0-9]{3})[-.\s]?([0-9]{4})\b',
'ip_address': r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b',
'api_key': r'\b[A-Za-z0-9]{32,}\b',
'aws_access_key': r'\bAKIA[0-9A-Z]{16}\b',
'private_key': r'-----BEGIN [A-Z ]+PRIVATE KEY-----',
'passport': r'\b[A-Z]{1,2}[0-9]{6,9}\b',
'bank_account': r'\b[0-9]{8,17}\b'
}
def collect_training_data(self, days: int = 30) -> List[TrainingData]:
"""
Collect training data from various sources
"""
training_data = []
try:
# Collect data from Macie findings
macie_data = self._collect_macie_training_data(days)
training_data.extend(macie_data)
# Collect data from manual classifications
manual_data = self._collect_manual_training_data(days)
training_data.extend(manual_data)
# Collect data from existing classifications
existing_data = self._collect_existing_classification_data(days)
training_data.extend(existing_data)
logger.info(f"Collected {len(training_data)} training samples")
except Exception as e:
logger.error(f"Error collecting training data: {str(e)}")
return training_data
def _collect_macie_training_data(self, days: int) -> List[TrainingData]:
"""
Collect training data from Macie findings
"""
training_data = []
try:
macie_client = boto3.client('macie2', region_name=self.region)
# Get Macie findings from the last N days
end_date = datetime.utcnow()
start_date = end_date - timedelta(days=days)
findings_response = macie_client.get_findings(
findingCriteria={
'criterion': {
'createdAt': {
'gte': start_date.isoformat(),
'lte': end_date.isoformat()
}
}
}
)
for finding in findings_response['findings']:
# Extract content sample and classification details
sensitive_data = finding.get('classificationDetails', {}).get('result', {}).get('sensitiveData', [])
if sensitive_data:
# Determine classification level based on sensitive data types
classification_level = self._determine_classification_from_sensitive_data(sensitive_data)
sensitive_types = [item.get('category', '') for item in sensitive_data]
# Create training sample
training_sample = TrainingData(
content=self._extract_content_sample(finding),
classification_level=classification_level,
sensitive_data_types=sensitive_types,
confidence_score=0.9, # High confidence for Macie findings
source='macie'
)
training_data.append(training_sample)
except Exception as e:
logger.error(f"Error collecting Macie training data: {str(e)}")
return training_data
def _collect_manual_training_data(self, days: int) -> List[TrainingData]:
"""
Collect training data from manual classifications
"""
training_data = []
try:
# Query manual classification records from DynamoDB
end_date = datetime.utcnow()
start_date = end_date - timedelta(days=days)
response = self.training_table.scan(
FilterExpression='#source = :source AND #timestamp BETWEEN :start AND :end',
ExpressionAttributeNames={
'#source': 'source',
'#timestamp': 'timestamp'
},
ExpressionAttributeValues={
':source': 'manual',
':start': start_date.isoformat(),
':end': end_date.isoformat()
}
)
for item in response['Items']:
training_sample = TrainingData(
content=item['content'],
classification_level=item['classification_level'],
sensitive_data_types=item.get('sensitive_data_types', []),
confidence_score=float(item.get('confidence_score', 1.0)),
source='manual'
)
training_data.append(training_sample)
except Exception as e:
logger.error(f"Error collecting manual training data: {str(e)}")
return training_data
def _collect_existing_classification_data(self, days: int) -> List[TrainingData]:
"""
Collect training data from existing automated classifications
"""
training_data = []
try:
# Query existing classification results
classification_table = self.dynamodb.Table('data-classification-results')
end_date = datetime.utcnow()
start_date = end_date - timedelta(days=days)
response = classification_table.scan(
FilterExpression='classification_timestamp BETWEEN :start AND :end AND confidence_score > :min_confidence',
ExpressionAttributeValues={
':start': start_date.isoformat(),
':end': end_date.isoformat(),
':min_confidence': '0.8' # Only use high-confidence classifications
}
)
for item in response['Items']:
# Get content sample for this resource
content_sample = self._get_content_sample_for_resource(item['resource_arn'])
if content_sample:
training_sample = TrainingData(
content=content_sample,
classification_level=item['classification_level'],
sensitive_data_types=item.get('sensitive_data_types', []),
confidence_score=float(item['confidence_score']),
source='automated'
)
training_data.append(training_sample)
except Exception as e:
logger.error(f"Error collecting existing classification data: {str(e)}")
return training_data
def train_classification_model(self, training_data: List[TrainingData]) -> Dict[str, Any]:
"""
Train machine learning model for data classification
"""
try:
# Prepare training data
X, y = self._prepare_training_features(training_data)
if len(X) < 10:
return {
'status': 'insufficient_data',
'message': f'Need at least 10 training samples, got {len(X)}'
}
# Split data for training and validation
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# Train TF-IDF vectorizer
self.vectorizer = TfidfVectorizer(
max_features=10000,
ngram_range=(1, 3),
stop_words='english',
lowercase=True
)
X_train_vectorized = self.vectorizer.fit_transform(X_train)
X_test_vectorized = self.vectorizer.transform(X_test)
# Train Random Forest classifier
self.classifier = RandomForestClassifier(
n_estimators=100,
max_depth=10,
random_state=42,
class_weight='balanced'
)
self.classifier.fit(X_train_vectorized, y_train)
# Evaluate model
y_pred = self.classifier.predict(X_test_vectorized)
# Generate evaluation metrics
evaluation_results = {
'classification_report': classification_report(y_test, y_pred, output_dict=True),
'confusion_matrix': confusion_matrix(y_test, y_pred).tolist(),
'feature_importance': self._get_feature_importance(),
'training_samples': len(training_data),
'model_version': datetime.utcnow().strftime('%Y%m%d_%H%M%S')
}
# Save model
self._save_model()
logger.info(f"Model trained successfully with {len(training_data)} samples")
return {
'status': 'success',
'evaluation': evaluation_results
}
except Exception as e:
logger.error(f"Error training classification model: {str(e)}")
return {
'status': 'error',
'message': str(e)
}
def _prepare_training_features(self, training_data: List[TrainingData]) -> Tuple[List[str], List[str]]:
"""
Prepare features and labels for training
"""
X = [] # Features (text content)
y = [] # Labels (classification levels)
for sample in training_data:
# Extract features from content
features = self._extract_text_features(sample.content)
X.append(features)
y.append(sample.classification_level)
return X, y
def _extract_text_features(self, content: str) -> str:
"""
Extract text features for classification
"""
# Combine original content with pattern-based features
features = content
# Add pattern detection results as features
for pattern_name, pattern in self.sensitive_patterns.items():
matches = len(re.findall(pattern, content, re.IGNORECASE))
if matches > 0:
features += f" {pattern_name}_detected_{matches}"
# Add content-based features
features += f" content_length_{len(content)}"
features += f" word_count_{len(content.split())}"
features += f" numeric_ratio_{sum(c.isdigit() for c in content) / len(content) if content else 0:.2f}"
return features
def _get_feature_importance(self) -> Dict[str, float]:
"""
Get feature importance from trained model
"""
if not self.classifier or not self.vectorizer:
return {}
feature_names = self.vectorizer.get_feature_names_out()
importance_scores = self.classifier.feature_importances_
# Get top 20 most important features
feature_importance = dict(zip(feature_names, importance_scores))
sorted_features = sorted(feature_importance.items(), key=lambda x: x[1], reverse=True)
return dict(sorted_features[:20])
def _save_model(self):
"""
Save trained model to S3
"""
try:
model_data = {
'vectorizer': self.vectorizer,
'classifier': self.classifier,
'model_version': datetime.utcnow().strftime('%Y%m%d_%H%M%S'),
'sensitive_patterns': self.sensitive_patterns
}
# Serialize model
model_bytes = pickle.dumps(model_data)
# Upload to S3
bucket_name = f'ml-classification-models-{self._get_account_id()}'
key = f'models/classification_model_{model_data["model_version"]}.pkl'
self.s3_client.put_object(
Bucket=bucket_name,
Key=key,
Body=model_bytes,
ServerSideEncryption='AES256'
)
# Update current model pointer
self.s3_client.put_object(
Bucket=bucket_name,
Key='models/current_model.txt',
Body=key.encode(),
ServerSideEncryption='AES256'
)
self.model_version = model_data["model_version"]
logger.info(f"Model saved to S3: {key}")
except Exception as e:
logger.error(f"Error saving model: {str(e)}")
def load_model(self, model_version: Optional[str] = None) -> bool:
"""
Load trained model from S3
"""
try:
bucket_name = f'ml-classification-models-{self._get_account_id()}'
if model_version:
key = f'models/classification_model_{model_version}.pkl'
else:
# Load current model
current_model_response = self.s3_client.get_object(
Bucket=bucket_name,
Key='models/current_model.txt'
)
key = current_model_response['Body'].read().decode()
# Download and deserialize model
model_response = self.s3_client.get_object(Bucket=bucket_name, Key=key)
model_data = pickle.loads(model_response['Body'].read())
self.vectorizer = model_data['vectorizer']
self.classifier = model_data['classifier']
self.model_version = model_data['model_version']
self.sensitive_patterns = model_data['sensitive_patterns']
logger.info(f"Model loaded successfully: {self.model_version}")
return True
except Exception as e:
logger.error(f"Error loading model: {str(e)}")
return False
def classify_content(self, content: str) -> ClassificationPrediction:
"""
Classify content using trained ML model
"""
if not self.classifier or not self.vectorizer:
raise ValueError("Model not loaded. Call load_model() first.")
try:
# Extract features
features = self._extract_text_features(content)
# Vectorize features
features_vectorized = self.vectorizer.transform([features])
# Make prediction
prediction = self.classifier.predict(features_vectorized)[0]
prediction_proba = self.classifier.predict_proba(features_vectorized)[0]
# Get confidence score
confidence_score = max(prediction_proba)
# Detect sensitive patterns
sensitive_patterns_detected = []
for pattern_name, pattern in self.sensitive_patterns.items():
if re.search(pattern, content, re.IGNORECASE):
sensitive_patterns_detected.append(pattern_name)
# Get feature importance for this prediction
feature_importance = self._get_prediction_feature_importance(features_vectorized)
# Generate recommendation
recommendation = self._generate_classification_recommendation(
prediction, confidence_score, sensitive_patterns_detected
)
return ClassificationPrediction(
classification_level=prediction,
confidence_score=confidence_score,
feature_importance=feature_importance,
sensitive_patterns_detected=sensitive_patterns_detected,
recommendation=recommendation
)
except Exception as e:
logger.error(f"Error classifying content: {str(e)}")
raise
def _get_prediction_feature_importance(self, features_vectorized) -> Dict[str, float]:
"""
Get feature importance for a specific prediction
"""
try:
# Get feature names and their values for this prediction
feature_names = self.vectorizer.get_feature_names_out()
feature_values = features_vectorized.toarray()[0]
# Get model feature importance
model_importance = self.classifier.feature_importances_
# Calculate weighted importance for this prediction
weighted_importance = {}
for i, (name, value, importance) in enumerate(zip(feature_names, feature_values, model_importance)):
if value > 0: # Only include features present in this document
weighted_importance[name] = value * importance
# Return top 10 features
sorted_features = sorted(weighted_importance.items(), key=lambda x: x[1], reverse=True)
return dict(sorted_features[:10])
except Exception as e:
logger.error(f"Error getting prediction feature importance: {str(e)}")
return {}
def _generate_classification_recommendation(self,
classification: str,
confidence: float,
sensitive_patterns: List[str]) -> str:
"""
Generate recommendation based on classification results
"""
recommendations = []
if confidence < 0.7:
recommendations.append("Low confidence classification - consider human review")
if classification in ['confidential', 'restricted'] and confidence > 0.8:
recommendations.append("High-risk classification detected - apply strict access controls")
if sensitive_patterns:
recommendations.append(f"Sensitive data patterns detected: {', '.join(sensitive_patterns)}")
if classification == 'restricted':
recommendations.append("Restricted data requires encryption, MFA, and audit logging")
elif classification == 'confidential':
recommendations.append("Confidential data requires access controls and monitoring")
elif classification == 'internal':
recommendations.append("Internal data requires basic access controls")
return '; '.join(recommendations) if recommendations else "Standard data handling procedures apply"
def retrain_model_with_feedback(self, feedback_data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Retrain model with user feedback
"""
try:
# Convert feedback to training data
feedback_training_data = []
for feedback in feedback_data:
training_sample = TrainingData(
content=feedback['content'],
classification_level=feedback['correct_classification'],
sensitive_data_types=feedback.get('sensitive_data_types', []),
confidence_score=1.0, # High confidence for human feedback
source='feedback'
)
feedback_training_data.append(training_sample)
# Collect existing training data
existing_data = self.collect_training_data(days=90)
# Combine with feedback data
all_training_data = existing_data + feedback_training_data
# Retrain model
return self.train_classification_model(all_training_data)
except Exception as e:
logger.error(f"Error retraining model with feedback: {str(e)}")
return {
'status': 'error',
'message': str(e)
}
def _determine_classification_from_sensitive_data(self, sensitive_data: List[Dict[str, Any]]) -> str:
"""
Determine classification level from Macie sensitive data findings
"""
max_level = 'public'
level_hierarchy = {'public': 0, 'internal': 1, 'confidential': 2, 'restricted': 3}
for data_item in sensitive_data:
category = data_item.get('category', '')
if category in ['CREDIT_CARD_NUMBER', 'SSN', 'BANK_ACCOUNT_NUMBER', 'PASSPORT_NUMBER']:
max_level = 'restricted'
elif category in ['PERSON_NAME', 'ADDRESS', 'PHONE_NUMBER'] and level_hierarchy[max_level] < 2:
max_level = 'confidential'
elif category in ['EMAIL_ADDRESS'] and level_hierarchy[max_level] < 1:
max_level = 'internal'
return max_level
def _extract_content_sample(self, finding: Dict[str, Any]) -> str:
"""
Extract content sample from Macie finding
"""
# This would extract actual content samples from the finding
# For security reasons, we'll return a placeholder
return f"Content sample from {finding.get('type', 'unknown')} finding"
def _get_content_sample_for_resource(self, resource_arn: str) -> Optional[str]:
"""
Get content sample for a resource ARN
"""
try:
# Extract bucket and key from S3 ARN
if 's3' in resource_arn:
parts = resource_arn.split(':')
bucket_name = parts[-1].split('/')[0]
# Get a sample object from the bucket
objects_response = self.s3_client.list_objects_v2(
Bucket=bucket_name,
MaxKeys=1
)
if 'Contents' in objects_response:
key = objects_response['Contents'][0]['Key']
# Get object content (first 1KB)
response = self.s3_client.get_object(
Bucket=bucket_name,
Key=key,
Range='bytes=0-1023'
)
return response['Body'].read().decode('utf-8', errors='ignore')
except Exception as e:
logger.error(f"Error getting content sample for {resource_arn}: {str(e)}")
return None
def _get_account_id(self) -> str:
"""Get AWS account ID"""
return boto3.client('sts').get_caller_identity()['Account']
# Example usage
if __name__ == "__main__":
# Initialize ML classification pipeline
pipeline = MLClassificationPipeline()
# Collect training data
print("Collecting training data...")
training_data = pipeline.collect_training_data(days=30)
print(f"Collected {len(training_data)} training samples")
# Train model
if len(training_data) >= 10:
print("Training classification model...")
training_result = pipeline.train_classification_model(training_data)
print(f"Training result: {training_result}")
# Test classification
if training_result['status'] == 'success':
test_content = "John Doe's credit card number is 4532-1234-5678-9012 and his SSN is 123-45-6789"
prediction = pipeline.classify_content(test_content)
print(f"Classification: {prediction.classification_level}")
print(f"Confidence: {prediction.confidence_score:.2f}")
print(f"Sensitive patterns: {prediction.sensitive_patterns_detected}")
print(f"Recommendation: {prediction.recommendation}")
else:
print("Insufficient training data for model training")Relevant AWS Services
Core Classification Services
- Amazon Macie: Automated sensitive data discovery and classification
- Amazon Comprehend: Natural language processing for content analysis
- Amazon Textract: Extract text from documents and images for classification
- Amazon Rekognition: Image and video content analysis
Event-Driven Services
- Amazon EventBridge: Event routing for real-time classification triggers
- AWS Lambda: Serverless functions for classification processing
- AWS Step Functions: Workflow orchestration for complex classification scenarios
- Amazon Kinesis: Real-time data streaming for classification
Machine Learning Services
- Amazon SageMaker: Custom ML model training and deployment
- AWS Batch: Large-scale batch processing for classification jobs
- Amazon Bedrock: Foundation models for advanced content analysis
Storage and Database Services
- Amazon S3: Object storage with event notifications
- Amazon DynamoDB: NoSQL database with streams for real-time processing
- Amazon RDS: Relational database with event notifications
- Amazon DocumentDB: Document database for unstructured data
Integration Services
- Amazon SNS: Notifications for classification events
- Amazon SQS: Message queuing for classification workflows
- AWS Systems Manager: Parameter store for classification rules and configurations
Benefits of Automated Classification
Operational Benefits
- Scalability: Handle large volumes of data automatically
- Consistency: Apply classification rules uniformly across all data
- Speed: Real-time classification as data is created or modified
- Cost Efficiency: Reduce manual effort and human error
Security Benefits
- Immediate Protection: Apply security controls as soon as data is classified
- Comprehensive Coverage: Classify all data assets, not just samples
- Continuous Monitoring: Ongoing classification as data changes
- Risk Reduction: Minimize exposure of unclassified sensitive data
Compliance Benefits
- Audit Trail: Complete record of classification decisions and changes
- Regulatory Compliance: Meet requirements for data identification and protection
- Policy Enforcement: Automatically enforce data handling policies
- Reporting: Generate compliance reports and metrics
Related Resources
- AWS Well-Architected Framework - Data Classification
- Amazon Macie User Guide
- Amazon Comprehend Developer Guide
- AWS Lambda Developer Guide
- Amazon EventBridge User Guide
- AWS Step Functions Developer Guide
- Amazon SageMaker Developer Guide
View code