diff --git a/src/backend/base/langflow/services/database/utils.py b/src/backend/base/langflow/services/database/utils.py index 17150abf6..fa40c725f 100644 --- a/src/backend/base/langflow/services/database/utils.py +++ b/src/backend/base/langflow/services/database/utils.py @@ -29,11 +29,11 @@ def migrate_messages_from_monitor_service_to_database(session: Session) -> bool: db_messages = session.exec(select(MessageTable)).all() db_messages = [msg[0] for msg in db_messages] # type: ignore - db_msg_dict = {(msg.text, msg.timestamp.isoformat(), str(msg.flow_id, msg.session_id)): msg for msg in db_messages} + db_msg_dict = {(msg.text, msg.timestamp.isoformat(), str(msg.flow_id), msg.session_id): msg for msg in db_messages} # Filter out messages that already exist in the database original_messages_filtered = [] for message in original_messages: - key = (message["text"], message["timestamp"].isoformat(), str(message["flow_id"])) + key = (message["text"], message["timestamp"].isoformat(), str(message["flow_id"]), message["session_id"]) if key not in db_msg_dict: original_messages_filtered.append(message) if not original_messages_filtered: @@ -42,8 +42,8 @@ def migrate_messages_from_monitor_service_to_database(session: Session) -> bool: try: # Bulk insert messages session.bulk_insert_mappings( - MessageTable, - [MessageTable.from_message(Message(**msg)).model_dump() for msg in original_messages_filtered], # type: ignore + MessageTable, # type: ignore + [MessageTable.from_message(Message(**msg)).model_dump() for msg in original_messages_filtered], ) session.commit() except Exception as e: @@ -55,7 +55,7 @@ def migrate_messages_from_monitor_service_to_database(session: Session) -> bool: all_ok = True for orig_msg in original_messages_filtered: - key = (orig_msg["text"], orig_msg["timestamp"].isoformat(), str(orig_msg["flow_id"])) + key = (orig_msg["text"], orig_msg["timestamp"].isoformat(), str(orig_msg["flow_id"]), orig_msg["session_id"]) matching_db_msg = db_msg_dict.get(key) if matching_db_msg is None: