fix: Fix some swallowed CancelledError (#6625)

Fix some swallowed CancelledError

Co-authored-by: Gabriel Luiz Freitas Almeida <gabriel@langflow.org>
This commit is contained in:
Christophe Bornet 2025-02-17 15:29:41 +01:00 committed by GitHub
commit 23244fe8c2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 8 additions and 16 deletions

View file

@ -347,14 +347,11 @@ async def build_flow(
client_consumed_queue: asyncio.Queue,
event_manager: EventManager,
) -> None:
build_task = asyncio.create_task(_build_vertex(vertex_id, graph, event_manager))
try:
await build_task
vertex_build_response: VertexBuildResponse = build_task.result()
vertex_build_response: VertexBuildResponse = await _build_vertex(vertex_id, graph, event_manager)
except asyncio.CancelledError as exc:
logger.exception(exc)
build_task.cancel()
return
raise
# send built event or error event
try:
@ -370,12 +367,7 @@ async def build_flow(
for next_vertex_id in vertex_build_response.next_vertices_ids:
task = asyncio.create_task(build_vertices(next_vertex_id, graph, client_consumed_queue, event_manager))
tasks.append(task)
try:
await asyncio.gather(*tasks)
except asyncio.CancelledError:
for task in tasks:
task.cancel()
return
async def event_generator(event_manager: EventManager, client_consumed_queue: asyncio.Queue) -> None:
try:
@ -397,9 +389,8 @@ async def build_flow(
await asyncio.gather(*tasks)
except asyncio.CancelledError:
background_tasks.add_task(graph.end_all_traces)
for task in tasks:
task.cancel()
return
raise
except Exception as e:
logger.error(f"Error building vertices: {e}")
custom_component = graph.get_vertex(vertex_id).custom_component

View file

@ -3,7 +3,6 @@ import base64
import json
import logging
import traceback
from contextlib import suppress
from contextvars import ContextVar
from typing import Annotated
from urllib.parse import quote, unquote, urlparse
@ -266,8 +265,9 @@ async def handle_call_tool(
return collected_results
finally:
progress_task.cancel()
with suppress(asyncio.CancelledError):
await progress_task
await asyncio.wait([progress_task])
if not progress_task.cancelled() and (exc := progress_task.exception()) is not None:
raise exc
except Exception as e:
msg = f"Error in async session: {e}"
logger.exception(msg)
@ -333,6 +333,7 @@ async def handle_sse(request: Request, current_user: Annotated[User, Depends(get
logger.info("Client disconnected from SSE connection")
except asyncio.CancelledError:
logger.info("SSE connection was cancelled")
raise
except Exception as e:
msg = f"Error in MCP: {e!s}"
logger.exception(msg)