feat: Add flow build cancellation endpoint and functionality (#6813)

* feat: Add flow build cancellation endpoint and support

Implement new functionality to cancel ongoing flow build jobs:
- Add `cancel_flow_build` function in build.py to handle job cancellation
- Create new `/build/{job_id}/cancel` endpoint in chat.py
- Introduce `CancelFlowResponse` schema for cancellation response
- Implement robust error handling and logging for job cancellation process

* [autofix.ci] apply automated fixes

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
Gabriel Luiz Freitas Almeida 2025-02-25 14:17:12 -03:00 committed by GitHub
commit d02b4573e5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 84 additions and 2 deletions

View file

@ -426,3 +426,47 @@ async def generate_flow_events(
raise
event_manager.on_end(data={})
await event_manager.queue.put((None, None, time.time()))
async def cancel_flow_build(
*,
job_id: str,
queue_service: JobQueueService,
) -> bool:
"""Cancel an ongoing flow build job.
Args:
job_id: The unique identifier of the job to cancel
queue_service: The service managing job queues
Returns:
True if the job was successfully canceled or doesn't need cancellation
False if the cancellation failed
Raises:
ValueError: If the job doesn't exist
"""
# Get the event task and event manager for the job
_, _, event_task = queue_service.get_queue_data(job_id)
if event_task is None:
logger.warning(f"No event task found for job_id {job_id}")
return True # Nothing to cancel is still a success
if event_task.done():
logger.info(f"Task for job_id {job_id} is already completed")
return True # Nothing to cancel is still a success
# Store the task reference to check status after cleanup
task_before_cleanup = event_task
# Perform cleanup using the queue service
await queue_service.cleanup_job(job_id)
# Verify that the task was actually cancelled
# The task should be done (cancelled) after cleanup
if task_before_cleanup.cancelled():
logger.info(f"Successfully cancelled flow build for job_id {job_id}")
return True
logger.error(f"Failed to cancel flow build for job_id {job_id}, task is still running")
return False

View file

@ -5,11 +5,12 @@ import traceback
import uuid
from typing import TYPE_CHECKING, Annotated
from fastapi import APIRouter, BackgroundTasks, Body, Depends, HTTPException
from fastapi import APIRouter, BackgroundTasks, Body, Depends, HTTPException, status
from fastapi.responses import StreamingResponse
from loguru import logger
from langflow.api.build import (
cancel_flow_build,
get_flow_events_response,
start_flow_build,
)
@ -25,6 +26,7 @@ from langflow.api.utils import (
parse_exception,
)
from langflow.api.v1.schemas import (
CancelFlowResponse,
FlowDataRequest,
InputValueRequest,
ResultDataResponse,
@ -177,6 +179,32 @@ async def get_build_events(
)
@router.post("/build/{job_id}/cancel", response_model=CancelFlowResponse)
async def cancel_build(
job_id: str,
queue_service: Annotated[JobQueueService, Depends(get_queue_service)],
):
"""Cancel a specific build job."""
try:
# Cancel the flow build and check if it was successful
cancellation_success = await cancel_flow_build(job_id=job_id, queue_service=queue_service)
if cancellation_success:
# Cancellation succeeded or wasn't needed
return CancelFlowResponse(success=True, message="Flow build cancelled successfully")
# Cancellation was attempted but failed
return CancelFlowResponse(success=False, message="Failed to cancel flow build")
except ValueError as exc:
# Job not found
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc
except Exception as exc:
# Any other unexpected error
logger.exception(f"Error cancelling flow build for job_id {job_id}: {exc}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(exc)) from exc
@router.post("/build/{flow_id}/vertices/{vertex_id}", deprecated=True)
async def build_vertex(
*,

View file

@ -377,3 +377,10 @@ class ConfigResponse(BaseModel):
health_check_max_retries: int
max_file_size_upload: int
event_delivery: Literal["polling", "streaming"]
class CancelFlowResponse(BaseModel):
"""Response model for flow build cancellation."""
success: bool
message: str