From ccf9477b7f64d50da489350a263c49c2c87f9512 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Sun, 11 Jun 2023 18:08:01 -0300 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20fix(manager.py):=20add=20check?= =?UTF-8?q?=20for=20langchain=20object=20in=20process=5Fmessage=20to=20avo?= =?UTF-8?q?id=20errors=20=E2=9C=A8=20feat(manager.py):=20add=20build=20met?= =?UTF-8?q?hod=20to=20build=20langchain=20object=20and=20store=20it=20in?= =?UTF-8?q?=20an=20in-memory=20cache=20The=20`process=5Fmessage`=20method?= =?UTF-8?q?=20now=20checks=20if=20the=20langchain=20object=20has=20been=20?= =?UTF-8?q?built=20and=20stored=20in=20the=20in-memory=20cache=20before=20?= =?UTF-8?q?processing=20the=20message.=20If=20the=20object=20is=20not=20fo?= =?UTF-8?q?und,=20the=20connection=20is=20closed=20with=20an=20error=20mes?= =?UTF-8?q?sage.=20A=20new=20`build`=20method=20has=20been=20added=20to=20?= =?UTF-8?q?build=20the=20langchain=20object=20and=20store=20it=20in=20an?= =?UTF-8?q?=20in-memory=20cache.=20This=20method=20is=20called=20before=20?= =?UTF-8?q?processing=20any=20messages.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/backend/langflow/chat/manager.py | 51 ++++++++++++++++++++++------ src/backend/langflow/chat/utils.py | 4 +-- 2 files changed, 41 insertions(+), 14 deletions(-) diff --git a/src/backend/langflow/chat/manager.py b/src/backend/langflow/chat/manager.py index d24057b68..4169ecc92 100644 --- a/src/backend/langflow/chat/manager.py +++ b/src/backend/langflow/chat/manager.py @@ -10,7 +10,10 @@ from langflow.utils.logger import logger import asyncio import json -from typing import Dict, List +from typing import Any, Dict, List + +from langflow.cache.flow import InMemoryCache +from langflow.graph import Graph class ChatHistory(Subject): @@ -46,6 +49,7 @@ class ChatManager: self.chat_history = ChatHistory() self.cache_manager = cache_manager self.cache_manager.attach(self.update) + self.in_memory_cache = InMemoryCache() def on_chat_history_update(self): """Send the last chat message to the client.""" @@ -99,24 +103,30 @@ class ChatManager: websocket = self.active_connections[client_id] await websocket.send_json(message.dict()) - async def process_message(self, client_id: str, payload: Dict): + async def close_connection(self, client_id: str, code: status, reason: str): + if websocket := self.active_connections[client_id]: + await websocket.close(code=code, reason=reason) + self.disconnect(client_id) + + async def process_message( + self, client_id: str, payload: Dict, langchain_object: Any + ): # Process the graph data and chat message chat_message = payload.pop("message", "") chat_message = ChatMessage(message=chat_message) self.chat_history.add_message(client_id, chat_message) - graph_data = payload + # graph_data = payload start_resp = ChatResponse(message=None, type="start", intermediate_steps="") await self.send_json(client_id, start_resp) - is_first_message = len(self.chat_history.get_history(client_id=client_id)) <= 1 + # is_first_message = len(self.chat_history.get_history(client_id=client_id)) <= 1 # Generate result and thought try: logger.debug("Generating result and thought") result, intermediate_steps = await process_graph( - graph_data=graph_data, - is_first_message=is_first_message, + langchain_object=langchain_object, chat_message=chat_message, websocket=self.active_connections[client_id], ) @@ -149,6 +159,17 @@ class ChatManager: await self.send_json(client_id, response) self.chat_history.add_message(client_id, response) + def build(self, client_id: str, graph_data: Dict) -> bool: + """ + Build the langchain object and set the streaming options, + then store it in the in-memory cache. + """ + logger.debug("Building langchain object") + graph = Graph.from_payload(graph_data) + langchain_object = graph.build() + self.in_memory_cache.set(client_id, langchain_object) + return client_id in self.in_memory_cache + async def handle_websocket(self, client_id: str, websocket: WebSocket): await self.connect(client_id, websocket) @@ -169,16 +190,24 @@ class ChatManager: continue with self.cache_manager.set_client_id(client_id): - await self.process_message(client_id, payload) + if client_id not in self.in_memory_cache: + self.close_connection( + client_id=client_id, + code=status.WS_1011_INTERNAL_ERROR, + reason="Please, build the flow before sending messages", + ) + else: + langchain_object = self.in_memory_cache.get(client_id) + await self.process_message(client_id, payload, langchain_object) except Exception as e: # Handle any exceptions that might occur logger.exception(e) - # send a message to the client - await self.active_connections[client_id].close( - code=status.WS_1011_INTERNAL_ERROR, reason=str(e)[:120] + self.close_connection( + client_id=client_id, + code=status.WS_1011_INTERNAL_ERROR, + reason=str(e)[:120], ) - self.disconnect(client_id) finally: try: connection = self.active_connections.get(client_id) diff --git a/src/backend/langflow/chat/utils.py b/src/backend/langflow/chat/utils.py index 410a442be..9473cc133 100644 --- a/src/backend/langflow/chat/utils.py +++ b/src/backend/langflow/chat/utils.py @@ -12,12 +12,10 @@ from typing import Dict async def process_graph( - graph_data: Dict, - is_first_message: bool, + langchain_object, chat_message: ChatMessage, websocket: WebSocket, ): - langchain_object = load_or_build_langchain_object(graph_data, is_first_message) langchain_object = try_setting_streaming_options(langchain_object, websocket) logger.debug("Loaded langchain object")