feat(index.tsx): add support for streaming build process using EventSource

The commented out code for the /build/{client_id} endpoint has been removed as it is no longer needed. The new implementation uses the /build/init endpoint to initiate the build process and then establishes an SSE connection using EventSource to stream the build process. This allows for a more efficient and responsive build process as the client can receive updates in real-time.
🔥 chore(chat.py, index.tsx): remove commented out code for /build/{client_id} endpoint
This commit is contained in:
Gabriel Luiz Freitas Almeida 2023-06-12 11:49:59 -03:00
commit 806d815a10
2 changed files with 28 additions and 67 deletions

View file

@ -46,44 +46,6 @@ async def init_build(graph_data: dict):
return JSONResponse(content={"flowId": flow_id})
# @router.post("/build/{client_id}", response_class=StreamingResponse)
# async def stream_build(client_id: str, graph_data: dict):
# """Build langchain object from data_graph."""
# async def event_stream(graph_data):
# node_id = None
# try:
# graph_data = graph_data.get("data")
# if not graph_data:
# raise HTTPException(status_code=400, detail="No data provided")
# logger.debug("Building langchain object")
# graph = Graph.from_payload(graph_data)
# for node_repr, node_id in graph.generator_build():
# logger.debug(
# f"Building node {node_repr[:50]}{'...' if len(node_repr) > 50 else ''}"
# )
# response = json.dumps(
# {
# "valid": True,
# "params": node_repr,
# "id": node_id,
# }
# )
# yield f"data: {response}\n\n" # SSE format
# chat_manager.set_cache(client_id, graph.build())
# except Exception as exc:
# logger.exception(exc)
# error_response = json.dumps(
# {"valid": False, "params": str(exc), "id": node_id}
# )
# yield f"data: {error_response}\n\n" # SSE format
# return StreamingResponse(event_stream(graph_data), media_type="text/event-stream")
@router.get("/build/stream/{flow_id}", response_class=StreamingResponse)
async def stream_build(flow_id: str):
"""Stream the build process based on stored flow data."""

View file

@ -26,17 +26,15 @@ export default function BuildTrigger({
const { updateSSEData } = useSSE();
const CHUNK_DELIMITER = "\n\n";
async function handleBuild(flow: FlowType) {
const minimumLoadingTime = 200; // in milliseconds
const startTime = Date.now();
setIsBuilding(true);
try {
const allChunksValid = await postDataToServer(`/build/${flow.id}`, flow);
const allNodesValid = await streamNodeData(`/build/init`, flow);
await enforceMinimumLoadingTime(startTime, minimumLoadingTime);
setIsBuilt(allChunksValid);
setIsBuilt(allNodesValid);
} catch (error) {
console.error("Error:", error);
} finally {
@ -44,40 +42,41 @@ export default function BuildTrigger({
}
}
async function postDataToServer(apiUrl: string, flow: FlowType) {
let allChunksValid = true;
async function streamNodeData(apiUrl: string, flow: FlowType) {
// Step 1: Make a POST request to send the flow data and receive a unique session ID
const response = await axios.post(apiUrl, flow);
const { flowId } = response.data;
await axios({
method: "post",
url: apiUrl,
data: { data: flow },
headers: { "Content-Type": "application/json" },
onDownloadProgress: (progressEvent) => {
const chunks =
progressEvent.event.currentTarget.responseText.split(CHUNK_DELIMITER);
chunks.forEach((chunk) => {
if (chunk === "") {
return;
}
const isValid = processChunk(chunk);
allChunksValid = allChunksValid && isValid;
});
},
});
// Step 2: Use the session ID to establish an SSE connection using EventSource
return allChunksValid;
let allNodesValid = true;
apiUrl = `/build/stream/${flowId}`;
const eventSource = new EventSource(apiUrl);
eventSource.onmessage = (event) => {
const parsedData = JSON.parse(event.data);
if (parsedData.end_of_stream) {
eventSource.close();
return;
}
const isValid = processStreamResult(parsedData);
allNodesValid = allNodesValid && isValid;
};
eventSource.onerror = (error) => {
console.error("EventSource failed:", error);
eventSource.close();
};
return allNodesValid;
}
function processChunk(chunk: string) {
function processStreamResult(parsedData) {
// Process each chunk of data here
// Parse the chunk and update the context
let parsedData = { valid: false, id: null };
try {
parsedData = JSON.parse(chunk.slice(6)); // Remove the "data: " part
updateSSEData({ [parsedData.id]: parsedData });
} catch (err) {
console.log("Chunk is not valid JSON: ", chunk);
console.log("Error parsing chunk: ", err);
console.log("Error parsing stream data: ", err);
}
return parsedData.valid;
}