fix: Implement cleanup grace period for job queue tasks (#7250)

* feat: implement cleanup grace period for job queue tasks

* Enhance JobQueueService to include a CLEANUP_GRACE_PERIOD, allowing tasks marked for cleanup to persist for a specified duration before actual removal.
* Update internal data structure to track cleanup timestamps for each job.
* Modify cleanup logic to check task status and manage cleanup timing effectively, ensuring resources are released appropriately after a grace period.
* Improve documentation to clarify the new cleanup mechanism and its purpose.

* fix: update queue data retrieval to include additional return value
This commit is contained in:
Gabriel Luiz Freitas Almeida 2025-03-26 10:43:10 -03:00 committed by GitHub
commit 1218246ba1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 53 additions and 21 deletions

View file

@ -88,7 +88,7 @@ async def get_flow_events_response(
):
"""Get events for a specific build job, either as a stream or single event."""
try:
main_queue, event_manager, event_task = queue_service.get_queue_data(job_id)
main_queue, event_manager, event_task, _ = queue_service.get_queue_data(job_id)
if stream:
if event_task is None:
raise HTTPException(status_code=404, detail="No event task found for job")
@ -463,7 +463,7 @@ async def cancel_flow_build(
asyncio.CancelledError: If the task cancellation failed
"""
# Get the event task and event manager for the job
_, _, event_task = queue_service.get_queue_data(job_id)
_, _, event_task, _ = queue_service.get_queue_data(job_id)
if event_task is None:
logger.warning(f"No event task found for job_id {job_id}")

View file

@ -18,15 +18,27 @@ class JobQueueService(Service):
- Safely clean up resources by cancelling active tasks and emptying queues.
- Automatically perform periodic cleanup of inactive or completed job queues.
The cleanup process follows a two-phase approach:
1. When a task is cancelled or fails, it is marked for cleanup by setting a timestamp
2. The actual cleanup only occurs after CLEANUP_GRACE_PERIOD seconds have elapsed
since the task was marked
Attributes:
name (str): Unique identifier for the service.
_queues (dict[str, tuple[asyncio.Queue, EventManager, asyncio.Task | None]]):
_queues (dict[str, tuple[asyncio.Queue, EventManager, asyncio.Task | None, float | None]]):
Dictionary mapping job IDs to a tuple containing:
* The job's asyncio.Queue instance.
* The associated EventManager instance.
* The asyncio.Task processing the job (if any).
* The cleanup timestamp (if any).
_cleanup_task (asyncio.Task | None): Background task for periodic cleanup.
_closed (bool): Flag indicating whether the service is currently active.
CLEANUP_GRACE_PERIOD (int): Number of seconds to wait after a task is marked for cleanup
before actually removing it. This grace period allows for:
* Pending operations to complete
* Related systems to finish their work
* Inspection or recovery if needed
Default is 300 seconds (5 minutes).
Example:
service = JobQueueService()
@ -45,12 +57,15 @@ class JobQueueService(Service):
"""Initialize the JobQueueService.
Sets up the internal registry for job queues, initializes the cleanup task, and sets the service state
to active.
to active. The service includes a grace period mechanism for cleanup, where tasks marked for removal
(due to cancellation or failure) will persist in the system for CLEANUP_GRACE_PERIOD seconds before
being actually removed.
"""
self._queues: dict[str, tuple[asyncio.Queue, EventManager, asyncio.Task | None]] = {}
self._queues: dict[str, tuple[asyncio.Queue, EventManager, asyncio.Task | None, float | None]] = {}
self._cleanup_task: asyncio.Task | None = None
self._closed = False
self.ready = False
self.CLEANUP_GRACE_PERIOD = 300 # 5 minutes before cleaning up marked tasks
def is_started(self) -> bool:
"""Check if the JobQueueService has started.
@ -126,7 +141,7 @@ class JobQueueService(Service):
event_manager = create_default_event_manager(main_queue)
# Register the queue without an active task.
self._queues[job_id] = (main_queue, event_manager, None)
self._queues[job_id] = (main_queue, event_manager, None, None)
logger.debug(f"Queue and event manager successfully created for job_id {job_id}")
return main_queue, event_manager
@ -153,7 +168,7 @@ class JobQueueService(Service):
logger.error(msg)
raise RuntimeError(msg)
main_queue, event_manager, existing_task = self._queues[job_id]
main_queue, event_manager, existing_task, _ = self._queues[job_id]
if existing_task and not existing_task.done():
logger.debug(f"Existing task for job_id {job_id} detected; cancelling it.")
@ -161,18 +176,19 @@ class JobQueueService(Service):
# Initiate the new asynchronous task.
task = asyncio.create_task(task_coro)
self._queues[job_id] = (main_queue, event_manager, task)
self._queues[job_id] = (main_queue, event_manager, task, None)
logger.debug(f"New task started for job_id {job_id}")
def get_queue_data(self, job_id: str) -> tuple[asyncio.Queue, EventManager, asyncio.Task | None]:
def get_queue_data(self, job_id: str) -> tuple[asyncio.Queue, EventManager, asyncio.Task | None, float | None]:
"""Retrieve the complete data structure associated with a job's queue.
Args:
job_id (str): Unique identifier for the job.
Returns:
tuple[asyncio.Queue, EventManager, asyncio.Task | None]:
A tuple containing the job's main queue, its linked event manager, and the associated task (if any).
tuple[asyncio.Queue, EventManager, asyncio.Task | None, float | None]:
A tuple containing the job's main queue, its linked event manager, the associated task (if any),
and the cleanup timestamp (if any).
"""
if job_id not in self._queues:
msg = f"No queue found for job_id {job_id}"
@ -203,7 +219,7 @@ class JobQueueService(Service):
return
logger.info(f"Commencing cleanup for job_id {job_id}")
main_queue, _event_manager, task = self._queues[job_id]
main_queue, _event_manager, task, _ = self._queues[job_id]
# Cancel the associated task if it is still running.
if task and not task.done():
@ -250,14 +266,30 @@ class JobQueueService(Service):
logger.error(f"Exception encountered during periodic cleanup: {exc}")
async def _cleanup_old_queues(self) -> None:
"""Scan all registered job queues and clean up those with inactive tasks.
"""Scan all registered job queues and clean up those with completed or failed tasks."""
current_time = asyncio.get_running_loop().time()
For each job:
- Check whether the associated task is either complete or cancelled.
- If so, execute the cleanup_job method to release the job's resources.
"""
for job_id in list(self._queues.keys()):
_, _, task = self._queues[job_id]
if task and task.done():
logger.debug(f"Job queue for job_id {job_id} marked for cleanup.")
await self.cleanup_job(job_id)
_, _, task, cleanup_time = self._queues[job_id]
if task:
logger.debug(
f"Queue {job_id} status - Done: {task.done()}, "
f"Cancelled: {task.cancelled()}, "
f"Has exception: {task.exception() is not None if task.done() else 'N/A'}"
)
# Check if task should be marked for cleanup
if task and (task.cancelled() or (task.done() and task.exception() is not None)):
if cleanup_time is None:
# Mark for cleanup by setting the timestamp
self._queues[job_id] = (
self._queues[job_id][0],
self._queues[job_id][1],
self._queues[job_id][2],
current_time,
)
logger.debug(f"Job queue for job_id {job_id} marked for cleanup - Task cancelled or failed")
elif current_time - cleanup_time >= self.CLEANUP_GRACE_PERIOD:
# Enough time has passed, perform the actual cleanup
logger.debug(f"Cleaning up job_id {job_id} after grace period")
await self.cleanup_job(job_id)