diff --git a/src/backend/base/langflow/components/embeddings/embedding_model.py b/src/backend/base/langflow/components/embeddings/embedding_model.py index 539a30f45..af35f23ea 100644 --- a/src/backend/base/langflow/components/embeddings/embedding_model.py +++ b/src/backend/base/langflow/components/embeddings/embedding_model.py @@ -58,7 +58,8 @@ class EmbeddingModelComponent(LCEmbeddingsModel): IntInput( name="dimensions", display_name="Dimensions", - info="The number of dimensions the resulting output embeddings should have.", + info="The number of dimensions the resulting output embeddings should have. " + "Only supported by certain models.", advanced=True, ), IntInput(name="chunk_size", display_name="Chunk Size", advanced=True, value=1000), diff --git a/src/backend/base/langflow/graph/graph/base.py b/src/backend/base/langflow/graph/graph/base.py index 32a613a5d..4498143a9 100644 --- a/src/backend/base/langflow/graph/graph/base.py +++ b/src/backend/base/langflow/graph/graph/base.py @@ -7,6 +7,7 @@ import copy import json import queue import threading +import traceback import uuid from collections import defaultdict, deque from datetime import datetime, timezone @@ -38,7 +39,7 @@ from langflow.graph.vertex.schema import NodeData, NodeTypeEnum from langflow.graph.vertex.vertex_types import ComponentVertex, InterfaceVertex, StateVertex from langflow.logging.logger import LogConfig, configure from langflow.schema.dotdict import dotdict -from langflow.schema.schema import INPUT_FIELD_NAME, InputType +from langflow.schema.schema import INPUT_FIELD_NAME, InputType, OutputValue from langflow.services.cache.utils import CacheMiss from langflow.services.deps import get_chat_service, get_tracing_service from langflow.utils.async_helpers import run_until_complete @@ -1526,6 +1527,7 @@ class Graph: event_manager: EventManager | None = None, ) -> Graph: """Processes the graph with vertices in each layer run in parallel.""" + has_webhook_component = "webhook" in start_component_id.lower() if start_component_id else False first_layer = self.sort_vertices(start_component_id=start_component_id) vertex_task_run_count: dict[str, int] = {} to_process = deque(first_layer) @@ -1549,14 +1551,16 @@ class Graph: set_cache=chat_service.set_cache, event_manager=event_manager, ), - name=f"{vertex.display_name} Run {vertex_task_run_count.get(vertex_id, 0)}", + name=f"{vertex.id} Run {vertex_task_run_count.get(vertex_id, 0)}", ) tasks.append(task) vertex_task_run_count[vertex_id] = vertex_task_run_count.get(vertex_id, 0) + 1 logger.debug(f"Running layer {layer_index} with {len(tasks)} tasks, {current_batch}") try: - next_runnable_vertices = await self._execute_tasks(tasks, lock=lock) + next_runnable_vertices = await self._execute_tasks( + tasks, lock=lock, has_webhook_component=has_webhook_component + ) except Exception: logger.exception(f"Error executing tasks in layer {layer_index}") raise @@ -1595,16 +1599,81 @@ class Graph: await set_cache_coro(data=self, lock=lock) return next_runnable_vertices - async def _execute_tasks(self, tasks: list[asyncio.Task], lock: asyncio.Lock) -> list[str]: - """Executes tasks in parallel, handling exceptions for each task.""" + async def _log_vertex_build_from_exception(self, vertex_id: str, result: Exception) -> None: + """Log a vertex build failure caused by an exception. + + This method handles formatting and logging errors that occur during vertex building. + It creates appropriate error output structures and logs the build failure. + + Args: + vertex_id: The ID of the vertex that failed to build + result: The exception that caused the build failure + + Returns: + None + + Side effects: + - Logs the exception details + - Creates error output structures + - Calls log_vertex_build to record the failure + """ + if isinstance(result, ComponentBuildError): + params = result.message + tb = result.formatted_traceback + else: + from langflow.api.utils import format_exception_message + + tb = traceback.format_exc() + logger.exception("Error building Component") + + params = format_exception_message(result) + message = {"errorMessage": params, "stackTrace": tb} + vertex = self.get_vertex(vertex_id) + output_label = vertex.outputs[0]["name"] if vertex.outputs else "output" + outputs = {output_label: OutputValue(message=message, type="error")} + result_data_response = { + "results": {}, + "outputs": outputs, + "logs": {}, + "message": {}, + "artifacts": {}, + "timedelta": None, + "duration": None, + "used_frozen_result": False, + } + + await log_vertex_build( + flow_id=self.flow_id or "", + vertex_id=vertex_id or "errors", + valid=False, + params=params, + data=result_data_response, + artifacts={}, + ) + + async def _execute_tasks( + self, tasks: list[asyncio.Task], lock: asyncio.Lock, *, has_webhook_component: bool = False + ) -> list[str]: + """Executes tasks in parallel, handling exceptions for each task. + + Args: + tasks: List of tasks to execute + lock: Async lock for synchronization + has_webhook_component: Whether the graph has a webhook component + """ results = [] completed_tasks = await asyncio.gather(*tasks, return_exceptions=True) vertices: list[Vertex] = [] for i, result in enumerate(completed_tasks): task_name = tasks[i].get_name() + vertex_id = tasks[i].get_name().split(" ")[0] + if isinstance(result, Exception): logger.error(f"Task {task_name} failed with exception: {result}") + if has_webhook_component: + await self._log_vertex_build_from_exception(vertex_id, result) + # Cancel all remaining tasks for t in tasks[i + 1 :]: t.cancel() diff --git a/src/backend/base/langflow/graph/utils.py b/src/backend/base/langflow/graph/utils.py index 47ef56dce..6ac86da5a 100644 --- a/src/backend/base/langflow/graph/utils.py +++ b/src/backend/base/langflow/graph/utils.py @@ -149,7 +149,7 @@ async def log_vertex_build( vertex_id: str, valid: bool, params: Any, - data: ResultDataResponse, + data: ResultDataResponse | dict, artifacts: dict | None = None, ) -> None: try: diff --git a/src/frontend/src/components/common/storeCardComponent/index.tsx b/src/frontend/src/components/common/storeCardComponent/index.tsx index 1d70c7794..d3e3b17a0 100644 --- a/src/frontend/src/components/common/storeCardComponent/index.tsx +++ b/src/frontend/src/components/common/storeCardComponent/index.tsx @@ -196,45 +196,6 @@ export default function StoreCardComponent({
- {/* {playground && ( - - )} */}
state.setSuccessData); const currentFlow = useFlowStore((state) => state.currentFlow); const endpointName = currentFlow?.endpoint_name ?? ""; diff --git a/src/frontend/src/controllers/API/queries/_builds/use-get-builds-polling-mutation.ts b/src/frontend/src/controllers/API/queries/_builds/use-get-builds-polling-mutation.ts index 88b42f5ae..926fb9900 100644 --- a/src/frontend/src/controllers/API/queries/_builds/use-get-builds-polling-mutation.ts +++ b/src/frontend/src/controllers/API/queries/_builds/use-get-builds-polling-mutation.ts @@ -1,3 +1,4 @@ +import useAlertStore from "@/stores/alertStore"; import useFlowStore from "@/stores/flowStore"; import { useUtilityStore } from "@/stores/utilityStore"; import { useMutationFunctionType } from "@/types/api"; @@ -7,6 +8,9 @@ import { api } from "../../api"; import { getURL } from "../../helpers/constants"; import { UseRequestProcessor } from "../../services/request-processor"; +const ERROR_DISPLAY_INTERVAL = 10000; +const ERROR_DISPLAY_COUNT = 1; + interface PollingItem { interval: NodeJS.Timeout; timestamp: number; @@ -96,6 +100,10 @@ export const useGetBuildsMutation: useMutationFunctionType< const flowIdRef = useRef(null); const requestInProgressRef = useRef>({}); + const errorDisplayCountRef = useRef(0); + const timeoutIdsRef = useRef([]); + + const setErrorData = useAlertStore((state) => state.setErrorData); const getBuildsFn = async ( payload: IGetBuilds, @@ -115,6 +123,24 @@ export const useGetBuildsMutation: useMutationFunctionType< if (Object.keys(flowPool).length > 0) { setFlowPool(flowPool); } + + // Check for errors only if we haven't displayed them yet + if (errorDisplayCountRef.current === 0) { + Object.keys(flowPool).forEach((key) => { + const nodeBuild = flowPool[key]; + if (nodeBuild.length > 0 && nodeBuild[0]?.valid === false) { + const errorMessage = nodeBuild?.[0]?.params || "Unknown error"; + if (errorMessage) { + setErrorData({ + title: "Last build failed", + list: [errorMessage], + }); + errorDisplayCountRef.current = 1; + } + } + }); + } + return; } @@ -145,9 +171,9 @@ export const useGetBuildsMutation: useMutationFunctionType< const timestamp = Date.now(); const pollCallback = async () => { const data = await getBuildsFn(payload); - payload.onSuccess?.(data); + payload.onSuccess?.(data!); - if (payload.stopPollingOn?.(data)) { + if (payload.stopPollingOn?.(data!)) { PollingManager.stopPoll(payload.flowId); } }; @@ -164,8 +190,8 @@ export const useGetBuildsMutation: useMutationFunctionType< PollingManager.enqueuePolling(payload.flowId, pollingItem); return getBuildsFn(payload).then((data) => { - payload.onSuccess?.(data); - if (payload.stopPollingOn?.(data)) { + payload.onSuccess?.(data!); + if (payload.stopPollingOn?.(data!)) { PollingManager.stopPoll(payload.flowId); } }); @@ -176,6 +202,13 @@ export const useGetBuildsMutation: useMutationFunctionType< if (flowIdRef.current) { PollingManager.stopPoll(flowIdRef.current); } + // Clear all timeouts + timeoutIdsRef.current.forEach((timeoutId) => { + clearTimeout(timeoutId); + }); + timeoutIdsRef.current = []; + // Reset error display count when component unmounts + errorDisplayCountRef.current = 0; }; }, []); diff --git a/src/frontend/src/pages/FlowPage/index.tsx b/src/frontend/src/pages/FlowPage/index.tsx index eb0a9e1bf..96a1258b9 100644 --- a/src/frontend/src/pages/FlowPage/index.tsx +++ b/src/frontend/src/pages/FlowPage/index.tsx @@ -172,17 +172,6 @@ export default function FlowPage({ view }: { view?: boolean }): JSX.Element {
)} - {/* {ENABLE_BRANDING && version && ( - -
Langflow 🤝 DataStax
- -
⛓️ v{version}
-
- )} */}
{blocker.state === "blocked" && ( <>