From 4b71295caf1305fda682f1e770c458a6ae65690b Mon Sep 17 00:00:00 2001 From: ogabrielluiz Date: Tue, 18 Jun 2024 00:14:11 -0300 Subject: [PATCH] refactor: Update RedisCache to use AsyncInMemoryCache as fallback when Redis cache is not connected --- .../base/langflow/services/cache/factory.py | 2 +- .../base/langflow/services/cache/service.py | 33 +++++++++++-------- .../base/langflow/services/chat/service.py | 2 +- 3 files changed, 21 insertions(+), 16 deletions(-) diff --git a/src/backend/base/langflow/services/cache/factory.py b/src/backend/base/langflow/services/cache/factory.py index b04eb6417..bca362e16 100644 --- a/src/backend/base/langflow/services/cache/factory.py +++ b/src/backend/base/langflow/services/cache/factory.py @@ -29,7 +29,7 @@ class CacheServiceFactory(ServiceFactory): logger.debug("Redis cache is connected") return redis_cache logger.warning("Redis cache is not connected, falling back to in-memory cache") - return ThreadingInMemoryCache() + return AsyncInMemoryCache() elif settings_service.settings.cache_type == "memory": return ThreadingInMemoryCache() diff --git a/src/backend/base/langflow/services/cache/service.py b/src/backend/base/langflow/services/cache/service.py index 2aa187b22..c23e7742d 100644 --- a/src/backend/base/langflow/services/cache/service.py +++ b/src/backend/base/langflow/services/cache/service.py @@ -243,10 +243,11 @@ class RedisCache(CacheService): try: self._client.ping() return True - except redis.exceptions.ConnectionError: + except redis.exceptions.ConnectionError as exc: + logger.error(f"RedisCache could not connect to the Redis server: {exc}") return False - def get(self, key): + async def get(self, key, lock=None): """ Retrieve an item from the cache. @@ -256,10 +257,12 @@ class RedisCache(CacheService): Returns: The value associated with the key, or None if the key is not found. """ - value = self._client.get(key) + if key is None: + return None + value = self._client.get(str(key)) return pickle.loads(value) if value else None - def set(self, key, value): + async def set(self, key, value, lock=None): """ Add an item to the cache. @@ -269,13 +272,13 @@ class RedisCache(CacheService): """ try: if pickled := pickle.dumps(value): - result = self._client.setex(key, self.expiration_time, pickled) + result = self._client.setex(str(key), self.expiration_time, pickled) if not result: raise ValueError("RedisCache could not set the value.") except TypeError as exc: raise TypeError("RedisCache only accepts values that can be pickled. ") from exc - def upsert(self, key, value): + async def upsert(self, key, value, lock=None): """ Inserts or updates a value in the cache. If the existing value and the new value are both dictionaries, they are merged. @@ -284,14 +287,16 @@ class RedisCache(CacheService): key: The key of the item. value: The value to insert or update. """ - existing_value = self.get(key) + if key is None: + return + existing_value = await self.get(key) if existing_value is not None and isinstance(existing_value, dict) and isinstance(value, dict): existing_value.update(value) value = existing_value - self.set(key, value) + await self.set(key, value) - def delete(self, key): + async def delete(self, key, lock=None): """ Remove an item from the cache. @@ -300,7 +305,7 @@ class RedisCache(CacheService): """ self._client.delete(key) - def clear(self): + async def clear(self, lock=None): """ Clear all items from the cache. """ @@ -308,17 +313,17 @@ class RedisCache(CacheService): def __contains__(self, key): """Check if the key is in the cache.""" - return False if key is None else self._client.exists(key) + return False if key is None else self._client.exists(str(key)) - def __getitem__(self, key): + async def __getitem__(self, key): """Retrieve an item from the cache using the square bracket notation.""" return self.get(key) - def __setitem__(self, key, value): + async def __setitem__(self, key, value): """Add an item to the cache using the square bracket notation.""" self.set(key, value) - def __delitem__(self, key): + async def __delitem__(self, key): """Remove an item from the cache using the square bracket notation.""" self.delete(key) diff --git a/src/backend/base/langflow/services/chat/service.py b/src/backend/base/langflow/services/chat/service.py index 042a541a3..bb913e6e9 100644 --- a/src/backend/base/langflow/services/chat/service.py +++ b/src/backend/base/langflow/services/chat/service.py @@ -23,7 +23,7 @@ class ChatService(Service): "result": data, "type": type(data), } - await self.cache_service.upsert(key, result_dict, lock=lock or self._cache_locks[key]) + await self.cache_service.upsert(str(key), result_dict, lock=lock or self._cache_locks[key]) return key in self.cache_service async def get_cache(self, key: str, lock: Optional[asyncio.Lock] = None) -> Any: