diff --git a/src/backend/langflow/api/v1/chat.py b/src/backend/langflow/api/v1/chat.py index 0e0f09501..1353090d6 100644 --- a/src/backend/langflow/api/v1/chat.py +++ b/src/backend/langflow/api/v1/chat.py @@ -46,44 +46,6 @@ async def init_build(graph_data: dict): return JSONResponse(content={"flowId": flow_id}) -# @router.post("/build/{client_id}", response_class=StreamingResponse) -# async def stream_build(client_id: str, graph_data: dict): -# """Build langchain object from data_graph.""" - -# async def event_stream(graph_data): -# node_id = None -# try: -# graph_data = graph_data.get("data") -# if not graph_data: -# raise HTTPException(status_code=400, detail="No data provided") - -# logger.debug("Building langchain object") -# graph = Graph.from_payload(graph_data) -# for node_repr, node_id in graph.generator_build(): -# logger.debug( -# f"Building node {node_repr[:50]}{'...' if len(node_repr) > 50 else ''}" -# ) -# response = json.dumps( -# { -# "valid": True, -# "params": node_repr, -# "id": node_id, -# } -# ) -# yield f"data: {response}\n\n" # SSE format - -# chat_manager.set_cache(client_id, graph.build()) - -# except Exception as exc: -# logger.exception(exc) -# error_response = json.dumps( -# {"valid": False, "params": str(exc), "id": node_id} -# ) -# yield f"data: {error_response}\n\n" # SSE format - -# return StreamingResponse(event_stream(graph_data), media_type="text/event-stream") - - @router.get("/build/stream/{flow_id}", response_class=StreamingResponse) async def stream_build(flow_id: str): """Stream the build process based on stored flow data.""" diff --git a/src/frontend/src/components/chatComponent/buildTrigger/index.tsx b/src/frontend/src/components/chatComponent/buildTrigger/index.tsx index 88ceb5f4d..897a79c17 100644 --- a/src/frontend/src/components/chatComponent/buildTrigger/index.tsx +++ b/src/frontend/src/components/chatComponent/buildTrigger/index.tsx @@ -26,17 +26,15 @@ export default function BuildTrigger({ const { updateSSEData } = useSSE(); - const CHUNK_DELIMITER = "\n\n"; - async function handleBuild(flow: FlowType) { const minimumLoadingTime = 200; // in milliseconds const startTime = Date.now(); setIsBuilding(true); try { - const allChunksValid = await postDataToServer(`/build/${flow.id}`, flow); + const allNodesValid = await streamNodeData(`/build/init`, flow); await enforceMinimumLoadingTime(startTime, minimumLoadingTime); - setIsBuilt(allChunksValid); + setIsBuilt(allNodesValid); } catch (error) { console.error("Error:", error); } finally { @@ -44,40 +42,41 @@ export default function BuildTrigger({ } } - async function postDataToServer(apiUrl: string, flow: FlowType) { - let allChunksValid = true; + async function streamNodeData(apiUrl: string, flow: FlowType) { + // Step 1: Make a POST request to send the flow data and receive a unique session ID + const response = await axios.post(apiUrl, flow); + const { flowId } = response.data; - await axios({ - method: "post", - url: apiUrl, - data: { data: flow }, - headers: { "Content-Type": "application/json" }, - onDownloadProgress: (progressEvent) => { - const chunks = - progressEvent.event.currentTarget.responseText.split(CHUNK_DELIMITER); - chunks.forEach((chunk) => { - if (chunk === "") { - return; - } - const isValid = processChunk(chunk); - allChunksValid = allChunksValid && isValid; - }); - }, - }); + // Step 2: Use the session ID to establish an SSE connection using EventSource - return allChunksValid; + let allNodesValid = true; + apiUrl = `/build/stream/${flowId}`; + const eventSource = new EventSource(apiUrl); + + eventSource.onmessage = (event) => { + const parsedData = JSON.parse(event.data); + if (parsedData.end_of_stream) { + eventSource.close(); + return; + } + const isValid = processStreamResult(parsedData); + allNodesValid = allNodesValid && isValid; + }; + + eventSource.onerror = (error) => { + console.error("EventSource failed:", error); + eventSource.close(); + }; + return allNodesValid; } - function processChunk(chunk: string) { + function processStreamResult(parsedData) { // Process each chunk of data here // Parse the chunk and update the context - let parsedData = { valid: false, id: null }; try { - parsedData = JSON.parse(chunk.slice(6)); // Remove the "data: " part updateSSEData({ [parsedData.id]: parsedData }); } catch (err) { - console.log("Chunk is not valid JSON: ", chunk); - console.log("Error parsing chunk: ", err); + console.log("Error parsing stream data: ", err); } return parsedData.valid; }