diff --git a/wavefront/server/apps/floware/floware/channels.py b/wavefront/server/apps/floware/floware/channels.py index a8b5be4f..b826f99f 100644 --- a/wavefront/server/apps/floware/floware/channels.py +++ b/wavefront/server/apps/floware/floware/channels.py @@ -18,6 +18,11 @@ async def start_redis_listener( queue = asyncio.Queue() pubsub = cache_manager.subscribe(channels=[REDIS_API_SERVICE_UPDATES_CHANNEL]) + + if pubsub is None: + logger.info('Cache is disabled — skipping Redis pubsub listener') + return + logger.info('Subscribed to Redis channel: %s', REDIS_API_SERVICE_UPDATES_CHANNEL) # Capture the running loop from the main thread diff --git a/wavefront/server/modules/db_repo_module/db_repo_module/cache/cache_manager.py b/wavefront/server/modules/db_repo_module/db_repo_module/cache/cache_manager.py index 14501e24..8e7a646e 100644 --- a/wavefront/server/modules/db_repo_module/db_repo_module/cache/cache_manager.py +++ b/wavefront/server/modules/db_repo_module/db_repo_module/cache/cache_manager.py @@ -32,6 +32,10 @@ def __init__( self.initial_backoff = initial_backoff self.max_backoff = max_backoff + self.cache_enabled = ( + os.getenv('WAVEFRONT_CACHE_ENABLED', 'true').lower() == 'true' + ) + self.pool = self._create_connection_pool( connection_timeout=connection_timeout, socket_timeout=socket_timeout, @@ -50,6 +54,12 @@ def __init__( logger.error('Server will not start without Redis connectivity') raise RuntimeError(f'Redis connection test failed: {e}') from e + if not self.cache_enabled: + logger.info( + 'Cache get/set operations are disabled via WAVEFRONT_CACHE_ENABLED; ' + 'pub/sub remains active' + ) + def _create_connection_pool( self, connection_timeout: int, @@ -100,6 +110,9 @@ def add( expiry: int = 3600, nx: bool = False, ) -> bool: + if not self.cache_enabled: + return False + try: logger.info(f'Adding key: {key} to cache with expiry: {expiry} seconds') return bool( @@ -115,6 +128,9 @@ def add( retry=retry_if_exception_type((RedisError, ConnectionError, TimeoutError)), ) def get_str(self, key: str, default: Any = None) -> Optional[str]: + if not self.cache_enabled: + return default + try: value = self.redis.get(f'{self.namespace}/{key}') return value if value is not None else default @@ -124,10 +140,16 @@ def get_str(self, key: str, default: Any = None) -> Optional[str]: raise def get_int(self, key: str, default: int = 0) -> int: + if not self.cache_enabled: + return default + value = self.get_str(key, default) return int(value) if value is not None else default def remove(self, key: str) -> bool: + if not self.cache_enabled: + return False + try: return bool(self.redis.delete(f'{self.namespace}/{key}')) except (RedisError, ConnectionError, TimeoutError) as e: @@ -136,6 +158,9 @@ def remove(self, key: str) -> bool: def invalidate_query(self, pattern: str) -> int: """Remove all keys matching the given pattern""" + if not self.cache_enabled: + return 0 + try: # Get all keys matching the pattern search_pattern = f'{self.namespace}/{pattern}' @@ -170,6 +195,7 @@ def publish(self, channel: str, message: str) -> int: Raises: RedisError: If publishing fails """ + try: full_channel = f'{self.namespace}/{channel}' logger.info(f'Publishing message to channel: {full_channel}') @@ -203,6 +229,7 @@ def subscribe( if message['type'] == 'message': print(f"Received: {message['data']}") """ + try: pubsub = self.redis.pubsub()