diff --git a/src/frontend/src/components/newChatView/chatMessage/index.tsx b/src/frontend/src/components/newChatView/chatMessage/index.tsx index 266444d8b..c2819a3a1 100644 --- a/src/frontend/src/components/newChatView/chatMessage/index.tsx +++ b/src/frontend/src/components/newChatView/chatMessage/index.tsx @@ -1,5 +1,5 @@ import Convert from "ansi-to-html"; -import { useEffect, useMemo, useState } from "react"; +import { useEffect, useMemo, useState,useRef } from "react"; import Markdown from "react-markdown"; import rehypeMathjax from "rehype-mathjax"; import remarkGfm from "remark-gfm"; @@ -12,6 +12,7 @@ import IconComponent from "../../../components/genericIconComponent"; import { chatMessagePropsType } from "../../../types/components"; import { classNames } from "../../../utils/utils"; import FileCard from "../fileComponent"; +import useFlowStore from "../../../stores/flowStore"; export default function ChatMessage({ chat, @@ -29,6 +30,9 @@ export default function ChatMessage({ const chatMessageString = chat.message ? chat.message.toString() : ""; const [chatMessage, setChatMessage] = useState(chatMessageString); const [isStreaming, setIsStreaming] = useState(false); + const eventSource = useRef(undefined); + const updateFlowPool = useFlowStore((state) => state.updateFlowPool); + // The idea now is that chat.stream_url MAY be a URL if we should stream the output of the chat // probably the message is empty when we have a stream_url @@ -36,49 +40,48 @@ export default function ChatMessage({ const streamChunks = (url: string) => { setIsStreaming(true); // Streaming starts return new Promise((resolve, reject) => { - const eventSource = new EventSource(url); - eventSource.onmessage = (event) => { + eventSource.current = new EventSource(url); + eventSource.current.onmessage = (event) => { let parsedData = JSON.parse(event.data); if (parsedData.chunk) { setChatMessage((prev) => prev + parsedData.chunk); } }; - eventSource.onerror = (event) => { + eventSource.current.onerror = (event) => { + setIsStreaming(false); + eventSource.current?.close(); + setStreamUrl(undefined); reject(new Error("Streaming failed")); - setIsStreaming(false); - eventSource.close(); }; - eventSource.addEventListener("close", (event) => { - setStreamUrl(null); // Update state to reflect the stream is closed - resolve(true); + eventSource.current.addEventListener("close", (event) => { + setStreamUrl(undefined); // Update state to reflect the stream is closed + eventSource.current?.close(); setIsStreaming(false); - eventSource.close(); + resolve(true); }); }); }; + + useEffect(() => { - if (streamUrl && chat.message === "") { + console.log(streamUrl) + if (streamUrl&& !isStreaming) { streamChunks(streamUrl) .then(() => { if (updateChat) { - updateChat(chat, chatMessage, streamUrl); + console.log("rodou") + updateChat(chat, chatMessage); } }) .catch((error) => { console.error(error); }); } - }, [streamUrl]); - - useEffect(() => { - // This effect is specifically for calling updateChat after streaming ends - if (!isStreaming && streamUrl) { - if (updateChat) { - updateChat(chat, chatMessage, streamUrl); - } + return () => { + eventSource.current?.close(); } - }, [isStreaming]); + }, [streamUrl,chatMessage]); useEffect(() => { const element = document.getElementById("last-chat-message"); diff --git a/src/frontend/src/components/newChatView/index.tsx b/src/frontend/src/components/newChatView/index.tsx index 86b8ace6a..b1b41b8fc 100644 --- a/src/frontend/src/components/newChatView/index.tsx +++ b/src/frontend/src/components/newChatView/index.tsx @@ -34,6 +34,7 @@ export default function NewChatView({ const inputIds = inputs.map((obj) => obj.id); const outputIds = outputs.map((obj) => obj.id); const outputTypes = outputs.map((obj) => obj.type); + const updateFlowPool = useFlowStore((state)=>state.updateFlowPool) useEffect(() => { if (!outputTypes.includes("ChatOutput")) { @@ -67,14 +68,12 @@ export default function NewChatView({ const { sender, message, sender_name, stream_url } = output.data .artifacts as ChatOutputType; - const componentId = output.id + index; - const is_ai = sender === "Machine" || sender === null; return { isSend: !is_ai, message: message, sender_name, - id: componentId, + componentId: output.id, stream_url: stream_url, }; } catch (e) { @@ -83,7 +82,7 @@ export default function NewChatView({ isSend: false, message: "Error parsing message", sender_name: "Error", - id: output.id + index, + componentId: output.id, }; } }); @@ -120,27 +119,25 @@ export default function NewChatView({ function updateChat( chat: ChatMessageType, message: string, - stream_url: string | null - ) { - if (message === "") return; - console.log(`updateChat: ${message}`); - console.log("chatHistory:", chatHistory); - chat.message = message; - chat.stream_url = stream_url; + stream_url?: string + ) { + if (message === "") return; + chat.message = message; + console.log(message) // chat is one of the chatHistory - setChatHistory((oldChatHistory) => { - const index = oldChatHistory.findIndex((ch) => ch.id === chat.id); - - if (index === -1) return oldChatHistory; - let newChatHistory = _.cloneDeep(oldChatHistory); - newChatHistory = [ - ...newChatHistory.slice(0, index), - chat, - ...newChatHistory.slice(index + 1), - ]; - console.log("newChatHistory:", newChatHistory); - return newChatHistory; - }); + updateFlowPool(chat.componentId,{message,sender_name:chat.sender_name??"Bot",sender:"Machine"}) + // setChatHistory((oldChatHistory) => { + // const index = oldChatHistory.findIndex((ch) => ch.id === chat.id); + // if (index === -1) return oldChatHistory; + // let newChatHistory = _.cloneDeep(oldChatHistory); + // newChatHistory = [ + // ...newChatHistory.slice(0, index), + // chat, + // ...newChatHistory.slice(index + 1), + // ]; + // console.log("newChatHistory:", newChatHistory); + // return newChatHistory; + // }); } return ( @@ -167,7 +164,7 @@ export default function NewChatView({ lockChat={lockChat} chat={chat} lastMessage={chatHistory.length - 1 === index ? true : false} - key={`${chat.id}-${index}`} + key={`${chat.componentId}-${index}`} updateChat={updateChat} /> )) diff --git a/src/frontend/src/stores/flowStore.ts b/src/frontend/src/stores/flowStore.ts index 351997aaf..f9a211e86 100644 --- a/src/frontend/src/stores/flowStore.ts +++ b/src/frontend/src/stores/flowStore.ts @@ -51,7 +51,7 @@ const useFlowStore = create((set, get) => ({ setFlowPool: (flowPool) => { set({ flowPool }); }, - addDataToFlowPool: (data: any, nodeId: string) => { + addDataToFlowPool: (data: FlowPoolObjectType, nodeId: string) => { let newFlowPool = cloneDeep({ ...get().flowPool }); if (!newFlowPool[nodeId]) newFlowPool[nodeId] = [data]; else { @@ -416,12 +416,13 @@ const useFlowStore = create((set, get) => ({ } function handleBuildUpdate( vertexBuildData: VertexBuildTypeAPI, - status: BuildStatus + status: BuildStatus, + buildId:string ) { if (vertexBuildData && vertexBuildData.inactive_vertices) { get().removeFromVerticesBuild(vertexBuildData.inactive_vertices); } - get().addDataToFlowPool(vertexBuildData, vertexBuildData.id); + get().addDataToFlowPool({...vertexBuildData,buildId}, vertexBuildData.id); useFlowStore.getState().updateBuildStatus([vertexBuildData.id], status); } await updateFlowInDatabase({ diff --git a/src/frontend/src/types/chat/index.ts b/src/frontend/src/types/chat/index.ts index 3914dd2dc..e24c6e891 100644 --- a/src/frontend/src/types/chat/index.ts +++ b/src/frontend/src/types/chat/index.ts @@ -9,7 +9,7 @@ export type ChatMessageType = { files?: Array<{ data: string; type: string; data_type: string }>; prompt?: string; chatKey?: string; - id?: string; + componentId: string; stream_url?: string | null; sender_name?: string; }; diff --git a/src/frontend/src/types/components/index.ts b/src/frontend/src/types/components/index.ts index e7767eabb..b36b5d44c 100644 --- a/src/frontend/src/types/components/index.ts +++ b/src/frontend/src/types/components/index.ts @@ -527,7 +527,7 @@ export type chatMessagePropsType = { updateChat: ( chat: ChatMessageType, message: string, - stream_url: string + stream_url?: string ) => void; }; @@ -632,9 +632,9 @@ export type validationStatusType = { id: string; data: object | any; params: string; - progress: number; + progress?: number; valid: boolean; - duration: string; + duration?: string; }; export type ApiKey = { diff --git a/src/frontend/src/types/zustand/flow/index.ts b/src/frontend/src/types/zustand/flow/index.ts index 8b6093b51..c506b034f 100644 --- a/src/frontend/src/types/zustand/flow/index.ts +++ b/src/frontend/src/types/zustand/flow/index.ts @@ -18,6 +18,7 @@ export type ChatOutputType = { message: string; sender: string; sender_name: string; + stream_url?: string; }; export type FlowPoolObjectType = { @@ -25,9 +26,10 @@ export type FlowPoolObjectType = { valid: boolean; params: any; data: { artifacts: any | ChatOutputType | chatInputType; results: any | ChatOutputType | chatInputType }; - duration: string; - progress: number; + duration?: string; + progress?: number; id: string; + buildId: string; }; export type FlowPoolType = { @@ -40,7 +42,7 @@ export type FlowStoreType = { outputs: Array<{ type: string; id: string }>; hasIO: boolean; setFlowPool: (flowPool: FlowPoolType) => void; - addDataToFlowPool: (data: any, nodeId: string) => void; + addDataToFlowPool: (data: FlowPoolObjectType, nodeId: string) => void; CleanFlowPool: () => void; isBuilding: boolean; isPending: boolean; diff --git a/src/frontend/src/utils/buildUtils.ts b/src/frontend/src/utils/buildUtils.ts index 862472ee7..3acc3d972 100644 --- a/src/frontend/src/utils/buildUtils.ts +++ b/src/frontend/src/utils/buildUtils.ts @@ -9,7 +9,7 @@ type BuildVerticesParams = { flowId: string; // Assuming FlowType is the type for your flow nodeId?: string | null; // Assuming nodeId is of type string, and it's optional onGetOrderSuccess?: () => void; - onBuildUpdate?: (data: VertexBuildTypeAPI, status: BuildStatus) => void; // Replace any with the actual type if it's not any + onBuildUpdate?: (data: VertexBuildTypeAPI, status: BuildStatus,buildId:string) => void; // Replace any with the actual type if it's not any onBuildComplete?: (allNodesValid: boolean) => void; onBuildError?: (title, list, idList: string[]) => void; onBuildStart?: (idList: string[]) => void; @@ -48,7 +48,7 @@ export async function buildVertices({ let orderResponse; try { orderResponse = await getVerticesOrder(flowId, nodeId); - } catch (error) { + } catch (error:any) { console.log(error); setErrorData({ title: "Oops! Looks like you missed something", @@ -59,6 +59,7 @@ export async function buildVertices({ } if (onGetOrderSuccess) onGetOrderSuccess(); let verticesOrder: Array> = orderResponse.data.ids; + const runId = orderResponse.data.run_id; let vertices_layers: Array> = []; let stop = false; if (validateNodes) { @@ -102,14 +103,14 @@ export async function buildVertices({ onBuildUpdate ) { // If it is, skip building and set the state to inactive - onBuildUpdate(getInactiveVertexData(id), BuildStatus.INACTIVE); + onBuildUpdate(getInactiveVertexData(id), BuildStatus.INACTIVE,runId); buildResults.push(false); continue; } await buildVertex({ flowId, id, - onBuildUpdate, + onBuildUpdate:(data: VertexBuildTypeAPI, status: BuildStatus) => {if(onBuildUpdate) onBuildUpdate(data, status,runId)}, onBuildError, verticesIds, buildResults,