🔨 chore(base.py): add upsert method to BaseCacheManager to provide a common interface for inserting or updating cache items

 feat(manager.py): add upsert method to InMemoryCache and RedisCache to support inserting or updating cache items
🐛 fix(manager.py): fix client_id generation in handle_websocket method to avoid overwriting existing cache items
🔨 chore(manager.py): refactor handle_websocket method to use upsert method and improve readability
This commit is contained in:
Gabriel Luiz Freitas Almeida 2023-08-14 12:25:14 -03:00
commit e8d3a5df6d
3 changed files with 64 additions and 4 deletions

View file

@ -32,6 +32,16 @@ class BaseCacheManager(abc.ABC, Service):
value: The value to cache.
"""
@abc.abstractmethod
def upsert(self, key, value):
"""
Add an item to the cache if it doesn't exist, or update it if it does.
Args:
key: The key of the item.
value: The value to cache.
"""
@abc.abstractmethod
def delete(self, key):
"""

View file

@ -91,6 +91,27 @@ class InMemoryCache(BaseCacheManager):
self._cache.popitem(last=False)
self._cache[key] = {"value": value, "time": time.time()}
def upsert(self, key, value):
"""
Inserts or updates a value in the cache.
If the existing value and the new value are both dictionaries, they are merged.
Args:
key: The key of the item.
value: The value to insert or update.
"""
with self._lock:
existing_value = self.get(key)
if (
existing_value is not None
and isinstance(existing_value, dict)
and isinstance(value, dict)
):
existing_value.update(value)
value = existing_value
self.set(key, value)
def get_or_set(self, key, value):
"""
Retrieve an item from the cache. If the item does not exist, set it with the provided value.
@ -220,6 +241,26 @@ class RedisCache(BaseCacheManager):
"""
self._client.setex(key, self.expiration_time, pickle.dumps(value))
def upsert(self, key, value):
"""
Inserts or updates a value in the cache.
If the existing value and the new value are both dictionaries, they are merged.
Args:
key: The key of the item.
value: The value to insert or update.
"""
existing_value = self.get(key)
if (
existing_value is not None
and isinstance(existing_value, dict)
and isinstance(value, dict)
):
existing_value.update(value)
value = existing_value
self.set(key, value)
def delete(self, key):
"""
Remove an item from the cache.

View file

@ -175,8 +175,11 @@ class ChatManager(Service):
# client_id is the flow id but that already exists in the cache
# so we need to change it to something else
client_id = f"{client_id}_chat" if "_chat" not in client_id else client_id
self.cache_manager.set(client_id, langchain_object)
result_dict = {
"result": langchain_object.result,
"type": type(langchain_object),
}
self.cache_manager.upsert(client_id, result_dict)
return client_id in self.cache_manager
async def handle_websocket(self, client_id: str, websocket: WebSocket):
@ -199,9 +202,15 @@ class ChatManager(Service):
continue
with self.chat_cache.set_client_id(client_id):
langchain_object = self.cache_manager.get(f"{client_id}_chat")
await self.process_message(client_id, payload, langchain_object)
if langchain_object := self.cache_manager.get(client_id).get(
"result"
):
await self.process_message(client_id, payload, langchain_object)
else:
raise RuntimeError(
f"Could not find a LangChain object for client_id {client_id}"
)
except Exception as exc:
# Handle any exceptions that might occur
logger.error(f"Error handling websocket: {exc}")