fix: serialize array output and logs (#3040)

* refactor: improve recursive serialization function

Refactor the `recursive_serialize_or_str` function in the `schema.py` file to improve its readability and maintainability. The function now uses a try-except block to handle exceptions and returns a string representation of the object if an exception occurs. This ensures that the function always returns a string, preventing any unexpected errors. Additionally, the function now includes additional checks for different object types, such as dictionaries, lists, and instances of `BaseModel`. These checks ensure that the function correctly serializes complex objects and avoids any potential issues. Overall, this refactoring improves the code quality and reliability of the `recursive_serialize_or_str` function.

* feat(artifact.py): add support for recursive serialization of items in ARRAY artifact type to ensure consistent data handling

* feat(schema.py): add support for serializing arrays in build_output_logs function to handle LogType.ARRAY case
This commit is contained in:
Gabriel Luiz Freitas Almeida 2024-07-29 17:46:07 -03:00 committed by GitHub
commit 4e9367badf
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 48 additions and 22 deletions

View file

@ -1,5 +1,5 @@
import inspect
from typing import Any, AsyncIterator, Callable, ClassVar, Generator, Iterator, List, Optional, Union
from typing import Any, Callable, ClassVar, List, Optional, Union
from uuid import UUID
import yaml
@ -15,26 +15,6 @@ from langflow.template.field.base import UNDEFINED, Output
from .custom_component import CustomComponent
def recursive_serialize_or_str(obj):
try:
if isinstance(obj, dict):
return {k: recursive_serialize_or_str(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [recursive_serialize_or_str(v) for v in obj]
elif isinstance(obj, BaseModel):
return {k: recursive_serialize_or_str(v) for k, v in obj.model_dump().items()}
elif isinstance(obj, (AsyncIterator, Generator, Iterator)):
# contain memory addresses
# without consuming the iterator
# return list(obj) consumes the iterator
# return f"{obj}" this generates '<generator object BaseChatModel.stream at 0x33e9ec770>'
# it is not useful
return "Unconsumed Stream"
return str(obj)
except Exception:
return str(obj)
class Component(CustomComponent):
inputs: List[InputTypes] = []
outputs: List[Output] = []

View file

@ -6,6 +6,7 @@ from pydantic import BaseModel
from langflow.schema import Data
from langflow.schema.message import Message
from langflow.schema.schema import recursive_serialize_or_str
class ArtifactType(str, Enum):
@ -52,6 +53,16 @@ def get_artifact_type(value, build_result=None) -> str:
def post_process_raw(raw, artifact_type: str):
if artifact_type == ArtifactType.STREAM.value:
raw = ""
elif artifact_type == ArtifactType.ARRAY.value:
_raw = []
for item in raw:
if hasattr(item, "dict"):
_raw.append(recursive_serialize_or_str(item))
elif hasattr(item, "model_dump"):
_raw.append(recursive_serialize_or_str(item))
else:
_raw.append(str(item))
raw = _raw
elif artifact_type == ArtifactType.UNKNOWN.value and raw is not None:
if isinstance(raw, (BaseModel, dict)):
try:

View file

@ -1,5 +1,5 @@
from enum import Enum
from typing import Generator, Literal, Union
from typing import AsyncIterator, Generator, Iterator, Literal, Union
from pydantic import BaseModel
from typing_extensions import TypedDict
@ -104,7 +104,42 @@ def build_output_logs(vertex, result) -> dict:
case LogType.UNKNOWN:
message = ""
case LogType.ARRAY:
message = [recursive_serialize_or_str(item) for item in message]
name = output.get("name", f"output_{index}")
outputs |= {name: OutputValue(message=message, type=_type).model_dump()}
return outputs
def recursive_serialize_or_str(obj):
try:
if isinstance(obj, dict):
return {k: recursive_serialize_or_str(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [recursive_serialize_or_str(v) for v in obj]
elif isinstance(obj, BaseModel):
if hasattr(obj, "model_dump"):
obj_dict = obj.model_dump()
elif hasattr(obj, "dict"):
obj_dict = obj.dict() # type: ignore
return {k: recursive_serialize_or_str(v) for k, v in obj_dict.items()}
elif isinstance(obj, (AsyncIterator, Generator, Iterator)):
# contain memory addresses
# without consuming the iterator
# return list(obj) consumes the iterator
# return f"{obj}" this generates '<generator object BaseChatModel.stream at 0x33e9ec770>'
# it is not useful
return "Unconsumed Stream"
elif hasattr(obj, "dict"):
return {k: recursive_serialize_or_str(v) for k, v in obj.dict().items()}
elif hasattr(obj, "model_dump"):
return {k: recursive_serialize_or_str(v) for k, v in obj.model_dump().items()}
elif issubclass(obj, BaseModel):
# This a type BaseModel and not an instance of it
return repr(obj)
return str(obj)
except Exception:
return str(obj)