Skip to content
REL11

REL11-BP05 - Use static stability to prevent bimodal behavior

REL11-BP05: Use static stability to prevent bimodal behavior

Static stability ensures your system behaves consistently regardless of the state of its dependencies. Avoid architectures that behave differently during normal operations versus failure scenarios. Design systems that can continue operating with cached data, default configurations, or degraded functionality when dependencies are unavailable.

Implementation Steps

1. Identify Dependencies

Map all external dependencies and their impact on system behavior.

2. Design Fallback Mechanisms

Implement fallback strategies that maintain consistent behavior during dependency failures.

3. Cache Critical Data

Store essential data locally to avoid dependency on external services.

4. Use Default Configurations

Define safe default values that allow continued operation.

5. Implement Graceful Degradation

Design systems to reduce functionality rather than fail completely.

Detailed Implementation

{% raw %}

View code
import boto3
import json
import time
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Union, Callable
from dataclasses import dataclass, asdict
from enum import Enum
import threading
import sqlite3
import redis
from functools import wraps
import hashlib

class DependencyState(Enum):
    AVAILABLE = "available"
    DEGRADED = "degraded"
    UNAVAILABLE = "unavailable"
    UNKNOWN = "unknown"

class FallbackStrategy(Enum):
    CACHED_DATA = "cached_data"
    DEFAULT_VALUE = "default_value"
    DEGRADED_FUNCTION = "degraded_function"
    SKIP_OPERATION = "skip_operation"
    STATIC_RESPONSE = "static_response"

class OperationMode(Enum):
    NORMAL = "normal"
    DEGRADED = "degraded"
    EMERGENCY = "emergency"

@dataclass
class Dependency:
    name: str
    service_type: str
    endpoint: str
    timeout: int
    retry_count: int
    fallback_strategy: FallbackStrategy
    fallback_data: Any
    health_check_interval: int
    circuit_breaker_threshold: int

@dataclass
class StaticStabilityConfig:
    service_name: str
    dependencies: List[Dependency]
    default_operation_mode: OperationMode
    cache_ttl: int
    health_check_enabled: bool
    graceful_degradation_enabled: bool

