From ee4ace8bfe661480bcd93d28334008c47e4556e4 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Wed, 26 Jun 2024 12:04:00 -0300 Subject: [PATCH 01/17] feat: Migrate messages from monitor service to database This commit migrates messages from the monitor service to the database. It adds a new function `migrate_messages_from_monitor_service_to_database` in the `utils.py` file, which retrieves messages from the monitor service, adds them to the database, and deletes them from the monitor service. This migration ensures that messages are stored in the database for better reliability and retrieval. --- src/backend/base/langflow/memory.py | 23 +++--- .../langflow/services/database/service.py | 21 +++-- .../base/langflow/services/database/utils.py | 18 ++++ .../base/langflow/services/monitor/schema.py | 82 +++++++++++++++---- .../base/langflow/services/monitor/service.py | 64 +++++++++++---- 5 files changed, 158 insertions(+), 50 deletions(-) diff --git a/src/backend/base/langflow/memory.py b/src/backend/base/langflow/memory.py index d1793740d..28052525e 100644 --- a/src/backend/base/langflow/memory.py +++ b/src/backend/base/langflow/memory.py @@ -1,5 +1,4 @@ import warnings -from typing import List, Optional from uuid import UUID from loguru import logger @@ -8,17 +7,18 @@ from sqlmodel import Session, col, select from langflow.schema.message import Message from langflow.services.database.models.message.model import MessageRead, MessageTable +from langflow.services.database.utils import migrate_messages_from_monitor_service_to_database from langflow.services.deps import session_scope def get_messages( - sender: Optional[str] = None, - sender_name: Optional[str] = None, - session_id: Optional[str] = None, - order_by: Optional[str] = "timestamp", - order: Optional[str] = "DESC", - flow_id: Optional[UUID] = None, - limit: Optional[int] = None, + sender: str | None = None, + sender_name: str | None = None, + session_id: str | None = None, + order_by: str | None = "timestamp", + order: str | None = "DESC", + flow_id: UUID | None = None, + limit: int | None = None, ): """ Retrieves messages from the monitor service based on the provided filters. @@ -33,6 +33,7 @@ def get_messages( Returns: List[Data]: A list of Data objects representing the retrieved messages. """ + migrate_messages_from_monitor_service_to_database() messages_read: list[Message] = [] with session_scope() as session: stmt = select(MessageTable) @@ -58,7 +59,7 @@ def get_messages( return messages_read -def add_messages(messages: Message | list[Message], flow_id: Optional[str] = None): +def add_messages(messages: Message | list[Message], flow_id: str | None = None): """ Add a message to the monitor service. """ @@ -111,8 +112,8 @@ def delete_messages(session_id: str): def store_message( message: Message, - flow_id: Optional[str] = None, -) -> List[Message]: + flow_id: str | None = None, +) -> list[Message]: """ Stores a message in the memory. diff --git a/src/backend/base/langflow/services/database/service.py b/src/backend/base/langflow/services/database/service.py index ceeaf3e38..ef9f0e8c8 100644 --- a/src/backend/base/langflow/services/database/service.py +++ b/src/backend/base/langflow/services/database/service.py @@ -6,22 +6,24 @@ from typing import TYPE_CHECKING import sqlalchemy as sa from alembic import command, util from alembic.config import Config -from langflow.services.base import Service -from langflow.services.database import models # noqa -from langflow.services.database.models.user.crud import get_user_by_username -from langflow.services.database.utils import Result, TableResults -from langflow.services.deps import get_settings_service -from langflow.services.utils import teardown_superuser from loguru import logger from sqlalchemy import event, inspect from sqlalchemy.engine import Engine from sqlalchemy.exc import OperationalError from sqlmodel import Session, SQLModel, create_engine, select, text +from langflow.services.base import Service +from langflow.services.database import models # noqa +from langflow.services.database.models.user.crud import get_user_by_username +from langflow.services.database.utils import Result, TableResults, migrate_messages_from_monitor_service_to_database +from langflow.services.deps import get_settings_service +from langflow.services.utils import teardown_superuser + if TYPE_CHECKING: - from langflow.services.settings.service import SettingsService from sqlalchemy.engine import Engine + from langflow.services.settings.service import SettingsService + class DatabaseService(Service): name = "database_service" @@ -205,6 +207,11 @@ class DatabaseService(Service): logger.error(f"AutogenerateDiffsDetected: {exc}") if not fix: raise RuntimeError(f"There's a mismatch between the models and the database.\n{exc}") + try: + migrate_messages_from_monitor_service_to_database(session) + except Exception as exc: + logger.error(f"Error migrating messages from monitor service to database: {exc}") + raise RuntimeError("Error migrating messages from monitor service to database") from exc if fix: self.try_downgrade_upgrade_until_success(alembic_cfg) diff --git a/src/backend/base/langflow/services/database/utils.py b/src/backend/base/langflow/services/database/utils.py index cf2c92cb3..f285bc631 100644 --- a/src/backend/base/langflow/services/database/utils.py +++ b/src/backend/base/langflow/services/database/utils.py @@ -6,10 +6,28 @@ from alembic.util.exc import CommandError from loguru import logger from sqlmodel import Session, text +from langflow.services.deps import get_monitor_service + if TYPE_CHECKING: from langflow.services.database.service import DatabaseService +def migrate_messages_from_monitor_service_to_database(session): + from langflow.schema.message import Message + from langflow.services.database.models.message import MessageTable + + monitor_service = get_monitor_service() + messages_df = monitor_service.get_messages() + if not messages_df.empty: + messages_ids = [] + for message in messages_df.to_dict(orient="records"): + messages_ids.append(message["index"]) + message = Message(**message) + session.add(MessageTable.from_message(message)) + session.commit() + monitor_service.delete_messages(messages_ids) + + def initialize_database(fix_migration: bool = False): logger.debug("Initializing database") from langflow.services.deps import get_db_service diff --git a/src/backend/base/langflow/services/monitor/schema.py b/src/backend/base/langflow/services/monitor/schema.py index eeea846a1..939c34397 100644 --- a/src/backend/base/langflow/services/monitor/schema.py +++ b/src/backend/base/langflow/services/monitor/schema.py @@ -28,15 +28,15 @@ class DefaultModel(BaseModel): class TransactionModel(DefaultModel): - index: Optional[int] = Field(default=None) - timestamp: Optional[datetime] = Field(default_factory=datetime.now, alias="timestamp") + index: int | None = Field(default=None) + timestamp: datetime | None = Field(default_factory=datetime.now, alias="timestamp") vertex_id: str target_id: str | None = None inputs: dict - outputs: Optional[dict] = None + outputs: dict | None = None status: str - error: Optional[str] = None - flow_id: Optional[str] = Field(default=None, alias="flow_id") + error: str | None = None + flow_id: str | None = Field(default=None, alias="flow_id") # validate target_args in case it is a JSON @field_validator("outputs", "inputs", mode="before") @@ -53,16 +53,16 @@ class TransactionModel(DefaultModel): class TransactionModelResponse(DefaultModel): - index: Optional[int] = Field(default=None) - timestamp: Optional[datetime] = Field(default_factory=datetime.now, alias="timestamp") + index: int | None = Field(default=None) + timestamp: datetime | None = Field(default_factory=datetime.now, alias="timestamp") vertex_id: str inputs: dict - outputs: Optional[dict] = None + outputs: dict | None = None status: str - error: Optional[str] = None - flow_id: Optional[str] = Field(default=None, alias="flow_id") - source: Optional[str] = None - target: Optional[str] = None + error: str | None = None + flow_id: str | None = Field(default=None, alias="flow_id") + source: str | None = None + target: str | None = None # validate target_args in case it is a JSON @field_validator("outputs", "inputs", mode="before") @@ -81,9 +81,9 @@ class TransactionModelResponse(DefaultModel): return v -class MessageModel(DefaultModel): - id: Optional[str | UUID] = Field(default=None) - flow_id: Optional[UUID] = Field(default=None) +class DeprecatedMessageModel(DefaultModel): + index: int | None = Field(default=None) + flow_id: str | None = Field(default=None, alias="flow_id") timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) sender: str sender_name: str @@ -112,7 +112,53 @@ class MessageModel(DefaultModel): return v @classmethod - def from_message(cls, message: Message, flow_id: Optional[str] = None): + def from_message(cls, message: Message, flow_id: str | None = None): + # first check if the record has all the required fields + if message.text is None or not message.sender or not message.sender_name: + raise ValueError("The message does not have the required fields (text, sender, sender_name).") + return cls( + sender=message.sender, + sender_name=message.sender_name, + text=message.text, + session_id=message.session_id, + files=message.files or [], + timestamp=message.timestamp, + flow_id=flow_id, + ) + + +class MessageModel(DefaultModel): + id: str | UUID | None = Field(default=None) + flow_id: UUID | None = Field(default=None) + timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + sender: str + sender_name: str + session_id: str + text: str + files: list[str] = [] + + @field_validator("files", mode="before") + @classmethod + def validate_files(cls, v): + if isinstance(v, str): + v = json.loads(v) + return v + + @field_serializer("timestamp") + @classmethod + def serialize_timestamp(cls, v): + v = v.replace(microsecond=0) + return v.strftime("%Y-%m-%d %H:%M:%S") + + @field_serializer("files") + @classmethod + def serialize_files(cls, v): + if isinstance(v, list): + return json.dumps(v) + return v + + @classmethod + def from_message(cls, message: Message, flow_id: str | None = None): # first check if the record has all the required fields if message.text is None or not message.sender or not message.sender_name: raise ValueError("The message does not have the required fields (text, sender, sender_name).") @@ -139,8 +185,8 @@ class MessageModelRequest(MessageModel): class VertexBuildModel(DefaultModel): - index: Optional[int] = Field(default=None, alias="index", exclude=True) - id: Optional[str] = Field(default=None, alias="id") + index: int | None = Field(default=None, alias="index", exclude=True) + id: str | None = Field(default=None, alias="id") flow_id: str valid: bool params: Any diff --git a/src/backend/base/langflow/services/monitor/service.py b/src/backend/base/langflow/services/monitor/service.py index d15d31329..655682cc2 100644 --- a/src/backend/base/langflow/services/monitor/service.py +++ b/src/backend/base/langflow/services/monitor/service.py @@ -18,14 +18,14 @@ class MonitorService(Service): name = "monitor_service" def __init__(self, settings_service: "SettingsService"): - from langflow.services.monitor.schema import MessageModel, TransactionModel, VertexBuildModel + from langflow.services.monitor.schema import DeprecatedMessageModel, TransactionModel, VertexBuildModel self.settings_service = settings_service self.base_cache_dir = Path(user_cache_dir("langflow")) self.db_path = self.base_cache_dir / "monitor.duckdb" - self.table_map: dict[str, type[TransactionModel | MessageModel | VertexBuildModel]] = { + self.table_map: dict[str, type[TransactionModel | DeprecatedMessageModel | VertexBuildModel]] = { "transactions": TransactionModel, - "messages": MessageModel, + "messages": DeprecatedMessageModel, "vertex_builds": VertexBuildModel, } @@ -68,12 +68,48 @@ class MonitorService(Service): def get_timestamp(): return datetime.now().strftime("%Y-%m-%d %H:%M:%S") + def get_messages( + self, + flow_id: str | None = None, + sender: str | None = None, + sender_name: str | None = None, + session_id: str | None = None, + order_by: str | None = "timestamp", + order: str | None = "DESC", + limit: int | None = None, + ): + query = "SELECT index, flow_id, sender_name, sender, session_id, text, files, timestamp FROM messages" + conditions = [] + if sender: + conditions.append(f"sender = '{sender}'") + if sender_name: + conditions.append(f"sender_name = '{sender_name}'") + if session_id: + conditions.append(f"session_id = '{session_id}'") + if flow_id: + conditions.append(f"flow_id = '{flow_id}'") + + if conditions: + query += " WHERE " + " AND ".join(conditions) + + if order_by and order: + # Make sure the order is from newest to oldest + query += f" ORDER BY {order_by} {order.upper()}" + + if limit is not None: + query += f" LIMIT {limit}" + + with duckdb.connect(str(self.db_path), read_only=True) as conn: + df = conn.execute(query).df() + + return df + def get_vertex_builds( self, - flow_id: Optional[str] = None, - vertex_id: Optional[str] = None, - valid: Optional[bool] = None, - order_by: Optional[str] = "timestamp", + flow_id: str | None = None, + vertex_id: str | None = None, + valid: bool | None = None, + order_by: str | None = "timestamp", ): query = "SELECT id, index,flow_id, valid, params, data, artifacts, timestamp FROM vertex_builds" conditions = [] @@ -96,7 +132,7 @@ class MonitorService(Service): return df.to_dict(orient="records") - def delete_vertex_builds(self, flow_id: Optional[str] = None): + def delete_vertex_builds(self, flow_id: str | None = None): query = "DELETE FROM vertex_builds" if flow_id: query += f" WHERE flow_id = '{flow_id}'" @@ -109,7 +145,7 @@ class MonitorService(Service): return self.exec_query(query, read_only=False) - def delete_messages(self, message_ids: Union[List[int], str]): + def delete_messages(self, message_ids: list[int] | str): if isinstance(message_ids, list): # If message_ids is a list, join the string representations of the integers ids_str = ",".join(map(str, message_ids)) @@ -132,11 +168,11 @@ class MonitorService(Service): def get_transactions( self, - source: Optional[str] = None, - target: Optional[str] = None, - status: Optional[str] = None, - order_by: Optional[str] = "timestamp", - flow_id: Optional[str] = None, + source: str | None = None, + target: str | None = None, + status: str | None = None, + order_by: str | None = "timestamp", + flow_id: str | None = None, ): query = ( "SELECT index,flow_id, status, error, timestamp, vertex_id, inputs, outputs, target_id FROM transactions" From bb7f365ccd93b2eda1218f92a1568e7353b748a3 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Wed, 26 Jun 2024 16:11:29 -0300 Subject: [PATCH 02/17] chore: Update schema and service files for DuckDbMessageModel --- src/backend/base/langflow/services/monitor/schema.py | 5 ++--- src/backend/base/langflow/services/monitor/service.py | 8 ++++---- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/backend/base/langflow/services/monitor/schema.py b/src/backend/base/langflow/services/monitor/schema.py index 939c34397..587e7628e 100644 --- a/src/backend/base/langflow/services/monitor/schema.py +++ b/src/backend/base/langflow/services/monitor/schema.py @@ -1,6 +1,6 @@ import json from datetime import datetime, timezone -from typing import Any, Optional +from typing import Any from uuid import UUID from pydantic import BaseModel, Field, field_serializer, field_validator @@ -81,8 +81,7 @@ class TransactionModelResponse(DefaultModel): return v -class DeprecatedMessageModel(DefaultModel): - index: int | None = Field(default=None) +class DuckDbMessageModel(DefaultModel): flow_id: str | None = Field(default=None, alias="flow_id") timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) sender: str diff --git a/src/backend/base/langflow/services/monitor/service.py b/src/backend/base/langflow/services/monitor/service.py index 655682cc2..4f0655602 100644 --- a/src/backend/base/langflow/services/monitor/service.py +++ b/src/backend/base/langflow/services/monitor/service.py @@ -1,6 +1,6 @@ from datetime import datetime from pathlib import Path -from typing import TYPE_CHECKING, List, Optional, Union +from typing import TYPE_CHECKING, Union import duckdb from loguru import logger @@ -18,14 +18,14 @@ class MonitorService(Service): name = "monitor_service" def __init__(self, settings_service: "SettingsService"): - from langflow.services.monitor.schema import DeprecatedMessageModel, TransactionModel, VertexBuildModel + from langflow.services.monitor.schema import DuckDbMessageModel, TransactionModel, VertexBuildModel self.settings_service = settings_service self.base_cache_dir = Path(user_cache_dir("langflow")) self.db_path = self.base_cache_dir / "monitor.duckdb" - self.table_map: dict[str, type[TransactionModel | DeprecatedMessageModel | VertexBuildModel]] = { + self.table_map: dict[str, type[TransactionModel | DuckDbMessageModel | VertexBuildModel]] = { "transactions": TransactionModel, - "messages": DeprecatedMessageModel, + "messages": DuckDbMessageModel, "vertex_builds": VertexBuildModel, } From 347ec5659815ad46c93b68e65a6ef49c83a999ff Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Wed, 26 Jun 2024 16:11:39 -0300 Subject: [PATCH 03/17] feat: Add flow_id assignment in MessageBase constructor --- .../base/langflow/services/database/models/message/model.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/backend/base/langflow/services/database/models/message/model.py b/src/backend/base/langflow/services/database/models/message/model.py index 5a775d3d9..128088398 100644 --- a/src/backend/base/langflow/services/database/models/message/model.py +++ b/src/backend/base/langflow/services/database/models/message/model.py @@ -34,6 +34,8 @@ class MessageBase(SQLModel): timestamp = datetime.fromisoformat(message.timestamp) else: timestamp = message.timestamp + if not flow_id and message.flow_id: + flow_id = message.flow_id return cls( sender=message.sender, sender_name=message.sender_name, From 5ff9bed407439769df9939d754d1466518510424 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Wed, 26 Jun 2024 16:11:54 -0300 Subject: [PATCH 04/17] Migrate messages from monitor service to database --- .../base/langflow/services/database/utils.py | 64 ++++++++++++++++--- 1 file changed, 56 insertions(+), 8 deletions(-) diff --git a/src/backend/base/langflow/services/database/utils.py b/src/backend/base/langflow/services/database/utils.py index f285bc631..bcb178e7f 100644 --- a/src/backend/base/langflow/services/database/utils.py +++ b/src/backend/base/langflow/services/database/utils.py @@ -4,28 +4,76 @@ from typing import TYPE_CHECKING from alembic.util.exc import CommandError from loguru import logger -from sqlmodel import Session, text +from sqlmodel import Session, select, text from langflow.services.deps import get_monitor_service if TYPE_CHECKING: from langflow.services.database.service import DatabaseService +from typing import Dict, List -def migrate_messages_from_monitor_service_to_database(session): + +def migrate_messages_from_monitor_service_to_database(session: Session) -> bool: from langflow.schema.message import Message from langflow.services.database.models.message import MessageTable monitor_service = get_monitor_service() messages_df = monitor_service.get_messages() - if not messages_df.empty: - messages_ids = [] - for message in messages_df.to_dict(orient="records"): - messages_ids.append(message["index"]) - message = Message(**message) - session.add(MessageTable.from_message(message)) + + if messages_df.empty: + logger.info("No messages to migrate.") + return True + + original_messages: List[Dict] = messages_df.to_dict(orient="records") + + db_messages = session.exec(select(MessageTable)).all() + db_messages = [msg[0] for msg in db_messages] + db_msg_dict = {(msg.text, msg.timestamp.isoformat(), str(msg.flow_id, msg.session_id)): msg for msg in db_messages} + # Filter out messages that already exist in the database + original_messages_filtered = [] + for message in original_messages: + key = (message["text"], message["timestamp"].isoformat(), str(message["flow_id"])) + if key not in db_msg_dict: + original_messages_filtered.append(message) + if not original_messages_filtered: + logger.info("No messages to migrate.") + return True + try: + # Bulk insert messages + session.bulk_insert_mappings( + MessageTable, [MessageTable.from_message(Message(**msg)).model_dump() for msg in original_messages_filtered] + ) session.commit() + except Exception as e: + logger.error(f"Error during message insertion: {str(e)}") + session.rollback() + return False + + # Create a dictionary for faster lookup + + all_ok = True + for orig_msg in original_messages_filtered: + key = (orig_msg["text"], orig_msg["timestamp"].isoformat(), str(orig_msg["flow_id"])) + matching_db_msg = db_msg_dict.get(key) + + if matching_db_msg is None: + logger.warning(f"Message not found in database: {orig_msg}") + all_ok = False + else: + # Validate other fields + if any(getattr(matching_db_msg, k) != v for k, v in orig_msg.items() if k != "index"): + logger.warning(f"Message mismatch in database: {orig_msg}") + all_ok = False + + if all_ok: + messages_ids = [message["index"] for message in original_messages] monitor_service.delete_messages(messages_ids) + logger.info("Migration completed successfully. Original messages deleted.") + else: + logger.warning("Migration completed with errors. Original messages not deleted.") + + return all_ok def initialize_database(fix_migration: bool = False): From 86f25f3b2c794e1b13235dd588a495003402b493 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Wed, 26 Jun 2024 16:12:00 -0300 Subject: [PATCH 05/17] feat: Add flow_id serialization in Message class This commit adds the `serialize_flow_id` method to the `Message` class in the `message.py` file. This method serializes the `flow_id` attribute of a `Message` object, converting it to a UUID if it is a string. This ensures consistent serialization of the `flow_id` attribute when working with the `Message` class. --- src/backend/base/langflow/schema/message.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/backend/base/langflow/schema/message.py b/src/backend/base/langflow/schema/message.py index c03c0cb71..5ad7d1922 100644 --- a/src/backend/base/langflow/schema/message.py +++ b/src/backend/base/langflow/schema/message.py @@ -41,6 +41,12 @@ class Message(Data): value = str(value) return value + @field_serializer("flow_id") + def serialize_flow_id(value): + if isinstance(value, str): + return UUID(value) + return value + @field_validator("files", mode="before") @classmethod def validate_files(cls, value): From 4b4aefc8bc4885d51e278ba829d09812ed37389b Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Wed, 26 Jun 2024 16:12:12 -0300 Subject: [PATCH 06/17] feat: Migrate messages from monitor service to database --- src/backend/base/langflow/memory.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/backend/base/langflow/memory.py b/src/backend/base/langflow/memory.py index 28052525e..f052acdee 100644 --- a/src/backend/base/langflow/memory.py +++ b/src/backend/base/langflow/memory.py @@ -33,7 +33,8 @@ def get_messages( Returns: List[Data]: A list of Data objects representing the retrieved messages. """ - migrate_messages_from_monitor_service_to_database() + with session_scope() as session: + migrate_messages_from_monitor_service_to_database(session) messages_read: list[Message] = [] with session_scope() as session: stmt = select(MessageTable) From 3afda401f3a4019d51c4bbe7fcd0730b59a15c49 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Wed, 26 Jun 2024 16:12:18 -0300 Subject: [PATCH 07/17] chore: Update filterwarnings in pyproject.toml --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 52d1ecd01..264e67a3c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -146,7 +146,7 @@ ignore-regex = '.*(Stati Uniti|Tense=Pres).*' minversion = "6.0" testpaths = ["tests", "integration"] console_output_style = "progress" -filterwarnings = ["ignore::DeprecationWarning"] +filterwarnings = ["ignore::DeprecationWarning", "ignore::ResourceWarning"] log_cli = true markers = ["async_test", "api_key_required"] From 1341860dea7ec94955306c09ca433b92ab93b4ca Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Wed, 26 Jun 2024 16:12:22 -0300 Subject: [PATCH 08/17] Refactor test_get_messages function to use list comprehension --- tests/unit/test_messages.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/tests/unit/test_messages.py b/tests/unit/test_messages.py index 198387db8..059d82b61 100644 --- a/tests/unit/test_messages.py +++ b/tests/unit/test_messages.py @@ -35,16 +35,20 @@ def created_messages(session): return messages_read -def test_get_messages(session): - add_messages(Message(text="Test message 1", sender="User", sender_name="User", session_id="session_id2")) - add_messages(Message(text="Test message 2", sender="User", sender_name="User", session_id="session_id2")) +def test_get_messages(): + add_messages( + [ + Message(text="Test message 1", sender="User", sender_name="User", session_id="session_id2"), + Message(text="Test message 2", sender="User", sender_name="User", session_id="session_id2"), + ] + ) messages = get_messages(sender="User", session_id="session_id2", limit=2) assert len(messages) == 2 assert messages[0].text == "Test message 1" assert messages[1].text == "Test message 2" -def test_add_messages(session): +def test_add_messages(): message = Message(text="New Test message", sender="User", sender_name="User", session_id="new_session_id") messages = add_messages(message) assert len(messages) == 1 @@ -65,7 +69,7 @@ def test_delete_messages(session): assert len(messages) == 0 -def test_store_message(session): +def test_store_message(): message = Message(text="Stored message", sender="User", sender_name="User", session_id="stored_session_id") stored_messages = store_message(message) assert len(stored_messages) == 1 From 8e466b83ba9817a161cbc91eae3cb0c90d0195a3 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Wed, 26 Jun 2024 16:13:08 -0300 Subject: [PATCH 09/17] feat: Add field validation for flow_id in MessageTable model This commit adds field validation for the `flow_id` attribute in the `MessageTable` model. The `validate_flow_id` class method is implemented to ensure that the `flow_id` value is either `None` or a valid UUID. This validation helps maintain data integrity and consistency when working with the `MessageTable` model. --- .../langflow/services/database/models/message/model.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/backend/base/langflow/services/database/models/message/model.py b/src/backend/base/langflow/services/database/models/message/model.py index 128088398..69b461753 100644 --- a/src/backend/base/langflow/services/database/models/message/model.py +++ b/src/backend/base/langflow/services/database/models/message/model.py @@ -54,6 +54,15 @@ class MessageTable(MessageBase, table=True): flow: "Flow" = Relationship(back_populates="messages") files: List[str] = Field(sa_column=Column(JSON)) + @field_validator("flow_id", mode="before") + @classmethod + def validate_flow_id(cls, value): + if value is None: + return value + if isinstance(value, str): + value = UUID(value) + return value + # Needed for Column(JSON) class Config: arbitrary_types_allowed = True From e6b42d3a60919fb802e5fc439de5baf74407655c Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Wed, 26 Jun 2024 17:35:59 -0300 Subject: [PATCH 10/17] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20(utils.py):=20ignore?= =?UTF-8?q?=20type=20error?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/backend/base/langflow/services/database/utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/backend/base/langflow/services/database/utils.py b/src/backend/base/langflow/services/database/utils.py index bcb178e7f..7cc889f0f 100644 --- a/src/backend/base/langflow/services/database/utils.py +++ b/src/backend/base/langflow/services/database/utils.py @@ -42,7 +42,8 @@ def migrate_messages_from_monitor_service_to_database(session: Session) -> bool: try: # Bulk insert messages session.bulk_insert_mappings( - MessageTable, [MessageTable.from_message(Message(**msg)).model_dump() for msg in original_messages_filtered] + MessageTable, + [MessageTable.from_message(Message(**msg)).model_dump() for msg in original_messages_filtered], # type: ignore ) session.commit() except Exception as e: From 34a88f508836c464c00a1447e27981207819ad7f Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Wed, 26 Jun 2024 17:38:17 -0300 Subject: [PATCH 11/17] add type ignore --- src/backend/base/langflow/services/database/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/backend/base/langflow/services/database/utils.py b/src/backend/base/langflow/services/database/utils.py index 7cc889f0f..17150abf6 100644 --- a/src/backend/base/langflow/services/database/utils.py +++ b/src/backend/base/langflow/services/database/utils.py @@ -28,7 +28,7 @@ def migrate_messages_from_monitor_service_to_database(session: Session) -> bool: original_messages: List[Dict] = messages_df.to_dict(orient="records") db_messages = session.exec(select(MessageTable)).all() - db_messages = [msg[0] for msg in db_messages] + db_messages = [msg[0] for msg in db_messages] # type: ignore db_msg_dict = {(msg.text, msg.timestamp.isoformat(), str(msg.flow_id, msg.session_id)): msg for msg in db_messages} # Filter out messages that already exist in the database original_messages_filtered = [] From 93ce1ab14e16c3cf79bc5b66cd330942d38a0e56 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Wed, 26 Jun 2024 17:38:23 -0300 Subject: [PATCH 12/17] refactor: Update from_message method in MessageBase model to accept UUID as flow_id parameter --- .../base/langflow/services/database/models/message/model.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/backend/base/langflow/services/database/models/message/model.py b/src/backend/base/langflow/services/database/models/message/model.py index 69b461753..7c0b9dc8f 100644 --- a/src/backend/base/langflow/services/database/models/message/model.py +++ b/src/backend/base/langflow/services/database/models/message/model.py @@ -26,7 +26,7 @@ class MessageBase(SQLModel): return value @classmethod - def from_message(cls, message: "Message", flow_id: str | None = None): + def from_message(cls, message: "Message", flow_id: str | UUID | None = None): # first check if the record has all the required fields if message.text is None or not message.sender or not message.sender_name: raise ValueError("The message does not have the required fields (text, sender, sender_name).") From 04b1ce8b16d59ed0d9be16f1dbbcd94864d44811 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Wed, 26 Jun 2024 17:39:45 -0300 Subject: [PATCH 13/17] refactor: Update migrate_messages_from_monitor_service_to_database function in utils.py This commit refactors the migrate_messages_from_monitor_service_to_database function in utils.py to correctly handle the session_id parameter. The session_id is now included in the key used to filter out messages that already exist in the database, ensuring that duplicate messages are not inserted. This improves the data integrity and consistency of the database. --- src/backend/base/langflow/services/database/utils.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/backend/base/langflow/services/database/utils.py b/src/backend/base/langflow/services/database/utils.py index 17150abf6..fa40c725f 100644 --- a/src/backend/base/langflow/services/database/utils.py +++ b/src/backend/base/langflow/services/database/utils.py @@ -29,11 +29,11 @@ def migrate_messages_from_monitor_service_to_database(session: Session) -> bool: db_messages = session.exec(select(MessageTable)).all() db_messages = [msg[0] for msg in db_messages] # type: ignore - db_msg_dict = {(msg.text, msg.timestamp.isoformat(), str(msg.flow_id, msg.session_id)): msg for msg in db_messages} + db_msg_dict = {(msg.text, msg.timestamp.isoformat(), str(msg.flow_id), msg.session_id): msg for msg in db_messages} # Filter out messages that already exist in the database original_messages_filtered = [] for message in original_messages: - key = (message["text"], message["timestamp"].isoformat(), str(message["flow_id"])) + key = (message["text"], message["timestamp"].isoformat(), str(message["flow_id"]), message["session_id"]) if key not in db_msg_dict: original_messages_filtered.append(message) if not original_messages_filtered: @@ -42,8 +42,8 @@ def migrate_messages_from_monitor_service_to_database(session: Session) -> bool: try: # Bulk insert messages session.bulk_insert_mappings( - MessageTable, - [MessageTable.from_message(Message(**msg)).model_dump() for msg in original_messages_filtered], # type: ignore + MessageTable, # type: ignore + [MessageTable.from_message(Message(**msg)).model_dump() for msg in original_messages_filtered], ) session.commit() except Exception as e: @@ -55,7 +55,7 @@ def migrate_messages_from_monitor_service_to_database(session: Session) -> bool: all_ok = True for orig_msg in original_messages_filtered: - key = (orig_msg["text"], orig_msg["timestamp"].isoformat(), str(orig_msg["flow_id"])) + key = (orig_msg["text"], orig_msg["timestamp"].isoformat(), str(orig_msg["flow_id"]), orig_msg["session_id"]) matching_db_msg = db_msg_dict.get(key) if matching_db_msg is None: From 16622522bed9c2286c13d282ebf9d82428a80ac0 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Wed, 26 Jun 2024 17:48:36 -0300 Subject: [PATCH 14/17] refactor: Remove error handling for migrating messages from monitor service to database --- src/backend/base/langflow/services/database/service.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/backend/base/langflow/services/database/service.py b/src/backend/base/langflow/services/database/service.py index ef9f0e8c8..32e0b08e2 100644 --- a/src/backend/base/langflow/services/database/service.py +++ b/src/backend/base/langflow/services/database/service.py @@ -211,7 +211,6 @@ class DatabaseService(Service): migrate_messages_from_monitor_service_to_database(session) except Exception as exc: logger.error(f"Error migrating messages from monitor service to database: {exc}") - raise RuntimeError("Error migrating messages from monitor service to database") from exc if fix: self.try_downgrade_upgrade_until_success(alembic_cfg) From 23f199e57f57e0eeb5bb886974646e7a76e7e763 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Wed, 26 Jun 2024 17:48:44 -0300 Subject: [PATCH 15/17] refactor: Add index field to DuckDbMessageModel in monitor schema This commit adds the `index` field to the `DuckDbMessageModel` class in the `monitor/schema.py` file. The `index` field is of type `int` and is used to store the index value for the message. This modification enhances the schema of the `DuckDbMessageModel` and allows for more efficient querying and sorting of messages based on their index. --- src/backend/base/langflow/services/monitor/schema.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/backend/base/langflow/services/monitor/schema.py b/src/backend/base/langflow/services/monitor/schema.py index 587e7628e..41018047f 100644 --- a/src/backend/base/langflow/services/monitor/schema.py +++ b/src/backend/base/langflow/services/monitor/schema.py @@ -82,6 +82,7 @@ class TransactionModelResponse(DefaultModel): class DuckDbMessageModel(DefaultModel): + index: int flow_id: str | None = Field(default=None, alias="flow_id") timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) sender: str From d54672a3e300ab31dc92a9a6ca4eadf520bb5894 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Wed, 26 Jun 2024 17:48:54 -0300 Subject: [PATCH 16/17] refactor: Update DuckDbMessageModel import in monitor service This commit updates the import statement for the `DuckDbMessageModel` class in the `monitor/service.py` file. The import is modified to reflect the recent changes made to the `monitor/schema.py` file, where the `DuckDbMessageModel` class was added. This update ensures that the correct class is imported and used in the `add_row` method of the `MonitorService` class. --- src/backend/base/langflow/services/monitor/service.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/backend/base/langflow/services/monitor/service.py b/src/backend/base/langflow/services/monitor/service.py index 4f0655602..f644fd871 100644 --- a/src/backend/base/langflow/services/monitor/service.py +++ b/src/backend/base/langflow/services/monitor/service.py @@ -10,7 +10,7 @@ from langflow.services.base import Service from langflow.services.monitor.utils import add_row_to_table, drop_and_create_table_if_schema_mismatch if TYPE_CHECKING: - from langflow.services.monitor.schema import MessageModel, TransactionModel, VertexBuildModel + from langflow.services.monitor.schema import DuckDbMessageModel, TransactionModel, VertexBuildModel from langflow.services.settings.service import SettingsService @@ -48,7 +48,7 @@ class MonitorService(Service): def add_row( self, table_name: str, - data: Union[dict, "TransactionModel", "MessageModel", "VertexBuildModel"], + data: Union[dict, "TransactionModel", "DuckDbMessageModel", "VertexBuildModel"], ): # Make sure the model passed matches the table From 00a753631ce22b18b90cec543120e8da7b78087c Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Wed, 26 Jun 2024 19:56:21 -0300 Subject: [PATCH 17/17] feat: Add default value for index field in DuckDbMessageModel This commit adds a default value of `None` for the `index` field in the `DuckDbMessageModel` class. The default value is set using the `Field` class from the `pydantic` library, with the `default` parameter set to `None` and the `alias` parameter set to "index". This change ensures that the `index` field is optional and can be omitted when creating instances of the `DuckDbMessageModel` class. Note: The commit message has been generated based on the provided code changes and recent commits. --- src/backend/base/langflow/services/monitor/schema.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/backend/base/langflow/services/monitor/schema.py b/src/backend/base/langflow/services/monitor/schema.py index 41018047f..de0bb17bb 100644 --- a/src/backend/base/langflow/services/monitor/schema.py +++ b/src/backend/base/langflow/services/monitor/schema.py @@ -82,7 +82,7 @@ class TransactionModelResponse(DefaultModel): class DuckDbMessageModel(DefaultModel): - index: int + index: int | None = Field(default=None, alias="index") flow_id: str | None = Field(default=None, alias="flow_id") timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) sender: str