diff --git a/src/backend/base/langflow/custom/custom_component/component.py b/src/backend/base/langflow/custom/custom_component/component.py index 17b054e26..d6d6a5af1 100644 --- a/src/backend/base/langflow/custom/custom_component/component.py +++ b/src/backend/base/langflow/custom/custom_component/component.py @@ -118,7 +118,7 @@ class Component(CustomComponent): async def build_results(self): inputs = self.get_trace_as_inputs() metadata = self.get_trace_as_metadata() - with self._tracing_service.trace_context( + async with self._tracing_service.trace_context( f"{self.display_name} ({self.vertex.id})", self.trace_type, inputs, metadata ): _results, _artifacts = await self._build_results() diff --git a/src/backend/base/langflow/custom/custom_component/custom_component.py b/src/backend/base/langflow/custom/custom_component/custom_component.py index acdbd0803..a0a7813cd 100644 --- a/src/backend/base/langflow/custom/custom_component/custom_component.py +++ b/src/backend/base/langflow/custom/custom_component/custom_component.py @@ -12,7 +12,7 @@ from langflow.helpers.flow import list_flows, load_flow, run_flow from langflow.schema import Data from langflow.schema.artifact import get_artifact_type from langflow.schema.dotdict import dotdict -from langflow.schema.message import Message +from langflow.schema.log import LoggableType from langflow.schema.schema import OutputLog from langflow.services.deps import get_storage_service, get_variable_service, session_scope from langflow.services.storage.service import StorageService @@ -30,9 +30,6 @@ if TYPE_CHECKING: from langflow.services.tracing.service import TracingService -LoggableType = Union[str, dict, list, int, float, bool, None, Data, Message] - - class CustomComponent(BaseComponent): """ Represents a custom component in Langflow. @@ -491,7 +488,10 @@ class CustomComponent(BaseComponent): message (LoggableType | list[LoggableType]): The message to log. """ if name is None: - name = self.display_name + name = self.display_name if self.display_name else self.__class__.__name__ + if hasattr(message, "model_dump"): + message = message.model_dump() log = Log(message=message, type=get_artifact_type(message), name=name) self._logs.append(log) - self._tracing_service.add_log(trace_name=self.vertex.id, log=log) + if self.vertex: + self._tracing_service.add_log(trace_name=self.vertex.id, log=log) diff --git a/src/backend/base/langflow/graph/graph/base.py b/src/backend/base/langflow/graph/graph/base.py index 47c64edc2..b8fa6936b 100644 --- a/src/backend/base/langflow/graph/graph/base.py +++ b/src/backend/base/langflow/graph/graph/base.py @@ -26,6 +26,7 @@ from langflow.services.monitor.utils import log_transaction if TYPE_CHECKING: from langflow.graph.schema import ResultData + from langflow.services.tracing.service import TracingService class Graph: @@ -86,7 +87,7 @@ class Graph: self.define_vertices_lists() self.state_manager = GraphStateManager() try: - self.tracing_service = get_tracing_service() + self.tracing_service: "TracingService" | None = get_tracing_service() except Exception as exc: logger.error(f"Error getting tracing service: {exc}") self.tracing_service = None @@ -226,7 +227,8 @@ class Graph: for vertex in self.vertices: self.state_manager.subscribe(run_id, vertex.update_graph_state) self._run_id = run_id - self.tracing_service.set_run_id(run_id) + if self.tracing_service: + self.tracing_service.set_run_id(run_id) def set_run_name(self): # Given a flow name, flow_id @@ -236,7 +238,9 @@ class Graph: self.set_run_id() self.tracing_service.set_run_name(name) - self.tracing_service.initialize_tracers() + + async def initialize_run(self): + await self.tracing_service.initialize_tracers() async def end_all_traces(self, outputs: dict[str, Any] | None = None, error: str | None = None): if not self.tracing_service: @@ -926,6 +930,7 @@ class Graph: run_id = uuid.uuid4() self.set_run_id(run_id) self.set_run_name() + await self.initialize_run() lock = chat_service._cache_locks[self.run_id] while to_process: current_batch = list(to_process) # Copy current deque items to a list