class StaticStabilitySystem:
    def __init__(self, config: StaticStabilityConfig):
        self.config = config
        self.service_name = config.service_name
        
        # Configure logging
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        self.logger = logging.getLogger(__name__)
        
        # Dependency state tracking
        self.dependency_states: Dict[str, DependencyState] = {}
        self.circuit_breakers: Dict[str, Dict[str, Any]] = {}
        self.fallback_cache: Dict[str, Any] = {}
        
        # Operation mode
        self.current_mode = config.default_operation_mode
        self.mode_lock = threading.Lock()
        
        # Initialize local cache
        self.local_cache = {}
        self.cache_lock = threading.Lock()
        
        # Initialize Redis for distributed caching (optional)
        try:
            self.redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
            self.redis_available = True
        except:
            self.redis_client = None
            self.redis_available = False
            self.logger.warning("Redis not available, using local cache only")
        
        # Initialize dependencies
        self._initialize_dependencies()
        
        # Start health monitoring
        if config.health_check_enabled:
            self._start_health_monitoring()

    def _initialize_dependencies(self) -> None:
        """Initialize dependency tracking and circuit breakers"""
        try:
            for dependency in self.config.dependencies:
                # Initialize dependency state
                self.dependency_states[dependency.name] = DependencyState.UNKNOWN
                
                # Initialize circuit breaker
                self.circuit_breakers[dependency.name] = {
                    'failure_count': 0,
                    'last_failure_time': None,
                    'state': 'closed',  # closed, open, half-open
                    'threshold': dependency.circuit_breaker_threshold,
                    'timeout': 60  # seconds before trying half-open
                }
                
                # Initialize fallback cache
                if dependency.fallback_strategy == FallbackStrategy.CACHED_DATA:
                    self.fallback_cache[dependency.name] = dependency.fallback_data
            
            self.logger.info(f"Initialized {len(self.config.dependencies)} dependencies")
            
        except Exception as e:
            self.logger.error(f"Dependency initialization failed: {str(e)}")

    def static_stability_decorator(self, dependency_name: str, fallback_strategy: FallbackStrategy = None):
        """Decorator to add static stability to functions"""
        def decorator(func: Callable) -> Callable:
            @wraps(func)
            def wrapper(*args, **kwargs):
                try:
                    # Check dependency state
                    dependency_state = self.dependency_states.get(dependency_name, DependencyState.UNKNOWN)
                    
                    # Check circuit breaker
                    if self._is_circuit_breaker_open(dependency_name):
                        return self._execute_fallback(dependency_name, func.__name__, args, kwargs, fallback_strategy)
                    
                    # Execute function with timeout
                    dependency = self._get_dependency(dependency_name)
                    if dependency:
                        result = self._execute_with_timeout(func, dependency.timeout, *args, **kwargs)
                        
                        # Update circuit breaker on success
                        self._record_success(dependency_name)
                        
                        # Cache successful result
                        self._cache_result(dependency_name, func.__name__, result, args, kwargs)
                        
                        return result
                    else:
                        return func(*args, **kwargs)
                        
                except Exception as e:
                    # Record failure
                    self._record_failure(dependency_name)
                    
                    # Execute fallback
                    return self._execute_fallback(dependency_name, func.__name__, args, kwargs, fallback_strategy)
            
            return wrapper
        return decorator

    def get_cached_data(self, cache_key: str, default_value: Any = None) -> Any:
        """Get data from cache with static stability"""
        try:
            # Try Redis first if available
            if self.redis_available:
                try:
                    cached_data = self.redis_client.get(cache_key)
                    if cached_data:
                        return json.loads(cached_data)
                except Exception as e:
                    self.logger.warning(f"Redis cache access failed: {str(e)}")
            
            # Fall back to local cache
            with self.cache_lock:
                cached_data = self.local_cache.get(cache_key)
                if cached_data:
                    # Check if cache is still valid
                    if cached_data.get('expires_at', 0) > time.time():
                        return cached_data['data']
            
            # Return default value if no cache available
            return default_value
            
        except Exception as e:
            self.logger.error(f"Cache access failed: {str(e)}")
            return default_value

    def set_cached_data(self, cache_key: str, data: Any, ttl: int = None) -> bool:
        """Set data in cache with static stability"""
        try:
            ttl = ttl or self.config.cache_ttl
            expires_at = time.time() + ttl
            
            cache_entry = {
                'data': data,
                'expires_at': expires_at,
                'created_at': time.time()
            }
            
            # Try Redis first if available
            if self.redis_available:
                try:
                    self.redis_client.setex(cache_key, ttl, json.dumps(data))
                except Exception as e:
                    self.logger.warning(f"Redis cache write failed: {str(e)}")
            
            # Always update local cache as fallback
            with self.cache_lock:
                self.local_cache[cache_key] = cache_entry
            
            return True
            
        except Exception as e:
            self.logger.error(f"Cache write failed: {str(e)}")
            return False

    def call_external_service(self, service_name: str, operation: str, **kwargs) -> Any:
        """Call external service with static stability"""
        try:
            dependency = self._get_dependency(service_name)
            if not dependency:
                raise ValueError(f"Unknown dependency: {service_name}")
            
            # Check circuit breaker
            if self._is_circuit_breaker_open(service_name):
                return self._execute_service_fallback(dependency, operation, **kwargs)
            
            # Check dependency state
            dependency_state = self.dependency_states.get(service_name, DependencyState.UNKNOWN)
            
            if dependency_state == DependencyState.UNAVAILABLE:
                return self._execute_service_fallback(dependency, operation, **kwargs)
            
            # Attempt service call
            try:
                result = self._make_service_call(dependency, operation, **kwargs)
                
                # Record success
                self._record_success(service_name)
                self.dependency_states[service_name] = DependencyState.AVAILABLE
                
                # Cache result
                cache_key = self._generate_cache_key(service_name, operation, **kwargs)
                self.set_cached_data(cache_key, result)
                
                return result
                
            except Exception as e:
                # Record failure
                self._record_failure(service_name)
                self.dependency_states[service_name] = DependencyState.UNAVAILABLE
                
                # Execute fallback
                return self._execute_service_fallback(dependency, operation, **kwargs)
                
        except Exception as e:
            self.logger.error(f"External service call failed: {str(e)}")
            return self._get_default_response(service_name, operation)

    def get_configuration(self, config_key: str, default_value: Any = None) -> Any:
        """Get configuration with static stability"""
        try:
            # Try to get from external configuration service
            config_dependency = self._get_dependency('configuration_service')
            
            if config_dependency and not self._is_circuit_breaker_open('configuration_service'):
                try:
                    config_value = self._fetch_configuration(config_key)
                    if config_value is not None:
                        # Cache the configuration
                        cache_key = f"config:{config_key}"
                        self.set_cached_data(cache_key, config_value, ttl=3600)
                        return config_value
                except Exception as e:
                    self.logger.warning(f"Configuration fetch failed: {str(e)}")
            
            # Fall back to cached configuration
            cache_key = f"config:{config_key}"
            cached_config = self.get_cached_data(cache_key)
            if cached_config is not None:
                return cached_config
            
            # Fall back to default value
            return default_value
            
        except Exception as e:
            self.logger.error(f"Configuration retrieval failed: {str(e)}")
            return default_value

    def execute_business_logic(self, operation: str, **kwargs) -> Dict[str, Any]:
        """Execute business logic with static stability"""
        try:
            result = {
                'success': False,
                'data': None,
                'mode': self.current_mode.value,
                'degraded_features': []
            }
            
            # Determine operation mode based on dependency states
            operation_mode = self._determine_operation_mode()
            
            if operation_mode == OperationMode.NORMAL:
                # Execute full functionality
                result['data'] = self._execute_normal_operation(operation, **kwargs)
                result['success'] = True
                
            elif operation_mode == OperationMode.DEGRADED:
                # Execute with reduced functionality
                result['data'] = self._execute_degraded_operation(operation, **kwargs)
                result['success'] = True
                result['degraded_features'] = self._get_degraded_features()
                
            elif operation_mode == OperationMode.EMERGENCY:
                # Execute minimal functionality
                result['data'] = self._execute_emergency_operation(operation, **kwargs)
                result['success'] = True
                result['degraded_features'] = ['all_non_essential_features']
            
            return result
            
        except Exception as e:
            self.logger.error(f"Business logic execution failed: {str(e)}")
            return {
                'success': False,
                'error': str(e),
                'mode': self.current_mode.value,
                'data': self._get_default_response('business_logic', operation)
            }

    def _execute_fallback(self, dependency_name: str, function_name: str, args: tuple, 
                         kwargs: dict, fallback_strategy: FallbackStrategy = None) -> Any:
        """Execute fallback strategy"""
        try:
            dependency = self._get_dependency(dependency_name)
            if not dependency:
                return None
            
            strategy = fallback_strategy or dependency.fallback_strategy
            
            if strategy == FallbackStrategy.CACHED_DATA:
                cache_key = self._generate_cache_key(dependency_name, function_name, *args, **kwargs)
                return self.get_cached_data(cache_key, dependency.fallback_data)
                
            elif strategy == FallbackStrategy.DEFAULT_VALUE:
                return dependency.fallback_data
                
            elif strategy == FallbackStrategy.DEGRADED_FUNCTION:
                return self._execute_degraded_function(dependency_name, function_name, args, kwargs)
                
            elif strategy == FallbackStrategy.SKIP_OPERATION:
                self.logger.info(f"Skipping operation {function_name} for dependency {dependency_name}")
                return None
                
            elif strategy == FallbackStrategy.STATIC_RESPONSE:
                return dependency.fallback_data
            
            return None
            
        except Exception as e:
            self.logger.error(f"Fallback execution failed: {str(e)}")
            return None

    def _determine_operation_mode(self) -> OperationMode:
        """Determine current operation mode based on dependency states"""
        try:
            available_count = 0
            total_count = len(self.config.dependencies)
            
            for dependency_name, state in self.dependency_states.items():
                if state == DependencyState.AVAILABLE:
                    available_count += 1
            
            if total_count == 0:
                return OperationMode.NORMAL
            
            availability_ratio = available_count / total_count
            
            if availability_ratio >= 0.8:
                return OperationMode.NORMAL
            elif availability_ratio >= 0.5:
                return OperationMode.DEGRADED
            else:
                return OperationMode.EMERGENCY
                
        except Exception as e:
            self.logger.error(f"Operation mode determination failed: {str(e)}")
            return OperationMode.EMERGENCY

    def _execute_normal_operation(self, operation: str, **kwargs) -> Any:
        """Execute operation in normal mode"""
        try:
            # Full functionality available
            if operation == 'user_authentication':
                return self._authenticate_user_full(**kwargs)
            elif operation == 'data_processing':
                return self._process_data_full(**kwargs)
            elif operation == 'recommendation_engine':
                return self._generate_recommendations_full(**kwargs)
            else:
                return {'status': 'completed', 'mode': 'normal'}
                
        except Exception as e:
            self.logger.error(f"Normal operation failed: {str(e)}")
            raise

    def _execute_degraded_operation(self, operation: str, **kwargs) -> Any:
        """Execute operation in degraded mode"""
        try:
            # Reduced functionality
            if operation == 'user_authentication':
                return self._authenticate_user_cached(**kwargs)
            elif operation == 'data_processing':
                return self._process_data_basic(**kwargs)
            elif operation == 'recommendation_engine':
                return self._generate_recommendations_cached(**kwargs)
            else:
                return {'status': 'completed', 'mode': 'degraded'}
                
        except Exception as e:
            self.logger.error(f"Degraded operation failed: {str(e)}")
            raise

    def _execute_emergency_operation(self, operation: str, **kwargs) -> Any:
        """Execute operation in emergency mode"""
        try:
            # Minimal functionality
            if operation == 'user_authentication':
                return self._authenticate_user_basic(**kwargs)
            elif operation == 'data_processing':
                return self._process_data_minimal(**kwargs)
            elif operation == 'recommendation_engine':
                return self._generate_recommendations_default(**kwargs)
            else:
                return {'status': 'completed', 'mode': 'emergency'}
                
        except Exception as e:
            self.logger.error(f"Emergency operation failed: {str(e)}")
            return {'status': 'failed', 'mode': 'emergency'}

    def _authenticate_user_full(self, **kwargs) -> Dict[str, Any]:
        """Full user authentication with all features"""
        user_id = kwargs.get('user_id')
        
        # Call external authentication service
        auth_result = self.call_external_service('auth_service', 'authenticate', user_id=user_id)
        
        # Get user profile
        profile = self.call_external_service('profile_service', 'get_profile', user_id=user_id)
        
        # Get permissions
        permissions = self.call_external_service('permission_service', 'get_permissions', user_id=user_id)
        
        return {
            'authenticated': auth_result.get('valid', False),
            'user_profile': profile,
            'permissions': permissions,
            'features_available': ['all']
        }

    def _authenticate_user_cached(self, **kwargs) -> Dict[str, Any]:
        """Degraded user authentication using cached data"""
        user_id = kwargs.get('user_id')
        
        # Try cached authentication
        cache_key = f"auth:{user_id}"
        cached_auth = self.get_cached_data(cache_key)
        
        if cached_auth:
            return {
                'authenticated': True,
                'user_profile': cached_auth.get('profile', {}),
                'permissions': cached_auth.get('permissions', ['basic']),
                'features_available': ['basic'],
                'note': 'Using cached authentication data'
            }
        
        return {
            'authenticated': False,
            'error': 'Authentication service unavailable and no cached data',
            'features_available': ['guest']
        }

    def _authenticate_user_basic(self, **kwargs) -> Dict[str, Any]:
        """Basic user authentication with minimal features"""
        return {
            'authenticated': True,
            'user_profile': {'id': kwargs.get('user_id'), 'name': 'Guest User'},
            'permissions': ['read'],
            'features_available': ['basic_read_only'],
            'note': 'Emergency mode - basic access only'
        }

    def _is_circuit_breaker_open(self, dependency_name: str) -> bool:
        """Check if circuit breaker is open for dependency"""
        try:
            cb = self.circuit_breakers.get(dependency_name, {})
            
            if cb.get('state') == 'open':
                # Check if timeout has passed for half-open attempt
                if cb.get('last_failure_time'):
                    time_since_failure = time.time() - cb['last_failure_time']
                    if time_since_failure > cb.get('timeout', 60):
                        cb['state'] = 'half-open'
                        return False
                return True
            
            return False
            
        except Exception as e:
            self.logger.error(f"Circuit breaker check failed: {str(e)}")
            return False

    def _record_success(self, dependency_name: str) -> None:
        """Record successful dependency call"""
        try:
            cb = self.circuit_breakers.get(dependency_name, {})
            cb['failure_count'] = 0
            cb['state'] = 'closed'
            cb['last_failure_time'] = None
            
        except Exception as e:
            self.logger.error(f"Success recording failed: {str(e)}")

    def _record_failure(self, dependency_name: str) -> None:
        """Record failed dependency call"""
        try:
            cb = self.circuit_breakers.get(dependency_name, {})
            cb['failure_count'] = cb.get('failure_count', 0) + 1
            cb['last_failure_time'] = time.time()
            
            if cb['failure_count'] >= cb.get('threshold', 5):
                cb['state'] = 'open'
                self.logger.warning(f"Circuit breaker opened for {dependency_name}")
            
        except Exception as e:
            self.logger.error(f"Failure recording failed: {str(e)}")

    def _get_dependency(self, name: str) -> Optional[Dependency]:
        """Get dependency configuration by name"""
        for dependency in self.config.dependencies:
            if dependency.name == name:
                return dependency
        return None

    def _generate_cache_key(self, *args, **kwargs) -> str:
        """Generate cache key from arguments"""
        key_data = f"{args}:{sorted(kwargs.items())}"
        return hashlib.md5(key_data.encode()).hexdigest()

    def get_system_status(self) -> Dict[str, Any]:
        """Get current system status"""
        try:
            status = {
                'service_name': self.service_name,
                'current_mode': self.current_mode.value,
                'dependencies': {},
                'circuit_breakers': {},
                'cache_stats': {
                    'local_cache_size': len(self.local_cache),
                    'redis_available': self.redis_available
                }
            }
            
            # Dependency states
            for name, state in self.dependency_states.items():
                status['dependencies'][name] = state.value
            
            # Circuit breaker states
            for name, cb in self.circuit_breakers.items():
                status['circuit_breakers'][name] = {
                    'state': cb.get('state', 'unknown'),
                    'failure_count': cb.get('failure_count', 0)
                }
            
            return status
            
        except Exception as e:
            self.logger.error(f"Status retrieval failed: {str(e)}")
            return {'error': str(e)}

