diff --git a/src/backend/langflow/services/monitor/service.py b/src/backend/langflow/services/monitor/service.py index f32b3807b..a8ca44a7b 100644 --- a/src/backend/langflow/services/monitor/service.py +++ b/src/backend/langflow/services/monitor/service.py @@ -1,11 +1,18 @@ from datetime import datetime from pathlib import Path -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, Optional, Union import duckdb from langflow.services.base import Service -from langflow.services.monitor.schema import MessageModel, TransactionModel, VertexBuildModel -from langflow.services.monitor.utils import add_row_to_table, drop_and_create_table_if_schema_mismatch +from langflow.services.monitor.schema import ( + MessageModel, + TransactionModel, + VertexBuildModel, +) +from langflow.services.monitor.utils import ( + add_row_to_table, + drop_and_create_table_if_schema_mismatch, +) from loguru import logger from platformdirs import user_cache_dir @@ -36,9 +43,15 @@ class MonitorService(Service): def ensure_tables_exist(self): for table_name, model in self.table_map.items(): - drop_and_create_table_if_schema_mismatch(str(self.db_path), table_name, model) + drop_and_create_table_if_schema_mismatch( + str(self.db_path), table_name, model + ) - def add_row(self, table_name: str, data: dict): + def add_row( + self, + table_name: str, + data: Union[dict, TransactionModel, MessageModel, VertexBuildModel], + ): # Make sure the model passed matches the table model = self.table_map.get(table_name) @@ -93,17 +106,21 @@ class MonitorService(Service): with duckdb.connect(str(self.db_path)) as conn: conn.execute(query) + def add_message(self, message: MessageModel): + self.add_row("messages", message) + def get_messages( self, - sender_type: Optional[str] = None, + sender: Optional[str] = None, sender_name: Optional[str] = None, session_id: Optional[str] = None, order_by: Optional[str] = "timestamp", + limit: Optional[int] = None, ): - query = "SELECT sender_name, sender_type, session_id, message, artifacts, timestamp FROM messages" + query = "SELECT sender_name, sender, session_id, message, artifacts, timestamp FROM messages" conditions = [] - if sender_type: - conditions.append(f"sender_type = '{sender_type}'") + if sender: + conditions.append(f"sender = '{sender}'") if sender_name: conditions.append(f"sender_name = '{sender_name}'") if session_id: @@ -115,10 +132,13 @@ class MonitorService(Service): if order_by: query += f" ORDER BY {order_by}" + if limit is not None: + query += f" LIMIT {limit}" + with duckdb.connect(str(self.db_path)) as conn: df = conn.execute(query).df() - return df.to_dict(orient="records") + return df def get_transactions( self,