diff --git a/src/frontend/src/components/newChatView/chatMessage/index.tsx b/src/frontend/src/components/newChatView/chatMessage/index.tsx index a3570c9aa..36fb3955e 100644 --- a/src/frontend/src/components/newChatView/chatMessage/index.tsx +++ b/src/frontend/src/components/newChatView/chatMessage/index.tsx @@ -17,12 +17,65 @@ export default function ChatMessage({ chat, lockChat, lastMessage, + updateChat, }: chatMessagePropsType): JSX.Element { const convert = new Convert({ newline: true }); const [hidden, setHidden] = useState(true); const template = chat.template; const [promptOpen, setPromptOpen] = useState(false); - const chat_message = chat.message.toString(); + const [streamUrl, setStreamUrl] = useState(chat.stream_url); + const [chatMessage, setChatMessage] = useState(chat.message.toString()); + const [isStreaming, setIsStreaming] = useState(false); + + // The idea now is that chat.stream_url MAY be a URL if we should stream the output of the chat + // probably the message is empty when we have a stream_url + // what we need is to update the chat_message with the SSE data + const streamChunks = (url: string) => { + setIsStreaming(true); // Streaming starts + return new Promise((resolve, reject) => { + const eventSource = new EventSource(url); + eventSource.onmessage = (event) => { + let parsedData = JSON.parse(event.data); + if (parsedData.chunk) { + setChatMessage((prev) => prev + parsedData.chunk); + } + }; + eventSource.onerror = (event) => { + reject(new Error("Streaming failed")); + setIsStreaming(false); + eventSource.close(); + }; + eventSource.addEventListener("close", (event) => { + setStreamUrl(null); // Update state to reflect the stream is closed + resolve(true); + setIsStreaming(false); + eventSource.close(); + }); + }); + }; + + useEffect(() => { + if (streamUrl && chat.message === "") { + streamChunks(streamUrl) + .then(() => { + if (updateChat) { + updateChat(chat, chatMessage, streamUrl); + } + }) + .catch((error) => { + console.error(error); + }); + } + }, [streamUrl]); + + useEffect(() => { + // This effect is specifically for calling updateChat after streaming ends + if (!isStreaming && streamUrl === null) { + if (updateChat) { + updateChat(chat, chatMessage, streamUrl); + } + } + }, [isStreaming]); useEffect(() => { const element = document.getElementById("last-chat-message"); @@ -103,7 +156,7 @@ export default function ChatMessage({ remarkPlugins={[remarkGfm, remarkMath]} rehypePlugins={[rehypeMathjax]} className="markdown prose min-w-full text-primary word-break-break-word - dark:prose-invert" +dark:prose-invert" components={{ pre({ node, ...props }) { return <>{props.children}; @@ -161,10 +214,10 @@ export default function ChatMessage({ }, }} > - {chat_message} + {chatMessage} ), - [chat.message, chat_message] + [chat.message, chatMessage] )} {chat.files && ( diff --git a/src/frontend/src/components/newChatView/index.tsx b/src/frontend/src/components/newChatView/index.tsx index f84402c9f..b0cec426a 100644 --- a/src/frontend/src/components/newChatView/index.tsx +++ b/src/frontend/src/components/newChatView/index.tsx @@ -1,4 +1,4 @@ -import { cloneDeep } from "lodash"; +import _, { cloneDeep } from "lodash"; import { useEffect, useRef, useState } from "react"; import IconComponent from "../../components/genericIconComponent"; import { deleteFlowPool } from "../../controllers/API"; @@ -19,7 +19,6 @@ import ChatMessage from "./chatMessage"; export default function newChatView(): JSX.Element { const [chatValue, setChatValue] = useState(""); - const [chatHistory, setChatHistory] = useState([]); const { flowPool, outputs, @@ -36,6 +35,7 @@ export default function newChatView(): JSX.Element { const [lockChat, setLockChat] = useState(false); const messagesRef = useRef(null); const isBuilding = useFlowStore((state) => state.isBuilding); + const [chatHistory, setChatHistory] = useState([]); const inputTypes = inputs.map((obj) => obj.type); const inputIds = inputs.map((obj) => obj.id); @@ -67,20 +67,30 @@ export default function newChatView(): JSX.Element { }); const chatMessages: ChatMessageType[] = chatOutputResponses .sort((a, b) => Date.parse(a.timestamp) - Date.parse(b.timestamp)) - .filter((output) => !!output.data.artifacts?.message) - .map((output) => { + // + .filter((output) => output.data.artifacts?.message !== null) + .map((output, index) => { try { - const { sender, message, sender_name } = output.data + const { sender, message, sender_name, stream_url } = output.data .artifacts as ChatOutputType; + const componentId = output.id + index; + const is_ai = sender === "Machine" || sender === null; - return { isSend: !is_ai, message: message, sender_name }; + return { + isSend: !is_ai, + message: message, + sender_name, + id: componentId, + stream_url: stream_url, + }; } catch (e) { console.error(e); return { isSend: false, message: "Error parsing message", sender_name: "Error", + id: output.id + index, }; } }); @@ -148,6 +158,32 @@ export default function newChatView(): JSX.Element { if (lockChat) setLockChat(false); } + function updateChat( + chat: ChatMessageType, + message: string, + stream_url: string | null + ) { + if (message === "") return; + console.log(`updateChat: ${message}`); + console.log("chatHistory:", chatHistory); + chat.message = message; + chat.stream_url = stream_url; + // chat is one of the chatHistory + setChatHistory((oldChatHistory) => { + const index = oldChatHistory.findIndex((ch) => ch.id === chat.id); + + if (index === -1) return oldChatHistory; + let newChatHistory = _.cloneDeep(oldChatHistory); + newChatHistory = [ + ...newChatHistory.slice(0, index), + chat, + ...newChatHistory.slice(index + 1), + ]; + console.log("newChatHistory:", newChatHistory); + return newChatHistory; + }); + } + return (
@@ -172,7 +208,8 @@ export default function newChatView(): JSX.Element { lockChat={lockChat} chat={chat} lastMessage={chatHistory.length - 1 === index ? true : false} - key={index} + key={`${chat.id}-${index}`} + updateChat={updateChat} /> )) ) : (