# Example usage
def main():
    # Define dependencies
    dependencies = [
        Dependency(
            name='auth_service',
            service_type='http',
            endpoint='https://auth.example.com',
            timeout=5,
            retry_count=2,
            fallback_strategy=FallbackStrategy.CACHED_DATA,
            fallback_data={'authenticated': False, 'permissions': ['guest']},
            health_check_interval=30,
            circuit_breaker_threshold=5
        ),
        Dependency(
            name='profile_service',
            service_type='http',
            endpoint='https://profile.example.com',
            timeout=3,
            retry_count=1,
            fallback_strategy=FallbackStrategy.DEFAULT_VALUE,
            fallback_data={'name': 'Guest User', 'preferences': {}},
            health_check_interval=60,
            circuit_breaker_threshold=3
        )
    ]
    
    # Create configuration
    config = StaticStabilityConfig(
        service_name='user_service',
        dependencies=dependencies,
        default_operation_mode=OperationMode.NORMAL,
        cache_ttl=300,
        health_check_enabled=True,
        graceful_degradation_enabled=True
    )
    
    # Initialize static stability system
    stability_system = StaticStabilitySystem(config)
    
    print("Static stability system initialized")
    
    # Example usage with decorator
    @stability_system.static_stability_decorator('auth_service')
    def authenticate_user(user_id: str) -> Dict[str, Any]:
        # This would normally call external auth service
        return {'user_id': user_id, 'authenticated': True}
    
    # Test authentication
    result = authenticate_user('user123')
    print(f"Authentication result: {result}")
    
    # Test business logic execution
    business_result = stability_system.execute_business_logic('user_authentication', user_id='user123')
    print(f"Business logic result: {json.dumps(business_result, indent=2)}")
    
    # Get system status
    status = stability_system.get_system_status()
    print(f"System status: {json.dumps(status, indent=2)}")

if __name__ == "__main__":
    main()

{% endraw %}

AWS Services

Primary Services

  • Amazon ElastiCache: Distributed caching for consistent data access
  • Amazon DynamoDB: NoSQL database with consistent performance
  • AWS Lambda: Stateless compute with built-in fault tolerance
  • Amazon S3: Highly available object storage for static content

Supporting Services

  • Amazon CloudWatch: Monitoring without dependency on external services
  • AWS Systems Manager Parameter Store: Configuration management with caching
  • Amazon SQS: Asynchronous messaging with built-in redundancy
  • AWS App Config: Feature flag management with local caching

Benefits

  • Consistent Behavior: System operates predictably regardless of dependency state
  • Reduced Cascading Failures: Prevents dependency failures from causing system-wide outages
  • Improved User Experience: Graceful degradation maintains core functionality
  • Operational Simplicity: Eliminates bimodal behavior that complicates troubleshooting
  • Higher Availability: System remains operational even when dependencies fail