Add SSE endpoint for streaming vertex build

This commit is contained in:
Gabriel Luiz Freitas Almeida 2024-02-23 19:14:48 -03:00
commit 4d48805e49

View file

@ -10,12 +10,14 @@ from fastapi import (
WebSocketException,
status,
)
from fastapi.responses import StreamingResponse
from loguru import logger
from sqlmodel import Session
from langflow.api.utils import build_and_cache_graph, format_elapsed_time
from langflow.api.v1.schemas import (
ResultData,
StreamData,
VertexBuildResponse,
VerticesOrderResponse,
)
@ -204,3 +206,51 @@ async def build_vertex(
logger.error(f"Error building vertex: {exc}")
logger.exception(exc)
raise HTTPException(status_code=500, detail=str(exc)) from exc
# Now onto an endpoint that is an SSE endpoint
# it will receive a component_id and a flow_id
#
@router.get("/build/{flow_id}/{vertex_id}/stream", response_class=StreamingResponse)
async def build_vertex_stream(
flow_id: str,
vertex_id: str,
chat_service: "ChatService" = Depends(get_chat_service),
):
"""Build a vertex instead of the entire graph."""
try:
async def stream_vertex():
try:
cache = chat_service.get_cache(flow_id)
if not cache:
# If there's no cache
raise ValueError(f"No cache found for {flow_id}.")
else:
graph = cache.get("result")
vertex = graph.get_vertex(vertex_id)
if not vertex.pinned or not vertex._built:
stream_data = StreamData(
event="message",
data={"message": "Building vertex"},
)
yield str(stream_data)
async for chunk in vertex.stream():
stream_data = StreamData(
event="message",
data={"chunk": chunk},
)
yield str(stream_data)
else:
raise ValueError(f"No result found for vertex {vertex_id}")
except Exception as exc:
yield str(StreamData(event="error", data={"error": str(exc)}))
yield str(StreamData(event="close", data={"message": "Stream closed"}))
return StreamingResponse(stream_vertex(), media_type="text/event-stream")
except Exception as exc:
raise HTTPException(status_code=500, detail="Error building vertex") from exc