diff --git a/src/backend/base/langflow/schema/log.py b/src/backend/base/langflow/schema/log.py new file mode 100644 index 000000000..771ff0670 --- /dev/null +++ b/src/backend/base/langflow/schema/log.py @@ -0,0 +1,5 @@ +from typing import Union + +from pydantic import BaseModel + +LoggableType = Union[str, dict, list, int, float, bool, None, BaseModel] diff --git a/src/backend/base/langflow/services/tracing/schema.py b/src/backend/base/langflow/services/tracing/schema.py index f6343914a..7519c974f 100644 --- a/src/backend/base/langflow/services/tracing/schema.py +++ b/src/backend/base/langflow/services/tracing/schema.py @@ -1,7 +1,9 @@ from typing_extensions import TypedDict +from langflow.schema.log import LoggableType + class Log(TypedDict): name: str - message: list | dict | str + message: LoggableType type: str diff --git a/src/backend/base/langflow/services/tracing/service.py b/src/backend/base/langflow/services/tracing/service.py index accdcfa49..e9761b039 100644 --- a/src/backend/base/langflow/services/tracing/service.py +++ b/src/backend/base/langflow/services/tracing/service.py @@ -1,6 +1,7 @@ +import asyncio import os import traceback -from contextlib import contextmanager +from contextlib import asynccontextmanager from datetime import datetime, timezone from typing import TYPE_CHECKING, Any, Dict @@ -29,12 +30,41 @@ class TracingService(Service): self.run_id = None self.project_name = None self._tracers: dict[str, LangSmithTracer] = {} + self.logs_queue = asyncio.Queue() + self.running = False + self.worker_task = None + + async def log_worker(self): + while self.running or not self.logs_queue.empty(): + log_func, args = await self.logs_queue.get() + try: + await log_func(*args) + except Exception as e: + logger.error(f"Error processing log: {e}") + finally: + self.logs_queue.task_done() + + async def start(self): + if not self.running: + self.running = True + self.worker_task = asyncio.create_task(self.log_worker()) + + async def flush(self): + await self.logs_queue.join() + + async def stop(self): + self.running = False + await self.flush() + self.worker_task.cancel() + if self.worker_task: + await self.worker_task def _reset_io(self): self.inputs = {} self.outputs = {} - def initialize_tracers(self): + async def initialize_tracers(self): + await self.start() self._initialize_langsmith_tracer() def _initialize_langsmith_tracer(self): @@ -80,8 +110,9 @@ class TracingService(Service): async def end(self, outputs: dict[str, Any] | None = None, error: str | None = None): self._end_all_traces(outputs, error) self._reset_io() + await self.stop() - async def add_log(self, trace_name: str, log: Log): + async def _add_log(self, trace_name: str, log: Log): for tracer in self._tracers.values(): if not tracer.ready: continue @@ -90,7 +121,12 @@ class TracingService(Service): except Exception as e: logger.error(f"Error adding log to trace {trace_name}: {e}") - @contextmanager + def add_log(self, trace_name: str, log: Log): + if not self.running: + asyncio.run(self.start()) + self.logs_queue.put_nowait((self._add_log, (trace_name, log))) + + @asynccontextmanager def trace_context( self, trace_name: str, trace_type: str, inputs: Dict[str, Any] = None, metadata: Dict[str, Any] = None ):