diff --git a/src/backend/base/langflow/api/v1/chat.py b/src/backend/base/langflow/api/v1/chat.py index 00204fcaf..09c985c3a 100644 --- a/src/backend/base/langflow/api/v1/chat.py +++ b/src/backend/base/langflow/api/v1/chat.py @@ -350,12 +350,12 @@ async def build_flow( build_task = asyncio.create_task(_build_vertex(vertex_id, graph, event_manager)) try: await build_task + vertex_build_response: VertexBuildResponse = build_task.result() except asyncio.CancelledError as exc: logger.exception(exc) build_task.cancel() return - vertex_build_response: VertexBuildResponse = build_task.result() # send built event or error event try: vertex_build_response_json = vertex_build_response.model_dump_json() diff --git a/src/backend/base/langflow/graph/vertex/base.py b/src/backend/base/langflow/graph/vertex/base.py index f65958285..08e8f441e 100644 --- a/src/backend/base/langflow/graph/vertex/base.py +++ b/src/backend/base/langflow/graph/vertex/base.py @@ -15,7 +15,7 @@ from loguru import logger from langflow.exceptions.component import ComponentBuildError from langflow.graph.schema import INPUT_COMPONENTS, OUTPUT_COMPONENTS, InterfaceComponentTypes, ResultData -from langflow.graph.utils import UnbuiltObject, UnbuiltResult, log_transaction +from langflow.graph.utils import UnbuiltObject, UnbuiltResult from langflow.interface import initialize from langflow.interface.listing import lazy_load_dict from langflow.schema.artifact import ArtifactType @@ -620,12 +620,29 @@ class Vertex: async with self._lock: return await self._get_result(requester, target_handle_name) - def _log_transaction_async( + async def _log_transaction_async( self, flow_id: str | UUID, source: Vertex, status, target: Vertex | None = None, error=None ) -> None: - task = asyncio.create_task(log_transaction(flow_id, source, status, target, error)) - self.log_transaction_tasks.add(task) - task.add_done_callback(self.log_transaction_tasks.discard) + """Log a transaction asynchronously with proper task handling and cancellation. + + Args: + flow_id: The ID of the flow + source: Source vertex + status: Transaction status + target: Optional target vertex + error: Optional error information + """ + # Commenting this out for now + # async with self._lock: + # if self.log_transaction_tasks: + # # Safely await and remove completed tasks + # task = self.log_transaction_tasks.pop() + # await task + + # # Create and track new task + # task = asyncio.create_task(log_transaction(flow_id, source, status, target, error)) + # self.log_transaction_tasks.add(task) + # task.add_done_callback(self.log_transaction_tasks.discard) async def _get_result( self, @@ -642,13 +659,13 @@ class Vertex: flow_id = self.graph.flow_id if not self.built: if flow_id: - self._log_transaction_async(str(flow_id), source=self, target=requester, status="error") + await self._log_transaction_async(str(flow_id), source=self, target=requester, status="error") msg = f"Component {self.display_name} has not been built yet" raise ValueError(msg) result = self.built_result if self.use_result else self.built_object if flow_id: - self._log_transaction_async(str(flow_id), source=self, target=requester, status="success") + await self._log_transaction_async(str(flow_id), source=self, target=requester, status="success") return result async def _build_vertex_and_update_params(self, key, vertex: Vertex) -> None: diff --git a/src/backend/base/langflow/graph/vertex/vertex_types.py b/src/backend/base/langflow/graph/vertex/vertex_types.py index 038daea23..45412da0b 100644 --- a/src/backend/base/langflow/graph/vertex/vertex_types.py +++ b/src/backend/base/langflow/graph/vertex/vertex_types.py @@ -107,7 +107,7 @@ class ComponentVertex(Vertex): default_value = requester.get_value_from_template_dict(edge.target_param) if flow_id: - self._log_transaction_async(source=self, target=requester, flow_id=str(flow_id), status="error") + await self._log_transaction_async(source=self, target=requester, flow_id=str(flow_id), status="error") if default_value is not UNDEFINED: return default_value msg = f"Component {self.display_name} has not been built yet" @@ -146,7 +146,7 @@ class ComponentVertex(Vertex): msg = f"Result not found for {edge.source_handle.name} in {edge}" raise ValueError(msg) if flow_id: - self._log_transaction_async(source=self, target=requester, flow_id=str(flow_id), status="success") + await self._log_transaction_async(source=self, target=requester, flow_id=str(flow_id), status="success") return result def extract_messages_from_artifacts(self, artifacts: dict[str, Any]) -> list[dict]: