📝 (service.py): add async tracing

📝 (log.py): add LoggableType to define the type of log messages in the schema
📝 (schema.py): import LoggableType from log.py to use in defining the type of log messages in the Log class
📝 (service.py): add asyncio functionality to log_worker method for processing logs asynchronously
📝 (service.py): add start, flush, and stop methods to manage the logging worker task asynchronously
📝 (service.py): change initialize_tracers method to be asynchronous and start the logging worker
📝 (service.py): add async functionality to end method to stop the logging worker
📝 (service.py): change add_log method to be asynchronous and use asyncio to start the logging worker if not running
📝 (service.py): change contextmanager to asynccontextmanager for trace_context method to handle asynchronous operations
This commit is contained in:
Gabriel Luiz Freitas Almeida 2024-06-21 22:01:33 -03:00
commit c11c18d719
3 changed files with 48 additions and 5 deletions

View file

@ -0,0 +1,5 @@
from typing import Union
from pydantic import BaseModel
LoggableType = Union[str, dict, list, int, float, bool, None, BaseModel]

View file

@ -1,7 +1,9 @@
from typing_extensions import TypedDict
from langflow.schema.log import LoggableType
class Log(TypedDict):
name: str
message: list | dict | str
message: LoggableType
type: str

View file

@ -1,6 +1,7 @@
import asyncio
import os
import traceback
from contextlib import contextmanager
from contextlib import asynccontextmanager
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any, Dict
@ -29,12 +30,41 @@ class TracingService(Service):
self.run_id = None
self.project_name = None
self._tracers: dict[str, LangSmithTracer] = {}
self.logs_queue = asyncio.Queue()
self.running = False
self.worker_task = None
async def log_worker(self):
while self.running or not self.logs_queue.empty():
log_func, args = await self.logs_queue.get()
try:
await log_func(*args)
except Exception as e:
logger.error(f"Error processing log: {e}")
finally:
self.logs_queue.task_done()
async def start(self):
if not self.running:
self.running = True
self.worker_task = asyncio.create_task(self.log_worker())
async def flush(self):
await self.logs_queue.join()
async def stop(self):
self.running = False
await self.flush()
self.worker_task.cancel()
if self.worker_task:
await self.worker_task
def _reset_io(self):
self.inputs = {}
self.outputs = {}
def initialize_tracers(self):
async def initialize_tracers(self):
await self.start()
self._initialize_langsmith_tracer()
def _initialize_langsmith_tracer(self):
@ -80,8 +110,9 @@ class TracingService(Service):
async def end(self, outputs: dict[str, Any] | None = None, error: str | None = None):
self._end_all_traces(outputs, error)
self._reset_io()
await self.stop()
async def add_log(self, trace_name: str, log: Log):
async def _add_log(self, trace_name: str, log: Log):
for tracer in self._tracers.values():
if not tracer.ready:
continue
@ -90,7 +121,12 @@ class TracingService(Service):
except Exception as e:
logger.error(f"Error adding log to trace {trace_name}: {e}")
@contextmanager
def add_log(self, trace_name: str, log: Log):
if not self.running:
asyncio.run(self.start())
self.logs_queue.put_nowait((self._add_log, (trace_name, log)))
@asynccontextmanager
def trace_context(
self, trace_name: str, trace_type: str, inputs: Dict[str, Any] = None, metadata: Dict[str, Any] = None
):