fix: improve task management in _log_transaction_async to avoid database locks (#5896)

* refactor: Update _log_transaction_async to be asynchronous and improve task management

- Changed _log_transaction_async method from synchronous to asynchronous to allow proper handling of transaction logging.
- Added error handling for task cancellation and ensured that pending tasks are managed correctly.
- Updated calls to _log_transaction_async in the ComponentVertex class to await the asynchronous method, ensuring proper flow execution and error handling.

* refactor: Simplify _log_transaction_async method by removing redundant error handling

- Streamlined the _log_transaction_async method to enhance readability and maintainability.
- Removed unnecessary try-except blocks for task cancellation, as the async context already handles task management effectively.
- Ensured that completed tasks are awaited and cleared properly, improving overall task management.

* fix: Correctly handle vertex build response in asynchronous flow construction

- Moved the retrieval of the vertex build response to occur after awaiting the build task, ensuring proper handling of task completion.
- Improved error handling by maintaining the cancellation logic while ensuring the response is only accessed after the task is completed.
- This change enhances the reliability of the flow construction process in the chat API.

* fix: Improve task management in Vertex class by refining log transaction handling

- Updated the log transaction handling in the Vertex class to await a single task instead of gathering all tasks, enhancing efficiency.
- Removed the clearing of the task list, ensuring that only the most recent task is processed, which simplifies the task management logic.
- This change aims to improve the reliability and performance of asynchronous logging in the flow execution.

* refactor: Comment out log transaction handling in Vertex class for future review
This commit is contained in:
Gabriel Luiz Freitas Almeida 2025-01-23 20:03:10 -03:00 committed by GitHub
commit 55c9ee2eb2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 27 additions and 10 deletions

View file

@ -350,12 +350,12 @@ async def build_flow(
build_task = asyncio.create_task(_build_vertex(vertex_id, graph, event_manager))
try:
await build_task
vertex_build_response: VertexBuildResponse = build_task.result()
except asyncio.CancelledError as exc:
logger.exception(exc)
build_task.cancel()
return
vertex_build_response: VertexBuildResponse = build_task.result()
# send built event or error event
try:
vertex_build_response_json = vertex_build_response.model_dump_json()

View file

@ -15,7 +15,7 @@ from loguru import logger
from langflow.exceptions.component import ComponentBuildError
from langflow.graph.schema import INPUT_COMPONENTS, OUTPUT_COMPONENTS, InterfaceComponentTypes, ResultData
from langflow.graph.utils import UnbuiltObject, UnbuiltResult, log_transaction
from langflow.graph.utils import UnbuiltObject, UnbuiltResult
from langflow.interface import initialize
from langflow.interface.listing import lazy_load_dict
from langflow.schema.artifact import ArtifactType
@ -620,12 +620,29 @@ class Vertex:
async with self._lock:
return await self._get_result(requester, target_handle_name)
def _log_transaction_async(
async def _log_transaction_async(
self, flow_id: str | UUID, source: Vertex, status, target: Vertex | None = None, error=None
) -> None:
task = asyncio.create_task(log_transaction(flow_id, source, status, target, error))
self.log_transaction_tasks.add(task)
task.add_done_callback(self.log_transaction_tasks.discard)
"""Log a transaction asynchronously with proper task handling and cancellation.
Args:
flow_id: The ID of the flow
source: Source vertex
status: Transaction status
target: Optional target vertex
error: Optional error information
"""
# Commenting this out for now
# async with self._lock:
# if self.log_transaction_tasks:
# # Safely await and remove completed tasks
# task = self.log_transaction_tasks.pop()
# await task
# # Create and track new task
# task = asyncio.create_task(log_transaction(flow_id, source, status, target, error))
# self.log_transaction_tasks.add(task)
# task.add_done_callback(self.log_transaction_tasks.discard)
async def _get_result(
self,
@ -642,13 +659,13 @@ class Vertex:
flow_id = self.graph.flow_id
if not self.built:
if flow_id:
self._log_transaction_async(str(flow_id), source=self, target=requester, status="error")
await self._log_transaction_async(str(flow_id), source=self, target=requester, status="error")
msg = f"Component {self.display_name} has not been built yet"
raise ValueError(msg)
result = self.built_result if self.use_result else self.built_object
if flow_id:
self._log_transaction_async(str(flow_id), source=self, target=requester, status="success")
await self._log_transaction_async(str(flow_id), source=self, target=requester, status="success")
return result
async def _build_vertex_and_update_params(self, key, vertex: Vertex) -> None:

View file

@ -107,7 +107,7 @@ class ComponentVertex(Vertex):
default_value = requester.get_value_from_template_dict(edge.target_param)
if flow_id:
self._log_transaction_async(source=self, target=requester, flow_id=str(flow_id), status="error")
await self._log_transaction_async(source=self, target=requester, flow_id=str(flow_id), status="error")
if default_value is not UNDEFINED:
return default_value
msg = f"Component {self.display_name} has not been built yet"
@ -146,7 +146,7 @@ class ComponentVertex(Vertex):
msg = f"Result not found for {edge.source_handle.name} in {edge}"
raise ValueError(msg)
if flow_id:
self._log_transaction_async(source=self, target=requester, flow_id=str(flow_id), status="success")
await self._log_transaction_async(source=self, target=requester, flow_id=str(flow_id), status="success")
return result
def extract_messages_from_artifacts(self, artifacts: dict[str, Any]) -> list[dict]: