fix: implemented build stop functionality on polling, call cancel endpoint on build cancel (#6815)

Co-authored-by: Gabriel Luiz Freitas Almeida <gabriel@langflow.org>
This commit is contained in:
Lucas Oliveira 2025-02-25 19:11:23 -03:00 committed by GitHub
commit 0134485d5d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 226 additions and 13 deletions

View file

@ -365,7 +365,7 @@ async def generate_flow_events(
try:
vertex_build_response: VertexBuildResponse = await _build_vertex(vertex_id, graph, event_manager)
except asyncio.CancelledError as exc:
logger.exception(exc)
logger.error(f"Build cancelled: {exc}")
raise
# send built event or error event
@ -445,6 +445,7 @@ async def cancel_flow_build(
Raises:
ValueError: If the job doesn't exist
asyncio.CancelledError: If the task cancellation failed
"""
# Get the event task and event manager for the job
_, _, event_task = queue_service.get_queue_data(job_id)
@ -460,13 +461,24 @@ async def cancel_flow_build(
# Store the task reference to check status after cleanup
task_before_cleanup = event_task
# Perform cleanup using the queue service
await queue_service.cleanup_job(job_id)
try:
# Perform cleanup using the queue service
await queue_service.cleanup_job(job_id)
except asyncio.CancelledError:
# Check if the task was actually cancelled
if task_before_cleanup.cancelled():
logger.info(f"Successfully cancelled flow build for job_id {job_id} (CancelledError caught)")
return True
# If the task wasn't cancelled, re-raise the exception
logger.error(f"CancelledError caught but task for job_id {job_id} was not cancelled")
raise
# Verify that the task was actually cancelled
# If no exception was raised, verify that the task was actually cancelled
# The task should be done (cancelled) after cleanup
if task_before_cleanup.cancelled():
logger.info(f"Successfully cancelled flow build for job_id {job_id}")
return True
# If we get here, the task wasn't cancelled properly
logger.error(f"Failed to cancel flow build for job_id {job_id}, task is still running")
return False

View file

@ -1,5 +1,6 @@
from __future__ import annotations
import asyncio
import time
import traceback
import uuid
@ -194,10 +195,12 @@ async def cancel_build(
return CancelFlowResponse(success=True, message="Flow build cancelled successfully")
# Cancellation was attempted but failed
return CancelFlowResponse(success=False, message="Failed to cancel flow build")
except asyncio.CancelledError:
# If CancelledError reaches here, it means the task was not successfully cancelled
logger.error(f"Failed to cancel flow build for job_id {job_id} (CancelledError caught)")
return CancelFlowResponse(success=False, message="Failed to cancel flow build")
except ValueError as exc:
# Job not found
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc
except Exception as exc:
# Any other unexpected error

View file

@ -175,6 +175,8 @@ class DatabaseService(Service):
cursor.execute(pragma)
except OperationalError:
logger.exception(f"Failed to set PRAGMA {pragma}")
except GeneratorExit:
logger.error(f"Failed to set PRAGMA {pragma}")
finally:
cursor.close()

View file

@ -193,3 +193,167 @@ async def test_build_flow_polling(client, json_memory_chatbot_no_llm, logged_in_
# Use the same consume_and_assert_stream function to verify the events
await consume_and_assert_stream(polling_response, job_id)
@pytest.mark.benchmark
async def test_cancel_build_unexpected_error(client, json_memory_chatbot_no_llm, logged_in_headers, monkeypatch):
"""Test handling of unexpected exceptions during flow build cancellation."""
# First create the flow
flow_id = await create_flow(client, json_memory_chatbot_no_llm, logged_in_headers)
# Start the build and get job_id
build_response = await build_flow(client, flow_id, logged_in_headers)
job_id = build_response["job_id"]
assert job_id is not None
# Mock the cancel_flow_build function to raise an unexpected exception
import langflow.api.v1.chat
original_cancel_flow_build = langflow.api.v1.chat.cancel_flow_build
async def mock_cancel_flow_build_with_error(*_args, **_kwargs):
msg = "Unexpected error during cancellation"
raise RuntimeError(msg)
monkeypatch.setattr(langflow.api.v1.chat, "cancel_flow_build", mock_cancel_flow_build_with_error)
try:
# Try to cancel the build - should return 500 Internal Server Error
cancel_response = await client.post(f"api/v1/build/{job_id}/cancel", headers=logged_in_headers)
assert cancel_response.status_code == codes.INTERNAL_SERVER_ERROR
# Verify the error message
response_data = cancel_response.json()
assert "detail" in response_data
assert "Unexpected error during cancellation" in response_data["detail"]
finally:
# Restore the original function to avoid affecting other tests
monkeypatch.setattr(langflow.api.v1.chat, "cancel_flow_build", original_cancel_flow_build)
@pytest.mark.benchmark
async def test_cancel_build_success(client, json_memory_chatbot_no_llm, logged_in_headers, monkeypatch):
"""Test successful cancellation of a flow build."""
# First create the flow
flow_id = await create_flow(client, json_memory_chatbot_no_llm, logged_in_headers)
# Start the build and get job_id
build_response = await build_flow(client, flow_id, logged_in_headers)
job_id = build_response["job_id"]
assert job_id is not None
# Mock the cancel_flow_build function to simulate a successful cancellation
import langflow.api.v1.chat
original_cancel_flow_build = langflow.api.v1.chat.cancel_flow_build
async def mock_successful_cancel_flow_build(*_args, **_kwargs):
return True # Return True to indicate successful cancellation
monkeypatch.setattr(langflow.api.v1.chat, "cancel_flow_build", mock_successful_cancel_flow_build)
try:
# Try to cancel the build (should return success)
cancel_response = await client.post(f"api/v1/build/{job_id}/cancel", headers=logged_in_headers)
assert cancel_response.status_code == codes.OK
# Verify the response structure indicates success
response_data = cancel_response.json()
assert "success" in response_data
assert "message" in response_data
assert response_data["success"] is True
assert "cancelled successfully" in response_data["message"].lower()
finally:
# Restore the original function to avoid affecting other tests
monkeypatch.setattr(langflow.api.v1.chat, "cancel_flow_build", original_cancel_flow_build)
@pytest.mark.benchmark
async def test_cancel_nonexistent_build(client, logged_in_headers):
"""Test cancelling a non-existent flow build."""
# Generate a random job_id that doesn't exist
invalid_job_id = str(uuid.uuid4())
# Try to cancel a non-existent build
response = await client.post(f"api/v1/build/{invalid_job_id}/cancel", headers=logged_in_headers)
assert response.status_code == codes.NOT_FOUND
assert "No queue found for job_id" in response.json()["detail"]
@pytest.mark.benchmark
async def test_cancel_build_failure(client, json_memory_chatbot_no_llm, logged_in_headers, monkeypatch):
"""Test handling of cancellation failure."""
# First create the flow
flow_id = await create_flow(client, json_memory_chatbot_no_llm, logged_in_headers)
# Start the build and get job_id
build_response = await build_flow(client, flow_id, logged_in_headers)
job_id = build_response["job_id"]
assert job_id is not None
# Mock the cancel_flow_build function to simulate a failure
# The import path in monkeypatch should match exactly how it's imported in the application
import langflow.api.v1.chat
original_cancel_flow_build = langflow.api.v1.chat.cancel_flow_build
async def mock_cancel_flow_build(*_args, **_kwargs):
return False # Return False to indicate cancellation failure
monkeypatch.setattr(langflow.api.v1.chat, "cancel_flow_build", mock_cancel_flow_build)
try:
# Try to cancel the build (should return failure but success=False)
cancel_response = await client.post(f"api/v1/build/{job_id}/cancel", headers=logged_in_headers)
assert cancel_response.status_code == codes.OK
# Verify the response structure indicates failure
response_data = cancel_response.json()
assert "success" in response_data
assert "message" in response_data
assert response_data["success"] is False
assert "Failed to cancel" in response_data["message"]
finally:
# Restore the original function to avoid affecting other tests
monkeypatch.setattr(langflow.api.v1.chat, "cancel_flow_build", original_cancel_flow_build)
@pytest.mark.benchmark
async def test_cancel_build_with_cancelled_error(client, json_memory_chatbot_no_llm, logged_in_headers, monkeypatch):
"""Test handling of CancelledError during cancellation (should be treated as failure)."""
# First create the flow
flow_id = await create_flow(client, json_memory_chatbot_no_llm, logged_in_headers)
# Start the build and get job_id
build_response = await build_flow(client, flow_id, logged_in_headers)
job_id = build_response["job_id"]
assert job_id is not None
# Mock the cancel_flow_build function to raise CancelledError
import asyncio
import langflow.api.v1.chat
original_cancel_flow_build = langflow.api.v1.chat.cancel_flow_build
async def mock_cancel_flow_build_with_cancelled_error(*_args, **_kwargs):
msg = "Task cancellation failed"
raise asyncio.CancelledError(msg)
monkeypatch.setattr(langflow.api.v1.chat, "cancel_flow_build", mock_cancel_flow_build_with_cancelled_error)
try:
# Try to cancel the build - should return failure when CancelledError is raised
# since our implementation treats CancelledError as a failed cancellation
cancel_response = await client.post(f"api/v1/build/{job_id}/cancel", headers=logged_in_headers)
assert cancel_response.status_code == codes.OK
# Verify the response structure indicates failure
response_data = cancel_response.json()
assert "success" in response_data
assert "message" in response_data
assert response_data["success"] is False
assert "failed to cancel" in response_data["message"].lower()
finally:
# Restore the original function to avoid affecting other tests
monkeypatch.setattr(langflow.api.v1.chat, "cancel_flow_build", original_cancel_flow_build)

View file

@ -237,6 +237,7 @@ export type StreamingRequestParams = {
body?: object;
onError?: (statusCode: number) => void;
onNetworkError?: (error: Error) => void;
buildController: AbortController;
};
async function performStreamingRequest({
@ -246,18 +247,18 @@ async function performStreamingRequest({
body,
onError,
onNetworkError,
buildController,
}: StreamingRequestParams) {
let headers = {
"Content-Type": "application/json",
// this flag is fundamental to ensure server stops tasks when client disconnects
Connection: "close",
};
const controller = new AbortController();
useFlowStore.getState().setBuildController(controller);
const params = {
method: method,
headers: headers,
signal: controller.signal,
signal: buildController.signal,
};
if (body) {
params["body"] = JSON.stringify(body);
@ -298,7 +299,7 @@ async function performStreamingRequest({
}
const shouldContinue = await onData(data);
if (!shouldContinue) {
controller.abort();
buildController.abort();
return;
}
} else {

View file

@ -1,3 +1,4 @@
import { MISSED_ERROR_ALERT } from "@/constants/alerts_constants";
import {
BASE_URL_API,
POLLING_INTERVAL,
@ -93,7 +94,7 @@ export async function updateVerticesOrder(
);
} catch (error: any) {
setErrorData({
title: "Oops! Looks like you missed something",
title: MISSED_ERROR_ALERT,
list: [error.response?.data?.detail ?? "Unknown Error"],
});
useFlowStore.getState().setIsBuilding(false);
@ -161,6 +162,7 @@ async function pollBuildEvents(
onGetOrderSuccess?: () => void;
onValidateNodes?: (nodes: string[]) => void;
},
abortController: AbortController,
): Promise<void> {
let isDone = false;
while (!isDone) {
@ -169,6 +171,7 @@ async function pollBuildEvents(
headers: {
"Content-Type": "application/json",
},
signal: abortController.signal, // Add abort signal to fetch
});
if (!response.ok) {
@ -184,13 +187,17 @@ async function pollBuildEvents(
// Process the event
const event = JSON.parse(data.event);
await onEvent(
const result = await onEvent(
event.event,
event.data,
buildResults,
verticesStartTimeMs,
callbacks,
);
if (!result) {
isDone = true;
abortController.abort();
}
// Check if this was the end event or if we got a null value
if (event.event === "end" || data.event === null) {
@ -278,6 +285,24 @@ export async function buildFlowVertices({
const { job_id } = await buildResponse.json();
const cancelBuildUrl = `${BASE_URL_API}build/${job_id}/cancel`;
// Get the buildController from flowStore
const buildController = new AbortController();
buildController.signal.addEventListener("abort", () => {
try {
fetch(cancelBuildUrl, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
});
} catch (error) {
console.error("Error canceling build:", error);
}
});
useFlowStore.getState().setBuildController(buildController);
// Then stream the events
const eventsUrl = `${BASE_URL_API}build/${job_id}/events`;
const buildResults: Array<boolean> = [];
@ -314,6 +339,7 @@ export async function buildFlowVertices({
"Network error. Please check the connection to the server.",
]);
},
buildController,
});
} else {
const callbacks = {
@ -329,10 +355,15 @@ export async function buildFlowVertices({
buildResults,
verticesStartTimeMs,
callbacks,
buildController,
);
}
} catch (error) {
} catch (error: unknown) {
console.error("Build process error:", error);
if (error instanceof Error && error.name === "AbortError") {
onBuildStopped && onBuildStopped();
return;
}
onBuildError!("Error Building Flow", [
(error as Error).message || "An unexpected error occurred",
]);