From ee4ace8bfe661480bcd93d28334008c47e4556e4 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Wed, 26 Jun 2024 12:04:00 -0300 Subject: [PATCH] feat: Migrate messages from monitor service to database This commit migrates messages from the monitor service to the database. It adds a new function `migrate_messages_from_monitor_service_to_database` in the `utils.py` file, which retrieves messages from the monitor service, adds them to the database, and deletes them from the monitor service. This migration ensures that messages are stored in the database for better reliability and retrieval. --- src/backend/base/langflow/memory.py | 23 +++--- .../langflow/services/database/service.py | 21 +++-- .../base/langflow/services/database/utils.py | 18 ++++ .../base/langflow/services/monitor/schema.py | 82 +++++++++++++++---- .../base/langflow/services/monitor/service.py | 64 +++++++++++---- 5 files changed, 158 insertions(+), 50 deletions(-) diff --git a/src/backend/base/langflow/memory.py b/src/backend/base/langflow/memory.py index d1793740d..28052525e 100644 --- a/src/backend/base/langflow/memory.py +++ b/src/backend/base/langflow/memory.py @@ -1,5 +1,4 @@ import warnings -from typing import List, Optional from uuid import UUID from loguru import logger @@ -8,17 +7,18 @@ from sqlmodel import Session, col, select from langflow.schema.message import Message from langflow.services.database.models.message.model import MessageRead, MessageTable +from langflow.services.database.utils import migrate_messages_from_monitor_service_to_database from langflow.services.deps import session_scope def get_messages( - sender: Optional[str] = None, - sender_name: Optional[str] = None, - session_id: Optional[str] = None, - order_by: Optional[str] = "timestamp", - order: Optional[str] = "DESC", - flow_id: Optional[UUID] = None, - limit: Optional[int] = None, + sender: str | None = None, + sender_name: str | None = None, + session_id: str | None = None, + order_by: str | None = "timestamp", + order: str | None = "DESC", + flow_id: UUID | None = None, + limit: int | None = None, ): """ Retrieves messages from the monitor service based on the provided filters. @@ -33,6 +33,7 @@ def get_messages( Returns: List[Data]: A list of Data objects representing the retrieved messages. """ + migrate_messages_from_monitor_service_to_database() messages_read: list[Message] = [] with session_scope() as session: stmt = select(MessageTable) @@ -58,7 +59,7 @@ def get_messages( return messages_read -def add_messages(messages: Message | list[Message], flow_id: Optional[str] = None): +def add_messages(messages: Message | list[Message], flow_id: str | None = None): """ Add a message to the monitor service. """ @@ -111,8 +112,8 @@ def delete_messages(session_id: str): def store_message( message: Message, - flow_id: Optional[str] = None, -) -> List[Message]: + flow_id: str | None = None, +) -> list[Message]: """ Stores a message in the memory. diff --git a/src/backend/base/langflow/services/database/service.py b/src/backend/base/langflow/services/database/service.py index ceeaf3e38..ef9f0e8c8 100644 --- a/src/backend/base/langflow/services/database/service.py +++ b/src/backend/base/langflow/services/database/service.py @@ -6,22 +6,24 @@ from typing import TYPE_CHECKING import sqlalchemy as sa from alembic import command, util from alembic.config import Config -from langflow.services.base import Service -from langflow.services.database import models # noqa -from langflow.services.database.models.user.crud import get_user_by_username -from langflow.services.database.utils import Result, TableResults -from langflow.services.deps import get_settings_service -from langflow.services.utils import teardown_superuser from loguru import logger from sqlalchemy import event, inspect from sqlalchemy.engine import Engine from sqlalchemy.exc import OperationalError from sqlmodel import Session, SQLModel, create_engine, select, text +from langflow.services.base import Service +from langflow.services.database import models # noqa +from langflow.services.database.models.user.crud import get_user_by_username +from langflow.services.database.utils import Result, TableResults, migrate_messages_from_monitor_service_to_database +from langflow.services.deps import get_settings_service +from langflow.services.utils import teardown_superuser + if TYPE_CHECKING: - from langflow.services.settings.service import SettingsService from sqlalchemy.engine import Engine + from langflow.services.settings.service import SettingsService + class DatabaseService(Service): name = "database_service" @@ -205,6 +207,11 @@ class DatabaseService(Service): logger.error(f"AutogenerateDiffsDetected: {exc}") if not fix: raise RuntimeError(f"There's a mismatch between the models and the database.\n{exc}") + try: + migrate_messages_from_monitor_service_to_database(session) + except Exception as exc: + logger.error(f"Error migrating messages from monitor service to database: {exc}") + raise RuntimeError("Error migrating messages from monitor service to database") from exc if fix: self.try_downgrade_upgrade_until_success(alembic_cfg) diff --git a/src/backend/base/langflow/services/database/utils.py b/src/backend/base/langflow/services/database/utils.py index cf2c92cb3..f285bc631 100644 --- a/src/backend/base/langflow/services/database/utils.py +++ b/src/backend/base/langflow/services/database/utils.py @@ -6,10 +6,28 @@ from alembic.util.exc import CommandError from loguru import logger from sqlmodel import Session, text +from langflow.services.deps import get_monitor_service + if TYPE_CHECKING: from langflow.services.database.service import DatabaseService +def migrate_messages_from_monitor_service_to_database(session): + from langflow.schema.message import Message + from langflow.services.database.models.message import MessageTable + + monitor_service = get_monitor_service() + messages_df = monitor_service.get_messages() + if not messages_df.empty: + messages_ids = [] + for message in messages_df.to_dict(orient="records"): + messages_ids.append(message["index"]) + message = Message(**message) + session.add(MessageTable.from_message(message)) + session.commit() + monitor_service.delete_messages(messages_ids) + + def initialize_database(fix_migration: bool = False): logger.debug("Initializing database") from langflow.services.deps import get_db_service diff --git a/src/backend/base/langflow/services/monitor/schema.py b/src/backend/base/langflow/services/monitor/schema.py index eeea846a1..939c34397 100644 --- a/src/backend/base/langflow/services/monitor/schema.py +++ b/src/backend/base/langflow/services/monitor/schema.py @@ -28,15 +28,15 @@ class DefaultModel(BaseModel): class TransactionModel(DefaultModel): - index: Optional[int] = Field(default=None) - timestamp: Optional[datetime] = Field(default_factory=datetime.now, alias="timestamp") + index: int | None = Field(default=None) + timestamp: datetime | None = Field(default_factory=datetime.now, alias="timestamp") vertex_id: str target_id: str | None = None inputs: dict - outputs: Optional[dict] = None + outputs: dict | None = None status: str - error: Optional[str] = None - flow_id: Optional[str] = Field(default=None, alias="flow_id") + error: str | None = None + flow_id: str | None = Field(default=None, alias="flow_id") # validate target_args in case it is a JSON @field_validator("outputs", "inputs", mode="before") @@ -53,16 +53,16 @@ class TransactionModel(DefaultModel): class TransactionModelResponse(DefaultModel): - index: Optional[int] = Field(default=None) - timestamp: Optional[datetime] = Field(default_factory=datetime.now, alias="timestamp") + index: int | None = Field(default=None) + timestamp: datetime | None = Field(default_factory=datetime.now, alias="timestamp") vertex_id: str inputs: dict - outputs: Optional[dict] = None + outputs: dict | None = None status: str - error: Optional[str] = None - flow_id: Optional[str] = Field(default=None, alias="flow_id") - source: Optional[str] = None - target: Optional[str] = None + error: str | None = None + flow_id: str | None = Field(default=None, alias="flow_id") + source: str | None = None + target: str | None = None # validate target_args in case it is a JSON @field_validator("outputs", "inputs", mode="before") @@ -81,9 +81,9 @@ class TransactionModelResponse(DefaultModel): return v -class MessageModel(DefaultModel): - id: Optional[str | UUID] = Field(default=None) - flow_id: Optional[UUID] = Field(default=None) +class DeprecatedMessageModel(DefaultModel): + index: int | None = Field(default=None) + flow_id: str | None = Field(default=None, alias="flow_id") timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) sender: str sender_name: str @@ -112,7 +112,53 @@ class MessageModel(DefaultModel): return v @classmethod - def from_message(cls, message: Message, flow_id: Optional[str] = None): + def from_message(cls, message: Message, flow_id: str | None = None): + # first check if the record has all the required fields + if message.text is None or not message.sender or not message.sender_name: + raise ValueError("The message does not have the required fields (text, sender, sender_name).") + return cls( + sender=message.sender, + sender_name=message.sender_name, + text=message.text, + session_id=message.session_id, + files=message.files or [], + timestamp=message.timestamp, + flow_id=flow_id, + ) + + +class MessageModel(DefaultModel): + id: str | UUID | None = Field(default=None) + flow_id: UUID | None = Field(default=None) + timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + sender: str + sender_name: str + session_id: str + text: str + files: list[str] = [] + + @field_validator("files", mode="before") + @classmethod + def validate_files(cls, v): + if isinstance(v, str): + v = json.loads(v) + return v + + @field_serializer("timestamp") + @classmethod + def serialize_timestamp(cls, v): + v = v.replace(microsecond=0) + return v.strftime("%Y-%m-%d %H:%M:%S") + + @field_serializer("files") + @classmethod + def serialize_files(cls, v): + if isinstance(v, list): + return json.dumps(v) + return v + + @classmethod + def from_message(cls, message: Message, flow_id: str | None = None): # first check if the record has all the required fields if message.text is None or not message.sender or not message.sender_name: raise ValueError("The message does not have the required fields (text, sender, sender_name).") @@ -139,8 +185,8 @@ class MessageModelRequest(MessageModel): class VertexBuildModel(DefaultModel): - index: Optional[int] = Field(default=None, alias="index", exclude=True) - id: Optional[str] = Field(default=None, alias="id") + index: int | None = Field(default=None, alias="index", exclude=True) + id: str | None = Field(default=None, alias="id") flow_id: str valid: bool params: Any diff --git a/src/backend/base/langflow/services/monitor/service.py b/src/backend/base/langflow/services/monitor/service.py index d15d31329..655682cc2 100644 --- a/src/backend/base/langflow/services/monitor/service.py +++ b/src/backend/base/langflow/services/monitor/service.py @@ -18,14 +18,14 @@ class MonitorService(Service): name = "monitor_service" def __init__(self, settings_service: "SettingsService"): - from langflow.services.monitor.schema import MessageModel, TransactionModel, VertexBuildModel + from langflow.services.monitor.schema import DeprecatedMessageModel, TransactionModel, VertexBuildModel self.settings_service = settings_service self.base_cache_dir = Path(user_cache_dir("langflow")) self.db_path = self.base_cache_dir / "monitor.duckdb" - self.table_map: dict[str, type[TransactionModel | MessageModel | VertexBuildModel]] = { + self.table_map: dict[str, type[TransactionModel | DeprecatedMessageModel | VertexBuildModel]] = { "transactions": TransactionModel, - "messages": MessageModel, + "messages": DeprecatedMessageModel, "vertex_builds": VertexBuildModel, } @@ -68,12 +68,48 @@ class MonitorService(Service): def get_timestamp(): return datetime.now().strftime("%Y-%m-%d %H:%M:%S") + def get_messages( + self, + flow_id: str | None = None, + sender: str | None = None, + sender_name: str | None = None, + session_id: str | None = None, + order_by: str | None = "timestamp", + order: str | None = "DESC", + limit: int | None = None, + ): + query = "SELECT index, flow_id, sender_name, sender, session_id, text, files, timestamp FROM messages" + conditions = [] + if sender: + conditions.append(f"sender = '{sender}'") + if sender_name: + conditions.append(f"sender_name = '{sender_name}'") + if session_id: + conditions.append(f"session_id = '{session_id}'") + if flow_id: + conditions.append(f"flow_id = '{flow_id}'") + + if conditions: + query += " WHERE " + " AND ".join(conditions) + + if order_by and order: + # Make sure the order is from newest to oldest + query += f" ORDER BY {order_by} {order.upper()}" + + if limit is not None: + query += f" LIMIT {limit}" + + with duckdb.connect(str(self.db_path), read_only=True) as conn: + df = conn.execute(query).df() + + return df + def get_vertex_builds( self, - flow_id: Optional[str] = None, - vertex_id: Optional[str] = None, - valid: Optional[bool] = None, - order_by: Optional[str] = "timestamp", + flow_id: str | None = None, + vertex_id: str | None = None, + valid: bool | None = None, + order_by: str | None = "timestamp", ): query = "SELECT id, index,flow_id, valid, params, data, artifacts, timestamp FROM vertex_builds" conditions = [] @@ -96,7 +132,7 @@ class MonitorService(Service): return df.to_dict(orient="records") - def delete_vertex_builds(self, flow_id: Optional[str] = None): + def delete_vertex_builds(self, flow_id: str | None = None): query = "DELETE FROM vertex_builds" if flow_id: query += f" WHERE flow_id = '{flow_id}'" @@ -109,7 +145,7 @@ class MonitorService(Service): return self.exec_query(query, read_only=False) - def delete_messages(self, message_ids: Union[List[int], str]): + def delete_messages(self, message_ids: list[int] | str): if isinstance(message_ids, list): # If message_ids is a list, join the string representations of the integers ids_str = ",".join(map(str, message_ids)) @@ -132,11 +168,11 @@ class MonitorService(Service): def get_transactions( self, - source: Optional[str] = None, - target: Optional[str] = None, - status: Optional[str] = None, - order_by: Optional[str] = "timestamp", - flow_id: Optional[str] = None, + source: str | None = None, + target: str | None = None, + status: str | None = None, + order_by: str | None = "timestamp", + flow_id: str | None = None, ): query = ( "SELECT index,flow_id, status, error, timestamp, vertex_id, inputs, outputs, target_id FROM transactions"