Fix pydantic field serialization and error handling

This commit is contained in:
Gabriel Luiz Freitas Almeida 2024-02-07 14:06:56 -03:00
commit b32485a6fa
3 changed files with 39 additions and 6 deletions

View file

@ -2,7 +2,7 @@ import json
from datetime import datetime
from typing import Any, Optional
from pydantic import BaseModel, Field, validator
from pydantic import BaseModel, Field, field_serializer, validator
class TransactionModel(BaseModel):
@ -60,6 +60,13 @@ class VertexBuildModel(BaseModel):
from_attributes = True
populate_by_name = True
@field_serializer("data", "artifacts")
def serialize_dict(v):
if isinstance(v, dict):
# map_dict = to_map(v)
return json.dumps(v)
return v
@validator("params", pre=True)
def validate_params(cls, v):
if isinstance(v, str):
@ -82,6 +89,24 @@ class VertexBuildModel(BaseModel):
return v
# create a function that turns dicts into a
# dict like this:
# my_map_dict = {
# "key": [
# 1, 2, 3
# ],
# "value": [
# "one", "two", "three"
# ]
# }
# so map has a "key" and a "value" list
# containing the keys and values of the dict
def to_map(value: dict):
keys = list(value.keys())
values = list(value.values())
return {"key": keys, "value": values}
class VertexBuildMapModel(BaseModel):
vertex_builds: dict[str, list[VertexBuildModel]]

View file

@ -29,7 +29,7 @@ class MonitorService(Service):
try:
self.ensure_tables_exist()
except Exception as e:
logger.error(f"Error initializing monitor service: {e}")
logger.exception(f"Error initializing monitor service: {e}")
def to_df(self, table_name):
return self.load_table_as_dataframe(table_name)
@ -145,3 +145,4 @@ class MonitorService(Service):
df = conn.execute(query).df()
return df.to_dict(orient="records")
return df.to_dict(orient="records")

View file

@ -1,11 +1,10 @@
from typing import TYPE_CHECKING, Any, Dict, Optional, Type
import duckdb
from langflow.services.deps import get_monitor_service
from loguru import logger
from pydantic import BaseModel
from langflow.services.deps import get_monitor_service
if TYPE_CHECKING:
from langflow.api.v1.schemas import ResultDict
@ -35,7 +34,7 @@ def model_to_sql_column_definitions(model: Type[BaseModel]) -> dict:
elif field_info.__name__ == "bool":
sql_type = "BOOLEAN"
elif field_info.__name__ == "dict":
sql_type = "JSON"
sql_type = "VARCHAR"
elif field_info.__name__ == "Any":
sql_type = "VARCHAR"
else:
@ -91,7 +90,14 @@ def add_row_to_table(
insert_sql = f"INSERT INTO {table_name} ({columns}) VALUES ({values_placeholders})"
# Execute the insert statement
conn.execute(insert_sql, values)
try:
conn.execute(insert_sql, values)
except Exception as e:
# Log values types
for key, value in validated_dict.items():
logger.error(f"{key}: {type(value)}")
logger.error(f"Error adding row to table: {e}")
async def log_message(
@ -131,6 +137,7 @@ async def log_vertex_build(
):
try:
monitor_service = get_monitor_service()
row = {
"flow_id": flow_id,
"id": vertex_id,