From 94df7be2dd0acdb664987ce0e0efd9aff56c9a12 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Fri, 21 Jun 2024 19:57:49 -0300 Subject: [PATCH] refactor: Update logs to outputs in CustomComponent and TracingService classes --- .../custom/custom_component/custom_component.py | 11 ++++++++--- .../base/langflow/services/tracing/schema.py | 7 +++++++ .../base/langflow/services/tracing/service.py | 16 ++++++++++++++++ 3 files changed, 31 insertions(+), 3 deletions(-) create mode 100644 src/backend/base/langflow/services/tracing/schema.py diff --git a/src/backend/base/langflow/custom/custom_component/custom_component.py b/src/backend/base/langflow/custom/custom_component/custom_component.py index ba60c91a2..16a4cecc3 100644 --- a/src/backend/base/langflow/custom/custom_component/custom_component.py +++ b/src/backend/base/langflow/custom/custom_component/custom_component.py @@ -16,6 +16,7 @@ from langflow.schema.message import Message from langflow.schema.schema import OutputLog from langflow.services.deps import get_storage_service, get_variable_service, session_scope from langflow.services.storage.service import StorageService +from langflow.services.tracing.schema import Log from langflow.type_extraction.type_extraction import ( extract_inner_type_from_generic_alias, extract_union_types_from_generic_alias, @@ -82,7 +83,8 @@ class CustomComponent(BaseComponent): status: Optional[Any] = None """The status of the component. This is displayed on the frontend. Defaults to None.""" _flows_data: Optional[List[Data]] = None - _logs: List[OutputLog] = [] + _outputs: List[OutputLog] = [] + _logs: List[Log] = [] tracing_service: Optional["TracingService"] = None def update_state(self, name: str, value: Any): @@ -481,12 +483,15 @@ class CustomComponent(BaseComponent): """ raise NotImplementedError - def log(self, message: LoggableType | list[LoggableType]): + def log(self, message: LoggableType | list[LoggableType], name: str | None = None): """ Logs a message. Args: message (LoggableType | list[LoggableType]): The message to log. """ - log = OutputLog(message=message, type=get_artifact_type(message)) + if name is None: + name = self.display_name + log = Log(message=message, type=get_artifact_type(message), name=name) self._logs.append(log) + self.tracing_service.add_log(trace_name=self.vertex.id, log=log) diff --git a/src/backend/base/langflow/services/tracing/schema.py b/src/backend/base/langflow/services/tracing/schema.py new file mode 100644 index 000000000..f6343914a --- /dev/null +++ b/src/backend/base/langflow/services/tracing/schema.py @@ -0,0 +1,7 @@ +from typing_extensions import TypedDict + + +class Log(TypedDict): + name: str + message: list | dict | str + type: str diff --git a/src/backend/base/langflow/services/tracing/service.py b/src/backend/base/langflow/services/tracing/service.py index c28d31e6f..accdcfa49 100644 --- a/src/backend/base/langflow/services/tracing/service.py +++ b/src/backend/base/langflow/services/tracing/service.py @@ -1,6 +1,7 @@ import os import traceback from contextlib import contextmanager +from datetime import datetime, timezone from typing import TYPE_CHECKING, Any, Dict from langchain.callbacks.tracers.langchain import wait_for_all_tracers @@ -8,6 +9,7 @@ from loguru import logger from langflow.schema.data import Data from langflow.services.base import Service +from langflow.services.tracing.schema import Log if TYPE_CHECKING: from langflow.services.monitor.service import MonitorService @@ -79,6 +81,15 @@ class TracingService(Service): self._end_all_traces(outputs, error) self._reset_io() + async def add_log(self, trace_name: str, log: Log): + for tracer in self._tracers.values(): + if not tracer.ready: + continue + try: + tracer.add_log(trace_name, log) + except Exception as e: + logger.error(f"Error adding log to trace {trace_name}: {e}") + @contextmanager def trace_context( self, trace_name: str, trace_type: str, inputs: Dict[str, Any] = None, metadata: Dict[str, Any] = None @@ -189,6 +200,11 @@ class LangSmithTracer: child.patch() else: child.post() + self._child_link[trace_name] = child.get_url() + + def add_log(self, trace_name: str, log: Log): + log_dict = {"name": log.name, "time": datetime.now(timezone.utc).isoformat(), "message": log.message} + self._children[trace_name].add_event(log_dict) def end(self, outputs: Dict[str, Any], error: str | None = None): self._run_tree.end(outputs=outputs, error=error)