Refactor MonitorService class and add new method for adding messages

This commit is contained in:
Gabriel Luiz Freitas Almeida 2024-02-21 16:01:23 -03:00
commit 09061a41b9

View file

@ -1,11 +1,18 @@
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING, Optional, Union
import duckdb
from langflow.services.base import Service
from langflow.services.monitor.schema import MessageModel, TransactionModel, VertexBuildModel
from langflow.services.monitor.utils import add_row_to_table, drop_and_create_table_if_schema_mismatch
from langflow.services.monitor.schema import (
MessageModel,
TransactionModel,
VertexBuildModel,
)
from langflow.services.monitor.utils import (
add_row_to_table,
drop_and_create_table_if_schema_mismatch,
)
from loguru import logger
from platformdirs import user_cache_dir
@ -36,9 +43,15 @@ class MonitorService(Service):
def ensure_tables_exist(self):
for table_name, model in self.table_map.items():
drop_and_create_table_if_schema_mismatch(str(self.db_path), table_name, model)
drop_and_create_table_if_schema_mismatch(
str(self.db_path), table_name, model
)
def add_row(self, table_name: str, data: dict):
def add_row(
self,
table_name: str,
data: Union[dict, TransactionModel, MessageModel, VertexBuildModel],
):
# Make sure the model passed matches the table
model = self.table_map.get(table_name)
@ -93,17 +106,21 @@ class MonitorService(Service):
with duckdb.connect(str(self.db_path)) as conn:
conn.execute(query)
def add_message(self, message: MessageModel):
self.add_row("messages", message)
def get_messages(
self,
sender_type: Optional[str] = None,
sender: Optional[str] = None,
sender_name: Optional[str] = None,
session_id: Optional[str] = None,
order_by: Optional[str] = "timestamp",
limit: Optional[int] = None,
):
query = "SELECT sender_name, sender_type, session_id, message, artifacts, timestamp FROM messages"
query = "SELECT sender_name, sender, session_id, message, artifacts, timestamp FROM messages"
conditions = []
if sender_type:
conditions.append(f"sender_type = '{sender_type}'")
if sender:
conditions.append(f"sender = '{sender}'")
if sender_name:
conditions.append(f"sender_name = '{sender_name}'")
if session_id:
@ -115,10 +132,13 @@ class MonitorService(Service):
if order_by:
query += f" ORDER BY {order_by}"
if limit is not None:
query += f" LIMIT {limit}"
with duckdb.connect(str(self.db_path)) as conn:
df = conn.execute(query).df()
return df.to_dict(orient="records")
return df
def get_transactions(
self,