diff --git a/src/backend/base/langflow/api/v1/chat.py b/src/backend/base/langflow/api/v1/chat.py index fbabacc3c..f0213e167 100644 --- a/src/backend/base/langflow/api/v1/chat.py +++ b/src/backend/base/langflow/api/v1/chat.py @@ -347,14 +347,11 @@ async def build_flow( client_consumed_queue: asyncio.Queue, event_manager: EventManager, ) -> None: - build_task = asyncio.create_task(_build_vertex(vertex_id, graph, event_manager)) try: - await build_task - vertex_build_response: VertexBuildResponse = build_task.result() + vertex_build_response: VertexBuildResponse = await _build_vertex(vertex_id, graph, event_manager) except asyncio.CancelledError as exc: logger.exception(exc) - build_task.cancel() - return + raise # send built event or error event try: @@ -370,12 +367,7 @@ async def build_flow( for next_vertex_id in vertex_build_response.next_vertices_ids: task = asyncio.create_task(build_vertices(next_vertex_id, graph, client_consumed_queue, event_manager)) tasks.append(task) - try: await asyncio.gather(*tasks) - except asyncio.CancelledError: - for task in tasks: - task.cancel() - return async def event_generator(event_manager: EventManager, client_consumed_queue: asyncio.Queue) -> None: try: @@ -397,9 +389,8 @@ async def build_flow( await asyncio.gather(*tasks) except asyncio.CancelledError: background_tasks.add_task(graph.end_all_traces) - for task in tasks: - task.cancel() - return + raise + except Exception as e: logger.error(f"Error building vertices: {e}") custom_component = graph.get_vertex(vertex_id).custom_component diff --git a/src/backend/base/langflow/api/v1/mcp.py b/src/backend/base/langflow/api/v1/mcp.py index 1516111d3..5db1f8354 100644 --- a/src/backend/base/langflow/api/v1/mcp.py +++ b/src/backend/base/langflow/api/v1/mcp.py @@ -3,7 +3,6 @@ import base64 import json import logging import traceback -from contextlib import suppress from contextvars import ContextVar from typing import Annotated from urllib.parse import quote, unquote, urlparse @@ -266,8 +265,9 @@ async def handle_call_tool( return collected_results finally: progress_task.cancel() - with suppress(asyncio.CancelledError): - await progress_task + await asyncio.wait([progress_task]) + if not progress_task.cancelled() and (exc := progress_task.exception()) is not None: + raise exc except Exception as e: msg = f"Error in async session: {e}" logger.exception(msg) @@ -333,6 +333,7 @@ async def handle_sse(request: Request, current_user: Annotated[User, Depends(get logger.info("Client disconnected from SSE connection") except asyncio.CancelledError: logger.info("SSE connection was cancelled") + raise except Exception as e: msg = f"Error in MCP: {e!s}" logger.exception(msg)