diff --git a/src/backend/base/langflow/custom/custom_component/custom_component.py b/src/backend/base/langflow/custom/custom_component/custom_component.py index 24df42894..ee8192496 100644 --- a/src/backend/base/langflow/custom/custom_component/custom_component.py +++ b/src/backend/base/langflow/custom/custom_component/custom_component.py @@ -83,7 +83,7 @@ class CustomComponent(BaseComponent): _flows_data: Optional[List[Data]] = None _outputs: List[OutputLog] = [] _logs: List[Log] = [] - _tracing_service: "TracingService" + tracing_service: Optional["TracingService"] = None def update_state(self, name: str, value: Any): if not self.vertex: @@ -488,14 +488,14 @@ class CustomComponent(BaseComponent): Args: message (LoggableType | list[LoggableType]): The message to log. """ - if name is None: - name = self.display_name if self.display_name else self.__class__.__name__ - if hasattr(message, "model_dump") and isinstance(message, BaseModel): - message = message.model_dump() + if name is None and self.display_name: + name = self.display_name + else: + name = self.__class__.__name__ log = Log(message=message, type=get_artifact_type(message), name=name) self._logs.append(log) - if self.vertex: - self._tracing_service.add_log(trace_name=self.vertex.id, log=log) + if self.tracing_service and self.vertex: + self.tracing_service.add_log(trace_name=self.vertex.id, log=log) def post_code_processing(self, new_build_config: dict, current_build_config: dict): """ diff --git a/src/backend/base/langflow/services/database/utils.py b/src/backend/base/langflow/services/database/utils.py index fa40c725f..9500bcc50 100644 --- a/src/backend/base/langflow/services/database/utils.py +++ b/src/backend/base/langflow/services/database/utils.py @@ -18,8 +18,12 @@ def migrate_messages_from_monitor_service_to_database(session: Session) -> bool: from langflow.schema.message import Message from langflow.services.database.models.message import MessageTable - monitor_service = get_monitor_service() - messages_df = monitor_service.get_messages() + try: + monitor_service = get_monitor_service() + messages_df = monitor_service.get_messages() + except Exception as e: + logger.error(f"Error retrieving messages from monitor service: {e}") + return False if messages_df.empty: logger.info("No messages to migrate.") diff --git a/src/backend/base/langflow/services/tracing/base.py b/src/backend/base/langflow/services/tracing/base.py new file mode 100644 index 000000000..2ec2eea26 --- /dev/null +++ b/src/backend/base/langflow/services/tracing/base.py @@ -0,0 +1,33 @@ +from abc import ABC, abstractmethod +from typing import Any, Dict +from uuid import UUID + + +class BaseTracer(ABC): + @abstractmethod + def __init__(self, trace_name: str, trace_type: str, project_name: str, trace_id: UUID): + raise NotImplementedError + + @abstractmethod + def ready(self): + raise NotImplementedError + + @abstractmethod + def add_trace( + self, trace_name: str, trace_type: str, inputs: Dict[str, Any], metadata: Dict[str, Any] | None = None + ): + raise NotImplementedError + + @abstractmethod + def end_trace(self, trace_name: str, outputs: Dict[str, Any] | None = None, error: str | None = None): + raise NotImplementedError + + @abstractmethod + def end( + self, + inputs: dict[str, Any], + outputs: Dict[str, Any], + error: str | None = None, + metadata: dict[str, Any] | None = None, + ): + raise NotImplementedError diff --git a/src/backend/base/langflow/services/tracing/service.py b/src/backend/base/langflow/services/tracing/service.py index 95795c988..4bdf17fa8 100644 --- a/src/backend/base/langflow/services/tracing/service.py +++ b/src/backend/base/langflow/services/tracing/service.py @@ -12,6 +12,7 @@ from loguru import logger from langflow.schema.data import Data from langflow.services.base import Service +from langflow.services.tracing.base import BaseTracer from langflow.services.tracing.schema import Log if TYPE_CHECKING: @@ -180,7 +181,7 @@ class TracingService(Service): self.outputs_metadata[trace_name] |= output_metadata or {} -class LangSmithTracer: +class LangSmithTracer(BaseTracer): def __init__(self, trace_name: str, trace_type: str, project_name: str, trace_id: UUID): from langsmith.run_trees import RunTree @@ -292,7 +293,7 @@ class LangSmithTracer: inputs: dict[str, Any], outputs: Dict[str, Any], error: str | None = None, - metadata: Optional[dict[str, Any]] = None, + metadata: dict[str, Any] | None = None, ): self._run_tree.add_metadata({"inputs": inputs, "metadata": metadata or {}}) self._run_tree.end(outputs=outputs, error=error) diff --git a/src/backend/base/langflow/services/tracing/utils.py b/src/backend/base/langflow/services/tracing/utils.py new file mode 100644 index 000000000..a90c94c99 --- /dev/null +++ b/src/backend/base/langflow/services/tracing/utils.py @@ -0,0 +1,34 @@ +from typing import Any, Dict + +from langflow.schema.data import Data + + +def convert_to_langchain_type(value): + from langflow.schema.message import Message + + if isinstance(value, dict): + for key, _value in value.copy().items(): + _value = convert_to_langchain_type(_value) + value[key] = _value + elif isinstance(value, list): + value = [convert_to_langchain_type(v) for v in value] + elif isinstance(value, Message): + if "prompt" in value: + value = value.load_lc_prompt() + elif value.sender: + value = value.to_lc_message() + else: + value = value.to_lc_document() + elif isinstance(value, Data): + if "text" in value.data: + value = value.to_lc_document() + else: + value = value.data + return value + + +def convert_to_langchain_types(io_dict: Dict[str, Any]): + converted = {} + for key, value in io_dict.items(): + converted[key] = convert_to_langchain_type(value) + return converted