fix: make sure data is json serializable (#4269)

* Refactor `recursive_serialize_or_str` function into a separate module

- Moved the `recursive_serialize_or_str` function from `schema.py` to a new `serialize.py` module for better modularity and reusability.
- Updated imports in `data.py`, `artifact.py`, and `schema.py` to reflect the new location of the `recursive_serialize_or_str` function.
- Enhanced the `recursive_serialize_or_str` function to handle `datetime` objects by converting them to ISO format.

* Enhance data serialization with recursive handling in `to_json` method

---------

Co-authored-by: Edwin Jose <edwin.jose@datastax.com>
This commit is contained in:
Gabriel Luiz Freitas Almeida 2024-10-24 13:47:02 -03:00 committed by GitHub
commit 5be41ae13e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 48 additions and 39 deletions

View file

@ -7,7 +7,7 @@ from pydantic import BaseModel
from langflow.schema.data import Data
from langflow.schema.message import Message
from langflow.schema.schema import recursive_serialize_or_str
from langflow.schema.serialize import recursive_serialize_or_str
class ArtifactType(str, Enum):

View file

@ -8,6 +8,7 @@ from langchain_core.prompts.image import ImagePromptTemplate
from loguru import logger
from pydantic import BaseModel, model_serializer, model_validator
from langflow.schema.serialize import recursive_serialize_or_str
from langflow.utils.constants import MESSAGE_SENDER_AI, MESSAGE_SENDER_USER
if TYPE_CHECKING:
@ -199,6 +200,7 @@ class Data(BaseModel):
# return a JSON string representation of the Data atributes
try:
data = {k: v.to_json() if hasattr(v, "to_json") else v for k, v in self.data.items()}
data = recursive_serialize_or_str(data)
return json.dumps(data, indent=4)
except Exception: # noqa: BLE001
logger.opt(exception=True).debug("Error converting Data to JSON")

View file

@ -1,14 +1,13 @@
from collections.abc import AsyncIterator, Generator, Iterator
from collections.abc import Generator
from enum import Enum
from typing import Literal
from loguru import logger
from pydantic import BaseModel
from pydantic.v1 import BaseModel as BaseModelV1
from typing_extensions import TypedDict
from langflow.schema.data import Data
from langflow.schema.message import Message
from langflow.schema.serialize import recursive_serialize_or_str
INPUT_FIELD_NAME = "input_value"
@ -113,38 +112,3 @@ def build_output_logs(vertex, result) -> dict:
outputs |= {name: OutputValue(message=message, type=_type).model_dump()}
return outputs
def recursive_serialize_or_str(obj):
try:
if isinstance(obj, str):
return obj
if isinstance(obj, dict):
return {k: recursive_serialize_or_str(v) for k, v in obj.items()}
if isinstance(obj, list):
return [recursive_serialize_or_str(v) for v in obj]
if isinstance(obj, BaseModel | BaseModelV1):
if hasattr(obj, "model_dump"):
obj_dict = obj.model_dump()
elif hasattr(obj, "dict"):
obj_dict = obj.dict()
return {k: recursive_serialize_or_str(v) for k, v in obj_dict.items()}
if isinstance(obj, AsyncIterator | Generator | Iterator):
# contain memory addresses
# without consuming the iterator
# return list(obj) consumes the iterator
# return f"{obj}" this generates '<generator object BaseChatModel.stream at 0x33e9ec770>'
# it is not useful
return "Unconsumed Stream"
if hasattr(obj, "dict"):
return {k: recursive_serialize_or_str(v) for k, v in obj.dict().items()}
if hasattr(obj, "model_dump"):
return {k: recursive_serialize_or_str(v) for k, v in obj.model_dump().items()}
if isinstance(obj, type) and issubclass(obj, BaseModel):
# This a type BaseModel and not an instance of it
return repr(obj)
return str(obj)
except Exception: # noqa: BLE001
logger.debug(f"Cannot serialize object {obj}")
return str(obj)

View file

@ -0,0 +1,43 @@
from collections.abc import AsyncIterator, Generator, Iterator
from datetime import datetime
from loguru import logger
from pydantic import BaseModel
from pydantic.v1 import BaseModel as BaseModelV1
def recursive_serialize_or_str(obj):
try:
if isinstance(obj, str):
return obj
if isinstance(obj, datetime):
return obj.isoformat()
if isinstance(obj, dict):
return {k: recursive_serialize_or_str(v) for k, v in obj.items()}
if isinstance(obj, list):
return [recursive_serialize_or_str(v) for v in obj]
if isinstance(obj, BaseModel | BaseModelV1):
if hasattr(obj, "model_dump"):
obj_dict = obj.model_dump()
elif hasattr(obj, "dict"):
obj_dict = obj.dict()
return {k: recursive_serialize_or_str(v) for k, v in obj_dict.items()}
if isinstance(obj, AsyncIterator | Generator | Iterator):
# contain memory addresses
# without consuming the iterator
# return list(obj) consumes the iterator
# return f"{obj}" this generates '<generator object BaseChatModel.stream at 0x33e9ec770>'
# it is not useful
return "Unconsumed Stream"
if hasattr(obj, "dict"):
return {k: recursive_serialize_or_str(v) for k, v in obj.dict().items()}
if hasattr(obj, "model_dump"):
return {k: recursive_serialize_or_str(v) for k, v in obj.model_dump().items()}
if isinstance(obj, type) and issubclass(obj, BaseModel):
# This a type BaseModel and not an instance of it
return repr(obj)
return str(obj)
except Exception: # noqa: BLE001
logger.debug(f"Cannot serialize object {obj}")
return str(obj)