From 162d9edb551a8c74d5e8a71d59df74a56f8f6ffb Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Wed, 26 Jun 2024 17:39:45 -0300 Subject: [PATCH] refactor: Update migrate_messages_from_monitor_service_to_database function in utils.py This commit refactors the migrate_messages_from_monitor_service_to_database function in utils.py to correctly handle the session_id parameter. The session_id is now included in the key used to filter out messages that already exist in the database, ensuring that duplicate messages are not inserted. This improves the data integrity and consistency of the database. --- src/backend/base/langflow/services/database/utils.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/backend/base/langflow/services/database/utils.py b/src/backend/base/langflow/services/database/utils.py index 17150abf6..fa40c725f 100644 --- a/src/backend/base/langflow/services/database/utils.py +++ b/src/backend/base/langflow/services/database/utils.py @@ -29,11 +29,11 @@ def migrate_messages_from_monitor_service_to_database(session: Session) -> bool: db_messages = session.exec(select(MessageTable)).all() db_messages = [msg[0] for msg in db_messages] # type: ignore - db_msg_dict = {(msg.text, msg.timestamp.isoformat(), str(msg.flow_id, msg.session_id)): msg for msg in db_messages} + db_msg_dict = {(msg.text, msg.timestamp.isoformat(), str(msg.flow_id), msg.session_id): msg for msg in db_messages} # Filter out messages that already exist in the database original_messages_filtered = [] for message in original_messages: - key = (message["text"], message["timestamp"].isoformat(), str(message["flow_id"])) + key = (message["text"], message["timestamp"].isoformat(), str(message["flow_id"]), message["session_id"]) if key not in db_msg_dict: original_messages_filtered.append(message) if not original_messages_filtered: @@ -42,8 +42,8 @@ def migrate_messages_from_monitor_service_to_database(session: Session) -> bool: try: # Bulk insert messages session.bulk_insert_mappings( - MessageTable, - [MessageTable.from_message(Message(**msg)).model_dump() for msg in original_messages_filtered], # type: ignore + MessageTable, # type: ignore + [MessageTable.from_message(Message(**msg)).model_dump() for msg in original_messages_filtered], ) session.commit() except Exception as e: @@ -55,7 +55,7 @@ def migrate_messages_from_monitor_service_to_database(session: Session) -> bool: all_ok = True for orig_msg in original_messages_filtered: - key = (orig_msg["text"], orig_msg["timestamp"].isoformat(), str(orig_msg["flow_id"])) + key = (orig_msg["text"], orig_msg["timestamp"].isoformat(), str(orig_msg["flow_id"]), orig_msg["session_id"]) matching_db_msg = db_msg_dict.get(key) if matching_db_msg is None: