refactor: Update logging functionality to support asynchronous processing

This commit is contained in:
Gabriel Luiz Freitas Almeida 2024-06-21 22:01:47 -03:00
commit cdcd70464a
3 changed files with 15 additions and 10 deletions

View file

@ -118,7 +118,7 @@ class Component(CustomComponent):
async def build_results(self):
inputs = self.get_trace_as_inputs()
metadata = self.get_trace_as_metadata()
with self._tracing_service.trace_context(
async with self._tracing_service.trace_context(
f"{self.display_name} ({self.vertex.id})", self.trace_type, inputs, metadata
):
_results, _artifacts = await self._build_results()

View file

@ -12,7 +12,7 @@ from langflow.helpers.flow import list_flows, load_flow, run_flow
from langflow.schema import Data
from langflow.schema.artifact import get_artifact_type
from langflow.schema.dotdict import dotdict
from langflow.schema.message import Message
from langflow.schema.log import LoggableType
from langflow.schema.schema import OutputLog
from langflow.services.deps import get_storage_service, get_variable_service, session_scope
from langflow.services.storage.service import StorageService
@ -30,9 +30,6 @@ if TYPE_CHECKING:
from langflow.services.tracing.service import TracingService
LoggableType = Union[str, dict, list, int, float, bool, None, Data, Message]
class CustomComponent(BaseComponent):
"""
Represents a custom component in Langflow.
@ -491,7 +488,10 @@ class CustomComponent(BaseComponent):
message (LoggableType | list[LoggableType]): The message to log.
"""
if name is None:
name = self.display_name
name = self.display_name if self.display_name else self.__class__.__name__
if hasattr(message, "model_dump"):
message = message.model_dump()
log = Log(message=message, type=get_artifact_type(message), name=name)
self._logs.append(log)
self._tracing_service.add_log(trace_name=self.vertex.id, log=log)
if self.vertex:
self._tracing_service.add_log(trace_name=self.vertex.id, log=log)

View file

@ -26,6 +26,7 @@ from langflow.services.monitor.utils import log_transaction
if TYPE_CHECKING:
from langflow.graph.schema import ResultData
from langflow.services.tracing.service import TracingService
class Graph:
@ -86,7 +87,7 @@ class Graph:
self.define_vertices_lists()
self.state_manager = GraphStateManager()
try:
self.tracing_service = get_tracing_service()
self.tracing_service: "TracingService" | None = get_tracing_service()
except Exception as exc:
logger.error(f"Error getting tracing service: {exc}")
self.tracing_service = None
@ -226,7 +227,8 @@ class Graph:
for vertex in self.vertices:
self.state_manager.subscribe(run_id, vertex.update_graph_state)
self._run_id = run_id
self.tracing_service.set_run_id(run_id)
if self.tracing_service:
self.tracing_service.set_run_id(run_id)
def set_run_name(self):
# Given a flow name, flow_id
@ -236,7 +238,9 @@ class Graph:
self.set_run_id()
self.tracing_service.set_run_name(name)
self.tracing_service.initialize_tracers()
async def initialize_run(self):
await self.tracing_service.initialize_tracers()
async def end_all_traces(self, outputs: dict[str, Any] | None = None, error: str | None = None):
if not self.tracing_service:
@ -926,6 +930,7 @@ class Graph:
run_id = uuid.uuid4()
self.set_run_id(run_id)
self.set_run_name()
await self.initialize_run()
lock = chat_service._cache_locks[self.run_id]
while to_process:
current_batch = list(to_process) # Copy current deque items to a list