From f79289f9668fe3fc42119d445ff14007a3483926 Mon Sep 17 00:00:00 2001 From: ogabrielluiz Date: Fri, 31 May 2024 09:38:05 -0300 Subject: [PATCH] feat: Add API endpoints for managing messages This commit adds new API endpoints for managing messages. It includes the ability to delete messages by their IDs, update a specific message, and delete all messages associated with a session. These changes are implemented in the `monitor.py`, `schema.py`, and `service.py` files. --- src/backend/base/langflow/api/v1/monitor.py | 38 +++++++++++++++++++ .../base/langflow/services/monitor/schema.py | 7 ++++ .../base/langflow/services/monitor/service.py | 19 ++++++++-- 3 files changed, 61 insertions(+), 3 deletions(-) diff --git a/src/backend/base/langflow/api/v1/monitor.py b/src/backend/base/langflow/api/v1/monitor.py index 05fee6f03..a6e7b51f2 100644 --- a/src/backend/base/langflow/api/v1/monitor.py +++ b/src/backend/base/langflow/api/v1/monitor.py @@ -4,6 +4,7 @@ from fastapi import APIRouter, Depends, HTTPException, Query from langflow.services.deps import get_monitor_service from langflow.services.monitor.schema import ( + MessageModelRequest, MessageModelResponse, TransactionModelResponse, VertexBuildMapModel, @@ -66,6 +67,43 @@ async def get_messages( raise HTTPException(status_code=500, detail=str(e)) +@router.delete("/messages", status_code=204) +async def delete_messages( + message_ids: List[int], + monitor_service: MonitorService = Depends(get_monitor_service), +): + try: + monitor_service.delete_messages(message_ids=message_ids) + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + +@router.post("/messages/{message_id}", response_model=MessageModelResponse) +async def update_message( + message_id: str, + message: MessageModelRequest, + monitor_service: MonitorService = Depends(get_monitor_service), +): + try: + message_dict = message.model_dump(exclude_none=True) + df = monitor_service.update_message(message_id=message_id, **message_dict) + dicts = df.to_dict(orient="records") + return [MessageModelResponse(**d) for d in dicts] + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + +@router.delete("/messages/session/{session_id}", status_code=204) +async def delete_messages_session( + session_id: str, + monitor_service: MonitorService = Depends(get_monitor_service), +): + try: + monitor_service.delete_messages_session(session_id=session_id) + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + @router.get("/transactions", response_model=List[TransactionModelResponse]) async def get_transactions( source: Optional[str] = Query(None), diff --git a/src/backend/base/langflow/services/monitor/schema.py b/src/backend/base/langflow/services/monitor/schema.py index e7bc7a963..b3a9ce5c6 100644 --- a/src/backend/base/langflow/services/monitor/schema.py +++ b/src/backend/base/langflow/services/monitor/schema.py @@ -122,6 +122,13 @@ class MessageModelResponse(MessageModel): return v +class MessageModelRequest(MessageModel): + message: str = Field(default="") + sender: str = Field(default="") + sender_name: str = Field(default="") + session_id: str = Field(default="") + + class VertexBuildModel(BaseModel): index: Optional[int] = Field(default=None, alias="index", exclude=True) id: Optional[str] = Field(default=None, alias="id") diff --git a/src/backend/base/langflow/services/monitor/service.py b/src/backend/base/langflow/services/monitor/service.py index 9fce7dd59..b90806128 100644 --- a/src/backend/base/langflow/services/monitor/service.py +++ b/src/backend/base/langflow/services/monitor/service.py @@ -32,6 +32,10 @@ class MonitorService(Service): except Exception as e: logger.exception(f"Error initializing monitor service: {e}") + def exec_query(self, query: str): + with duckdb.connect(str(self.db_path)) as conn: + return conn.execute(query).df() + def to_df(self, table_name): return self.load_table_as_dataframe(table_name) @@ -98,11 +102,20 @@ class MonitorService(Service): with duckdb.connect(str(self.db_path)) as conn: conn.execute(query) - def delete_messages(self, session_id: str): + def delete_messages_session(self, session_id: str): query = f"DELETE FROM messages WHERE session_id = '{session_id}'" - with duckdb.connect(str(self.db_path)) as conn: - conn.execute(query) + return self.exec_query(query) + + def delete_messages(self, message_ids: list[int]): + query = f"DELETE FROM messages WHERE id IN ({','.join(str(message_ids))})" + + return self.exec_query(query) + + def update_message(self, message_id: int, **kwargs): + query = f"UPDATE messages SET {', '.join(f'{k} = {v}' for k, v in kwargs.items())} WHERE id = {message_id}" + + return self.exec_query(query) def add_message(self, message: MessageModel): self.add_row("messages", message)