refactor: Update migrate_messages_from_monitor_service_to_database function in utils.py
This commit refactors the migrate_messages_from_monitor_service_to_database function in utils.py to correctly handle the session_id parameter. The session_id is now included in the key used to filter out messages that already exist in the database, ensuring that duplicate messages are not inserted. This improves the data integrity and consistency of the database.
This commit is contained in:
parent
e456b006ea
commit
162d9edb55
1 changed files with 5 additions and 5 deletions
|
|
@ -29,11 +29,11 @@ def migrate_messages_from_monitor_service_to_database(session: Session) -> bool:
|
|||
|
||||
db_messages = session.exec(select(MessageTable)).all()
|
||||
db_messages = [msg[0] for msg in db_messages] # type: ignore
|
||||
db_msg_dict = {(msg.text, msg.timestamp.isoformat(), str(msg.flow_id, msg.session_id)): msg for msg in db_messages}
|
||||
db_msg_dict = {(msg.text, msg.timestamp.isoformat(), str(msg.flow_id), msg.session_id): msg for msg in db_messages}
|
||||
# Filter out messages that already exist in the database
|
||||
original_messages_filtered = []
|
||||
for message in original_messages:
|
||||
key = (message["text"], message["timestamp"].isoformat(), str(message["flow_id"]))
|
||||
key = (message["text"], message["timestamp"].isoformat(), str(message["flow_id"]), message["session_id"])
|
||||
if key not in db_msg_dict:
|
||||
original_messages_filtered.append(message)
|
||||
if not original_messages_filtered:
|
||||
|
|
@ -42,8 +42,8 @@ def migrate_messages_from_monitor_service_to_database(session: Session) -> bool:
|
|||
try:
|
||||
# Bulk insert messages
|
||||
session.bulk_insert_mappings(
|
||||
MessageTable,
|
||||
[MessageTable.from_message(Message(**msg)).model_dump() for msg in original_messages_filtered], # type: ignore
|
||||
MessageTable, # type: ignore
|
||||
[MessageTable.from_message(Message(**msg)).model_dump() for msg in original_messages_filtered],
|
||||
)
|
||||
session.commit()
|
||||
except Exception as e:
|
||||
|
|
@ -55,7 +55,7 @@ def migrate_messages_from_monitor_service_to_database(session: Session) -> bool:
|
|||
|
||||
all_ok = True
|
||||
for orig_msg in original_messages_filtered:
|
||||
key = (orig_msg["text"], orig_msg["timestamp"].isoformat(), str(orig_msg["flow_id"]))
|
||||
key = (orig_msg["text"], orig_msg["timestamp"].isoformat(), str(orig_msg["flow_id"]), orig_msg["session_id"])
|
||||
matching_db_msg = db_msg_dict.get(key)
|
||||
|
||||
if matching_db_msg is None:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue