fix: display errors when webhook background tasks fail (#7572)

* 📝 (base.py): improve naming convention for task names in Graph class
🔧 (base.py): add error logging functionality in _execute_tasks method
🔧 (utils.py): update data parameter type in log_vertex_build function
🔧 (index.tsx): remove unused setCurrentFlow function call in StoreCardComponent
🔧 (use-get-builds-polling-mutation.ts): add error handling and display logic for build failures
🔧 (flowSidebarComponent/index.tsx): make showLegacy prop optional in FlowSidebarComponentProps interface
🔧 (index.tsx): remove commented out code related to branding in FlowPage component

* [autofix.ci] apply automated fixes

* fix: update URL construction in CopyFieldAreaComponent to handle undefined endpointName

- Modified the URL construction logic to use currentFlow.id when endpointName is not provided, ensuring a valid URL is always generated.

* 🔧 (base.py): add error logging method for vertex build failures

- Introduced `_log_vertex_build_from_exception` method to handle and log exceptions during vertex building, improving error reporting and output structure.
- Updated `_execute_tasks` method to utilize the new logging method for better exception handling.

* 🐛 (use-get-builds-polling-mutation.ts): fix error display count to only show errors once per build failure instead of multiple times

* 📝 (embedding_model.py): improve formatting of the info message for the 'dimensions' input to enhance readability and clarity

* [autofix.ci] apply automated fixes

* fix: add webhook component handling in task execution

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: Gabriel Luiz Freitas Almeida <gabriel@langflow.org>
This commit is contained in:
Cristhian Zanforlin Lousa 2025-04-24 19:18:49 -03:00 committed by GitHub
commit eed7dee8bf
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 114 additions and 62 deletions

View file

@ -58,7 +58,8 @@ class EmbeddingModelComponent(LCEmbeddingsModel):
IntInput(
name="dimensions",
display_name="Dimensions",
info="The number of dimensions the resulting output embeddings should have.",
info="The number of dimensions the resulting output embeddings should have. "
"Only supported by certain models.",
advanced=True,
),
IntInput(name="chunk_size", display_name="Chunk Size", advanced=True, value=1000),

View file

@ -7,6 +7,7 @@ import copy
import json
import queue
import threading
import traceback
import uuid
from collections import defaultdict, deque
from datetime import datetime, timezone
@ -38,7 +39,7 @@ from langflow.graph.vertex.schema import NodeData, NodeTypeEnum
from langflow.graph.vertex.vertex_types import ComponentVertex, InterfaceVertex, StateVertex
from langflow.logging.logger import LogConfig, configure
from langflow.schema.dotdict import dotdict
from langflow.schema.schema import INPUT_FIELD_NAME, InputType
from langflow.schema.schema import INPUT_FIELD_NAME, InputType, OutputValue
from langflow.services.cache.utils import CacheMiss
from langflow.services.deps import get_chat_service, get_tracing_service
from langflow.utils.async_helpers import run_until_complete
@ -1526,6 +1527,7 @@ class Graph:
event_manager: EventManager | None = None,
) -> Graph:
"""Processes the graph with vertices in each layer run in parallel."""
has_webhook_component = "webhook" in start_component_id.lower() if start_component_id else False
first_layer = self.sort_vertices(start_component_id=start_component_id)
vertex_task_run_count: dict[str, int] = {}
to_process = deque(first_layer)
@ -1549,14 +1551,16 @@ class Graph:
set_cache=chat_service.set_cache,
event_manager=event_manager,
),
name=f"{vertex.display_name} Run {vertex_task_run_count.get(vertex_id, 0)}",
name=f"{vertex.id} Run {vertex_task_run_count.get(vertex_id, 0)}",
)
tasks.append(task)
vertex_task_run_count[vertex_id] = vertex_task_run_count.get(vertex_id, 0) + 1
logger.debug(f"Running layer {layer_index} with {len(tasks)} tasks, {current_batch}")
try:
next_runnable_vertices = await self._execute_tasks(tasks, lock=lock)
next_runnable_vertices = await self._execute_tasks(
tasks, lock=lock, has_webhook_component=has_webhook_component
)
except Exception:
logger.exception(f"Error executing tasks in layer {layer_index}")
raise
@ -1595,16 +1599,81 @@ class Graph:
await set_cache_coro(data=self, lock=lock)
return next_runnable_vertices
async def _execute_tasks(self, tasks: list[asyncio.Task], lock: asyncio.Lock) -> list[str]:
"""Executes tasks in parallel, handling exceptions for each task."""
async def _log_vertex_build_from_exception(self, vertex_id: str, result: Exception) -> None:
"""Log a vertex build failure caused by an exception.
This method handles formatting and logging errors that occur during vertex building.
It creates appropriate error output structures and logs the build failure.
Args:
vertex_id: The ID of the vertex that failed to build
result: The exception that caused the build failure
Returns:
None
Side effects:
- Logs the exception details
- Creates error output structures
- Calls log_vertex_build to record the failure
"""
if isinstance(result, ComponentBuildError):
params = result.message
tb = result.formatted_traceback
else:
from langflow.api.utils import format_exception_message
tb = traceback.format_exc()
logger.exception("Error building Component")
params = format_exception_message(result)
message = {"errorMessage": params, "stackTrace": tb}
vertex = self.get_vertex(vertex_id)
output_label = vertex.outputs[0]["name"] if vertex.outputs else "output"
outputs = {output_label: OutputValue(message=message, type="error")}
result_data_response = {
"results": {},
"outputs": outputs,
"logs": {},
"message": {},
"artifacts": {},
"timedelta": None,
"duration": None,
"used_frozen_result": False,
}
await log_vertex_build(
flow_id=self.flow_id or "",
vertex_id=vertex_id or "errors",
valid=False,
params=params,
data=result_data_response,
artifacts={},
)
async def _execute_tasks(
self, tasks: list[asyncio.Task], lock: asyncio.Lock, *, has_webhook_component: bool = False
) -> list[str]:
"""Executes tasks in parallel, handling exceptions for each task.
Args:
tasks: List of tasks to execute
lock: Async lock for synchronization
has_webhook_component: Whether the graph has a webhook component
"""
results = []
completed_tasks = await asyncio.gather(*tasks, return_exceptions=True)
vertices: list[Vertex] = []
for i, result in enumerate(completed_tasks):
task_name = tasks[i].get_name()
vertex_id = tasks[i].get_name().split(" ")[0]
if isinstance(result, Exception):
logger.error(f"Task {task_name} failed with exception: {result}")
if has_webhook_component:
await self._log_vertex_build_from_exception(vertex_id, result)
# Cancel all remaining tasks
for t in tasks[i + 1 :]:
t.cancel()

