fix(frontend): add useRef hook to manage eventSource in ChatMessage component

feat(frontend): add support for process.env.PORT environment variable in server.ts
feat(frontend): add updateFlowPool function to NewChatView component
feat(frontend): add buildId parameter to addDataToFlowPool function in flowStore
feat(frontend): add stream_url property to ChatOutputType in flow types
This commit is contained in:
anovazzi1 2024-02-27 18:29:57 -03:00
commit df07cf413b
7 changed files with 64 additions and 60 deletions

View file

@ -1,5 +1,5 @@
import Convert from "ansi-to-html";
import { useEffect, useMemo, useState } from "react";
import { useEffect, useMemo, useState,useRef } from "react";
import Markdown from "react-markdown";
import rehypeMathjax from "rehype-mathjax";
import remarkGfm from "remark-gfm";
@ -12,6 +12,7 @@ import IconComponent from "../../../components/genericIconComponent";
import { chatMessagePropsType } from "../../../types/components";
import { classNames } from "../../../utils/utils";
import FileCard from "../fileComponent";
import useFlowStore from "../../../stores/flowStore";
export default function ChatMessage({
chat,
@ -29,6 +30,9 @@ export default function ChatMessage({
const chatMessageString = chat.message ? chat.message.toString() : "";
const [chatMessage, setChatMessage] = useState(chatMessageString);
const [isStreaming, setIsStreaming] = useState(false);
const eventSource = useRef<EventSource | undefined>(undefined);
const updateFlowPool = useFlowStore((state) => state.updateFlowPool);
// The idea now is that chat.stream_url MAY be a URL if we should stream the output of the chat
// probably the message is empty when we have a stream_url
@ -36,49 +40,48 @@ export default function ChatMessage({
const streamChunks = (url: string) => {
setIsStreaming(true); // Streaming starts
return new Promise<boolean>((resolve, reject) => {
const eventSource = new EventSource(url);
eventSource.onmessage = (event) => {
eventSource.current = new EventSource(url);
eventSource.current.onmessage = (event) => {
let parsedData = JSON.parse(event.data);
if (parsedData.chunk) {
setChatMessage((prev) => prev + parsedData.chunk);
}
};
eventSource.onerror = (event) => {
eventSource.current.onerror = (event) => {
setIsStreaming(false);
eventSource.current?.close();
setStreamUrl(undefined);
reject(new Error("Streaming failed"));
setIsStreaming(false);
eventSource.close();
};
eventSource.addEventListener("close", (event) => {
setStreamUrl(null); // Update state to reflect the stream is closed
resolve(true);
eventSource.current.addEventListener("close", (event) => {
setStreamUrl(undefined); // Update state to reflect the stream is closed
eventSource.current?.close();
setIsStreaming(false);
eventSource.close();
resolve(true);
});
});
};
useEffect(() => {
if (streamUrl && chat.message === "") {
console.log(streamUrl)
if (streamUrl&& !isStreaming) {
streamChunks(streamUrl)
.then(() => {
if (updateChat) {
updateChat(chat, chatMessage, streamUrl);
console.log("rodou")
updateChat(chat, chatMessage);
}
})
.catch((error) => {
console.error(error);
});
}
}, [streamUrl]);
useEffect(() => {
// This effect is specifically for calling updateChat after streaming ends
if (!isStreaming && streamUrl) {
if (updateChat) {
updateChat(chat, chatMessage, streamUrl);
}
return () => {
eventSource.current?.close();
}
}, [isStreaming]);
}, [streamUrl,chatMessage]);
useEffect(() => {
const element = document.getElementById("last-chat-message");

View file

@ -34,6 +34,7 @@ export default function NewChatView({
const inputIds = inputs.map((obj) => obj.id);
const outputIds = outputs.map((obj) => obj.id);
const outputTypes = outputs.map((obj) => obj.type);
const updateFlowPool = useFlowStore((state)=>state.updateFlowPool)
useEffect(() => {
if (!outputTypes.includes("ChatOutput")) {
@ -67,14 +68,12 @@ export default function NewChatView({
const { sender, message, sender_name, stream_url } = output.data
.artifacts as ChatOutputType;
const componentId = output.id + index;
const is_ai = sender === "Machine" || sender === null;
return {
isSend: !is_ai,
message: message,
sender_name,
id: componentId,
componentId: output.id,
stream_url: stream_url,
};
} catch (e) {
@ -83,7 +82,7 @@ export default function NewChatView({
isSend: false,
message: "Error parsing message",
sender_name: "Error",
id: output.id + index,
componentId: output.id,
};
}
});
@ -120,27 +119,25 @@ export default function NewChatView({
function updateChat(
chat: ChatMessageType,
message: string,
stream_url: string | null
) {
if (message === "") return;
console.log(`updateChat: ${message}`);
console.log("chatHistory:", chatHistory);
chat.message = message;
chat.stream_url = stream_url;
stream_url?: string
) {
if (message === "") return;
chat.message = message;
console.log(message)
// chat is one of the chatHistory
setChatHistory((oldChatHistory) => {
const index = oldChatHistory.findIndex((ch) => ch.id === chat.id);
if (index === -1) return oldChatHistory;
let newChatHistory = _.cloneDeep(oldChatHistory);
newChatHistory = [
...newChatHistory.slice(0, index),
chat,
...newChatHistory.slice(index + 1),
];
console.log("newChatHistory:", newChatHistory);
return newChatHistory;
});
updateFlowPool(chat.componentId,{message,sender_name:chat.sender_name??"Bot",sender:"Machine"})
// setChatHistory((oldChatHistory) => {
// const index = oldChatHistory.findIndex((ch) => ch.id === chat.id);
// if (index === -1) return oldChatHistory;
// let newChatHistory = _.cloneDeep(oldChatHistory);
// newChatHistory = [
// ...newChatHistory.slice(0, index),
// chat,
// ...newChatHistory.slice(index + 1),
// ];
// console.log("newChatHistory:", newChatHistory);
// return newChatHistory;
// });
}
return (
@ -167,7 +164,7 @@ export default function NewChatView({
lockChat={lockChat}
chat={chat}
lastMessage={chatHistory.length - 1 === index ? true : false}
key={`${chat.id}-${index}`}
key={`${chat.componentId}-${index}`}
updateChat={updateChat}
/>
))

View file

@ -51,7 +51,7 @@ const useFlowStore = create<FlowStoreType>((set, get) => ({
setFlowPool: (flowPool) => {
set({ flowPool });
},
addDataToFlowPool: (data: any, nodeId: string) => {
addDataToFlowPool: (data: FlowPoolObjectType, nodeId: string) => {
let newFlowPool = cloneDeep({ ...get().flowPool });
if (!newFlowPool[nodeId]) newFlowPool[nodeId] = [data];
else {
@ -416,12 +416,13 @@ const useFlowStore = create<FlowStoreType>((set, get) => ({
}
function handleBuildUpdate(
vertexBuildData: VertexBuildTypeAPI,
status: BuildStatus
status: BuildStatus,
buildId:string
) {
if (vertexBuildData && vertexBuildData.inactive_vertices) {
get().removeFromVerticesBuild(vertexBuildData.inactive_vertices);
}
get().addDataToFlowPool(vertexBuildData, vertexBuildData.id);
get().addDataToFlowPool({...vertexBuildData,buildId}, vertexBuildData.id);
useFlowStore.getState().updateBuildStatus([vertexBuildData.id], status);
}
await updateFlowInDatabase({

View file

@ -9,7 +9,7 @@ export type ChatMessageType = {
files?: Array<{ data: string; type: string; data_type: string }>;
prompt?: string;
chatKey?: string;
id?: string;
componentId: string;
stream_url?: string | null;
sender_name?: string;
};

View file

@ -527,7 +527,7 @@ export type chatMessagePropsType = {
updateChat: (
chat: ChatMessageType,
message: string,
stream_url: string
stream_url?: string
) => void;
};
@ -632,9 +632,9 @@ export type validationStatusType = {
id: string;
data: object | any;
params: string;
progress: number;
progress?: number;
valid: boolean;
duration: string;
duration?: string;
};
export type ApiKey = {

View file

@ -18,6 +18,7 @@ export type ChatOutputType = {
message: string;
sender: string;
sender_name: string;
stream_url?: string;
};
export type FlowPoolObjectType = {
@ -25,9 +26,10 @@ export type FlowPoolObjectType = {
valid: boolean;
params: any;
data: { artifacts: any | ChatOutputType | chatInputType; results: any | ChatOutputType | chatInputType };
duration: string;
progress: number;
duration?: string;
progress?: number;
id: string;
buildId: string;
};
export type FlowPoolType = {
@ -40,7 +42,7 @@ export type FlowStoreType = {
outputs: Array<{ type: string; id: string }>;
hasIO: boolean;
setFlowPool: (flowPool: FlowPoolType) => void;
addDataToFlowPool: (data: any, nodeId: string) => void;
addDataToFlowPool: (data: FlowPoolObjectType, nodeId: string) => void;
CleanFlowPool: () => void;
isBuilding: boolean;
isPending: boolean;

View file

@ -9,7 +9,7 @@ type BuildVerticesParams = {
flowId: string; // Assuming FlowType is the type for your flow
nodeId?: string | null; // Assuming nodeId is of type string, and it's optional
onGetOrderSuccess?: () => void;
onBuildUpdate?: (data: VertexBuildTypeAPI, status: BuildStatus) => void; // Replace any with the actual type if it's not any
onBuildUpdate?: (data: VertexBuildTypeAPI, status: BuildStatus,buildId:string) => void; // Replace any with the actual type if it's not any
onBuildComplete?: (allNodesValid: boolean) => void;
onBuildError?: (title, list, idList: string[]) => void;
onBuildStart?: (idList: string[]) => void;
@ -48,7 +48,7 @@ export async function buildVertices({
let orderResponse;
try {
orderResponse = await getVerticesOrder(flowId, nodeId);
} catch (error) {
} catch (error:any) {
console.log(error);
setErrorData({
title: "Oops! Looks like you missed something",
@ -59,6 +59,7 @@ export async function buildVertices({
}
if (onGetOrderSuccess) onGetOrderSuccess();
let verticesOrder: Array<Array<string>> = orderResponse.data.ids;
const runId = orderResponse.data.run_id;
let vertices_layers: Array<Array<string>> = [];
let stop = false;
if (validateNodes) {
@ -102,14 +103,14 @@ export async function buildVertices({
onBuildUpdate
) {
// If it is, skip building and set the state to inactive
onBuildUpdate(getInactiveVertexData(id), BuildStatus.INACTIVE);
onBuildUpdate(getInactiveVertexData(id), BuildStatus.INACTIVE,runId);
buildResults.push(false);
continue;
}
await buildVertex({
flowId,
id,
onBuildUpdate,
onBuildUpdate:(data: VertexBuildTypeAPI, status: BuildStatus) => {if(onBuildUpdate) onBuildUpdate(data, status,runId)},
onBuildError,
verticesIds,
buildResults,