Add BaseTracer and refactor convert_to_langchain_types (#2402)
This PR adds a BaseTracer class and refactors the serialization logic in convert_to_langchain_types.
This commit is contained in:
commit
43f6ff0996
5 changed files with 83 additions and 11 deletions
|
|
@ -83,7 +83,7 @@ class CustomComponent(BaseComponent):
|
|||
_flows_data: Optional[List[Data]] = None
|
||||
_outputs: List[OutputLog] = []
|
||||
_logs: List[Log] = []
|
||||
_tracing_service: "TracingService"
|
||||
tracing_service: Optional["TracingService"] = None
|
||||
|
||||
def update_state(self, name: str, value: Any):
|
||||
if not self.vertex:
|
||||
|
|
@ -488,14 +488,14 @@ class CustomComponent(BaseComponent):
|
|||
Args:
|
||||
message (LoggableType | list[LoggableType]): The message to log.
|
||||
"""
|
||||
if name is None:
|
||||
name = self.display_name if self.display_name else self.__class__.__name__
|
||||
if hasattr(message, "model_dump") and isinstance(message, BaseModel):
|
||||
message = message.model_dump()
|
||||
if name is None and self.display_name:
|
||||
name = self.display_name
|
||||
else:
|
||||
name = self.__class__.__name__
|
||||
log = Log(message=message, type=get_artifact_type(message), name=name)
|
||||
self._logs.append(log)
|
||||
if self.vertex:
|
||||
self._tracing_service.add_log(trace_name=self.vertex.id, log=log)
|
||||
if self.tracing_service and self.vertex:
|
||||
self.tracing_service.add_log(trace_name=self.vertex.id, log=log)
|
||||
|
||||
def post_code_processing(self, new_build_config: dict, current_build_config: dict):
|
||||
"""
|
||||
|
|
|
|||
|
|
@ -18,8 +18,12 @@ def migrate_messages_from_monitor_service_to_database(session: Session) -> bool:
|
|||
from langflow.schema.message import Message
|
||||
from langflow.services.database.models.message import MessageTable
|
||||
|
||||
monitor_service = get_monitor_service()
|
||||
messages_df = monitor_service.get_messages()
|
||||
try:
|
||||
monitor_service = get_monitor_service()
|
||||
messages_df = monitor_service.get_messages()
|
||||
except Exception as e:
|
||||
logger.error(f"Error retrieving messages from monitor service: {e}")
|
||||
return False
|
||||
|
||||
if messages_df.empty:
|
||||
logger.info("No messages to migrate.")
|
||||
|
|
|
|||
33
src/backend/base/langflow/services/tracing/base.py
Normal file
33
src/backend/base/langflow/services/tracing/base.py
Normal file
|
|
@ -0,0 +1,33 @@
|
|||
from abc import ABC, abstractmethod
|
||||
from typing import Any, Dict
|
||||
from uuid import UUID
|
||||
|
||||
|
||||
class BaseTracer(ABC):
|
||||
@abstractmethod
|
||||
def __init__(self, trace_name: str, trace_type: str, project_name: str, trace_id: UUID):
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def ready(self):
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def add_trace(
|
||||
self, trace_name: str, trace_type: str, inputs: Dict[str, Any], metadata: Dict[str, Any] | None = None
|
||||
):
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def end_trace(self, trace_name: str, outputs: Dict[str, Any] | None = None, error: str | None = None):
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def end(
|
||||
self,
|
||||
inputs: dict[str, Any],
|
||||
outputs: Dict[str, Any],
|
||||
error: str | None = None,
|
||||
metadata: dict[str, Any] | None = None,
|
||||
):
|
||||
raise NotImplementedError
|
||||
|
|
@ -12,6 +12,7 @@ from loguru import logger
|
|||
|
||||
from langflow.schema.data import Data
|
||||
from langflow.services.base import Service
|
||||
from langflow.services.tracing.base import BaseTracer
|
||||
from langflow.services.tracing.schema import Log
|
||||
|
||||
if TYPE_CHECKING:
|
||||
|
|
@ -180,7 +181,7 @@ class TracingService(Service):
|
|||
self.outputs_metadata[trace_name] |= output_metadata or {}
|
||||
|
||||
|
||||
class LangSmithTracer:
|
||||
class LangSmithTracer(BaseTracer):
|
||||
def __init__(self, trace_name: str, trace_type: str, project_name: str, trace_id: UUID):
|
||||
from langsmith.run_trees import RunTree
|
||||
|
||||
|
|
@ -292,7 +293,7 @@ class LangSmithTracer:
|
|||
inputs: dict[str, Any],
|
||||
outputs: Dict[str, Any],
|
||||
error: str | None = None,
|
||||
metadata: Optional[dict[str, Any]] = None,
|
||||
metadata: dict[str, Any] | None = None,
|
||||
):
|
||||
self._run_tree.add_metadata({"inputs": inputs, "metadata": metadata or {}})
|
||||
self._run_tree.end(outputs=outputs, error=error)
|
||||
|
|
|
|||
34
src/backend/base/langflow/services/tracing/utils.py
Normal file
34
src/backend/base/langflow/services/tracing/utils.py
Normal file
|
|
@ -0,0 +1,34 @@
|
|||
from typing import Any, Dict
|
||||
|
||||
from langflow.schema.data import Data
|
||||
|
||||
|
||||
def convert_to_langchain_type(value):
|
||||
from langflow.schema.message import Message
|
||||
|
||||
if isinstance(value, dict):
|
||||
for key, _value in value.copy().items():
|
||||
_value = convert_to_langchain_type(_value)
|
||||
value[key] = _value
|
||||
elif isinstance(value, list):
|
||||
value = [convert_to_langchain_type(v) for v in value]
|
||||
elif isinstance(value, Message):
|
||||
if "prompt" in value:
|
||||
value = value.load_lc_prompt()
|
||||
elif value.sender:
|
||||
value = value.to_lc_message()
|
||||
else:
|
||||
value = value.to_lc_document()
|
||||
elif isinstance(value, Data):
|
||||
if "text" in value.data:
|
||||
value = value.to_lc_document()
|
||||
else:
|
||||
value = value.data
|
||||
return value
|
||||
|
||||
|
||||
def convert_to_langchain_types(io_dict: Dict[str, Any]):
|
||||
converted = {}
|
||||
for key, value in io_dict.items():
|
||||
converted[key] = convert_to_langchain_type(value)
|
||||
return converted
|
||||
Loading…
Add table
Add a link
Reference in a new issue