From 1218246ba15dbcdca9fa8087ff2f5e8c81b68988 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Wed, 26 Mar 2025 10:43:10 -0300 Subject: [PATCH] fix: Implement cleanup grace period for job queue tasks (#7250) * feat: implement cleanup grace period for job queue tasks * Enhance JobQueueService to include a CLEANUP_GRACE_PERIOD, allowing tasks marked for cleanup to persist for a specified duration before actual removal. * Update internal data structure to track cleanup timestamps for each job. * Modify cleanup logic to check task status and manage cleanup timing effectively, ensuring resources are released appropriately after a grace period. * Improve documentation to clarify the new cleanup mechanism and its purpose. * fix: update queue data retrieval to include additional return value --- src/backend/base/langflow/api/build.py | 4 +- .../langflow/services/job_queue/service.py | 70 ++++++++++++++----- 2 files changed, 53 insertions(+), 21 deletions(-) diff --git a/src/backend/base/langflow/api/build.py b/src/backend/base/langflow/api/build.py index a68b538c0..6d74fa0f7 100644 --- a/src/backend/base/langflow/api/build.py +++ b/src/backend/base/langflow/api/build.py @@ -88,7 +88,7 @@ async def get_flow_events_response( ): """Get events for a specific build job, either as a stream or single event.""" try: - main_queue, event_manager, event_task = queue_service.get_queue_data(job_id) + main_queue, event_manager, event_task, _ = queue_service.get_queue_data(job_id) if stream: if event_task is None: raise HTTPException(status_code=404, detail="No event task found for job") @@ -463,7 +463,7 @@ async def cancel_flow_build( asyncio.CancelledError: If the task cancellation failed """ # Get the event task and event manager for the job - _, _, event_task = queue_service.get_queue_data(job_id) + _, _, event_task, _ = queue_service.get_queue_data(job_id) if event_task is None: logger.warning(f"No event task found for job_id {job_id}") diff --git a/src/backend/base/langflow/services/job_queue/service.py b/src/backend/base/langflow/services/job_queue/service.py index 3d2a437a3..1c39e4e04 100644 --- a/src/backend/base/langflow/services/job_queue/service.py +++ b/src/backend/base/langflow/services/job_queue/service.py @@ -18,15 +18,27 @@ class JobQueueService(Service): - Safely clean up resources by cancelling active tasks and emptying queues. - Automatically perform periodic cleanup of inactive or completed job queues. + The cleanup process follows a two-phase approach: + 1. When a task is cancelled or fails, it is marked for cleanup by setting a timestamp + 2. The actual cleanup only occurs after CLEANUP_GRACE_PERIOD seconds have elapsed + since the task was marked + Attributes: name (str): Unique identifier for the service. - _queues (dict[str, tuple[asyncio.Queue, EventManager, asyncio.Task | None]]): + _queues (dict[str, tuple[asyncio.Queue, EventManager, asyncio.Task | None, float | None]]): Dictionary mapping job IDs to a tuple containing: * The job's asyncio.Queue instance. * The associated EventManager instance. * The asyncio.Task processing the job (if any). + * The cleanup timestamp (if any). _cleanup_task (asyncio.Task | None): Background task for periodic cleanup. _closed (bool): Flag indicating whether the service is currently active. + CLEANUP_GRACE_PERIOD (int): Number of seconds to wait after a task is marked for cleanup + before actually removing it. This grace period allows for: + * Pending operations to complete + * Related systems to finish their work + * Inspection or recovery if needed + Default is 300 seconds (5 minutes). Example: service = JobQueueService() @@ -45,12 +57,15 @@ class JobQueueService(Service): """Initialize the JobQueueService. Sets up the internal registry for job queues, initializes the cleanup task, and sets the service state - to active. + to active. The service includes a grace period mechanism for cleanup, where tasks marked for removal + (due to cancellation or failure) will persist in the system for CLEANUP_GRACE_PERIOD seconds before + being actually removed. """ - self._queues: dict[str, tuple[asyncio.Queue, EventManager, asyncio.Task | None]] = {} + self._queues: dict[str, tuple[asyncio.Queue, EventManager, asyncio.Task | None, float | None]] = {} self._cleanup_task: asyncio.Task | None = None self._closed = False self.ready = False + self.CLEANUP_GRACE_PERIOD = 300 # 5 minutes before cleaning up marked tasks def is_started(self) -> bool: """Check if the JobQueueService has started. @@ -126,7 +141,7 @@ class JobQueueService(Service): event_manager = create_default_event_manager(main_queue) # Register the queue without an active task. - self._queues[job_id] = (main_queue, event_manager, None) + self._queues[job_id] = (main_queue, event_manager, None, None) logger.debug(f"Queue and event manager successfully created for job_id {job_id}") return main_queue, event_manager @@ -153,7 +168,7 @@ class JobQueueService(Service): logger.error(msg) raise RuntimeError(msg) - main_queue, event_manager, existing_task = self._queues[job_id] + main_queue, event_manager, existing_task, _ = self._queues[job_id] if existing_task and not existing_task.done(): logger.debug(f"Existing task for job_id {job_id} detected; cancelling it.") @@ -161,18 +176,19 @@ class JobQueueService(Service): # Initiate the new asynchronous task. task = asyncio.create_task(task_coro) - self._queues[job_id] = (main_queue, event_manager, task) + self._queues[job_id] = (main_queue, event_manager, task, None) logger.debug(f"New task started for job_id {job_id}") - def get_queue_data(self, job_id: str) -> tuple[asyncio.Queue, EventManager, asyncio.Task | None]: + def get_queue_data(self, job_id: str) -> tuple[asyncio.Queue, EventManager, asyncio.Task | None, float | None]: """Retrieve the complete data structure associated with a job's queue. Args: job_id (str): Unique identifier for the job. Returns: - tuple[asyncio.Queue, EventManager, asyncio.Task | None]: - A tuple containing the job's main queue, its linked event manager, and the associated task (if any). + tuple[asyncio.Queue, EventManager, asyncio.Task | None, float | None]: + A tuple containing the job's main queue, its linked event manager, the associated task (if any), + and the cleanup timestamp (if any). """ if job_id not in self._queues: msg = f"No queue found for job_id {job_id}" @@ -203,7 +219,7 @@ class JobQueueService(Service): return logger.info(f"Commencing cleanup for job_id {job_id}") - main_queue, _event_manager, task = self._queues[job_id] + main_queue, _event_manager, task, _ = self._queues[job_id] # Cancel the associated task if it is still running. if task and not task.done(): @@ -250,14 +266,30 @@ class JobQueueService(Service): logger.error(f"Exception encountered during periodic cleanup: {exc}") async def _cleanup_old_queues(self) -> None: - """Scan all registered job queues and clean up those with inactive tasks. + """Scan all registered job queues and clean up those with completed or failed tasks.""" + current_time = asyncio.get_running_loop().time() - For each job: - - Check whether the associated task is either complete or cancelled. - - If so, execute the cleanup_job method to release the job's resources. - """ for job_id in list(self._queues.keys()): - _, _, task = self._queues[job_id] - if task and task.done(): - logger.debug(f"Job queue for job_id {job_id} marked for cleanup.") - await self.cleanup_job(job_id) + _, _, task, cleanup_time = self._queues[job_id] + if task: + logger.debug( + f"Queue {job_id} status - Done: {task.done()}, " + f"Cancelled: {task.cancelled()}, " + f"Has exception: {task.exception() is not None if task.done() else 'N/A'}" + ) + + # Check if task should be marked for cleanup + if task and (task.cancelled() or (task.done() and task.exception() is not None)): + if cleanup_time is None: + # Mark for cleanup by setting the timestamp + self._queues[job_id] = ( + self._queues[job_id][0], + self._queues[job_id][1], + self._queues[job_id][2], + current_time, + ) + logger.debug(f"Job queue for job_id {job_id} marked for cleanup - Task cancelled or failed") + elif current_time - cleanup_time >= self.CLEANUP_GRACE_PERIOD: + # Enough time has passed, perform the actual cleanup + logger.debug(f"Cleaning up job_id {job_id} after grace period") + await self.cleanup_job(job_id)