diff --git a/src/backend/base/langflow/api/build.py b/src/backend/base/langflow/api/build.py index 139925c83..ef05da8b4 100644 --- a/src/backend/base/langflow/api/build.py +++ b/src/backend/base/langflow/api/build.py @@ -365,7 +365,7 @@ async def generate_flow_events( try: vertex_build_response: VertexBuildResponse = await _build_vertex(vertex_id, graph, event_manager) except asyncio.CancelledError as exc: - logger.exception(exc) + logger.error(f"Build cancelled: {exc}") raise # send built event or error event @@ -445,6 +445,7 @@ async def cancel_flow_build( Raises: ValueError: If the job doesn't exist + asyncio.CancelledError: If the task cancellation failed """ # Get the event task and event manager for the job _, _, event_task = queue_service.get_queue_data(job_id) @@ -460,13 +461,24 @@ async def cancel_flow_build( # Store the task reference to check status after cleanup task_before_cleanup = event_task - # Perform cleanup using the queue service - await queue_service.cleanup_job(job_id) + try: + # Perform cleanup using the queue service + await queue_service.cleanup_job(job_id) + except asyncio.CancelledError: + # Check if the task was actually cancelled + if task_before_cleanup.cancelled(): + logger.info(f"Successfully cancelled flow build for job_id {job_id} (CancelledError caught)") + return True + # If the task wasn't cancelled, re-raise the exception + logger.error(f"CancelledError caught but task for job_id {job_id} was not cancelled") + raise - # Verify that the task was actually cancelled + # If no exception was raised, verify that the task was actually cancelled # The task should be done (cancelled) after cleanup if task_before_cleanup.cancelled(): logger.info(f"Successfully cancelled flow build for job_id {job_id}") return True + + # If we get here, the task wasn't cancelled properly logger.error(f"Failed to cancel flow build for job_id {job_id}, task is still running") return False diff --git a/src/backend/base/langflow/api/v1/chat.py b/src/backend/base/langflow/api/v1/chat.py index 5bcd7db64..db4bd7ce3 100644 --- a/src/backend/base/langflow/api/v1/chat.py +++ b/src/backend/base/langflow/api/v1/chat.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio import time import traceback import uuid @@ -194,10 +195,12 @@ async def cancel_build( return CancelFlowResponse(success=True, message="Flow build cancelled successfully") # Cancellation was attempted but failed return CancelFlowResponse(success=False, message="Failed to cancel flow build") - + except asyncio.CancelledError: + # If CancelledError reaches here, it means the task was not successfully cancelled + logger.error(f"Failed to cancel flow build for job_id {job_id} (CancelledError caught)") + return CancelFlowResponse(success=False, message="Failed to cancel flow build") except ValueError as exc: # Job not found - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc except Exception as exc: # Any other unexpected error diff --git a/src/backend/base/langflow/services/database/service.py b/src/backend/base/langflow/services/database/service.py index 2facacf7a..f06e88ff2 100644 --- a/src/backend/base/langflow/services/database/service.py +++ b/src/backend/base/langflow/services/database/service.py @@ -175,6 +175,8 @@ class DatabaseService(Service): cursor.execute(pragma) except OperationalError: logger.exception(f"Failed to set PRAGMA {pragma}") + except GeneratorExit: + logger.error(f"Failed to set PRAGMA {pragma}") finally: cursor.close() diff --git a/src/backend/tests/unit/test_chat_endpoint.py b/src/backend/tests/unit/test_chat_endpoint.py index 5a629245b..bedce5661 100644 --- a/src/backend/tests/unit/test_chat_endpoint.py +++ b/src/backend/tests/unit/test_chat_endpoint.py @@ -193,3 +193,167 @@ async def test_build_flow_polling(client, json_memory_chatbot_no_llm, logged_in_ # Use the same consume_and_assert_stream function to verify the events await consume_and_assert_stream(polling_response, job_id) + + +@pytest.mark.benchmark +async def test_cancel_build_unexpected_error(client, json_memory_chatbot_no_llm, logged_in_headers, monkeypatch): + """Test handling of unexpected exceptions during flow build cancellation.""" + # First create the flow + flow_id = await create_flow(client, json_memory_chatbot_no_llm, logged_in_headers) + + # Start the build and get job_id + build_response = await build_flow(client, flow_id, logged_in_headers) + job_id = build_response["job_id"] + assert job_id is not None + + # Mock the cancel_flow_build function to raise an unexpected exception + import langflow.api.v1.chat + + original_cancel_flow_build = langflow.api.v1.chat.cancel_flow_build + + async def mock_cancel_flow_build_with_error(*_args, **_kwargs): + msg = "Unexpected error during cancellation" + raise RuntimeError(msg) + + monkeypatch.setattr(langflow.api.v1.chat, "cancel_flow_build", mock_cancel_flow_build_with_error) + + try: + # Try to cancel the build - should return 500 Internal Server Error + cancel_response = await client.post(f"api/v1/build/{job_id}/cancel", headers=logged_in_headers) + assert cancel_response.status_code == codes.INTERNAL_SERVER_ERROR + + # Verify the error message + response_data = cancel_response.json() + assert "detail" in response_data + assert "Unexpected error during cancellation" in response_data["detail"] + finally: + # Restore the original function to avoid affecting other tests + monkeypatch.setattr(langflow.api.v1.chat, "cancel_flow_build", original_cancel_flow_build) + + +@pytest.mark.benchmark +async def test_cancel_build_success(client, json_memory_chatbot_no_llm, logged_in_headers, monkeypatch): + """Test successful cancellation of a flow build.""" + # First create the flow + flow_id = await create_flow(client, json_memory_chatbot_no_llm, logged_in_headers) + + # Start the build and get job_id + build_response = await build_flow(client, flow_id, logged_in_headers) + job_id = build_response["job_id"] + assert job_id is not None + + # Mock the cancel_flow_build function to simulate a successful cancellation + import langflow.api.v1.chat + + original_cancel_flow_build = langflow.api.v1.chat.cancel_flow_build + + async def mock_successful_cancel_flow_build(*_args, **_kwargs): + return True # Return True to indicate successful cancellation + + monkeypatch.setattr(langflow.api.v1.chat, "cancel_flow_build", mock_successful_cancel_flow_build) + + try: + # Try to cancel the build (should return success) + cancel_response = await client.post(f"api/v1/build/{job_id}/cancel", headers=logged_in_headers) + assert cancel_response.status_code == codes.OK + + # Verify the response structure indicates success + response_data = cancel_response.json() + assert "success" in response_data + assert "message" in response_data + assert response_data["success"] is True + assert "cancelled successfully" in response_data["message"].lower() + finally: + # Restore the original function to avoid affecting other tests + monkeypatch.setattr(langflow.api.v1.chat, "cancel_flow_build", original_cancel_flow_build) + + +@pytest.mark.benchmark +async def test_cancel_nonexistent_build(client, logged_in_headers): + """Test cancelling a non-existent flow build.""" + # Generate a random job_id that doesn't exist + invalid_job_id = str(uuid.uuid4()) + + # Try to cancel a non-existent build + response = await client.post(f"api/v1/build/{invalid_job_id}/cancel", headers=logged_in_headers) + assert response.status_code == codes.NOT_FOUND + assert "No queue found for job_id" in response.json()["detail"] + + +@pytest.mark.benchmark +async def test_cancel_build_failure(client, json_memory_chatbot_no_llm, logged_in_headers, monkeypatch): + """Test handling of cancellation failure.""" + # First create the flow + flow_id = await create_flow(client, json_memory_chatbot_no_llm, logged_in_headers) + + # Start the build and get job_id + build_response = await build_flow(client, flow_id, logged_in_headers) + job_id = build_response["job_id"] + assert job_id is not None + + # Mock the cancel_flow_build function to simulate a failure + # The import path in monkeypatch should match exactly how it's imported in the application + import langflow.api.v1.chat + + original_cancel_flow_build = langflow.api.v1.chat.cancel_flow_build + + async def mock_cancel_flow_build(*_args, **_kwargs): + return False # Return False to indicate cancellation failure + + monkeypatch.setattr(langflow.api.v1.chat, "cancel_flow_build", mock_cancel_flow_build) + + try: + # Try to cancel the build (should return failure but success=False) + cancel_response = await client.post(f"api/v1/build/{job_id}/cancel", headers=logged_in_headers) + assert cancel_response.status_code == codes.OK + + # Verify the response structure indicates failure + response_data = cancel_response.json() + assert "success" in response_data + assert "message" in response_data + assert response_data["success"] is False + assert "Failed to cancel" in response_data["message"] + finally: + # Restore the original function to avoid affecting other tests + monkeypatch.setattr(langflow.api.v1.chat, "cancel_flow_build", original_cancel_flow_build) + + +@pytest.mark.benchmark +async def test_cancel_build_with_cancelled_error(client, json_memory_chatbot_no_llm, logged_in_headers, monkeypatch): + """Test handling of CancelledError during cancellation (should be treated as failure).""" + # First create the flow + flow_id = await create_flow(client, json_memory_chatbot_no_llm, logged_in_headers) + + # Start the build and get job_id + build_response = await build_flow(client, flow_id, logged_in_headers) + job_id = build_response["job_id"] + assert job_id is not None + + # Mock the cancel_flow_build function to raise CancelledError + import asyncio + + import langflow.api.v1.chat + + original_cancel_flow_build = langflow.api.v1.chat.cancel_flow_build + + async def mock_cancel_flow_build_with_cancelled_error(*_args, **_kwargs): + msg = "Task cancellation failed" + raise asyncio.CancelledError(msg) + + monkeypatch.setattr(langflow.api.v1.chat, "cancel_flow_build", mock_cancel_flow_build_with_cancelled_error) + + try: + # Try to cancel the build - should return failure when CancelledError is raised + # since our implementation treats CancelledError as a failed cancellation + cancel_response = await client.post(f"api/v1/build/{job_id}/cancel", headers=logged_in_headers) + assert cancel_response.status_code == codes.OK + + # Verify the response structure indicates failure + response_data = cancel_response.json() + assert "success" in response_data + assert "message" in response_data + assert response_data["success"] is False + assert "failed to cancel" in response_data["message"].lower() + finally: + # Restore the original function to avoid affecting other tests + monkeypatch.setattr(langflow.api.v1.chat, "cancel_flow_build", original_cancel_flow_build) diff --git a/src/frontend/src/controllers/API/api.tsx b/src/frontend/src/controllers/API/api.tsx index c7fb37a11..9d95c1ff2 100644 --- a/src/frontend/src/controllers/API/api.tsx +++ b/src/frontend/src/controllers/API/api.tsx @@ -237,6 +237,7 @@ export type StreamingRequestParams = { body?: object; onError?: (statusCode: number) => void; onNetworkError?: (error: Error) => void; + buildController: AbortController; }; async function performStreamingRequest({ @@ -246,18 +247,18 @@ async function performStreamingRequest({ body, onError, onNetworkError, + buildController, }: StreamingRequestParams) { let headers = { "Content-Type": "application/json", // this flag is fundamental to ensure server stops tasks when client disconnects Connection: "close", }; - const controller = new AbortController(); - useFlowStore.getState().setBuildController(controller); + const params = { method: method, headers: headers, - signal: controller.signal, + signal: buildController.signal, }; if (body) { params["body"] = JSON.stringify(body); @@ -298,7 +299,7 @@ async function performStreamingRequest({ } const shouldContinue = await onData(data); if (!shouldContinue) { - controller.abort(); + buildController.abort(); return; } } else { diff --git a/src/frontend/src/utils/buildUtils.ts b/src/frontend/src/utils/buildUtils.ts index eed7e1540..44e5c1460 100644 --- a/src/frontend/src/utils/buildUtils.ts +++ b/src/frontend/src/utils/buildUtils.ts @@ -1,3 +1,4 @@ +import { MISSED_ERROR_ALERT } from "@/constants/alerts_constants"; import { BASE_URL_API, POLLING_INTERVAL, @@ -93,7 +94,7 @@ export async function updateVerticesOrder( ); } catch (error: any) { setErrorData({ - title: "Oops! Looks like you missed something", + title: MISSED_ERROR_ALERT, list: [error.response?.data?.detail ?? "Unknown Error"], }); useFlowStore.getState().setIsBuilding(false); @@ -161,6 +162,7 @@ async function pollBuildEvents( onGetOrderSuccess?: () => void; onValidateNodes?: (nodes: string[]) => void; }, + abortController: AbortController, ): Promise { let isDone = false; while (!isDone) { @@ -169,6 +171,7 @@ async function pollBuildEvents( headers: { "Content-Type": "application/json", }, + signal: abortController.signal, // Add abort signal to fetch }); if (!response.ok) { @@ -184,13 +187,17 @@ async function pollBuildEvents( // Process the event const event = JSON.parse(data.event); - await onEvent( + const result = await onEvent( event.event, event.data, buildResults, verticesStartTimeMs, callbacks, ); + if (!result) { + isDone = true; + abortController.abort(); + } // Check if this was the end event or if we got a null value if (event.event === "end" || data.event === null) { @@ -278,6 +285,24 @@ export async function buildFlowVertices({ const { job_id } = await buildResponse.json(); + const cancelBuildUrl = `${BASE_URL_API}build/${job_id}/cancel`; + + // Get the buildController from flowStore + const buildController = new AbortController(); + buildController.signal.addEventListener("abort", () => { + try { + fetch(cancelBuildUrl, { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + }); + } catch (error) { + console.error("Error canceling build:", error); + } + }); + useFlowStore.getState().setBuildController(buildController); + // Then stream the events const eventsUrl = `${BASE_URL_API}build/${job_id}/events`; const buildResults: Array = []; @@ -314,6 +339,7 @@ export async function buildFlowVertices({ "Network error. Please check the connection to the server.", ]); }, + buildController, }); } else { const callbacks = { @@ -329,10 +355,15 @@ export async function buildFlowVertices({ buildResults, verticesStartTimeMs, callbacks, + buildController, ); } - } catch (error) { + } catch (error: unknown) { console.error("Build process error:", error); + if (error instanceof Error && error.name === "AbortError") { + onBuildStopped && onBuildStopped(); + return; + } onBuildError!("Error Building Flow", [ (error as Error).message || "An unexpected error occurred", ]);