diff --git a/src/backend/langflow/api/v1/chat.py b/src/backend/langflow/api/v1/chat.py index 8d754975b..2220bbeb8 100644 --- a/src/backend/langflow/api/v1/chat.py +++ b/src/backend/langflow/api/v1/chat.py @@ -1,4 +1,3 @@ -import json from fastapi import ( APIRouter, HTTPException, @@ -7,7 +6,7 @@ from fastapi import ( status, ) from fastapi.responses import StreamingResponse -from langflow.api.v1.schemas import BuiltResponse, InitResponse +from langflow.api.v1.schemas import BuiltResponse, InitResponse, StreamData from langflow.chat.manager import ChatManager from langflow.graph.graph.base import Graph @@ -71,18 +70,18 @@ async def stream_build(flow_id: str): """Stream the build process based on stored flow data.""" async def event_stream(flow_id): - final_response = json.dumps({"end_of_stream": True}) + final_response = {"end_of_stream": True} try: if flow_id not in flow_data_store: error_message = "Invalid session ID" - yield f"error: {json.dumps({'error': error_message})}\n\n" + yield str(StreamData(event="error", data={"error": error_message})) return graph_data = flow_data_store[flow_id].get("data") if not graph_data: error_message = "No data provided" - yield f"data: {json.dumps({'error': error_message})}\n\n" + yield str(StreamData(event="error", data={"error": error_message})) return logger.debug("Building langchain object") @@ -90,9 +89,9 @@ async def stream_build(flow_id: str): # Some error could happen when building the graph graph = Graph.from_payload(graph_data) except Exception as exc: - logger.error(exc) + logger.exception(exc) error_message = str(exc) - yield f"error: {json.dumps({'error': error_message})}\n\n" + yield str(StreamData(event="error", data={"error": error_message})) return number_of_nodes = len(graph.nodes) @@ -100,9 +99,8 @@ async def stream_build(flow_id: str): try: log_dict = { "log": f"Building node {vertex.vertex_type}", - "progress": round(i / number_of_nodes, 2), } - yield f"data: {json.dumps(log_dict)}\n\n" + yield str(StreamData(event="log", data=log_dict)) vertex.build() params = vertex._built_object_repr() valid = True @@ -113,21 +111,21 @@ async def stream_build(flow_id: str): params = str(exc) valid = False - response = json.dumps( - { - "valid": valid, - "params": params, - "id": vertex.id, - } - ) - yield f"data: {response}\n\n" + response = { + "valid": valid, + "params": params, + "id": vertex.id, + "progress": round(i / number_of_nodes, 2), + } + + yield str(StreamData(event="node", data=response)) chat_manager.set_cache(flow_id, graph.build()) except Exception as exc: logger.error("Error while building the flow: %s", exc) - yield f"error: {json.dumps({'error': str(exc)})}\n\n" + yield str(StreamData(event="error", data={"error": str(exc)})) finally: - yield f"data: {final_response}\n\n" + yield str(StreamData(event="end", data=final_response)) try: return StreamingResponse(event_stream(flow_id), media_type="text/event-stream")