diff --git a/src/backend/base/langflow/services/tracing/service.py b/src/backend/base/langflow/services/tracing/service.py index f7ff6228b..9f02ca87d 100644 --- a/src/backend/base/langflow/services/tracing/service.py +++ b/src/backend/base/langflow/services/tracing/service.py @@ -60,35 +60,25 @@ class TracingService(Service): self.project_name: str | None = None self._tracers: dict[str, BaseTracer] = {} self._logs: dict[str, list[Log | dict[Any, Any]]] = defaultdict(list) - self.logs_queue: asyncio.Queue = asyncio.Queue() self.running = False - self.worker_task: asyncio.Task | None = None self.end_trace_tasks: set[asyncio.Task] = set() self.deactivated = self.settings_service.settings.deactivate_tracing self.session_id: str | None = None - async def log_worker(self) -> None: - while self.running or not self.logs_queue.empty(): - log_func, args = await self.logs_queue.get() - try: - await log_func(*args) - except Exception: # noqa: BLE001 - logger.exception("Error processing log") - finally: - self.logs_queue.task_done() - async def start(self) -> None: if self.running: return try: self.running = True - self.worker_task = asyncio.create_task(self.log_worker()) except Exception: # noqa: BLE001 logger.exception("Error starting tracing service") async def flush(self) -> None: + # This method is kept for API compatibility but now has minimal implementation try: - await self.logs_queue.join() + # Wait for any pending trace tasks to complete + if self.end_trace_tasks: + await asyncio.gather(*self.end_trace_tasks, return_exceptions=True) except Exception: # noqa: BLE001 logger.exception("Error flushing logs") @@ -96,13 +86,6 @@ class TracingService(Service): try: self.running = False await self.flush() - # check the qeue is empty - if not self.logs_queue.empty(): - await self.logs_queue.join() - if self.worker_task: - self.worker_task.cancel() - self.worker_task = None - except Exception: # noqa: BLE001 logger.exception("Error stopping tracing service")