refactor: Update MonitorService to support read-only mode in exec_query method

This commit is contained in:
ogabrielluiz 2024-06-13 10:39:51 -03:00
commit 18c4bebd57

View file

@ -1,6 +1,6 @@
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Optional, Union, List
from typing import TYPE_CHECKING, List, Optional, Union
import duckdb
from loguru import logger
@ -32,8 +32,8 @@ class MonitorService(Service):
except Exception as e:
logger.exception(f"Error initializing monitor service: {e}")
def exec_query(self, query: str):
with duckdb.connect(str(self.db_path)) as conn:
def exec_query(self, query: str, read_only: bool = False):
with duckdb.connect(str(self.db_path), read_only=read_only) as conn:
return conn.execute(query).df()
def to_df(self, table_name):
@ -55,7 +55,7 @@ class MonitorService(Service):
raise ValueError(f"Unknown table name: {table_name}")
# Connect to DuckDB and add the row
with duckdb.connect(str(self.db_path)) as conn:
with duckdb.connect(str(self.db_path), read_only=False) as conn:
add_row_to_table(conn, table_name, model, data)
def load_table_as_dataframe(self, table_name):
@ -89,7 +89,7 @@ class MonitorService(Service):
if order_by:
query += f" ORDER BY {order_by}"
with duckdb.connect(str(self.db_path)) as conn:
with duckdb.connect(str(self.db_path), read_only=True) as conn:
df = conn.execute(query).df()
return df.to_dict(orient="records")
@ -99,13 +99,13 @@ class MonitorService(Service):
if flow_id:
query += f" WHERE flow_id = '{flow_id}'"
with duckdb.connect(str(self.db_path)) as conn:
with duckdb.connect(str(self.db_path), read_only=False) as conn:
conn.execute(query)
def delete_messages_session(self, session_id: str):
query = f"DELETE FROM messages WHERE session_id = '{session_id}'"
return self.exec_query(query)
return self.exec_query(query, read_only=False)
def delete_messages(self, message_ids: Union[List[int], str]):
if isinstance(message_ids, list):
@ -119,14 +119,14 @@ class MonitorService(Service):
query = f"DELETE FROM messages WHERE index IN ({ids_str})"
return self.exec_query(query)
return self.exec_query(query, read_only=False)
def update_message(self, message_id: str, **kwargs):
query = (
f"""UPDATE messages SET {', '.join(f"{k} = '{v}'" for k, v in kwargs.items())} WHERE index = {message_id}"""
)
return self.exec_query(query)
return self.exec_query(query, read_only=False)
def add_message(self, message: MessageModel):
self.add_row("messages", message)
@ -162,7 +162,7 @@ class MonitorService(Service):
if limit is not None:
query += f" LIMIT {limit}"
with duckdb.connect(str(self.db_path)) as conn:
with duckdb.connect(str(self.db_path), read_only=True) as conn:
df = conn.execute(query).df()
return df
@ -193,7 +193,7 @@ class MonitorService(Service):
if order_by:
query += f" ORDER BY {order_by} DESC"
with duckdb.connect(str(self.db_path)) as conn:
with duckdb.connect(str(self.db_path), read_only=True) as conn:
df = conn.execute(query).df()
return df.to_dict(orient="records")