refactor: Update add_messages function to use database session
This commit refactors the add_messages function in memory.py to use a database session for adding messages to the monitor service. Instead of directly calling the monitor_service.add_message method, the messages are now added using a session object. This change ensures that the messages are properly persisted in the database and improves the reliability of the application.
This commit is contained in:
parent
2f2bc4008f
commit
ed40e55ef1
1 changed files with 14 additions and 14 deletions
|
|
@ -4,8 +4,8 @@ from typing import List, Optional
|
|||
from loguru import logger
|
||||
|
||||
from langflow.schema.message import Message
|
||||
from langflow.services.deps import get_monitor_service
|
||||
from langflow.services.monitor.schema import MessageModel
|
||||
from langflow.services.database.models.message.model import MessageTable
|
||||
from langflow.services.deps import get_monitor_service, session_scope
|
||||
|
||||
|
||||
def get_messages(
|
||||
|
|
@ -64,7 +64,6 @@ def add_messages(messages: Message | list[Message], flow_id: Optional[str] = Non
|
|||
Add a message to the monitor service.
|
||||
"""
|
||||
try:
|
||||
monitor_service = get_monitor_service()
|
||||
if not isinstance(messages, list):
|
||||
messages = [messages]
|
||||
|
||||
|
|
@ -72,19 +71,20 @@ def add_messages(messages: Message | list[Message], flow_id: Optional[str] = Non
|
|||
types = ", ".join([str(type(message)) for message in messages])
|
||||
raise ValueError(f"The messages must be instances of Message. Found: {types}")
|
||||
|
||||
messages_models: list[MessageModel] = []
|
||||
messages_models: list[MessageTable] = []
|
||||
for msg in messages:
|
||||
if not msg.timestamp:
|
||||
msg.timestamp = monitor_service.get_timestamp()
|
||||
messages_models.append(MessageModel.from_message(msg, flow_id=flow_id))
|
||||
messages_models.append(MessageTable.from_message(msg, flow_id=flow_id))
|
||||
with session_scope() as session:
|
||||
for message_model in messages_models:
|
||||
try:
|
||||
session.add(message_model)
|
||||
session.commit()
|
||||
session.refresh(message_model)
|
||||
except Exception as e:
|
||||
logger.error(f"Error adding message to monitor service: {e}")
|
||||
logger.exception(e)
|
||||
raise e
|
||||
|
||||
for message_model in messages_models:
|
||||
try:
|
||||
monitor_service.add_message(message_model)
|
||||
except Exception as e:
|
||||
logger.error(f"Error adding message to monitor service: {e}")
|
||||
logger.exception(e)
|
||||
raise e
|
||||
return messages_models
|
||||
except Exception as e:
|
||||
logger.exception(e)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue