diff --git a/src/backend/base/langflow/api/utils.py b/src/backend/base/langflow/api/utils.py index 27576e971..0500917a3 100644 --- a/src/backend/base/langflow/api/utils.py +++ b/src/backend/base/langflow/api/utils.py @@ -301,3 +301,10 @@ def get_top_level_vertices(graph, vertices_ids): else: top_level_vertices.append(vertex_id) return top_level_vertices + + +def parse_exception(exc): + """Parse the exception message.""" + if hasattr(exc, "body"): + return exc.body["message"] + return str(exc) diff --git a/src/backend/base/langflow/api/v1/chat.py b/src/backend/base/langflow/api/v1/chat.py index 46ad91c59..996f301e6 100644 --- a/src/backend/base/langflow/api/v1/chat.py +++ b/src/backend/base/langflow/api/v1/chat.py @@ -12,6 +12,7 @@ from langflow.api.utils import ( format_elapsed_time, format_exception_message, get_top_level_vertices, + parse_exception, ) from langflow.api.v1.schemas import ( InputValueRequest, @@ -314,7 +315,10 @@ async def build_vertex_stream( except Exception as exc: logger.exception(f"Error building vertex: {exc}") - yield str(StreamData(event="error", data={"error": str(exc)})) + exc_message = parse_exception(exc) + if exc_message == "The message must be an iterator or an async iterator.": + exc_message = "This stream has already been closed." + yield str(StreamData(event="error", data={"error": exc_message})) finally: logger.debug("Closing stream") yield str(StreamData(event="close", data={"message": "Stream closed"})) diff --git a/src/frontend/src/components/newChatView/chatMessage/index.tsx b/src/frontend/src/components/newChatView/chatMessage/index.tsx index 760a9d140..fb815b931 100644 --- a/src/frontend/src/components/newChatView/chatMessage/index.tsx +++ b/src/frontend/src/components/newChatView/chatMessage/index.tsx @@ -9,6 +9,7 @@ import Robot from "../../../assets/robot.png"; import SanitizedHTMLWrapper from "../../../components/SanitizedHTMLWrapper"; import CodeTabsComponent from "../../../components/codeTabsComponent"; import IconComponent from "../../../components/genericIconComponent"; +import useAlertStore from "../../../stores/alertStore"; import useFlowStore from "../../../stores/flowStore"; import { chatMessagePropsType } from "../../../types/components"; import { classNames } from "../../../utils/utils"; @@ -33,6 +34,7 @@ export default function ChatMessage({ const [isStreaming, setIsStreaming] = useState(false); const eventSource = useRef(undefined); const updateFlowPool = useFlowStore((state) => state.updateFlowPool); + const setErrorData = useAlertStore((state) => state.setErrorData); const chatMessageRef = useRef(chatMessage); // Sync ref with state @@ -57,7 +59,17 @@ export default function ChatMessage({ setIsStreaming(false); eventSource.current?.close(); setStreamUrl(undefined); - reject(new Error("Streaming failed")); + // property data is not available in the event object + // so check if the event object has a data property + if (event.data) { + let parsedData = JSON.parse(event.data); + if (parsedData.error) { + reject(new Error(parsedData.error)); + } else + reject(new Error("An error occurred while streaming the output")); + } else { + reject(new Error("An error occurred while streaming the output")); + } }; eventSource.current.addEventListener("close", (event) => { setStreamUrl(undefined); // Update state to reflect the stream is closed @@ -79,7 +91,10 @@ export default function ChatMessage({ } }) .catch((error) => { - console.error(error); + setErrorData({ + title: "Streaming Error", + list: [error.message], + }); setLockChat(false); }); }