From 68176511720e46d0df13cf01ed2dd98d03f7b37a Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Fri, 23 Feb 2024 19:14:12 -0300 Subject: [PATCH 1/7] Add flow_id parameter to Graph.from_payload() method --- src/backend/langflow/api/utils.py | 2 +- src/backend/langflow/graph/graph/base.py | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/backend/langflow/api/utils.py b/src/backend/langflow/api/utils.py index f92fb2986..bd71e6dcb 100644 --- a/src/backend/langflow/api/utils.py +++ b/src/backend/langflow/api/utils.py @@ -223,7 +223,7 @@ def build_and_cache_graph( flow: Flow = session.get(Flow, flow_id) if not flow or not flow.data: raise ValueError("Invalid flow ID") - other_graph = Graph.from_payload(flow.data) + other_graph = Graph.from_payload(flow.data, flow_id) if graph is None: graph = other_graph else: diff --git a/src/backend/langflow/graph/graph/base.py b/src/backend/langflow/graph/graph/base.py index 6e2d60bea..3535c0e2f 100644 --- a/src/backend/langflow/graph/graph/base.py +++ b/src/backend/langflow/graph/graph/base.py @@ -26,6 +26,7 @@ class Graph: self, nodes: List[Dict], edges: List[Dict[str, str]], + flow_id: Optional[str] = None, ) -> None: self.inputs = [] self.outputs = [] @@ -34,6 +35,7 @@ class Graph: self.raw_graph_data = {"nodes": nodes, "edges": edges} self._runs = 0 self._updates = 0 + self.flow_id = flow_id self.top_level_vertices = [] for vertex in self._vertices: @@ -113,7 +115,7 @@ class Graph: return predecessor_map, successor_map @classmethod - def from_payload(cls, payload: Dict) -> "Graph": + def from_payload(cls, payload: Dict, flow_id: str) -> "Graph": """ Creates a graph from a payload. @@ -128,7 +130,7 @@ class Graph: try: vertices = payload["nodes"] edges = payload["edges"] - return cls(vertices, edges) + return cls(vertices, edges, flow_id) except KeyError as exc: logger.exception(exc) raise ValueError( From 4d48805e49e1393bcf5b27dfb438e7e95803120d Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Fri, 23 Feb 2024 19:14:48 -0300 Subject: [PATCH 2/7] Add SSE endpoint for streaming vertex build --- src/backend/langflow/api/v1/chat.py | 50 +++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/src/backend/langflow/api/v1/chat.py b/src/backend/langflow/api/v1/chat.py index e0742a31e..d04588cba 100644 --- a/src/backend/langflow/api/v1/chat.py +++ b/src/backend/langflow/api/v1/chat.py @@ -10,12 +10,14 @@ from fastapi import ( WebSocketException, status, ) +from fastapi.responses import StreamingResponse from loguru import logger from sqlmodel import Session from langflow.api.utils import build_and_cache_graph, format_elapsed_time from langflow.api.v1.schemas import ( ResultData, + StreamData, VertexBuildResponse, VerticesOrderResponse, ) @@ -204,3 +206,51 @@ async def build_vertex( logger.error(f"Error building vertex: {exc}") logger.exception(exc) raise HTTPException(status_code=500, detail=str(exc)) from exc + + +# Now onto an endpoint that is an SSE endpoint +# it will receive a component_id and a flow_id +# +@router.get("/build/{flow_id}/{vertex_id}/stream", response_class=StreamingResponse) +async def build_vertex_stream( + flow_id: str, + vertex_id: str, + chat_service: "ChatService" = Depends(get_chat_service), +): + """Build a vertex instead of the entire graph.""" + try: + + async def stream_vertex(): + try: + cache = chat_service.get_cache(flow_id) + if not cache: + # If there's no cache + raise ValueError(f"No cache found for {flow_id}.") + else: + graph = cache.get("result") + + vertex = graph.get_vertex(vertex_id) + if not vertex.pinned or not vertex._built: + stream_data = StreamData( + event="message", + data={"message": "Building vertex"}, + ) + yield str(stream_data) + + async for chunk in vertex.stream(): + stream_data = StreamData( + event="message", + data={"chunk": chunk}, + ) + yield str(stream_data) + else: + raise ValueError(f"No result found for vertex {vertex_id}") + + except Exception as exc: + yield str(StreamData(event="error", data={"error": str(exc)})) + + yield str(StreamData(event="close", data={"message": "Stream closed"})) + + return StreamingResponse(stream_vertex(), media_type="text/event-stream") + except Exception as exc: + raise HTTPException(status_code=500, detail="Error building vertex") from exc From a8e27bac6d5723bd3ad2f4d654225a8dd38a1fdd Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Fri, 23 Feb 2024 19:15:16 -0300 Subject: [PATCH 3/7] Refactor ContractEdge log_transaction method --- src/backend/langflow/graph/edge/base.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/backend/langflow/graph/edge/base.py b/src/backend/langflow/graph/edge/base.py index 3301ca936..99b24e2da 100644 --- a/src/backend/langflow/graph/edge/base.py +++ b/src/backend/langflow/graph/edge/base.py @@ -1,10 +1,11 @@ from typing import TYPE_CHECKING, Any, List, Optional +from loguru import logger +from pydantic import BaseModel, Field + from langflow.graph.edge.utils import build_clean_params from langflow.services.deps import get_monitor_service from langflow.services.monitor.utils import log_message -from loguru import logger -from pydantic import BaseModel, Field if TYPE_CHECKING: from langflow.graph.vertex.base import Vertex @@ -135,11 +136,11 @@ class ContractEdge(Edge): log_transaction(self, source, target, "success") # If the target vertex is a power component we log messages - if ( - target.vertex_type == "ChatOutput" - and isinstance(target.params.get("message"), str) - or isinstance(target.params.get("message"), dict) + if target.vertex_type == "ChatOutput" and ( + isinstance(target.params.get("message"), str) or isinstance(target.params.get("message"), dict) ): + if target.params.get("message") == "": + return self.result await log_message( sender=target.params.get("sender", ""), sender_name=target.params.get("sender_name", ""), @@ -168,3 +169,4 @@ def log_transaction(edge: ContractEdge, source: "Vertex", target: "Vertex", stat monitor_service.add_row(table_name="transactions", data=data) except Exception as e: logger.error(f"Error logging transaction: {e}") + logger.error(f"Error logging transaction: {e}") From ed66136f85594e18874bfa6b28a109c240a86d11 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Fri, 23 Feb 2024 19:15:43 -0300 Subject: [PATCH 4/7] Refactor ChatVertex to support message streaming --- src/backend/langflow/graph/vertex/types.py | 67 +++++++++++++++++++++- src/backend/langflow/utils/schemas.py | 1 + 2 files changed, 67 insertions(+), 1 deletion(-) diff --git a/src/backend/langflow/graph/vertex/types.py b/src/backend/langflow/graph/vertex/types.py index e80eef594..705b5d114 100644 --- a/src/backend/langflow/graph/vertex/types.py +++ b/src/backend/langflow/graph/vertex/types.py @@ -1,14 +1,16 @@ import ast import json -from typing import Callable, Dict, List, Optional, Union +from typing import AsyncIterator, Callable, Dict, Iterator, List, Optional, Union import yaml from langchain_core.messages import AIMessage +from loguru import logger from langflow.graph.utils import UnbuiltObject, flatten_list from langflow.graph.vertex.base import StatefulVertex, StatelessVertex from langflow.interface.utils import extract_input_variables_from_prompt from langflow.schema import Record +from langflow.services.monitor.utils import log_message from langflow.utils.schemas import ChatOutputResponse @@ -314,6 +316,20 @@ class ChatVertex(StatelessVertex): super().__init__(data, graph=graph, base_type="custom_components", is_task=True) self.steps = [self._build, self._run] + def build_stream_url(self): + return f"/api/v1/build/{self.graph.flow_id}/{self.id}/stream" + + async def _build(self, user_id=None): + """ + Initiate the build process. + """ + logger.debug(f"Building {self.vertex_type}") + await self._build_each_node_in_params_dict(user_id) + await self._get_and_instantiate_class(user_id) + self._validate_built_object() + + self._built = True + def _built_object_repr(self): if self.task_id and self.is_task: if task := self.get_task(): @@ -332,6 +348,8 @@ class ChatVertex(StatelessVertex): artifacts = None sender = self.params.get("sender", None) sender_name = self.params.get("sender_name", None) + message = self.params.get("message", None) + stream_url = None if isinstance(self._built_object, AIMessage): artifacts = ChatOutputResponse.from_message( self._built_object, @@ -345,8 +363,13 @@ class ChatVertex(StatelessVertex): message = dict_to_codeblock(self._built_object) elif isinstance(self._built_object, Record): message = self._built_object.text + elif isinstance(message, (AsyncIterator, Iterator)): + stream_url = self.build_stream_url() + message = "" elif not isinstance(self._built_object, str): message = str(self._built_object) + # if the message is a generator or iterator + # it means that it is a stream of messages else: message = self._built_object @@ -354,14 +377,56 @@ class ChatVertex(StatelessVertex): message=message, sender=sender, sender_name=sender_name, + stream_url=stream_url, ) if artifacts: self.artifacts = artifacts.model_dump() + if isinstance(self._built_object, (AsyncIterator, Iterator)): + if self.params["as_record"]: + self._built_object = Record(text=message, data=self.artifacts) + else: + self._built_object = message self._built_result = self._built_object else: await super()._run(*args, **kwargs) + async def stream(self): + iterator = self.params.get("message", None) + if not isinstance(iterator, (AsyncIterator, Iterator)): + raise ValueError("The message must be an iterator or an async iterator.") + is_async = isinstance(iterator, AsyncIterator) + complete_message = "" + if is_async: + async for message in iterator: + message = message.content if hasattr(message, "content") else message + message = message.text if hasattr(message, "text") else message + yield message + complete_message += message + else: + for message in iterator: + message = message.content if hasattr(message, "content") else message + message = message.text if hasattr(message, "text") else message + yield message + complete_message += message + self._built_object = Record(text=complete_message, data=self.artifacts) + self._built_result = complete_message + # Update artifacts with the message + # and remove the stream_url + self.artifacts = ChatOutputResponse( + message=complete_message, + sender=self.params.get("sender", ""), + sender_name=self.params.get("sender_name", ""), + ).model_dump() + + await log_message( + sender=self.params.get("sender", ""), + sender_name=self.params.get("sender_name", ""), + message=complete_message, + session_id=self.params.get("session_id", ""), + artifacts=self.artifacts, + ) + class RoutingVertex(StatelessVertex): def __init__(self, data: Dict, graph): diff --git a/src/backend/langflow/utils/schemas.py b/src/backend/langflow/utils/schemas.py index bcde2eb2f..8d0b2db12 100644 --- a/src/backend/langflow/utils/schemas.py +++ b/src/backend/langflow/utils/schemas.py @@ -10,6 +10,7 @@ class ChatOutputResponse(BaseModel): message: Union[str, List[Union[str, Dict]]] sender: Optional[str] = "Machine" sender_name: Optional[str] = "AI" + stream_url: Optional[str] = None @classmethod def from_message( From 1facc44cd08d576c5ff36fd96ec86fe22de6d076 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Fri, 23 Feb 2024 19:16:30 -0300 Subject: [PATCH 5/7] Update chat message and implement streaming functionality --- .../newChatView/chatMessage/index.tsx | 61 +++++++++++++++++-- .../src/components/newChatView/index.tsx | 51 +++++++++++++--- 2 files changed, 101 insertions(+), 11 deletions(-) diff --git a/src/frontend/src/components/newChatView/chatMessage/index.tsx b/src/frontend/src/components/newChatView/chatMessage/index.tsx index a3570c9aa..36fb3955e 100644 --- a/src/frontend/src/components/newChatView/chatMessage/index.tsx +++ b/src/frontend/src/components/newChatView/chatMessage/index.tsx @@ -17,12 +17,65 @@ export default function ChatMessage({ chat, lockChat, lastMessage, + updateChat, }: chatMessagePropsType): JSX.Element { const convert = new Convert({ newline: true }); const [hidden, setHidden] = useState(true); const template = chat.template; const [promptOpen, setPromptOpen] = useState(false); - const chat_message = chat.message.toString(); + const [streamUrl, setStreamUrl] = useState(chat.stream_url); + const [chatMessage, setChatMessage] = useState(chat.message.toString()); + const [isStreaming, setIsStreaming] = useState(false); + + // The idea now is that chat.stream_url MAY be a URL if we should stream the output of the chat + // probably the message is empty when we have a stream_url + // what we need is to update the chat_message with the SSE data + const streamChunks = (url: string) => { + setIsStreaming(true); // Streaming starts + return new Promise((resolve, reject) => { + const eventSource = new EventSource(url); + eventSource.onmessage = (event) => { + let parsedData = JSON.parse(event.data); + if (parsedData.chunk) { + setChatMessage((prev) => prev + parsedData.chunk); + } + }; + eventSource.onerror = (event) => { + reject(new Error("Streaming failed")); + setIsStreaming(false); + eventSource.close(); + }; + eventSource.addEventListener("close", (event) => { + setStreamUrl(null); // Update state to reflect the stream is closed + resolve(true); + setIsStreaming(false); + eventSource.close(); + }); + }); + }; + + useEffect(() => { + if (streamUrl && chat.message === "") { + streamChunks(streamUrl) + .then(() => { + if (updateChat) { + updateChat(chat, chatMessage, streamUrl); + } + }) + .catch((error) => { + console.error(error); + }); + } + }, [streamUrl]); + + useEffect(() => { + // This effect is specifically for calling updateChat after streaming ends + if (!isStreaming && streamUrl === null) { + if (updateChat) { + updateChat(chat, chatMessage, streamUrl); + } + } + }, [isStreaming]); useEffect(() => { const element = document.getElementById("last-chat-message"); @@ -103,7 +156,7 @@ export default function ChatMessage({ remarkPlugins={[remarkGfm, remarkMath]} rehypePlugins={[rehypeMathjax]} className="markdown prose min-w-full text-primary word-break-break-word - dark:prose-invert" +dark:prose-invert" components={{ pre({ node, ...props }) { return <>{props.children}; @@ -161,10 +214,10 @@ export default function ChatMessage({ }, }} > - {chat_message} + {chatMessage} ), - [chat.message, chat_message] + [chat.message, chatMessage] )} {chat.files && ( diff --git a/src/frontend/src/components/newChatView/index.tsx b/src/frontend/src/components/newChatView/index.tsx index f84402c9f..b0cec426a 100644 --- a/src/frontend/src/components/newChatView/index.tsx +++ b/src/frontend/src/components/newChatView/index.tsx @@ -1,4 +1,4 @@ -import { cloneDeep } from "lodash"; +import _, { cloneDeep } from "lodash"; import { useEffect, useRef, useState } from "react"; import IconComponent from "../../components/genericIconComponent"; import { deleteFlowPool } from "../../controllers/API"; @@ -19,7 +19,6 @@ import ChatMessage from "./chatMessage"; export default function newChatView(): JSX.Element { const [chatValue, setChatValue] = useState(""); - const [chatHistory, setChatHistory] = useState([]); const { flowPool, outputs, @@ -36,6 +35,7 @@ export default function newChatView(): JSX.Element { const [lockChat, setLockChat] = useState(false); const messagesRef = useRef(null); const isBuilding = useFlowStore((state) => state.isBuilding); + const [chatHistory, setChatHistory] = useState([]); const inputTypes = inputs.map((obj) => obj.type); const inputIds = inputs.map((obj) => obj.id); @@ -67,20 +67,30 @@ export default function newChatView(): JSX.Element { }); const chatMessages: ChatMessageType[] = chatOutputResponses .sort((a, b) => Date.parse(a.timestamp) - Date.parse(b.timestamp)) - .filter((output) => !!output.data.artifacts?.message) - .map((output) => { + // + .filter((output) => output.data.artifacts?.message !== null) + .map((output, index) => { try { - const { sender, message, sender_name } = output.data + const { sender, message, sender_name, stream_url } = output.data .artifacts as ChatOutputType; + const componentId = output.id + index; + const is_ai = sender === "Machine" || sender === null; - return { isSend: !is_ai, message: message, sender_name }; + return { + isSend: !is_ai, + message: message, + sender_name, + id: componentId, + stream_url: stream_url, + }; } catch (e) { console.error(e); return { isSend: false, message: "Error parsing message", sender_name: "Error", + id: output.id + index, }; } }); @@ -148,6 +158,32 @@ export default function newChatView(): JSX.Element { if (lockChat) setLockChat(false); } + function updateChat( + chat: ChatMessageType, + message: string, + stream_url: string | null + ) { + if (message === "") return; + console.log(`updateChat: ${message}`); + console.log("chatHistory:", chatHistory); + chat.message = message; + chat.stream_url = stream_url; + // chat is one of the chatHistory + setChatHistory((oldChatHistory) => { + const index = oldChatHistory.findIndex((ch) => ch.id === chat.id); + + if (index === -1) return oldChatHistory; + let newChatHistory = _.cloneDeep(oldChatHistory); + newChatHistory = [ + ...newChatHistory.slice(0, index), + chat, + ...newChatHistory.slice(index + 1), + ]; + console.log("newChatHistory:", newChatHistory); + return newChatHistory; + }); + } + return (
@@ -172,7 +208,8 @@ export default function newChatView(): JSX.Element { lockChat={lockChat} chat={chat} lastMessage={chatHistory.length - 1 === index ? true : false} - key={index} + key={`${chat.id}-${index}`} + updateChat={updateChat} /> )) ) : ( From 6079461f3452e91e5aa97827242e258def98bed0 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Fri, 23 Feb 2024 19:16:56 -0300 Subject: [PATCH 6/7] Fix status class for built nodes --- src/frontend/src/CustomNodes/GenericNode/index.tsx | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/frontend/src/CustomNodes/GenericNode/index.tsx b/src/frontend/src/CustomNodes/GenericNode/index.tsx index fcbf12c80..c62abf6b9 100644 --- a/src/frontend/src/CustomNodes/GenericNode/index.tsx +++ b/src/frontend/src/CustomNodes/GenericNode/index.tsx @@ -174,6 +174,8 @@ export default function GenericNode({ return "green-status"; } else if (!isValid && buildStatus === BuildStatus.INACTIVE) { return "gray-status"; + } else if (!validationStatus && buildStatus === BuildStatus.BUILT) { + return "green-status"; } else if (!isValid && buildStatus === BuildStatus.BUILT) { return "red-status"; } else if (!validationStatus) { @@ -202,7 +204,6 @@ export default function GenericNode({ let isInvalid = validationStatus && !validationStatus.valid; if (buildStatus === BuildStatus.INACTIVE && isInvalid) { // INACTIVE should have its own class - // different from BUILT and TO_BUILD return "inactive-status"; } if (buildStatus === BuildStatus.BUILT && isInvalid) { From 927b5c59d4180790763e9801c04aa132a5ffd021 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Fri, 23 Feb 2024 19:17:34 -0300 Subject: [PATCH 7/7] Add id and stream_url to ChatMessageType and ChatOutputType --- src/frontend/src/types/chat/index.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/frontend/src/types/chat/index.ts b/src/frontend/src/types/chat/index.ts index b393b57cd..3914dd2dc 100644 --- a/src/frontend/src/types/chat/index.ts +++ b/src/frontend/src/types/chat/index.ts @@ -9,6 +9,8 @@ export type ChatMessageType = { files?: Array<{ data: string; type: string; data_type: string }>; prompt?: string; chatKey?: string; + id?: string; + stream_url?: string | null; sender_name?: string; }; @@ -16,6 +18,7 @@ export type ChatOutputType = { message: string; sender: string; sender_name: string; + stream_url?: string; }; export type chatInputType = {