From 806d815a100983f76ed2fa39aead2dfc1fa3b8de Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Mon, 12 Jun 2023 11:49:59 -0300 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20feat(index.tsx):=20add=20support=20?= =?UTF-8?q?for=20streaming=20build=20process=20using=20EventSource=20The?= =?UTF-8?q?=20commented=20out=20code=20for=20the=20/build/{client=5Fid}=20?= =?UTF-8?q?endpoint=20has=20been=20removed=20as=20it=20is=20no=20longer=20?= =?UTF-8?q?needed.=20The=20new=20implementation=20uses=20the=20/build/init?= =?UTF-8?q?=20endpoint=20to=20initiate=20the=20build=20process=20and=20the?= =?UTF-8?q?n=20establishes=20an=20SSE=20connection=20using=20EventSource?= =?UTF-8?q?=20to=20stream=20the=20build=20process.=20This=20allows=20for?= =?UTF-8?q?=20a=20more=20efficient=20and=20responsive=20build=20process=20?= =?UTF-8?q?as=20the=20client=20can=20receive=20updates=20in=20real-time.?= =?UTF-8?q?=20=F0=9F=94=A5=20chore(chat.py,=20index.tsx):=20remove=20comme?= =?UTF-8?q?nted=20out=20code=20for=20/build/{client=5Fid}=20endpoint?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/backend/langflow/api/v1/chat.py | 38 ------------- .../chatComponent/buildTrigger/index.tsx | 57 +++++++++---------- 2 files changed, 28 insertions(+), 67 deletions(-) diff --git a/src/backend/langflow/api/v1/chat.py b/src/backend/langflow/api/v1/chat.py index 0e0f09501..1353090d6 100644 --- a/src/backend/langflow/api/v1/chat.py +++ b/src/backend/langflow/api/v1/chat.py @@ -46,44 +46,6 @@ async def init_build(graph_data: dict): return JSONResponse(content={"flowId": flow_id}) -# @router.post("/build/{client_id}", response_class=StreamingResponse) -# async def stream_build(client_id: str, graph_data: dict): -# """Build langchain object from data_graph.""" - -# async def event_stream(graph_data): -# node_id = None -# try: -# graph_data = graph_data.get("data") -# if not graph_data: -# raise HTTPException(status_code=400, detail="No data provided") - -# logger.debug("Building langchain object") -# graph = Graph.from_payload(graph_data) -# for node_repr, node_id in graph.generator_build(): -# logger.debug( -# f"Building node {node_repr[:50]}{'...' if len(node_repr) > 50 else ''}" -# ) -# response = json.dumps( -# { -# "valid": True, -# "params": node_repr, -# "id": node_id, -# } -# ) -# yield f"data: {response}\n\n" # SSE format - -# chat_manager.set_cache(client_id, graph.build()) - -# except Exception as exc: -# logger.exception(exc) -# error_response = json.dumps( -# {"valid": False, "params": str(exc), "id": node_id} -# ) -# yield f"data: {error_response}\n\n" # SSE format - -# return StreamingResponse(event_stream(graph_data), media_type="text/event-stream") - - @router.get("/build/stream/{flow_id}", response_class=StreamingResponse) async def stream_build(flow_id: str): """Stream the build process based on stored flow data.""" diff --git a/src/frontend/src/components/chatComponent/buildTrigger/index.tsx b/src/frontend/src/components/chatComponent/buildTrigger/index.tsx index 88ceb5f4d..897a79c17 100644 --- a/src/frontend/src/components/chatComponent/buildTrigger/index.tsx +++ b/src/frontend/src/components/chatComponent/buildTrigger/index.tsx @@ -26,17 +26,15 @@ export default function BuildTrigger({ const { updateSSEData } = useSSE(); - const CHUNK_DELIMITER = "\n\n"; - async function handleBuild(flow: FlowType) { const minimumLoadingTime = 200; // in milliseconds const startTime = Date.now(); setIsBuilding(true); try { - const allChunksValid = await postDataToServer(`/build/${flow.id}`, flow); + const allNodesValid = await streamNodeData(`/build/init`, flow); await enforceMinimumLoadingTime(startTime, minimumLoadingTime); - setIsBuilt(allChunksValid); + setIsBuilt(allNodesValid); } catch (error) { console.error("Error:", error); } finally { @@ -44,40 +42,41 @@ export default function BuildTrigger({ } } - async function postDataToServer(apiUrl: string, flow: FlowType) { - let allChunksValid = true; + async function streamNodeData(apiUrl: string, flow: FlowType) { + // Step 1: Make a POST request to send the flow data and receive a unique session ID + const response = await axios.post(apiUrl, flow); + const { flowId } = response.data; - await axios({ - method: "post", - url: apiUrl, - data: { data: flow }, - headers: { "Content-Type": "application/json" }, - onDownloadProgress: (progressEvent) => { - const chunks = - progressEvent.event.currentTarget.responseText.split(CHUNK_DELIMITER); - chunks.forEach((chunk) => { - if (chunk === "") { - return; - } - const isValid = processChunk(chunk); - allChunksValid = allChunksValid && isValid; - }); - }, - }); + // Step 2: Use the session ID to establish an SSE connection using EventSource - return allChunksValid; + let allNodesValid = true; + apiUrl = `/build/stream/${flowId}`; + const eventSource = new EventSource(apiUrl); + + eventSource.onmessage = (event) => { + const parsedData = JSON.parse(event.data); + if (parsedData.end_of_stream) { + eventSource.close(); + return; + } + const isValid = processStreamResult(parsedData); + allNodesValid = allNodesValid && isValid; + }; + + eventSource.onerror = (error) => { + console.error("EventSource failed:", error); + eventSource.close(); + }; + return allNodesValid; } - function processChunk(chunk: string) { + function processStreamResult(parsedData) { // Process each chunk of data here // Parse the chunk and update the context - let parsedData = { valid: false, id: null }; try { - parsedData = JSON.parse(chunk.slice(6)); // Remove the "data: " part updateSSEData({ [parsedData.id]: parsedData }); } catch (err) { - console.log("Chunk is not valid JSON: ", chunk); - console.log("Error parsing chunk: ", err); + console.log("Error parsing stream data: ", err); } return parsedData.valid; }