feat: Add API endpoints for managing messages

This commit adds new API endpoints for managing messages. It includes the ability to delete messages by their IDs, update a specific message, and delete all messages associated with a session. These changes are implemented in the `monitor.py`, `schema.py`, and `service.py` files.
This commit is contained in:
ogabrielluiz 2024-05-31 09:38:05 -03:00
commit f79289f966
3 changed files with 61 additions and 3 deletions

View file

@ -4,6 +4,7 @@ from fastapi import APIRouter, Depends, HTTPException, Query
from langflow.services.deps import get_monitor_service
from langflow.services.monitor.schema import (
MessageModelRequest,
MessageModelResponse,
TransactionModelResponse,
VertexBuildMapModel,
@ -66,6 +67,43 @@ async def get_messages(
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/messages", status_code=204)
async def delete_messages(
message_ids: List[int],
monitor_service: MonitorService = Depends(get_monitor_service),
):
try:
monitor_service.delete_messages(message_ids=message_ids)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/messages/{message_id}", response_model=MessageModelResponse)
async def update_message(
message_id: str,
message: MessageModelRequest,
monitor_service: MonitorService = Depends(get_monitor_service),
):
try:
message_dict = message.model_dump(exclude_none=True)
df = monitor_service.update_message(message_id=message_id, **message_dict)
dicts = df.to_dict(orient="records")
return [MessageModelResponse(**d) for d in dicts]
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/messages/session/{session_id}", status_code=204)
async def delete_messages_session(
session_id: str,
monitor_service: MonitorService = Depends(get_monitor_service),
):
try:
monitor_service.delete_messages_session(session_id=session_id)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/transactions", response_model=List[TransactionModelResponse])
async def get_transactions(
source: Optional[str] = Query(None),

View file

@ -122,6 +122,13 @@ class MessageModelResponse(MessageModel):
return v
class MessageModelRequest(MessageModel):
message: str = Field(default="")
sender: str = Field(default="")
sender_name: str = Field(default="")
session_id: str = Field(default="")
class VertexBuildModel(BaseModel):
index: Optional[int] = Field(default=None, alias="index", exclude=True)
id: Optional[str] = Field(default=None, alias="id")

View file

@ -32,6 +32,10 @@ class MonitorService(Service):
except Exception as e:
logger.exception(f"Error initializing monitor service: {e}")
def exec_query(self, query: str):
with duckdb.connect(str(self.db_path)) as conn:
return conn.execute(query).df()
def to_df(self, table_name):
return self.load_table_as_dataframe(table_name)
@ -98,11 +102,20 @@ class MonitorService(Service):
with duckdb.connect(str(self.db_path)) as conn:
conn.execute(query)
def delete_messages(self, session_id: str):
def delete_messages_session(self, session_id: str):
query = f"DELETE FROM messages WHERE session_id = '{session_id}'"
with duckdb.connect(str(self.db_path)) as conn:
conn.execute(query)
return self.exec_query(query)
def delete_messages(self, message_ids: list[int]):
query = f"DELETE FROM messages WHERE id IN ({','.join(str(message_ids))})"
return self.exec_query(query)
def update_message(self, message_id: int, **kwargs):
query = f"UPDATE messages SET {', '.join(f'{k} = {v}' for k, v in kwargs.items())} WHERE id = {message_id}"
return self.exec_query(query)
def add_message(self, message: MessageModel):
self.add_row("messages", message)