From 64bf97ff849d9751959a310bc119c2890402bb38 Mon Sep 17 00:00:00 2001 From: Gabriel Almeida Date: Thu, 4 May 2023 18:24:12 -0300 Subject: [PATCH 1/3] chore(base.py): add type ignore comment to clear_cache function fix(base.py): raise ValueError if content is None in save_binary_file function feat(nodes.py): add memory field to TimeTravelGuideChainNode test(test_chains_template.py): add test for memory field in TimeTravelGuideChainNode --- src/backend/langflow/cache/base.py | 5 +++-- src/backend/langflow/template/nodes.py | 7 +++++++ tests/test_chains_template.py | 11 +++++++++++ 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/src/backend/langflow/cache/base.py b/src/backend/langflow/cache/base.py index 73439e9dd..3d667b8b4 100644 --- a/src/backend/langflow/cache/base.py +++ b/src/backend/langflow/cache/base.py @@ -47,7 +47,7 @@ def memoize_dict(maxsize=128): def clear_cache(): cache.clear() - wrapper.clear_cache = clear_cache + wrapper.clear_cache = clear_cache # type: ignore return wrapper return decorator @@ -119,7 +119,8 @@ def save_binary_file(content: str, file_name: str, accepted_types: list[str]) -> # Get the destination folder cache_path = Path(tempfile.gettempdir()) / PREFIX - + if content is None: + raise ValueError("Please, reload the file in the loader.") data = content.split(",")[1] decoded_bytes = base64.b64decode(data) diff --git a/src/backend/langflow/template/nodes.py b/src/backend/langflow/template/nodes.py index 15e3d2d85..61a41589b 100644 --- a/src/backend/langflow/template/nodes.py +++ b/src/backend/langflow/template/nodes.py @@ -150,6 +150,13 @@ class TimeTravelGuideChainNode(FrontendNode): multiline=False, name="llm", ), + TemplateField( + field_type="BaseChatMemory", + required=False, + show=True, + name="memory", + advanced=False, + ), ], ) description: str = "Time travel guide chain to be used in the flow." diff --git a/tests/test_chains_template.py b/tests/test_chains_template.py index 13bef2e9c..33af32e57 100644 --- a/tests/test_chains_template.py +++ b/tests/test_chains_template.py @@ -435,5 +435,16 @@ def test_time_travel_guide_chain(client: TestClient): "list": False, "advanced": False, } + assert template["memory"] == { + "required": False, + "placeholder": "", + "show": True, + "multiline": False, + "password": False, + "name": "memory", + "type": "BaseChatMemory", + "list": False, + "advanced": False, + } assert chain["description"] == "Time travel guide chain to be used in the flow." From 474e14efaf98b6ddcac97d42812aa4a3f1aadf27 Mon Sep 17 00:00:00 2001 From: Gabriel Almeida Date: Fri, 5 May 2023 11:55:25 -0300 Subject: [PATCH 2/3] refactor(langflow): replace langchain_object.run with langchain_object.acall in get_result_and_steps function feat(langflow): add support for streaming intermediate steps to the client via websockets --- src/backend/langflow/api/callback.py | 3 ++- src/backend/langflow/api/chat.py | 8 ++------ src/backend/langflow/api/chat_manager.py | 15 ++++++++------- src/backend/langflow/interface/run.py | 11 +++++++---- src/backend/langflow/interface/utils.py | 11 ++--------- 5 files changed, 21 insertions(+), 27 deletions(-) diff --git a/src/backend/langflow/api/callback.py b/src/backend/langflow/api/callback.py index baab596a4..a6590673e 100644 --- a/src/backend/langflow/api/callback.py +++ b/src/backend/langflow/api/callback.py @@ -1,6 +1,7 @@ +import asyncio from typing import Any -from langchain.callbacks.base import AsyncCallbackHandler +from langchain.callbacks.base import AsyncCallbackHandler, BaseCallbackHandler from langflow.api.schemas import ChatResponse diff --git a/src/backend/langflow/api/chat.py b/src/backend/langflow/api/chat.py index e25d0d2f1..48a195179 100644 --- a/src/backend/langflow/api/chat.py +++ b/src/backend/langflow/api/chat.py @@ -10,9 +10,5 @@ chat_manager = ChatManager() @router.websocket("/chat/{client_id}") async def websocket_endpoint(client_id: str, websocket: WebSocket): """Websocket endpoint for chat.""" - try: - await chat_manager.handle_websocket(client_id, websocket) - except Exception as e: - # Log stack trace - logger.exception(e) - raise e + + await chat_manager.handle_websocket(client_id, websocket) diff --git a/src/backend/langflow/api/chat_manager.py b/src/backend/langflow/api/chat_manager.py index 2dab12e34..eff60cadd 100644 --- a/src/backend/langflow/api/chat_manager.py +++ b/src/backend/langflow/api/chat_manager.py @@ -5,6 +5,7 @@ from typing import Dict, List from fastapi import WebSocket +from langflow.api.callback import StreamingLLMCallbackHandler from langflow.api.schemas import ChatMessage, ChatResponse, FileResponse from langflow.cache import cache_manager from langflow.cache.manager import Subject @@ -175,12 +176,11 @@ class ChatManager: # Handle any exceptions that might occur logger.exception(e) # send a message to the client - await self.send_message(client_id, str(e)) - raise e + await self.active_connections[client_id].close(code=1000, reason=str(e)) finally: - await self.active_connections[client_id].close( - code=1000, reason="Client disconnected" - ) + # await self.active_connections[client_id].close( + # code=1000, reason="Client disconnected" + # ) self.disconnect(client_id) @@ -203,8 +203,9 @@ async def process_graph( # Generate result and thought try: logger.debug("Generating result and thought") - result, intermediate_steps = get_result_and_steps( - langchain_object, chat_message.message or "" + stream_handler = StreamingLLMCallbackHandler(websocket) + result, intermediate_steps = await get_result_and_steps( + langchain_object, chat_message.message or "", callbacks=[stream_handler] ) logger.debug("Generated result and intermediate_steps") return result, intermediate_steps diff --git a/src/backend/langflow/interface/run.py b/src/backend/langflow/interface/run.py index 68639785c..a8d55d3be 100644 --- a/src/backend/langflow/interface/run.py +++ b/src/backend/langflow/interface/run.py @@ -185,8 +185,11 @@ def fix_memory_inputs(langchain_object): update_memory_keys(langchain_object, possible_new_mem_key) -def get_result_and_steps(langchain_object, message: str): +async def get_result_and_steps(langchain_object, message: str, callbacks=None): """Get result and thought from extracted json""" + + if callbacks is None: + callbacks = [] try: if hasattr(langchain_object, "verbose"): langchain_object.verbose = True @@ -206,17 +209,17 @@ def get_result_and_steps(langchain_object, message: str): # https://github.com/hwchase17/langchain/issues/2068 # Deactivating until we have a frontend solution # to display intermediate steps - langchain_object.return_intermediate_steps = False + langchain_object.return_intermediate_steps = True fix_memory_inputs(langchain_object) with io.StringIO() as output_buffer, contextlib.redirect_stdout(output_buffer): try: - output = langchain_object(chat_input) + output = await langchain_object.acall(chat_input, callbacks=callbacks) except ValueError as exc: # make the error message more informative logger.debug(f"Error: {str(exc)}") - output = langchain_object.run(chat_input) + output = langchain_object.run(chat_input, callbacks=callbacks) intermediate_steps = ( output.get("intermediate_steps", []) if isinstance(output, dict) else [] diff --git a/src/backend/langflow/interface/utils.py b/src/backend/langflow/interface/utils.py index e8b3e417e..7368fda3e 100644 --- a/src/backend/langflow/interface/utils.py +++ b/src/backend/langflow/interface/utils.py @@ -4,13 +4,9 @@ import os from io import BytesIO import yaml -from langchain.callbacks.manager import AsyncCallbackManager -from langchain.chat_models import AzureChatOpenAI, ChatOpenAI -from langchain.llms import AzureOpenAI, OpenAI +from langchain.base_language import BaseLanguageModel from PIL.Image import Image -from langflow.api.callback import StreamingLLMCallbackHandler - def load_file_into_dict(file_path: str) -> dict: if not os.path.exists(file_path): @@ -48,10 +44,7 @@ def try_setting_streaming_options(langchain_object, websocket): langchain_object.llm_chain, "llm" ): llm = langchain_object.llm_chain.llm - if isinstance(llm, (OpenAI, ChatOpenAI, AzureOpenAI, AzureChatOpenAI)): + if isinstance(llm, BaseLanguageModel): llm.streaming = bool(hasattr(llm, "streaming")) - stream_handler = StreamingLLMCallbackHandler(websocket) - stream_manager = AsyncCallbackManager([stream_handler]) - llm.callback_manager = stream_manager return langchain_object From c032e2a0d12600dabb74ec5a9c5d498a846d30b3 Mon Sep 17 00:00:00 2001 From: Gabriel Almeida Date: Fri, 5 May 2023 12:22:51 -0300 Subject: [PATCH 3/3] refactor(chat_manager.py): remove unused variable 'result' and set message to empty string --- src/backend/langflow/api/chat_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/backend/langflow/api/chat_manager.py b/src/backend/langflow/api/chat_manager.py index eff60cadd..280f51b3a 100644 --- a/src/backend/langflow/api/chat_manager.py +++ b/src/backend/langflow/api/chat_manager.py @@ -144,7 +144,7 @@ class ChatManager: break response = ChatResponse( - message=result or "", + message="", intermediate_steps=intermediate_steps.strip(), type="end", files=file_responses,