fix: transaction logging of pandas dataframes (#7716)

* Fix transaction logging of pandas dataframes

* cleaner class check

* ruff

* ruff

* [autofix.ci] apply automated fixes

* update serialization logic to skip json format

* adds a custom run response to serialize dfs too?

* just use runresponse

* [autofix.ci] apply automated fixes

* remove customrunresponse

* Fix fastapi type

* [autofix.ci] apply automated fixes

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
Jordan Frazier 2025-04-30 14:51:47 -07:00 committed by GitHub
commit e18b55042e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 41 additions and 18 deletions

View file

@ -267,7 +267,7 @@ async def run_flow_generator(
await event_manager.queue.put((None, None, time.time))
@router.post("/run/{flow_id_or_name}", response_model_exclude_none=True) # noqa: RUF100, FAST003
@router.post("/run/{flow_id_or_name}", response_model=None, response_model_exclude_none=True)
async def simplified_run_flow(
*,
background_tasks: BackgroundTasks,

View file

@ -1,11 +1,11 @@
from __future__ import annotations
import json
from collections.abc import Generator
from enum import Enum
from typing import TYPE_CHECKING, Any
from uuid import UUID
import pandas as pd
from loguru import logger
from langflow.interface.utils import extract_input_variables_from_prompt
@ -124,12 +124,26 @@ async def log_transaction(
else:
return
inputs = _vertex_to_primitive_dict(source)
# Convert the result to a serializable format
if source.result:
try:
result_dict = source.result.model_dump()
for key, value in result_dict.items():
if isinstance(value, pd.DataFrame):
result_dict[key] = value.to_dict()
outputs = result_dict
except Exception as e: # noqa: BLE001
logger.warning(f"Error serializing result: {e!s}")
outputs = None
else:
outputs = None
transaction = TransactionBase(
vertex_id=source.id,
target_id=target.id if target else None,
inputs=inputs,
# ugly hack to get the model dump with weird datatypes
outputs=json.loads(source.result.model_dump_json()) if source.result else None,
outputs=outputs,
status=status,
error=error,
flow_id=flow_id if isinstance(flow_id, UUID) else UUID(flow_id),
@ -139,8 +153,8 @@ async def log_transaction(
inserted = await crud_log_transaction(session, transaction)
if inserted:
logger.debug(f"Logged transaction: {inserted.id}")
except Exception: # noqa: BLE001
logger.error("Error logging transaction")
except Exception as exc: # noqa: BLE001
logger.error(f"Error logging transaction: {exc!s}")
async def log_vertex_build(

View file

@ -140,18 +140,27 @@ def _is_numpy_type(obj: Any) -> bool:
def _serialize_numpy_type(obj: Any, max_length: int | None, max_items: int | None) -> Any:
"""Serialize numpy types."""
if np.issubdtype(obj.dtype, np.number) and hasattr(obj, "item"):
return obj.item()
if np.issubdtype(obj.dtype, np.bool_):
return bool(obj)
if np.issubdtype(obj.dtype, np.complexfloating):
return complex(cast("complex", obj))
if np.issubdtype(obj.dtype, np.str_):
return _serialize_str(str(obj), max_length, max_items)
if np.issubdtype(obj.dtype, np.bytes_) and hasattr(obj, "tobytes"):
return _serialize_bytes(obj.tobytes(), max_length, max_items)
if np.issubdtype(obj.dtype, np.object_) and hasattr(obj, "item"):
return _serialize_instance(obj.item(), max_length, max_items)
try:
# For single-element arrays
if obj.size == 1 and hasattr(obj, "item"):
return obj.item()
# For multi-element arrays
if np.issubdtype(obj.dtype, np.number):
return obj.tolist() # Convert to Python list
if np.issubdtype(obj.dtype, np.bool_):
return bool(obj)
if np.issubdtype(obj.dtype, np.complexfloating):
return complex(cast("complex", obj))
if np.issubdtype(obj.dtype, np.str_):
return _serialize_str(str(obj), max_length, max_items)
if np.issubdtype(obj.dtype, np.bytes_) and hasattr(obj, "tobytes"):
return _serialize_bytes(obj.tobytes(), max_length, max_items)
if np.issubdtype(obj.dtype, np.object_) and hasattr(obj, "item"):
return _serialize_instance(obj.item(), max_length, max_items)
except Exception as e: # noqa: BLE001
logger.debug(f"Cannot serialize numpy array: {e!s}")
return UNSERIALIZABLE_SENTINEL
return UNSERIALIZABLE_SENTINEL