feat: Migrate messages from monitor service to database

This commit migrates messages from the monitor service to the database. It adds a new function `migrate_messages_from_monitor_service_to_database` in the `utils.py` file, which retrieves messages from the monitor service, adds them to the database, and deletes them from the monitor service. This migration ensures that messages are stored in the database for better reliability and retrieval.
This commit is contained in:
Gabriel Luiz Freitas Almeida 2024-06-26 12:04:00 -03:00
commit ee4ace8bfe
5 changed files with 158 additions and 50 deletions

View file

@ -1,5 +1,4 @@
import warnings
from typing import List, Optional
from uuid import UUID
from loguru import logger
@ -8,17 +7,18 @@ from sqlmodel import Session, col, select
from langflow.schema.message import Message
from langflow.services.database.models.message.model import MessageRead, MessageTable
from langflow.services.database.utils import migrate_messages_from_monitor_service_to_database
from langflow.services.deps import session_scope
def get_messages(
sender: Optional[str] = None,
sender_name: Optional[str] = None,
session_id: Optional[str] = None,
order_by: Optional[str] = "timestamp",
order: Optional[str] = "DESC",
flow_id: Optional[UUID] = None,
limit: Optional[int] = None,
sender: str | None = None,
sender_name: str | None = None,
session_id: str | None = None,
order_by: str | None = "timestamp",
order: str | None = "DESC",
flow_id: UUID | None = None,
limit: int | None = None,
):
"""
Retrieves messages from the monitor service based on the provided filters.
@ -33,6 +33,7 @@ def get_messages(
Returns:
List[Data]: A list of Data objects representing the retrieved messages.
"""
migrate_messages_from_monitor_service_to_database()
messages_read: list[Message] = []
with session_scope() as session:
stmt = select(MessageTable)
@ -58,7 +59,7 @@ def get_messages(
return messages_read
def add_messages(messages: Message | list[Message], flow_id: Optional[str] = None):
def add_messages(messages: Message | list[Message], flow_id: str | None = None):
"""
Add a message to the monitor service.
"""
@ -111,8 +112,8 @@ def delete_messages(session_id: str):
def store_message(
message: Message,
flow_id: Optional[str] = None,
) -> List[Message]:
flow_id: str | None = None,
) -> list[Message]:
"""
Stores a message in the memory.

View file

@ -6,22 +6,24 @@ from typing import TYPE_CHECKING
import sqlalchemy as sa
from alembic import command, util
from alembic.config import Config
from langflow.services.base import Service
from langflow.services.database import models # noqa
from langflow.services.database.models.user.crud import get_user_by_username
from langflow.services.database.utils import Result, TableResults
from langflow.services.deps import get_settings_service
from langflow.services.utils import teardown_superuser
from loguru import logger
from sqlalchemy import event, inspect
from sqlalchemy.engine import Engine
from sqlalchemy.exc import OperationalError
from sqlmodel import Session, SQLModel, create_engine, select, text
from langflow.services.base import Service
from langflow.services.database import models # noqa
from langflow.services.database.models.user.crud import get_user_by_username
from langflow.services.database.utils import Result, TableResults, migrate_messages_from_monitor_service_to_database
from langflow.services.deps import get_settings_service
from langflow.services.utils import teardown_superuser
if TYPE_CHECKING:
from langflow.services.settings.service import SettingsService
from sqlalchemy.engine import Engine
from langflow.services.settings.service import SettingsService
class DatabaseService(Service):
name = "database_service"
@ -205,6 +207,11 @@ class DatabaseService(Service):
logger.error(f"AutogenerateDiffsDetected: {exc}")
if not fix:
raise RuntimeError(f"There's a mismatch between the models and the database.\n{exc}")
try:
migrate_messages_from_monitor_service_to_database(session)
except Exception as exc:
logger.error(f"Error migrating messages from monitor service to database: {exc}")
raise RuntimeError("Error migrating messages from monitor service to database") from exc
if fix:
self.try_downgrade_upgrade_until_success(alembic_cfg)

View file

@ -6,10 +6,28 @@ from alembic.util.exc import CommandError
from loguru import logger
from sqlmodel import Session, text
from langflow.services.deps import get_monitor_service
if TYPE_CHECKING:
from langflow.services.database.service import DatabaseService
def migrate_messages_from_monitor_service_to_database(session):
from langflow.schema.message import Message
from langflow.services.database.models.message import MessageTable
monitor_service = get_monitor_service()
messages_df = monitor_service.get_messages()
if not messages_df.empty:
messages_ids = []
for message in messages_df.to_dict(orient="records"):
messages_ids.append(message["index"])
message = Message(**message)
session.add(MessageTable.from_message(message))
session.commit()
monitor_service.delete_messages(messages_ids)
def initialize_database(fix_migration: bool = False):
logger.debug("Initializing database")
from langflow.services.deps import get_db_service

View file

@ -28,15 +28,15 @@ class DefaultModel(BaseModel):
class TransactionModel(DefaultModel):
index: Optional[int] = Field(default=None)
timestamp: Optional[datetime] = Field(default_factory=datetime.now, alias="timestamp")
index: int | None = Field(default=None)
timestamp: datetime | None = Field(default_factory=datetime.now, alias="timestamp")
vertex_id: str
target_id: str | None = None
inputs: dict
outputs: Optional[dict] = None
outputs: dict | None = None
status: str
error: Optional[str] = None
flow_id: Optional[str] = Field(default=None, alias="flow_id")
error: str | None = None
flow_id: str | None = Field(default=None, alias="flow_id")
# validate target_args in case it is a JSON
@field_validator("outputs", "inputs", mode="before")
@ -53,16 +53,16 @@ class TransactionModel(DefaultModel):
class TransactionModelResponse(DefaultModel):
index: Optional[int] = Field(default=None)
timestamp: Optional[datetime] = Field(default_factory=datetime.now, alias="timestamp")
index: int | None = Field(default=None)
timestamp: datetime | None = Field(default_factory=datetime.now, alias="timestamp")
vertex_id: str
inputs: dict
outputs: Optional[dict] = None
outputs: dict | None = None
status: str
error: Optional[str] = None
flow_id: Optional[str] = Field(default=None, alias="flow_id")
source: Optional[str] = None
target: Optional[str] = None
error: str | None = None
flow_id: str | None = Field(default=None, alias="flow_id")
source: str | None = None
target: str | None = None
# validate target_args in case it is a JSON
@field_validator("outputs", "inputs", mode="before")
@ -81,9 +81,9 @@ class TransactionModelResponse(DefaultModel):
return v
class MessageModel(DefaultModel):
id: Optional[str | UUID] = Field(default=None)
flow_id: Optional[UUID] = Field(default=None)
class DeprecatedMessageModel(DefaultModel):
index: int | None = Field(default=None)
flow_id: str | None = Field(default=None, alias="flow_id")
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
sender: str
sender_name: str
@ -112,7 +112,53 @@ class MessageModel(DefaultModel):
return v
@classmethod
def from_message(cls, message: Message, flow_id: Optional[str] = None):
def from_message(cls, message: Message, flow_id: str | None = None):
# first check if the record has all the required fields
if message.text is None or not message.sender or not message.sender_name:
raise ValueError("The message does not have the required fields (text, sender, sender_name).")
return cls(
sender=message.sender,
sender_name=message.sender_name,
text=message.text,
session_id=message.session_id,
files=message.files or [],
timestamp=message.timestamp,
flow_id=flow_id,
)
class MessageModel(DefaultModel):
id: str | UUID | None = Field(default=None)
flow_id: UUID | None = Field(default=None)
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
sender: str
sender_name: str
session_id: str
text: str
files: list[str] = []
@field_validator("files", mode="before")
@classmethod
def validate_files(cls, v):
if isinstance(v, str):
v = json.loads(v)
return v
@field_serializer("timestamp")
@classmethod
def serialize_timestamp(cls, v):
v = v.replace(microsecond=0)
return v.strftime("%Y-%m-%d %H:%M:%S")
@field_serializer("files")
@classmethod
def serialize_files(cls, v):
if isinstance(v, list):
return json.dumps(v)
return v
@classmethod
def from_message(cls, message: Message, flow_id: str | None = None):
# first check if the record has all the required fields
if message.text is None or not message.sender or not message.sender_name:
raise ValueError("The message does not have the required fields (text, sender, sender_name).")
@ -139,8 +185,8 @@ class MessageModelRequest(MessageModel):
class VertexBuildModel(DefaultModel):
index: Optional[int] = Field(default=None, alias="index", exclude=True)
id: Optional[str] = Field(default=None, alias="id")
index: int | None = Field(default=None, alias="index", exclude=True)
id: str | None = Field(default=None, alias="id")
flow_id: str
valid: bool
params: Any

View file

@ -18,14 +18,14 @@ class MonitorService(Service):
name = "monitor_service"
def __init__(self, settings_service: "SettingsService"):
from langflow.services.monitor.schema import MessageModel, TransactionModel, VertexBuildModel
from langflow.services.monitor.schema import DeprecatedMessageModel, TransactionModel, VertexBuildModel
self.settings_service = settings_service
self.base_cache_dir = Path(user_cache_dir("langflow"))
self.db_path = self.base_cache_dir / "monitor.duckdb"
self.table_map: dict[str, type[TransactionModel | MessageModel | VertexBuildModel]] = {
self.table_map: dict[str, type[TransactionModel | DeprecatedMessageModel | VertexBuildModel]] = {
"transactions": TransactionModel,
"messages": MessageModel,
"messages": DeprecatedMessageModel,
"vertex_builds": VertexBuildModel,
}
@ -68,12 +68,48 @@ class MonitorService(Service):
def get_timestamp():
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def get_messages(
self,
flow_id: str | None = None,
sender: str | None = None,
sender_name: str | None = None,
session_id: str | None = None,
order_by: str | None = "timestamp",
order: str | None = "DESC",
limit: int | None = None,
):
query = "SELECT index, flow_id, sender_name, sender, session_id, text, files, timestamp FROM messages"
conditions = []
if sender:
conditions.append(f"sender = '{sender}'")
if sender_name:
conditions.append(f"sender_name = '{sender_name}'")
if session_id:
conditions.append(f"session_id = '{session_id}'")
if flow_id:
conditions.append(f"flow_id = '{flow_id}'")
if conditions:
query += " WHERE " + " AND ".join(conditions)
if order_by and order:
# Make sure the order is from newest to oldest
query += f" ORDER BY {order_by} {order.upper()}"
if limit is not None:
query += f" LIMIT {limit}"
with duckdb.connect(str(self.db_path), read_only=True) as conn:
df = conn.execute(query).df()
return df
def get_vertex_builds(
self,
flow_id: Optional[str] = None,
vertex_id: Optional[str] = None,
valid: Optional[bool] = None,
order_by: Optional[str] = "timestamp",
flow_id: str | None = None,
vertex_id: str | None = None,
valid: bool | None = None,
order_by: str | None = "timestamp",
):
query = "SELECT id, index,flow_id, valid, params, data, artifacts, timestamp FROM vertex_builds"
conditions = []
@ -96,7 +132,7 @@ class MonitorService(Service):
return df.to_dict(orient="records")
def delete_vertex_builds(self, flow_id: Optional[str] = None):
def delete_vertex_builds(self, flow_id: str | None = None):
query = "DELETE FROM vertex_builds"
if flow_id:
query += f" WHERE flow_id = '{flow_id}'"
@ -109,7 +145,7 @@ class MonitorService(Service):
return self.exec_query(query, read_only=False)
def delete_messages(self, message_ids: Union[List[int], str]):
def delete_messages(self, message_ids: list[int] | str):
if isinstance(message_ids, list):
# If message_ids is a list, join the string representations of the integers
ids_str = ",".join(map(str, message_ids))
@ -132,11 +168,11 @@ class MonitorService(Service):
def get_transactions(
self,
source: Optional[str] = None,
target: Optional[str] = None,
status: Optional[str] = None,
order_by: Optional[str] = "timestamp",
flow_id: Optional[str] = None,
source: str | None = None,
target: str | None = None,
status: str | None = None,
order_by: str | None = "timestamp",
flow_id: str | None = None,
):
query = (
"SELECT index,flow_id, status, error, timestamp, vertex_id, inputs, outputs, target_id FROM transactions"