From ed40e55ef1a35263cb720e55cc337208c55c49bc Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Sun, 23 Jun 2024 22:07:41 -0300 Subject: [PATCH] refactor: Update add_messages function to use database session This commit refactors the add_messages function in memory.py to use a database session for adding messages to the monitor service. Instead of directly calling the monitor_service.add_message method, the messages are now added using a session object. This change ensures that the messages are properly persisted in the database and improves the reliability of the application. --- src/backend/base/langflow/memory.py | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/src/backend/base/langflow/memory.py b/src/backend/base/langflow/memory.py index e89682969..c824310d9 100644 --- a/src/backend/base/langflow/memory.py +++ b/src/backend/base/langflow/memory.py @@ -4,8 +4,8 @@ from typing import List, Optional from loguru import logger from langflow.schema.message import Message -from langflow.services.deps import get_monitor_service -from langflow.services.monitor.schema import MessageModel +from langflow.services.database.models.message.model import MessageTable +from langflow.services.deps import get_monitor_service, session_scope def get_messages( @@ -64,7 +64,6 @@ def add_messages(messages: Message | list[Message], flow_id: Optional[str] = Non Add a message to the monitor service. """ try: - monitor_service = get_monitor_service() if not isinstance(messages, list): messages = [messages] @@ -72,19 +71,20 @@ def add_messages(messages: Message | list[Message], flow_id: Optional[str] = Non types = ", ".join([str(type(message)) for message in messages]) raise ValueError(f"The messages must be instances of Message. Found: {types}") - messages_models: list[MessageModel] = [] + messages_models: list[MessageTable] = [] for msg in messages: - if not msg.timestamp: - msg.timestamp = monitor_service.get_timestamp() - messages_models.append(MessageModel.from_message(msg, flow_id=flow_id)) + messages_models.append(MessageTable.from_message(msg, flow_id=flow_id)) + with session_scope() as session: + for message_model in messages_models: + try: + session.add(message_model) + session.commit() + session.refresh(message_model) + except Exception as e: + logger.error(f"Error adding message to monitor service: {e}") + logger.exception(e) + raise e - for message_model in messages_models: - try: - monitor_service.add_message(message_model) - except Exception as e: - logger.error(f"Error adding message to monitor service: {e}") - logger.exception(e) - raise e return messages_models except Exception as e: logger.exception(e)