From e18b55042e75143fb7c326b8641e24fee592551e Mon Sep 17 00:00:00 2001 From: Jordan Frazier <122494242+jordanrfrazier@users.noreply.github.com> Date: Wed, 30 Apr 2025 14:51:47 -0700 Subject: [PATCH] fix: transaction logging of pandas dataframes (#7716) * Fix transaction logging of pandas dataframes * cleaner class check * ruff * ruff * [autofix.ci] apply automated fixes * update serialization logic to skip json format * adds a custom run response to serialize dfs too? * just use runresponse * [autofix.ci] apply automated fixes * remove customrunresponse * Fix fastapi type * [autofix.ci] apply automated fixes --------- Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- src/backend/base/langflow/api/v1/endpoints.py | 2 +- src/backend/base/langflow/graph/utils.py | 24 +++++++++++--- .../langflow/serialization/serialization.py | 33 ++++++++++++------- 3 files changed, 41 insertions(+), 18 deletions(-) diff --git a/src/backend/base/langflow/api/v1/endpoints.py b/src/backend/base/langflow/api/v1/endpoints.py index 533f2d64f..8de2cfc84 100644 --- a/src/backend/base/langflow/api/v1/endpoints.py +++ b/src/backend/base/langflow/api/v1/endpoints.py @@ -267,7 +267,7 @@ async def run_flow_generator( await event_manager.queue.put((None, None, time.time)) -@router.post("/run/{flow_id_or_name}", response_model_exclude_none=True) # noqa: RUF100, FAST003 +@router.post("/run/{flow_id_or_name}", response_model=None, response_model_exclude_none=True) async def simplified_run_flow( *, background_tasks: BackgroundTasks, diff --git a/src/backend/base/langflow/graph/utils.py b/src/backend/base/langflow/graph/utils.py index 6ac86da5a..d8c767b7a 100644 --- a/src/backend/base/langflow/graph/utils.py +++ b/src/backend/base/langflow/graph/utils.py @@ -1,11 +1,11 @@ from __future__ import annotations -import json from collections.abc import Generator from enum import Enum from typing import TYPE_CHECKING, Any from uuid import UUID +import pandas as pd from loguru import logger from langflow.interface.utils import extract_input_variables_from_prompt @@ -124,12 +124,26 @@ async def log_transaction( else: return inputs = _vertex_to_primitive_dict(source) + + # Convert the result to a serializable format + if source.result: + try: + result_dict = source.result.model_dump() + for key, value in result_dict.items(): + if isinstance(value, pd.DataFrame): + result_dict[key] = value.to_dict() + outputs = result_dict + except Exception as e: # noqa: BLE001 + logger.warning(f"Error serializing result: {e!s}") + outputs = None + else: + outputs = None + transaction = TransactionBase( vertex_id=source.id, target_id=target.id if target else None, inputs=inputs, - # ugly hack to get the model dump with weird datatypes - outputs=json.loads(source.result.model_dump_json()) if source.result else None, + outputs=outputs, status=status, error=error, flow_id=flow_id if isinstance(flow_id, UUID) else UUID(flow_id), @@ -139,8 +153,8 @@ async def log_transaction( inserted = await crud_log_transaction(session, transaction) if inserted: logger.debug(f"Logged transaction: {inserted.id}") - except Exception: # noqa: BLE001 - logger.error("Error logging transaction") + except Exception as exc: # noqa: BLE001 + logger.error(f"Error logging transaction: {exc!s}") async def log_vertex_build( diff --git a/src/backend/base/langflow/serialization/serialization.py b/src/backend/base/langflow/serialization/serialization.py index c5f22de80..276b38d25 100644 --- a/src/backend/base/langflow/serialization/serialization.py +++ b/src/backend/base/langflow/serialization/serialization.py @@ -140,18 +140,27 @@ def _is_numpy_type(obj: Any) -> bool: def _serialize_numpy_type(obj: Any, max_length: int | None, max_items: int | None) -> Any: """Serialize numpy types.""" - if np.issubdtype(obj.dtype, np.number) and hasattr(obj, "item"): - return obj.item() - if np.issubdtype(obj.dtype, np.bool_): - return bool(obj) - if np.issubdtype(obj.dtype, np.complexfloating): - return complex(cast("complex", obj)) - if np.issubdtype(obj.dtype, np.str_): - return _serialize_str(str(obj), max_length, max_items) - if np.issubdtype(obj.dtype, np.bytes_) and hasattr(obj, "tobytes"): - return _serialize_bytes(obj.tobytes(), max_length, max_items) - if np.issubdtype(obj.dtype, np.object_) and hasattr(obj, "item"): - return _serialize_instance(obj.item(), max_length, max_items) + try: + # For single-element arrays + if obj.size == 1 and hasattr(obj, "item"): + return obj.item() + + # For multi-element arrays + if np.issubdtype(obj.dtype, np.number): + return obj.tolist() # Convert to Python list + if np.issubdtype(obj.dtype, np.bool_): + return bool(obj) + if np.issubdtype(obj.dtype, np.complexfloating): + return complex(cast("complex", obj)) + if np.issubdtype(obj.dtype, np.str_): + return _serialize_str(str(obj), max_length, max_items) + if np.issubdtype(obj.dtype, np.bytes_) and hasattr(obj, "tobytes"): + return _serialize_bytes(obj.tobytes(), max_length, max_items) + if np.issubdtype(obj.dtype, np.object_) and hasattr(obj, "item"): + return _serialize_instance(obj.item(), max_length, max_items) + except Exception as e: # noqa: BLE001 + logger.debug(f"Cannot serialize numpy array: {e!s}") + return UNSERIALIZABLE_SENTINEL return UNSERIALIZABLE_SENTINEL