diff --git a/src/backend/base/langflow/api/build.py b/src/backend/base/langflow/api/build.py index 6d74fa0f7..dde85cdc8 100644 --- a/src/backend/base/langflow/api/build.py +++ b/src/backend/base/langflow/api/build.py @@ -34,7 +34,7 @@ from langflow.schema.message import ErrorMessage from langflow.schema.schema import OutputValue from langflow.services.database.models.flow import Flow from langflow.services.deps import get_chat_service, get_telemetry_service, session_scope -from langflow.services.job_queue.service import JobQueueService +from langflow.services.job_queue.service import JobQueueNotFoundError, JobQueueService from langflow.services.telemetry.schema import ComponentPayload, PlaygroundPayload @@ -91,6 +91,7 @@ async def get_flow_events_response( main_queue, event_manager, event_task, _ = queue_service.get_queue_data(job_id) if stream: if event_task is None: + logger.error(f"No event task found for job {job_id}") raise HTTPException(status_code=404, detail="No event task found for job") return await create_flow_response( queue=main_queue, @@ -99,17 +100,30 @@ async def get_flow_events_response( ) # Polling mode - get exactly one event - _, value, _ = await main_queue.get() - if value is None: - # End of stream, trigger end event - if event_task is not None: - event_task.cancel() - event_manager.on_end(data={}) + try: + _, value, _ = await main_queue.get() + if value is None: + # End of stream, trigger end event + if event_task is not None: + event_task.cancel() + event_manager.on_end(data={}) - return JSONResponse({"event": value.decode("utf-8") if value else None}) + return JSONResponse({"event": value.decode("utf-8") if value else None}) + except asyncio.CancelledError as exc: + logger.info(f"Event polling was cancelled for job {job_id}") + raise HTTPException(status_code=499, detail="Event polling was cancelled") from exc + except asyncio.TimeoutError as exc: + logger.warning(f"Timeout while waiting for events for job {job_id}") + raise HTTPException(status_code=408, detail="Timeout while waiting for events") from exc - except ValueError as exc: - raise HTTPException(status_code=404, detail=str(exc)) from exc + except JobQueueNotFoundError as exc: + logger.error(f"Job not found: {job_id}. Error: {exc!s}") + raise HTTPException(status_code=404, detail=f"Job not found: {exc!s}") from exc + except Exception as exc: + if isinstance(exc, HTTPException): + raise + logger.exception(f"Unexpected error processing flow events for job {job_id}") + raise HTTPException(status_code=500, detail=f"Unexpected error: {exc!s}") from exc async def create_flow_response( diff --git a/src/backend/base/langflow/api/v1/chat.py b/src/backend/base/langflow/api/v1/chat.py index 42e0ebce2..7f21116e2 100644 --- a/src/backend/base/langflow/api/v1/chat.py +++ b/src/backend/base/langflow/api/v1/chat.py @@ -58,7 +58,7 @@ from langflow.services.deps import ( get_telemetry_service, session_scope, ) -from langflow.services.job_queue.service import JobQueueService +from langflow.services.job_queue.service import JobQueueNotFoundError, JobQueueService from langflow.services.telemetry.schema import ComponentPayload, PlaygroundPayload if TYPE_CHECKING: @@ -233,6 +233,9 @@ async def cancel_build( except ValueError as exc: # Job not found raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc + except JobQueueNotFoundError as exc: + logger.error(f"Job not found: {job_id}. Error: {exc!s}") + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Job not found: {exc!s}") from exc except Exception as exc: # Any other unexpected error logger.exception(f"Error cancelling flow build for job_id {job_id}: {exc}") diff --git a/src/backend/base/langflow/services/job_queue/service.py b/src/backend/base/langflow/services/job_queue/service.py index 1c39e4e04..f62cd7320 100644 --- a/src/backend/base/langflow/services/job_queue/service.py +++ b/src/backend/base/langflow/services/job_queue/service.py @@ -8,6 +8,14 @@ from langflow.events.event_manager import EventManager, create_default_event_man from langflow.services.base import Service +class JobQueueNotFoundError(Exception): + """Exception raised when a job queue is not found.""" + + def __init__(self, job_id: str) -> None: + self.job_id = job_id + super().__init__(f"Job queue not found for job_id: {job_id}") + + class JobQueueService(Service): """Asynchronous service for managing job-specific queues and their associated tasks. @@ -57,9 +65,7 @@ class JobQueueService(Service): """Initialize the JobQueueService. Sets up the internal registry for job queues, initializes the cleanup task, and sets the service state - to active. The service includes a grace period mechanism for cleanup, where tasks marked for removal - (due to cancellation or failure) will persist in the system for CLEANUP_GRACE_PERIOD seconds before - being actually removed. + to active. """ self._queues: dict[str, tuple[asyncio.Queue, EventManager, asyncio.Task | None, float | None]] = {} self._cleanup_task: asyncio.Task | None = None @@ -189,18 +195,19 @@ class JobQueueService(Service): tuple[asyncio.Queue, EventManager, asyncio.Task | None, float | None]: A tuple containing the job's main queue, its linked event manager, the associated task (if any), and the cleanup timestamp (if any). - """ - if job_id not in self._queues: - msg = f"No queue found for job_id {job_id}" - logger.error(msg) - raise ValueError(msg) + Raises: + JobQueueNotFoundError: If the job_id is not found. + RuntimeError: If the service is closed. + """ if self._closed: - msg = "Queue service is closed" - logger.error(msg) + msg = f"Queue service is closed for job_id: {job_id}" raise RuntimeError(msg) - return self._queues[job_id] + try: + return self._queues[job_id] + except KeyError as exc: + raise JobQueueNotFoundError(job_id) from exc async def cleanup_job(self, job_id: str) -> None: """Clean up and release resources for a specific job. diff --git a/src/backend/tests/unit/test_chat_endpoint.py b/src/backend/tests/unit/test_chat_endpoint.py index bedce5661..d9e39d4d0 100644 --- a/src/backend/tests/unit/test_chat_endpoint.py +++ b/src/backend/tests/unit/test_chat_endpoint.py @@ -99,7 +99,7 @@ async def test_build_flow_invalid_job_id(client, logged_in_headers): invalid_job_id = str(uuid.uuid4()) response = await get_build_events(client, invalid_job_id, logged_in_headers) assert response.status_code == codes.NOT_FOUND - assert "No queue found for job_id" in response.json()["detail"] + assert "Job not found" in response.json()["detail"] @pytest.mark.benchmark @@ -277,7 +277,7 @@ async def test_cancel_nonexistent_build(client, logged_in_headers): # Try to cancel a non-existent build response = await client.post(f"api/v1/build/{invalid_job_id}/cancel", headers=logged_in_headers) assert response.status_code == codes.NOT_FOUND - assert "No queue found for job_id" in response.json()["detail"] + assert "Job not found" in response.json()["detail"] @pytest.mark.benchmark diff --git a/src/frontend/src/utils/buildUtils.ts b/src/frontend/src/utils/buildUtils.ts index 766a6f0e0..1a101adc4 100644 --- a/src/frontend/src/utils/buildUtils.ts +++ b/src/frontend/src/utils/buildUtils.ts @@ -184,7 +184,11 @@ async function pollBuildEvents( }); if (!response.ok) { - throw new Error("Error polling build events"); + const errorData = await response.json().catch(() => ({})); + throw new Error( + errorData.detail || + "Langflow was not able to connect to the server. Please make sure your connection is working properly.", + ); } const data = await response.json(); @@ -362,7 +366,7 @@ export async function buildFlowVertices({ onGetOrderSuccess, onValidateNodes, }; - return pollBuildEvents( + return await pollBuildEvents( eventsUrl, buildResults, verticesStartTimeMs, @@ -377,7 +381,8 @@ export async function buildFlowVertices({ return; } onBuildError!("Error Building Flow", [ - (error as Error).message || "An unexpected error occurred", + (error as Error).message || + "Langflow was not able to connect to the server. Please make sure your connection is working properly.", ]); throw error; }