From 8ccdeb9dd541a0da24788f4b1728054300c6cdcb Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Fri, 23 Jun 2023 09:54:15 -0300 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20fix(chat.py):=20fix=20stream=5Fb?= =?UTF-8?q?uild=20function=20to=20return=20StreamData=20objects=20instead?= =?UTF-8?q?=20of=20strings=20=E2=9C=A8=20feat(chat.py):=20add=20progress?= =?UTF-8?q?=20information=20to=20the=20stream=5Fbuild=20function=20The=20s?= =?UTF-8?q?tream=5Fbuild=20function=20now=20returns=20StreamData=20objects?= =?UTF-8?q?=20instead=20of=20strings,=20which=20improves=20the=20readabili?= =?UTF-8?q?ty=20of=20the=20code.=20The=20function=20also=20now=20includes?= =?UTF-8?q?=20progress=20information=20in=20the=20response,=20which=20allo?= =?UTF-8?q?ws=20the=20client=20to=20track=20the=20progress=20of=20the=20bu?= =?UTF-8?q?ild=20process.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/backend/langflow/api/v1/chat.py | 65 +++++++++++++++++--------- src/backend/langflow/api/v1/schemas.py | 8 ++++ 2 files changed, 51 insertions(+), 22 deletions(-) diff --git a/src/backend/langflow/api/v1/chat.py b/src/backend/langflow/api/v1/chat.py index ae9fb8b30..a730758d2 100644 --- a/src/backend/langflow/api/v1/chat.py +++ b/src/backend/langflow/api/v1/chat.py @@ -1,4 +1,3 @@ -import json from fastapi import ( APIRouter, HTTPException, @@ -7,7 +6,7 @@ from fastapi import ( status, ) from fastapi.responses import StreamingResponse -from langflow.api.v1.schemas import BuiltResponse, InitResponse +from langflow.api.v1.schemas import BuiltResponse, InitResponse, StreamData from langflow.chat.manager import ChatManager from langflow.graph.graph.base import Graph @@ -39,14 +38,18 @@ async def init_build(graph_data: dict): try: flow_id = graph_data.get("id") + if flow_id is None: + raise ValueError("No ID provided") + # Check if already building + if flow_id in flow_data_store and flow_data_store[flow_id].get("building"): + return InitResponse(flowId=flow_id) + # Delete from cache if already exists if flow_id in chat_manager.in_memory_cache: with chat_manager.in_memory_cache._lock: chat_manager.in_memory_cache.delete(flow_id) logger.debug(f"Deleted flow {flow_id} from cache") - if flow_id is None: - raise ValueError("No ID provided") - flow_data_store[flow_id] = graph_data + flow_data_store[flow_id] = {"graph_data": graph_data, "building": False} return InitResponse(flowId=flow_id) except Exception as exc: @@ -76,26 +79,44 @@ async def stream_build(flow_id: str): """Stream the build process based on stored flow data.""" async def event_stream(flow_id): - final_response = json.dumps({"end_of_stream": True}) + final_response = {"end_of_stream": True} try: if flow_id not in flow_data_store: error_message = "Invalid session ID" - yield f"data: {json.dumps({'error': error_message})}\n\n" + yield str(StreamData(event="error", data={"error": error_message})) + return + + if flow_data_store[flow_id].get("building"): + error_message = "Already building" + yield str(StreamData(event="error", data={"error": error_message})) return graph_data = flow_data_store[flow_id].get("data") if not graph_data: error_message = "No data provided" - yield f"data: {json.dumps({'error': error_message})}\n\n" + yield str(StreamData(event="error", data={"error": error_message})) return logger.debug("Building langchain object") - graph = Graph.from_payload(graph_data) - for node in graph.generator_build(): + try: + # Some error could happen when building the graph + graph = Graph.from_payload(graph_data) + except Exception as exc: + logger.exception(exc) + error_message = str(exc) + yield str(StreamData(event="error", data={"error": error_message})) + return + + number_of_nodes = len(graph.nodes) + for i, vertex in enumerate(graph.generator_build(), 1): try: - node.build() - params = node._built_object_repr() + log_dict = { + "log": f"Building node {vertex.vertex_type}", + } + yield str(StreamData(event="log", data=log_dict)) + vertex.build() + params = vertex._built_object_repr() valid = True logger.debug( f"Building node {params[:50]}{'...' if len(params) > 50 else ''}" @@ -104,21 +125,21 @@ async def stream_build(flow_id: str): params = str(exc) valid = False - response = json.dumps( - { - "valid": valid, - "params": params, - "id": node.id, - } - ) - yield f"data: {response}\n\n" + response = { + "valid": valid, + "params": params, + "id": vertex.id, + "progress": round(i / number_of_nodes, 2), + } + + yield str(StreamData(event="message", data=response)) chat_manager.set_cache(flow_id, graph.build()) except Exception as exc: logger.error("Error while building the flow: %s", exc) - yield f"error: {json.dumps({'error': str(exc)})}\n\n" + yield str(StreamData(event="error", data={"error": str(exc)})) finally: - yield f"data: {final_response}\n\n" + yield str(StreamData(event="message", data=final_response)) try: return StreamingResponse(event_stream(flow_id), media_type="text/event-stream") diff --git a/src/backend/langflow/api/v1/schemas.py b/src/backend/langflow/api/v1/schemas.py index 714f0df7f..263221d75 100644 --- a/src/backend/langflow/api/v1/schemas.py +++ b/src/backend/langflow/api/v1/schemas.py @@ -101,3 +101,11 @@ class InitResponse(BaseModel): class BuiltResponse(BaseModel): built: bool + + +class StreamData(BaseModel): + event: str + data: dict + + def __str__(self) -> str: + return f"event: {self.event}\ndata: {json.dumps(self.data)}\n\n"