diff --git a/src/backend/langflow/services/monitor/schema.py b/src/backend/langflow/services/monitor/schema.py index 620484829..bd48ebfa6 100644 --- a/src/backend/langflow/services/monitor/schema.py +++ b/src/backend/langflow/services/monitor/schema.py @@ -1,6 +1,6 @@ import json from datetime import datetime -from typing import Optional +from typing import Any, Optional from pydantic import BaseModel, Field, validator @@ -44,3 +44,38 @@ class MessageModel(BaseModel): if isinstance(v, str): return json.loads(v) return v + + +class VertexBuildModel(BaseModel): + index: Optional[int] = Field(default=None, alias="index") + id: Optional[str] = Field(default=None, alias="id") + flow_id: str + valid: bool + params: Any + data: dict + artifacts: dict + + class Config: + from_attributes = True + populate_by_name = True + + @validator("params", pre=True) + def validate_params(cls, v): + if isinstance(v, str): + try: + return json.loads(v) + except json.JSONDecodeError: + return v + return v + + @validator("data", pre=True) + def validate_data(cls, v): + if isinstance(v, str): + return json.loads(v) + return v + + @validator("artifacts", pre=True) + def validate_artifacts(cls, v): + if isinstance(v, str): + return json.loads(v) + return v diff --git a/src/backend/langflow/services/monitor/service.py b/src/backend/langflow/services/monitor/service.py index 4a4d6d5b1..9923e5f51 100644 --- a/src/backend/langflow/services/monitor/service.py +++ b/src/backend/langflow/services/monitor/service.py @@ -1,14 +1,15 @@ from datetime import datetime from pathlib import Path -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Optional import duckdb -from langflow.services.base import Service -from langflow.services.monitor.schema import MessageModel, TransactionModel -from langflow.services.monitor.utils import add_row_to_table, drop_and_create_table_if_schema_mismatch from loguru import logger from platformdirs import user_cache_dir +from langflow.services.base import Service +from langflow.services.monitor.schema import MessageModel, TransactionModel, VertexBuildModel +from langflow.services.monitor.utils import add_row_to_table, drop_and_create_table_if_schema_mismatch + if TYPE_CHECKING: from langflow.services.settings.manager import SettingsService @@ -20,6 +21,12 @@ class MonitorService(Service): self.settings_service = settings_service self.base_cache_dir = Path(user_cache_dir("langflow")) self.db_path = self.base_cache_dir / "monitor.duckdb" + self.table_map = { + "transactions": TransactionModel, + "messages": MessageModel, + "vertex_builds": VertexBuildModel, + } + try: self.ensure_tables_exist() except Exception as e: @@ -29,16 +36,14 @@ class MonitorService(Service): return self.load_table_as_dataframe(table_name) def ensure_tables_exist(self): - drop_and_create_table_if_schema_mismatch(str(self.db_path), "transactions", TransactionModel) - drop_and_create_table_if_schema_mismatch(str(self.db_path), "messages", MessageModel) + for table_name, model in self.table_map.items(): + drop_and_create_table_if_schema_mismatch(str(self.db_path), table_name, model) def add_row(self, table_name: str, data: dict): # Make sure the model passed matches the table - if table_name == "transactions": - model = TransactionModel - elif table_name == "messages": - model = MessageModel - else: + + model = self.table_map.get(table_name) + if model is None: raise ValueError(f"Unknown table name: {table_name}") # Connect to DuckDB and add the row @@ -52,3 +57,23 @@ class MonitorService(Service): @staticmethod def get_timestamp(): return datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + def get_vertex_builds( + self, flow_id: Optional[str] = None, vertex_id: Optional[str] = None, valid: Optional[bool] = None + ): + query = "SELECT * FROM vertex_builds" + conditions = [] + if flow_id: + conditions.append(f"flow_id = '{flow_id}'") + if vertex_id: + conditions.append(f"vertex_id = '{vertex_id}'") + if valid is not None: # Check for None because valid is a boolean + conditions.append(f"valid = {valid}") + + if conditions: + query += " WHERE " + " AND ".join(conditions) + + with duckdb.connect(str(self.db_path)) as conn: + df = conn.execute(query).df() + + return df.to_dict(orient="records")