🔧 chore(chat.py): add SSE support to post_build endpoint

 feat(chat.py): add SSE support to post_build endpoint to stream build progress to the client
The post_build endpoint now returns a StreamingResponse object that streams Server-Sent Events (SSE) to the client. This allows the client to receive build progress updates in real-time. The event_stream function is responsible for generating the SSE events and is called by the StreamingResponse object. The SSE events contain information about the build progress, including whether the build was successful or not, the parameters used to build each node, and the node ID. The chat_manager.set_cache method is called to cache the built graph object.
This commit is contained in:
Gabriel Luiz Freitas Almeida 2023-06-12 07:50:00 -03:00
commit 273f452f7b

View file

@ -1,3 +1,4 @@
import json
from fastapi import (
APIRouter,
HTTPException,
@ -6,8 +7,10 @@ from fastapi import (
WebSocketException,
status,
)
from fastapi.responses import StreamingResponse
from langflow.chat.manager import ChatManager
from langflow.graph.graph.base import Graph
from langflow.utils.logger import logger
router = APIRouter()
@ -27,12 +30,34 @@ async def websocket_endpoint(client_id: str, websocket: WebSocket):
await websocket.close(code=status.WS_1000_NORMAL_CLOSURE, reason=str(exc))
@router.post("/build/{client_id}")
@router.post("/build/{client_id}", response_class=StreamingResponse)
async def post_build(client_id: str, graph_data: dict):
"""Build langchain object from data_graph."""
try:
if chat_manager.build(client_id, graph_data.get("data")):
return {"message": "Build successful"}
except Exception as exc:
logger.exception(exc)
raise HTTPException(status_code=500, detail=str(exc)) from exc
async def event_stream(graph_data):
try:
graph_data = graph_data.get("data")
if not graph_data:
raise HTTPException(status_code=400, detail="No data provided")
logger.debug("Building langchain object")
graph = Graph.from_payload(graph_data)
for node in graph.generator_build():
logger.debug(f"Building node {node.name}")
response = json.dumps(
{
"valid": True,
"params": str(node._built_object_repr()),
"id": node.id,
}
)
yield f"data: {response}\n\n" # SSE format
chat_manager.set_cache(client_id, graph.build())
except Exception as exc:
logger.exception(exc)
error_response = json.dumps({"error": str(exc)})
yield f"data: {error_response}\n\n" # SSE format
return StreamingResponse(event_stream(graph_data), media_type="text/event-stream")