View file

@ -149,7 +149,7 @@ async def log_vertex_build(
vertex_id: str,
valid: bool,
params: Any,
data: ResultDataResponse,
data: ResultDataResponse | dict,
artifacts: dict | None = None,
) -> None:
try:

View file

@ -196,45 +196,6 @@ export default function StoreCardComponent({
<CardFooter>
<div className="z-50 flex w-full items-center justify-between gap-2">
<div className="flex w-full flex-wrap items-end justify-end gap-2">
{/* {playground && (
<Button
disabled={loadingPlayground || !authorized}
key={data.id}
tabIndex={-1}
variant="outline"
size="sm"
className="z-50 gap-2 whitespace-nowrap"
data-testid={"playground-flow-button-" + data.id}
onClick={(e) => {
e.preventDefault();
e.stopPropagation();
setLoadingPlayground(true);
getFlowData().then((res) => {
if (!hasPlayground(res)) {
setErrorData({
title: "Error",
list: ["This flow doesn't have a playground."],
});
setLoadingPlayground(false);
return;
}
setCurrentFlow(res);
setOpenPlayground(true);
setLoadingPlayground(false);
});
}}
>
{!loadingPlayground ? (
<IconComponent
name="BotMessageSquareIcon"
className="h-4 w-4 select-none"
/>
) : (
<Loading className="h-4 w-4 text-medium-indigo" />
)}
Playground
</Button>
)} */}
<div className="flex gap-0.5">
<ShadTooltip
content={authorized ? "Like" : "Please review your API key."}

View file

@ -62,7 +62,6 @@ export default function CopyFieldAreaComponent({
const [isFocused, setIsFocused] = useState(false);
const [isCopied, setIsCopied] = useState(false);
const isValueToReplace = value === BACKEND_URL || value === MCP_SSE_VALUE;
const setSuccessData = useAlertStore((state) => state.setSuccessData);
const currentFlow = useFlowStore((state) => state.currentFlow);
const endpointName = currentFlow?.endpoint_name ?? "";

View file

@ -1,3 +1,4 @@
import useAlertStore from "@/stores/alertStore";
import useFlowStore from "@/stores/flowStore";
import { useUtilityStore } from "@/stores/utilityStore";
import { useMutationFunctionType } from "@/types/api";
@ -7,6 +8,9 @@ import { api } from "../../api";
import { getURL } from "../../helpers/constants";
import { UseRequestProcessor } from "../../services/request-processor";
const ERROR_DISPLAY_INTERVAL = 10000;
const ERROR_DISPLAY_COUNT = 1;
interface PollingItem {
interval: NodeJS.Timeout;
timestamp: number;
@ -96,6 +100,10 @@ export const useGetBuildsMutation: useMutationFunctionType<
const flowIdRef = useRef<string | null>(null);
const requestInProgressRef = useRef<Record<string, boolean>>({});
const errorDisplayCountRef = useRef<number>(0);
const timeoutIdsRef = useRef<number[]>([]);
const setErrorData = useAlertStore((state) => state.setErrorData);
const getBuildsFn = async (
payload: IGetBuilds,
@ -115,6 +123,24 @@ export const useGetBuildsMutation: useMutationFunctionType<
if (Object.keys(flowPool).length > 0) {
setFlowPool(flowPool);
}
// Check for errors only if we haven't displayed them yet
if (errorDisplayCountRef.current === 0) {
Object.keys(flowPool).forEach((key) => {
const nodeBuild = flowPool[key];
if (nodeBuild.length > 0 && nodeBuild[0]?.valid === false) {
const errorMessage = nodeBuild?.[0]?.params || "Unknown error";
if (errorMessage) {
setErrorData({
title: "Last build failed",
list: [errorMessage],
});
errorDisplayCountRef.current = 1;
}
}
});
}
return;
}
@ -145,9 +171,9 @@ export const useGetBuildsMutation: useMutationFunctionType<
const timestamp = Date.now();
const pollCallback = async () => {
const data = await getBuildsFn(payload);
payload.onSuccess?.(data);
payload.onSuccess?.(data!);
if (payload.stopPollingOn?.(data)) {
if (payload.stopPollingOn?.(data!)) {
PollingManager.stopPoll(payload.flowId);
}
};
@ -164,8 +190,8 @@ export const useGetBuildsMutation: useMutationFunctionType<
PollingManager.enqueuePolling(payload.flowId, pollingItem);
return getBuildsFn(payload).then((data) => {
payload.onSuccess?.(data);
if (payload.stopPollingOn?.(data)) {
payload.onSuccess?.(data!);
if (payload.stopPollingOn?.(data!)) {
PollingManager.stopPoll(payload.flowId);
}
});
@ -176,6 +202,13 @@ export const useGetBuildsMutation: useMutationFunctionType<
if (flowIdRef.current) {
PollingManager.stopPoll(flowIdRef.current);
}
// Clear all timeouts
timeoutIdsRef.current.forEach((timeoutId) => {
clearTimeout(timeoutId);
});
timeoutIdsRef.current = [];
// Reset error display count when component unmounts
errorDisplayCountRef.current = 0;
};
}, []);

View file

@ -172,17 +172,6 @@ export default function FlowPage({ view }: { view?: boolean }): JSX.Element {
</SidebarProvider>
</div>
)}
{/* {ENABLE_BRANDING && version && (
<a
target={"_blank"}
href="https://medium.com/logspace/langflow-datastax-better-together-1b7462cebc4d"
className="langflow-page-icon"
>
<div className="mt-1">Langflow 🤝 DataStax</div>
<div className={version ? "mt-2" : "mt-1"}> v{version}</div>
</a>
)} */}
</div>
{blocker.state === "blocked" && (
<>