From 273f452f7b40cceacdef27fd0cf16a0f875c2bf9 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Mon, 12 Jun 2023 07:50:00 -0300 Subject: [PATCH] =?UTF-8?q?=F0=9F=94=A7=20chore(chat.py):=20add=20SSE=20su?= =?UTF-8?q?pport=20to=20post=5Fbuild=20endpoint=20=E2=9C=A8=20feat(chat.py?= =?UTF-8?q?):=20add=20SSE=20support=20to=20post=5Fbuild=20endpoint=20to=20?= =?UTF-8?q?stream=20build=20progress=20to=20the=20client=20The=20post=5Fbu?= =?UTF-8?q?ild=20endpoint=20now=20returns=20a=20StreamingResponse=20object?= =?UTF-8?q?=20that=20streams=20Server-Sent=20Events=20(SSE)=20to=20the=20c?= =?UTF-8?q?lient.=20This=20allows=20the=20client=20to=20receive=20build=20?= =?UTF-8?q?progress=20updates=20in=20real-time.=20The=20event=5Fstream=20f?= =?UTF-8?q?unction=20is=20responsible=20for=20generating=20the=20SSE=20eve?= =?UTF-8?q?nts=20and=20is=20called=20by=20the=20StreamingResponse=20object?= =?UTF-8?q?.=20The=20SSE=20events=20contain=20information=20about=20the=20?= =?UTF-8?q?build=20progress,=20including=20whether=20the=20build=20was=20s?= =?UTF-8?q?uccessful=20or=20not,=20the=20parameters=20used=20to=20build=20?= =?UTF-8?q?each=20node,=20and=20the=20node=20ID.=20The=20chat=5Fmanager.se?= =?UTF-8?q?t=5Fcache=20method=20is=20called=20to=20cache=20the=20built=20g?= =?UTF-8?q?raph=20object.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/backend/langflow/api/v1/chat.py | 39 +++++++++++++++++++++++------ 1 file changed, 32 insertions(+), 7 deletions(-) diff --git a/src/backend/langflow/api/v1/chat.py b/src/backend/langflow/api/v1/chat.py index 9619d5f30..2c085c187 100644 --- a/src/backend/langflow/api/v1/chat.py +++ b/src/backend/langflow/api/v1/chat.py @@ -1,3 +1,4 @@ +import json from fastapi import ( APIRouter, HTTPException, @@ -6,8 +7,10 @@ from fastapi import ( WebSocketException, status, ) +from fastapi.responses import StreamingResponse from langflow.chat.manager import ChatManager +from langflow.graph.graph.base import Graph from langflow.utils.logger import logger router = APIRouter() @@ -27,12 +30,34 @@ async def websocket_endpoint(client_id: str, websocket: WebSocket): await websocket.close(code=status.WS_1000_NORMAL_CLOSURE, reason=str(exc)) -@router.post("/build/{client_id}") +@router.post("/build/{client_id}", response_class=StreamingResponse) async def post_build(client_id: str, graph_data: dict): """Build langchain object from data_graph.""" - try: - if chat_manager.build(client_id, graph_data.get("data")): - return {"message": "Build successful"} - except Exception as exc: - logger.exception(exc) - raise HTTPException(status_code=500, detail=str(exc)) from exc + + async def event_stream(graph_data): + try: + graph_data = graph_data.get("data") + if not graph_data: + raise HTTPException(status_code=400, detail="No data provided") + + logger.debug("Building langchain object") + graph = Graph.from_payload(graph_data) + for node in graph.generator_build(): + logger.debug(f"Building node {node.name}") + response = json.dumps( + { + "valid": True, + "params": str(node._built_object_repr()), + "id": node.id, + } + ) + yield f"data: {response}\n\n" # SSE format + + chat_manager.set_cache(client_id, graph.build()) + + except Exception as exc: + logger.exception(exc) + error_response = json.dumps({"error": str(exc)}) + yield f"data: {error_response}\n\n" # SSE format + + return StreamingResponse(event_stream(graph_data), media_type="text/event-stream")