From c11c18d7193c69fb8049457fb37ec5d26c3e2bf8 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Fri, 21 Jun 2024 22:01:33 -0300 Subject: [PATCH] =?UTF-8?q?=F0=9F=93=9D=20(service.py):=20add=20async=20tr?= =?UTF-8?q?acing?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 📝 (log.py): add LoggableType to define the type of log messages in the schema 📝 (schema.py): import LoggableType from log.py to use in defining the type of log messages in the Log class 📝 (service.py): add asyncio functionality to log_worker method for processing logs asynchronously 📝 (service.py): add start, flush, and stop methods to manage the logging worker task asynchronously 📝 (service.py): change initialize_tracers method to be asynchronous and start the logging worker 📝 (service.py): add async functionality to end method to stop the logging worker 📝 (service.py): change add_log method to be asynchronous and use asyncio to start the logging worker if not running 📝 (service.py): change contextmanager to asynccontextmanager for trace_context method to handle asynchronous operations --- src/backend/base/langflow/schema/log.py | 5 +++ .../base/langflow/services/tracing/schema.py | 4 +- .../base/langflow/services/tracing/service.py | 44 +++++++++++++++++-- 3 files changed, 48 insertions(+), 5 deletions(-) create mode 100644 src/backend/base/langflow/schema/log.py diff --git a/src/backend/base/langflow/schema/log.py b/src/backend/base/langflow/schema/log.py new file mode 100644 index 000000000..771ff0670 --- /dev/null +++ b/src/backend/base/langflow/schema/log.py @@ -0,0 +1,5 @@ +from typing import Union + +from pydantic import BaseModel + +LoggableType = Union[str, dict, list, int, float, bool, None, BaseModel] diff --git a/src/backend/base/langflow/services/tracing/schema.py b/src/backend/base/langflow/services/tracing/schema.py index f6343914a..7519c974f 100644 --- a/src/backend/base/langflow/services/tracing/schema.py +++ b/src/backend/base/langflow/services/tracing/schema.py @@ -1,7 +1,9 @@ from typing_extensions import TypedDict +from langflow.schema.log import LoggableType + class Log(TypedDict): name: str - message: list | dict | str + message: LoggableType type: str diff --git a/src/backend/base/langflow/services/tracing/service.py b/src/backend/base/langflow/services/tracing/service.py index accdcfa49..e9761b039 100644 --- a/src/backend/base/langflow/services/tracing/service.py +++ b/src/backend/base/langflow/services/tracing/service.py @@ -1,6 +1,7 @@ +import asyncio import os import traceback -from contextlib import contextmanager +from contextlib import asynccontextmanager from datetime import datetime, timezone from typing import TYPE_CHECKING, Any, Dict @@ -29,12 +30,41 @@ class TracingService(Service): self.run_id = None self.project_name = None self._tracers: dict[str, LangSmithTracer] = {} + self.logs_queue = asyncio.Queue() + self.running = False + self.worker_task = None + + async def log_worker(self): + while self.running or not self.logs_queue.empty(): + log_func, args = await self.logs_queue.get() + try: + await log_func(*args) + except Exception as e: + logger.error(f"Error processing log: {e}") + finally: + self.logs_queue.task_done() + + async def start(self): + if not self.running: + self.running = True + self.worker_task = asyncio.create_task(self.log_worker()) + + async def flush(self): + await self.logs_queue.join() + + async def stop(self): + self.running = False + await self.flush() + self.worker_task.cancel() + if self.worker_task: + await self.worker_task def _reset_io(self): self.inputs = {} self.outputs = {} - def initialize_tracers(self): + async def initialize_tracers(self): + await self.start() self._initialize_langsmith_tracer() def _initialize_langsmith_tracer(self): @@ -80,8 +110,9 @@ class TracingService(Service): async def end(self, outputs: dict[str, Any] | None = None, error: str | None = None): self._end_all_traces(outputs, error) self._reset_io() + await self.stop() - async def add_log(self, trace_name: str, log: Log): + async def _add_log(self, trace_name: str, log: Log): for tracer in self._tracers.values(): if not tracer.ready: continue @@ -90,7 +121,12 @@ class TracingService(Service): except Exception as e: logger.error(f"Error adding log to trace {trace_name}: {e}") - @contextmanager + def add_log(self, trace_name: str, log: Log): + if not self.running: + asyncio.run(self.start()) + self.logs_queue.put_nowait((self._add_log, (trace_name, log))) + + @asynccontextmanager def trace_context( self, trace_name: str, trace_type: str, inputs: Dict[str, Any] = None, metadata: Dict[str, Any] = None ):