refactor: remove async queue from tracing service (#6999)
This commit is contained in:
parent
e99d5948e1
commit
f748ea9bc8
1 changed files with 4 additions and 21 deletions
|
|
@ -60,35 +60,25 @@ class TracingService(Service):
|
|||
self.project_name: str | None = None
|
||||
self._tracers: dict[str, BaseTracer] = {}
|
||||
self._logs: dict[str, list[Log | dict[Any, Any]]] = defaultdict(list)
|
||||
self.logs_queue: asyncio.Queue = asyncio.Queue()
|
||||
self.running = False
|
||||
self.worker_task: asyncio.Task | None = None
|
||||
self.end_trace_tasks: set[asyncio.Task] = set()
|
||||
self.deactivated = self.settings_service.settings.deactivate_tracing
|
||||
self.session_id: str | None = None
|
||||
|
||||
async def log_worker(self) -> None:
|
||||
while self.running or not self.logs_queue.empty():
|
||||
log_func, args = await self.logs_queue.get()
|
||||
try:
|
||||
await log_func(*args)
|
||||
except Exception: # noqa: BLE001
|
||||
logger.exception("Error processing log")
|
||||
finally:
|
||||
self.logs_queue.task_done()
|
||||
|
||||
async def start(self) -> None:
|
||||
if self.running:
|
||||
return
|
||||
try:
|
||||
self.running = True
|
||||
self.worker_task = asyncio.create_task(self.log_worker())
|
||||
except Exception: # noqa: BLE001
|
||||
logger.exception("Error starting tracing service")
|
||||
|
||||
async def flush(self) -> None:
|
||||
# This method is kept for API compatibility but now has minimal implementation
|
||||
try:
|
||||
await self.logs_queue.join()
|
||||
# Wait for any pending trace tasks to complete
|
||||
if self.end_trace_tasks:
|
||||
await asyncio.gather(*self.end_trace_tasks, return_exceptions=True)
|
||||
except Exception: # noqa: BLE001
|
||||
logger.exception("Error flushing logs")
|
||||
|
||||
|
|
@ -96,13 +86,6 @@ class TracingService(Service):
|
|||
try:
|
||||
self.running = False
|
||||
await self.flush()
|
||||
# check the qeue is empty
|
||||
if not self.logs_queue.empty():
|
||||
await self.logs_queue.join()
|
||||
if self.worker_task:
|
||||
self.worker_task.cancel()
|
||||
self.worker_task = None
|
||||
|
||||
except Exception: # noqa: BLE001
|
||||
logger.exception("Error stopping tracing service")
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue