From d02b4573e508d5264a07b7391e3b221eb13463bc Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Tue, 25 Feb 2025 14:17:12 -0300 Subject: [PATCH] feat: Add flow build cancellation endpoint and functionality (#6813) * feat: Add flow build cancellation endpoint and support Implement new functionality to cancel ongoing flow build jobs: - Add `cancel_flow_build` function in build.py to handle job cancellation - Create new `/build/{job_id}/cancel` endpoint in chat.py - Introduce `CancelFlowResponse` schema for cancellation response - Implement robust error handling and logging for job cancellation process * [autofix.ci] apply automated fixes --------- Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- src/backend/base/langflow/api/build.py | 44 +++++++++++++++++++++ src/backend/base/langflow/api/v1/chat.py | 30 +++++++++++++- src/backend/base/langflow/api/v1/schemas.py | 7 ++++ uv.lock | 5 ++- 4 files changed, 84 insertions(+), 2 deletions(-) diff --git a/src/backend/base/langflow/api/build.py b/src/backend/base/langflow/api/build.py index 384a74bdd..139925c83 100644 --- a/src/backend/base/langflow/api/build.py +++ b/src/backend/base/langflow/api/build.py @@ -426,3 +426,47 @@ async def generate_flow_events( raise event_manager.on_end(data={}) await event_manager.queue.put((None, None, time.time())) + + +async def cancel_flow_build( + *, + job_id: str, + queue_service: JobQueueService, +) -> bool: + """Cancel an ongoing flow build job. + + Args: + job_id: The unique identifier of the job to cancel + queue_service: The service managing job queues + + Returns: + True if the job was successfully canceled or doesn't need cancellation + False if the cancellation failed + + Raises: + ValueError: If the job doesn't exist + """ + # Get the event task and event manager for the job + _, _, event_task = queue_service.get_queue_data(job_id) + + if event_task is None: + logger.warning(f"No event task found for job_id {job_id}") + return True # Nothing to cancel is still a success + + if event_task.done(): + logger.info(f"Task for job_id {job_id} is already completed") + return True # Nothing to cancel is still a success + + # Store the task reference to check status after cleanup + task_before_cleanup = event_task + + # Perform cleanup using the queue service + await queue_service.cleanup_job(job_id) + + # Verify that the task was actually cancelled + # The task should be done (cancelled) after cleanup + if task_before_cleanup.cancelled(): + logger.info(f"Successfully cancelled flow build for job_id {job_id}") + return True + logger.error(f"Failed to cancel flow build for job_id {job_id}, task is still running") + return False diff --git a/src/backend/base/langflow/api/v1/chat.py b/src/backend/base/langflow/api/v1/chat.py index d9c88908e..5bcd7db64 100644 --- a/src/backend/base/langflow/api/v1/chat.py +++ b/src/backend/base/langflow/api/v1/chat.py @@ -5,11 +5,12 @@ import traceback import uuid from typing import TYPE_CHECKING, Annotated -from fastapi import APIRouter, BackgroundTasks, Body, Depends, HTTPException +from fastapi import APIRouter, BackgroundTasks, Body, Depends, HTTPException, status from fastapi.responses import StreamingResponse from loguru import logger from langflow.api.build import ( + cancel_flow_build, get_flow_events_response, start_flow_build, ) @@ -25,6 +26,7 @@ from langflow.api.utils import ( parse_exception, ) from langflow.api.v1.schemas import ( + CancelFlowResponse, FlowDataRequest, InputValueRequest, ResultDataResponse, @@ -177,6 +179,32 @@ async def get_build_events( ) +@router.post("/build/{job_id}/cancel", response_model=CancelFlowResponse) +async def cancel_build( + job_id: str, + queue_service: Annotated[JobQueueService, Depends(get_queue_service)], +): + """Cancel a specific build job.""" + try: + # Cancel the flow build and check if it was successful + cancellation_success = await cancel_flow_build(job_id=job_id, queue_service=queue_service) + + if cancellation_success: + # Cancellation succeeded or wasn't needed + return CancelFlowResponse(success=True, message="Flow build cancelled successfully") + # Cancellation was attempted but failed + return CancelFlowResponse(success=False, message="Failed to cancel flow build") + + except ValueError as exc: + # Job not found + + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc + except Exception as exc: + # Any other unexpected error + logger.exception(f"Error cancelling flow build for job_id {job_id}: {exc}") + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(exc)) from exc + + @router.post("/build/{flow_id}/vertices/{vertex_id}", deprecated=True) async def build_vertex( *, diff --git a/src/backend/base/langflow/api/v1/schemas.py b/src/backend/base/langflow/api/v1/schemas.py index a1a58f5b4..8ff419d39 100644 --- a/src/backend/base/langflow/api/v1/schemas.py +++ b/src/backend/base/langflow/api/v1/schemas.py @@ -377,3 +377,10 @@ class ConfigResponse(BaseModel): health_check_max_retries: int max_file_size_upload: int event_delivery: Literal["polling", "streaming"] + + +class CancelFlowResponse(BaseModel): + """Response model for flow build cancellation.""" + + success: bool + message: str diff --git a/uv.lock b/uv.lock index d575f7236..a997c68e2 100644 --- a/uv.lock +++ b/uv.lock @@ -1,4 +1,5 @@ version = 1 +revision = 1 requires-python = ">=3.10, <3.14" resolution-markers = [ "python_full_version >= '3.13'", @@ -4491,7 +4492,7 @@ requires-dist = [ { name = "pymongo", specifier = "==4.10.1" }, { name = "python-pptx", marker = "extra == 'nv-ingest'", specifier = "==0.6.23" }, { name = "pytube", specifier = "==15.0.0" }, - { name = "pywin32", marker = "sys_platform == 'win32'", specifier = ">=307,<308" }, + { name = "pywin32", marker = "sys_platform == 'win32'", specifier = "==307" }, { name = "qdrant-client", specifier = "==1.9.2" }, { name = "qianfan", specifier = "==0.3.5" }, { name = "ragstack-ai-knowledge-store", specifier = "==0.2.1" }, @@ -4513,6 +4514,7 @@ requires-dist = [ { name = "youtube-transcript-api", specifier = "==0.6.3" }, { name = "zep-python", specifier = "==2.0.2" }, ] +provides-extras = ["deploy", "couchbase", "cassio", "local", "clickhouse-connect", "nv-ingest"] [package.metadata.requires-dev] dev = [ @@ -4763,6 +4765,7 @@ requires-dist = [ { name = "uvicorn", specifier = ">=0.30.0,<1.0.0" }, { name = "validators", specifier = ">=0.34.0" }, ] +provides-extras = ["deploy", "local", "all"] [package.metadata.requires-dev] dev = [