diff --git a/src/backend/langflow/services/cache/base.py b/src/backend/langflow/services/cache/base.py index 80aec1408..f40cd6663 100644 --- a/src/backend/langflow/services/cache/base.py +++ b/src/backend/langflow/services/cache/base.py @@ -32,6 +32,16 @@ class BaseCacheManager(abc.ABC, Service): value: The value to cache. """ + @abc.abstractmethod + def upsert(self, key, value): + """ + Add an item to the cache if it doesn't exist, or update it if it does. + + Args: + key: The key of the item. + value: The value to cache. + """ + @abc.abstractmethod def delete(self, key): """ diff --git a/src/backend/langflow/services/cache/manager.py b/src/backend/langflow/services/cache/manager.py index 6243c2507..89ef7ebcf 100644 --- a/src/backend/langflow/services/cache/manager.py +++ b/src/backend/langflow/services/cache/manager.py @@ -91,6 +91,27 @@ class InMemoryCache(BaseCacheManager): self._cache.popitem(last=False) self._cache[key] = {"value": value, "time": time.time()} + def upsert(self, key, value): + """ + Inserts or updates a value in the cache. + If the existing value and the new value are both dictionaries, they are merged. + + Args: + key: The key of the item. + value: The value to insert or update. + """ + with self._lock: + existing_value = self.get(key) + if ( + existing_value is not None + and isinstance(existing_value, dict) + and isinstance(value, dict) + ): + existing_value.update(value) + value = existing_value + + self.set(key, value) + def get_or_set(self, key, value): """ Retrieve an item from the cache. If the item does not exist, set it with the provided value. @@ -220,6 +241,26 @@ class RedisCache(BaseCacheManager): """ self._client.setex(key, self.expiration_time, pickle.dumps(value)) + def upsert(self, key, value): + """ + Inserts or updates a value in the cache. + If the existing value and the new value are both dictionaries, they are merged. + + Args: + key: The key of the item. + value: The value to insert or update. + """ + existing_value = self.get(key) + if ( + existing_value is not None + and isinstance(existing_value, dict) + and isinstance(value, dict) + ): + existing_value.update(value) + value = existing_value + + self.set(key, value) + def delete(self, key): """ Remove an item from the cache. diff --git a/src/backend/langflow/services/chat/manager.py b/src/backend/langflow/services/chat/manager.py index 76790cbb4..e4b3895c2 100644 --- a/src/backend/langflow/services/chat/manager.py +++ b/src/backend/langflow/services/chat/manager.py @@ -175,8 +175,11 @@ class ChatManager(Service): # client_id is the flow id but that already exists in the cache # so we need to change it to something else - client_id = f"{client_id}_chat" if "_chat" not in client_id else client_id - self.cache_manager.set(client_id, langchain_object) + result_dict = { + "result": langchain_object.result, + "type": type(langchain_object), + } + self.cache_manager.upsert(client_id, result_dict) return client_id in self.cache_manager async def handle_websocket(self, client_id: str, websocket: WebSocket): @@ -199,9 +202,15 @@ class ChatManager(Service): continue with self.chat_cache.set_client_id(client_id): - langchain_object = self.cache_manager.get(f"{client_id}_chat") - await self.process_message(client_id, payload, langchain_object) + if langchain_object := self.cache_manager.get(client_id).get( + "result" + ): + await self.process_message(client_id, payload, langchain_object) + else: + raise RuntimeError( + f"Could not find a LangChain object for client_id {client_id}" + ) except Exception as exc: # Handle any exceptions that might occur logger.error(f"Error handling websocket: {exc}")