diff --git a/src/backend/base/langflow/memory.py b/src/backend/base/langflow/memory.py index d1793740d..28052525e 100644 --- a/src/backend/base/langflow/memory.py +++ b/src/backend/base/langflow/memory.py @@ -1,5 +1,4 @@ import warnings -from typing import List, Optional from uuid import UUID from loguru import logger @@ -8,17 +7,18 @@ from sqlmodel import Session, col, select from langflow.schema.message import Message from langflow.services.database.models.message.model import MessageRead, MessageTable +from langflow.services.database.utils import migrate_messages_from_monitor_service_to_database from langflow.services.deps import session_scope def get_messages( - sender: Optional[str] = None, - sender_name: Optional[str] = None, - session_id: Optional[str] = None, - order_by: Optional[str] = "timestamp", - order: Optional[str] = "DESC", - flow_id: Optional[UUID] = None, - limit: Optional[int] = None, + sender: str | None = None, + sender_name: str | None = None, + session_id: str | None = None, + order_by: str | None = "timestamp", + order: str | None = "DESC", + flow_id: UUID | None = None, + limit: int | None = None, ): """ Retrieves messages from the monitor service based on the provided filters. @@ -33,6 +33,7 @@ def get_messages( Returns: List[Data]: A list of Data objects representing the retrieved messages. """ + migrate_messages_from_monitor_service_to_database() messages_read: list[Message] = [] with session_scope() as session: stmt = select(MessageTable) @@ -58,7 +59,7 @@ def get_messages( return messages_read -def add_messages(messages: Message | list[Message], flow_id: Optional[str] = None): +def add_messages(messages: Message | list[Message], flow_id: str | None = None): """ Add a message to the monitor service. """ @@ -111,8 +112,8 @@ def delete_messages(session_id: str): def store_message( message: Message, - flow_id: Optional[str] = None, -) -> List[Message]: + flow_id: str | None = None, +) -> list[Message]: """ Stores a message in the memory. diff --git a/src/backend/base/langflow/services/database/service.py b/src/backend/base/langflow/services/database/service.py index ceeaf3e38..ef9f0e8c8 100644 --- a/src/backend/base/langflow/services/database/service.py +++ b/src/backend/base/langflow/services/database/service.py @@ -6,22 +6,24 @@ from typing import TYPE_CHECKING import sqlalchemy as sa from alembic import command, util from alembic.config import Config -from langflow.services.base import Service -from langflow.services.database import models # noqa -from langflow.services.database.models.user.crud import get_user_by_username -from langflow.services.database.utils import Result, TableResults -from langflow.services.deps import get_settings_service -from langflow.services.utils import teardown_superuser from loguru import logger from sqlalchemy import event, inspect from sqlalchemy.engine import Engine from sqlalchemy.exc import OperationalError from sqlmodel import Session, SQLModel, create_engine, select, text +from langflow.services.base import Service +from langflow.services.database import models # noqa +from langflow.services.database.models.user.crud import get_user_by_username +from langflow.services.database.utils import Result, TableResults, migrate_messages_from_monitor_service_to_database +from langflow.services.deps import get_settings_service +from langflow.services.utils import teardown_superuser + if TYPE_CHECKING: - from langflow.services.settings.service import SettingsService from sqlalchemy.engine import Engine + from langflow.services.settings.service import SettingsService + class DatabaseService(Service): name = "database_service" @@ -205,6 +207,11 @@ class DatabaseService(Service): logger.error(f"AutogenerateDiffsDetected: {exc}") if not fix: raise RuntimeError(f"There's a mismatch between the models and the database.\n{exc}") + try: + migrate_messages_from_monitor_service_to_database(session) + except Exception as exc: + logger.error(f"Error migrating messages from monitor service to database: {exc}") + raise RuntimeError("Error migrating messages from monitor service to database") from exc if fix: self.try_downgrade_upgrade_until_success(alembic_cfg) diff --git a/src/backend/base/langflow/services/database/utils.py b/src/backend/base/langflow/services/database/utils.py index cf2c92cb3..f285bc631 100644 --- a/src/backend/base/langflow/services/database/utils.py +++ b/src/backend/base/langflow/services/database/utils.py @@ -6,10 +6,28 @@ from alembic.util.exc import CommandError from loguru import logger from sqlmodel import Session, text +from langflow.services.deps import get_monitor_service + if TYPE_CHECKING: from langflow.services.database.service import DatabaseService +def migrate_messages_from_monitor_service_to_database(session): + from langflow.schema.message import Message + from langflow.services.database.models.message import MessageTable + + monitor_service = get_monitor_service() + messages_df = monitor_service.get_messages() + if not messages_df.empty: + messages_ids = [] + for message in messages_df.to_dict(orient="records"): + messages_ids.append(message["index"]) + message = Message(**message) + session.add(MessageTable.from_message(message)) + session.commit() + monitor_service.delete_messages(messages_ids) + + def initialize_database(fix_migration: bool = False): logger.debug("Initializing database") from langflow.services.deps import get_db_service diff --git a/src/backend/base/langflow/services/monitor/schema.py b/src/backend/base/langflow/services/monitor/schema.py index eeea846a1..939c34397 100644 --- a/src/backend/base/langflow/services/monitor/schema.py +++ b/src/backend/base/langflow/services/monitor/schema.py @@ -28,15 +28,15 @@ class DefaultModel(BaseModel): class TransactionModel(DefaultModel): - index: Optional[int] = Field(default=None) - timestamp: Optional[datetime] = Field(default_factory=datetime.now, alias="timestamp") + index: int | None = Field(default=None) + timestamp: datetime | None = Field(default_factory=datetime.now, alias="timestamp") vertex_id: str target_id: str | None = None inputs: dict - outputs: Optional[dict] = None + outputs: dict | None = None status: str - error: Optional[str] = None - flow_id: Optional[str] = Field(default=None, alias="flow_id") + error: str | None = None + flow_id: str | None = Field(default=None, alias="flow_id") # validate target_args in case it is a JSON @field_validator("outputs", "inputs", mode="before") @@ -53,16 +53,16 @@ class TransactionModel(DefaultModel): class TransactionModelResponse(DefaultModel): - index: Optional[int] = Field(default=None) - timestamp: Optional[datetime] = Field(default_factory=datetime.now, alias="timestamp") + index: int | None = Field(default=None) + timestamp: datetime | None = Field(default_factory=datetime.now, alias="timestamp") vertex_id: str inputs: dict - outputs: Optional[dict] = None + outputs: dict | None = None status: str - error: Optional[str] = None - flow_id: Optional[str] = Field(default=None, alias="flow_id") - source: Optional[str] = None - target: Optional[str] = None + error: str | None = None + flow_id: str | None = Field(default=None, alias="flow_id") + source: str | None = None + target: str | None = None # validate target_args in case it is a JSON @field_validator("outputs", "inputs", mode="before") @@ -81,9 +81,9 @@ class TransactionModelResponse(DefaultModel): return v -class MessageModel(DefaultModel): - id: Optional[str | UUID] = Field(default=None) - flow_id: Optional[UUID] = Field(default=None) +class DeprecatedMessageModel(DefaultModel): + index: int | None = Field(default=None) + flow_id: str | None = Field(default=None, alias="flow_id") timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) sender: str sender_name: str @@ -112,7 +112,53 @@ class MessageModel(DefaultModel): return v @classmethod - def from_message(cls, message: Message, flow_id: Optional[str] = None): + def from_message(cls, message: Message, flow_id: str | None = None): + # first check if the record has all the required fields + if message.text is None or not message.sender or not message.sender_name: + raise ValueError("The message does not have the required fields (text, sender, sender_name).") + return cls( + sender=message.sender, + sender_name=message.sender_name, + text=message.text, + session_id=message.session_id, + files=message.files or [], + timestamp=message.timestamp, + flow_id=flow_id, + ) + + +class MessageModel(DefaultModel): + id: str | UUID | None = Field(default=None) + flow_id: UUID | None = Field(default=None) + timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + sender: str + sender_name: str + session_id: str + text: str + files: list[str] = [] + + @field_validator("files", mode="before") + @classmethod + def validate_files(cls, v): + if isinstance(v, str): + v = json.loads(v) + return v + + @field_serializer("timestamp") + @classmethod + def serialize_timestamp(cls, v): + v = v.replace(microsecond=0) + return v.strftime("%Y-%m-%d %H:%M:%S") + + @field_serializer("files") + @classmethod + def serialize_files(cls, v): + if isinstance(v, list): + return json.dumps(v) + return v + + @classmethod + def from_message(cls, message: Message, flow_id: str | None = None): # first check if the record has all the required fields if message.text is None or not message.sender or not message.sender_name: raise ValueError("The message does not have the required fields (text, sender, sender_name).") @@ -139,8 +185,8 @@ class MessageModelRequest(MessageModel): class VertexBuildModel(DefaultModel): - index: Optional[int] = Field(default=None, alias="index", exclude=True) - id: Optional[str] = Field(default=None, alias="id") + index: int | None = Field(default=None, alias="index", exclude=True) + id: str | None = Field(default=None, alias="id") flow_id: str valid: bool params: Any diff --git a/src/backend/base/langflow/services/monitor/service.py b/src/backend/base/langflow/services/monitor/service.py index d15d31329..655682cc2 100644 --- a/src/backend/base/langflow/services/monitor/service.py +++ b/src/backend/base/langflow/services/monitor/service.py @@ -18,14 +18,14 @@ class MonitorService(Service): name = "monitor_service" def __init__(self, settings_service: "SettingsService"): - from langflow.services.monitor.schema import MessageModel, TransactionModel, VertexBuildModel + from langflow.services.monitor.schema import DeprecatedMessageModel, TransactionModel, VertexBuildModel self.settings_service = settings_service self.base_cache_dir = Path(user_cache_dir("langflow")) self.db_path = self.base_cache_dir / "monitor.duckdb" - self.table_map: dict[str, type[TransactionModel | MessageModel | VertexBuildModel]] = { + self.table_map: dict[str, type[TransactionModel | DeprecatedMessageModel | VertexBuildModel]] = { "transactions": TransactionModel, - "messages": MessageModel, + "messages": DeprecatedMessageModel, "vertex_builds": VertexBuildModel, } @@ -68,12 +68,48 @@ class MonitorService(Service): def get_timestamp(): return datetime.now().strftime("%Y-%m-%d %H:%M:%S") + def get_messages( + self, + flow_id: str | None = None, + sender: str | None = None, + sender_name: str | None = None, + session_id: str | None = None, + order_by: str | None = "timestamp", + order: str | None = "DESC", + limit: int | None = None, + ): + query = "SELECT index, flow_id, sender_name, sender, session_id, text, files, timestamp FROM messages" + conditions = [] + if sender: + conditions.append(f"sender = '{sender}'") + if sender_name: + conditions.append(f"sender_name = '{sender_name}'") + if session_id: + conditions.append(f"session_id = '{session_id}'") + if flow_id: + conditions.append(f"flow_id = '{flow_id}'") + + if conditions: + query += " WHERE " + " AND ".join(conditions) + + if order_by and order: + # Make sure the order is from newest to oldest + query += f" ORDER BY {order_by} {order.upper()}" + + if limit is not None: + query += f" LIMIT {limit}" + + with duckdb.connect(str(self.db_path), read_only=True) as conn: + df = conn.execute(query).df() + + return df + def get_vertex_builds( self, - flow_id: Optional[str] = None, - vertex_id: Optional[str] = None, - valid: Optional[bool] = None, - order_by: Optional[str] = "timestamp", + flow_id: str | None = None, + vertex_id: str | None = None, + valid: bool | None = None, + order_by: str | None = "timestamp", ): query = "SELECT id, index,flow_id, valid, params, data, artifacts, timestamp FROM vertex_builds" conditions = [] @@ -96,7 +132,7 @@ class MonitorService(Service): return df.to_dict(orient="records") - def delete_vertex_builds(self, flow_id: Optional[str] = None): + def delete_vertex_builds(self, flow_id: str | None = None): query = "DELETE FROM vertex_builds" if flow_id: query += f" WHERE flow_id = '{flow_id}'" @@ -109,7 +145,7 @@ class MonitorService(Service): return self.exec_query(query, read_only=False) - def delete_messages(self, message_ids: Union[List[int], str]): + def delete_messages(self, message_ids: list[int] | str): if isinstance(message_ids, list): # If message_ids is a list, join the string representations of the integers ids_str = ",".join(map(str, message_ids)) @@ -132,11 +168,11 @@ class MonitorService(Service): def get_transactions( self, - source: Optional[str] = None, - target: Optional[str] = None, - status: Optional[str] = None, - order_by: Optional[str] = "timestamp", - flow_id: Optional[str] = None, + source: str | None = None, + target: str | None = None, + status: str | None = None, + order_by: str | None = "timestamp", + flow_id: str | None = None, ): query = ( "SELECT index,flow_id, status, error, timestamp, vertex_id, inputs, outputs, target_id FROM transactions"