Refactor MonitorService and add VertexBuildModel

This commit is contained in:
Gabriel Luiz Freitas Almeida 2024-01-30 23:06:31 -03:00
commit 21dfd013bf
2 changed files with 72 additions and 12 deletions

View file

@ -1,6 +1,6 @@
import json
from datetime import datetime
from typing import Optional
from typing import Any, Optional
from pydantic import BaseModel, Field, validator
@ -44,3 +44,38 @@ class MessageModel(BaseModel):
if isinstance(v, str):
return json.loads(v)
return v
class VertexBuildModel(BaseModel):
index: Optional[int] = Field(default=None, alias="index")
id: Optional[str] = Field(default=None, alias="id")
flow_id: str
valid: bool
params: Any
data: dict
artifacts: dict
class Config:
from_attributes = True
populate_by_name = True
@validator("params", pre=True)
def validate_params(cls, v):
if isinstance(v, str):
try:
return json.loads(v)
except json.JSONDecodeError:
return v
return v
@validator("data", pre=True)
def validate_data(cls, v):
if isinstance(v, str):
return json.loads(v)
return v
@validator("artifacts", pre=True)
def validate_artifacts(cls, v):
if isinstance(v, str):
return json.loads(v)
return v

View file

@ -1,14 +1,15 @@
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Optional
import duckdb
from langflow.services.base import Service
from langflow.services.monitor.schema import MessageModel, TransactionModel
from langflow.services.monitor.utils import add_row_to_table, drop_and_create_table_if_schema_mismatch
from loguru import logger
from platformdirs import user_cache_dir
from langflow.services.base import Service
from langflow.services.monitor.schema import MessageModel, TransactionModel, VertexBuildModel
from langflow.services.monitor.utils import add_row_to_table, drop_and_create_table_if_schema_mismatch
if TYPE_CHECKING:
from langflow.services.settings.manager import SettingsService
@ -20,6 +21,12 @@ class MonitorService(Service):
self.settings_service = settings_service
self.base_cache_dir = Path(user_cache_dir("langflow"))
self.db_path = self.base_cache_dir / "monitor.duckdb"
self.table_map = {
"transactions": TransactionModel,
"messages": MessageModel,
"vertex_builds": VertexBuildModel,
}
try:
self.ensure_tables_exist()
except Exception as e:
@ -29,16 +36,14 @@ class MonitorService(Service):
return self.load_table_as_dataframe(table_name)
def ensure_tables_exist(self):
drop_and_create_table_if_schema_mismatch(str(self.db_path), "transactions", TransactionModel)
drop_and_create_table_if_schema_mismatch(str(self.db_path), "messages", MessageModel)
for table_name, model in self.table_map.items():
drop_and_create_table_if_schema_mismatch(str(self.db_path), table_name, model)
def add_row(self, table_name: str, data: dict):
# Make sure the model passed matches the table
if table_name == "transactions":
model = TransactionModel
elif table_name == "messages":
model = MessageModel
else:
model = self.table_map.get(table_name)
if model is None:
raise ValueError(f"Unknown table name: {table_name}")
# Connect to DuckDB and add the row
@ -52,3 +57,23 @@ class MonitorService(Service):
@staticmethod
def get_timestamp():
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def get_vertex_builds(
self, flow_id: Optional[str] = None, vertex_id: Optional[str] = None, valid: Optional[bool] = None
):
query = "SELECT * FROM vertex_builds"
conditions = []
if flow_id:
conditions.append(f"flow_id = '{flow_id}'")
if vertex_id:
conditions.append(f"vertex_id = '{vertex_id}'")
if valid is not None: # Check for None because valid is a boolean
conditions.append(f"valid = {valid}")
if conditions:
query += " WHERE " + " AND ".join(conditions)
with duckdb.connect(str(self.db_path)) as conn:
df = conn.execute(query).df()
return df.to_dict(orient="records")