Refactor MonitorService queries and add order_by parameter

This commit is contained in:
Gabriel Luiz Freitas Almeida 2024-01-31 14:58:36 -03:00
commit cb74e02032

View file

@ -59,9 +59,13 @@ class MonitorService(Service):
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def get_vertex_builds(
self, flow_id: Optional[str] = None, vertex_id: Optional[str] = None, valid: Optional[bool] = None
self,
flow_id: Optional[str] = None,
vertex_id: Optional[str] = None,
valid: Optional[bool] = None,
order_by: Optional[str] = "timestamp",
):
query = "SELECT * FROM vertex_builds"
query = "SELECT flow_id, vertex_id, valid, params, data, artifacts, timestamp FROM vertex_builds"
conditions = []
if flow_id:
conditions.append(f"flow_id = '{flow_id}'")
@ -73,6 +77,62 @@ class MonitorService(Service):
if conditions:
query += " WHERE " + " AND ".join(conditions)
if order_by:
query += f" ORDER BY {order_by}"
with duckdb.connect(str(self.db_path)) as conn:
df = conn.execute(query).df()
return df.to_dict(orient="records")
def get_messages(
self,
sender_type: Optional[str] = None,
sender_name: Optional[str] = None,
session_id: Optional[str] = None,
order_by: Optional[str] = "timestamp",
):
query = "SELECT sender_name, sender_type, session_id, message, artifacts, timestamp FROM messages"
conditions = []
if sender_type:
conditions.append(f"sender_type = '{sender_type}'")
if sender_name:
conditions.append(f"sender_name = '{sender_name}'")
if session_id:
conditions.append(f"session_id = '{session_id}'")
if conditions:
query += " WHERE " + " AND ".join(conditions)
if order_by:
query += f" ORDER BY {order_by}"
with duckdb.connect(str(self.db_path)) as conn:
df = conn.execute(query).df()
return df.to_dict(orient="records")
def get_transactions(
self,
source: Optional[str] = None,
target: Optional[str] = None,
status: Optional[str] = None,
order_by: Optional[str] = "timestamp",
):
query = "SELECT source, target, target_args, status, error, timestamp FROM transactions"
conditions = []
if source:
conditions.append(f"source = '{source}'")
if target:
conditions.append(f"target = '{target}'")
if status:
conditions.append(f"status = '{status}'")
if conditions:
query += " WHERE " + " AND ".join(conditions)
if order_by:
query += f" ORDER BY {order_by}"
with duckdb.connect(str(self.db_path)) as conn:
df = conn.execute(query).df()