From b32485a6fa5ccbbe7cce1a7c71c2e1799079ea5f Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Wed, 7 Feb 2024 14:06:56 -0300 Subject: [PATCH] Fix pydantic field serialization and error handling --- .../langflow/services/monitor/schema.py | 27 ++++++++++++++++++- .../langflow/services/monitor/service.py | 3 ++- .../langflow/services/monitor/utils.py | 15 ++++++++--- 3 files changed, 39 insertions(+), 6 deletions(-) diff --git a/src/backend/langflow/services/monitor/schema.py b/src/backend/langflow/services/monitor/schema.py index e9c380a10..1cb6cecff 100644 --- a/src/backend/langflow/services/monitor/schema.py +++ b/src/backend/langflow/services/monitor/schema.py @@ -2,7 +2,7 @@ import json from datetime import datetime from typing import Any, Optional -from pydantic import BaseModel, Field, validator +from pydantic import BaseModel, Field, field_serializer, validator class TransactionModel(BaseModel): @@ -60,6 +60,13 @@ class VertexBuildModel(BaseModel): from_attributes = True populate_by_name = True + @field_serializer("data", "artifacts") + def serialize_dict(v): + if isinstance(v, dict): + # map_dict = to_map(v) + return json.dumps(v) + return v + @validator("params", pre=True) def validate_params(cls, v): if isinstance(v, str): @@ -82,6 +89,24 @@ class VertexBuildModel(BaseModel): return v +# create a function that turns dicts into a +# dict like this: +# my_map_dict = { +# "key": [ +# 1, 2, 3 +# ], +# "value": [ +# "one", "two", "three" +# ] +# } +# so map has a "key" and a "value" list +# containing the keys and values of the dict +def to_map(value: dict): + keys = list(value.keys()) + values = list(value.values()) + return {"key": keys, "value": values} + + class VertexBuildMapModel(BaseModel): vertex_builds: dict[str, list[VertexBuildModel]] diff --git a/src/backend/langflow/services/monitor/service.py b/src/backend/langflow/services/monitor/service.py index efa9e0ec1..f32b3807b 100644 --- a/src/backend/langflow/services/monitor/service.py +++ b/src/backend/langflow/services/monitor/service.py @@ -29,7 +29,7 @@ class MonitorService(Service): try: self.ensure_tables_exist() except Exception as e: - logger.error(f"Error initializing monitor service: {e}") + logger.exception(f"Error initializing monitor service: {e}") def to_df(self, table_name): return self.load_table_as_dataframe(table_name) @@ -145,3 +145,4 @@ class MonitorService(Service): df = conn.execute(query).df() return df.to_dict(orient="records") + return df.to_dict(orient="records") diff --git a/src/backend/langflow/services/monitor/utils.py b/src/backend/langflow/services/monitor/utils.py index f7c4261ac..9bd32e243 100644 --- a/src/backend/langflow/services/monitor/utils.py +++ b/src/backend/langflow/services/monitor/utils.py @@ -1,11 +1,10 @@ from typing import TYPE_CHECKING, Any, Dict, Optional, Type import duckdb +from langflow.services.deps import get_monitor_service from loguru import logger from pydantic import BaseModel -from langflow.services.deps import get_monitor_service - if TYPE_CHECKING: from langflow.api.v1.schemas import ResultDict @@ -35,7 +34,7 @@ def model_to_sql_column_definitions(model: Type[BaseModel]) -> dict: elif field_info.__name__ == "bool": sql_type = "BOOLEAN" elif field_info.__name__ == "dict": - sql_type = "JSON" + sql_type = "VARCHAR" elif field_info.__name__ == "Any": sql_type = "VARCHAR" else: @@ -91,7 +90,14 @@ def add_row_to_table( insert_sql = f"INSERT INTO {table_name} ({columns}) VALUES ({values_placeholders})" # Execute the insert statement - conn.execute(insert_sql, values) + try: + conn.execute(insert_sql, values) + except Exception as e: + # Log values types + for key, value in validated_dict.items(): + logger.error(f"{key}: {type(value)}") + + logger.error(f"Error adding row to table: {e}") async def log_message( @@ -131,6 +137,7 @@ async def log_vertex_build( ): try: monitor_service = get_monitor_service() + row = { "flow_id": flow_id, "id": vertex_id,