refactor: Update logs to outputs in CustomComponent and TracingService classes
This commit is contained in:
parent
e19d9c90ea
commit
94df7be2dd
3 changed files with 31 additions and 3 deletions
|
|
@ -16,6 +16,7 @@ from langflow.schema.message import Message
|
|||
from langflow.schema.schema import OutputLog
|
||||
from langflow.services.deps import get_storage_service, get_variable_service, session_scope
|
||||
from langflow.services.storage.service import StorageService
|
||||
from langflow.services.tracing.schema import Log
|
||||
from langflow.type_extraction.type_extraction import (
|
||||
extract_inner_type_from_generic_alias,
|
||||
extract_union_types_from_generic_alias,
|
||||
|
|
@ -82,7 +83,8 @@ class CustomComponent(BaseComponent):
|
|||
status: Optional[Any] = None
|
||||
"""The status of the component. This is displayed on the frontend. Defaults to None."""
|
||||
_flows_data: Optional[List[Data]] = None
|
||||
_logs: List[OutputLog] = []
|
||||
_outputs: List[OutputLog] = []
|
||||
_logs: List[Log] = []
|
||||
tracing_service: Optional["TracingService"] = None
|
||||
|
||||
def update_state(self, name: str, value: Any):
|
||||
|
|
@ -481,12 +483,15 @@ class CustomComponent(BaseComponent):
|
|||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def log(self, message: LoggableType | list[LoggableType]):
|
||||
def log(self, message: LoggableType | list[LoggableType], name: str | None = None):
|
||||
"""
|
||||
Logs a message.
|
||||
|
||||
Args:
|
||||
message (LoggableType | list[LoggableType]): The message to log.
|
||||
"""
|
||||
log = OutputLog(message=message, type=get_artifact_type(message))
|
||||
if name is None:
|
||||
name = self.display_name
|
||||
log = Log(message=message, type=get_artifact_type(message), name=name)
|
||||
self._logs.append(log)
|
||||
self.tracing_service.add_log(trace_name=self.vertex.id, log=log)
|
||||
|
|
|
|||
7
src/backend/base/langflow/services/tracing/schema.py
Normal file
7
src/backend/base/langflow/services/tracing/schema.py
Normal file
|
|
@ -0,0 +1,7 @@
|
|||
from typing_extensions import TypedDict
|
||||
|
||||
|
||||
class Log(TypedDict):
|
||||
name: str
|
||||
message: list | dict | str
|
||||
type: str
|
||||
|
|
@ -1,6 +1,7 @@
|
|||
import os
|
||||
import traceback
|
||||
from contextlib import contextmanager
|
||||
from datetime import datetime, timezone
|
||||
from typing import TYPE_CHECKING, Any, Dict
|
||||
|
||||
from langchain.callbacks.tracers.langchain import wait_for_all_tracers
|
||||
|
|
@ -8,6 +9,7 @@ from loguru import logger
|
|||
|
||||
from langflow.schema.data import Data
|
||||
from langflow.services.base import Service
|
||||
from langflow.services.tracing.schema import Log
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from langflow.services.monitor.service import MonitorService
|
||||
|
|
@ -79,6 +81,15 @@ class TracingService(Service):
|
|||
self._end_all_traces(outputs, error)
|
||||
self._reset_io()
|
||||
|
||||
async def add_log(self, trace_name: str, log: Log):
|
||||
for tracer in self._tracers.values():
|
||||
if not tracer.ready:
|
||||
continue
|
||||
try:
|
||||
tracer.add_log(trace_name, log)
|
||||
except Exception as e:
|
||||
logger.error(f"Error adding log to trace {trace_name}: {e}")
|
||||
|
||||
@contextmanager
|
||||
def trace_context(
|
||||
self, trace_name: str, trace_type: str, inputs: Dict[str, Any] = None, metadata: Dict[str, Any] = None
|
||||
|
|
@ -189,6 +200,11 @@ class LangSmithTracer:
|
|||
child.patch()
|
||||
else:
|
||||
child.post()
|
||||
self._child_link[trace_name] = child.get_url()
|
||||
|
||||
def add_log(self, trace_name: str, log: Log):
|
||||
log_dict = {"name": log.name, "time": datetime.now(timezone.utc).isoformat(), "message": log.message}
|
||||
self._children[trace_name].add_event(log_dict)
|
||||
|
||||
def end(self, outputs: Dict[str, Any], error: str | None = None):
|
||||
self._run_tree.end(outputs=outputs, error=error)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue