fix: assign orphaned flows to superuser if auto-login is enabled (#4715)

* fix: assign orphaned flows to superuser if auto-login is enabled

* fix: orphan flow naming

* fix: ruff errors

* fix: orphan flow naming (again)
This commit is contained in:
Ítalo Johnny 2024-11-19 18:55:07 -03:00 committed by GitHub
commit b978241d16
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 34 additions and 16 deletions

View file

@ -1,6 +1,7 @@
from __future__ import annotations
import asyncio
import re
import sqlite3
import time
from contextlib import asynccontextmanager, contextmanager
@ -145,28 +146,45 @@ class DatabaseService(Service):
async with AsyncSession(self.async_engine, expire_on_commit=False) as session:
yield session
async def migrate_flows_if_auto_login(self) -> None:
# if auto_login is enabled, we need to migrate the flows
# to the default superuser if they don't have a user id
# associated with them
async def assign_orphaned_flows_to_superuser(self) -> None:
"""Assign flows without a user ID to the default superuser if auto_login is enabled."""
# Get the settings service to check if auto_login is enabled
settings_service = get_settings_service()
if settings_service.auth_settings.AUTO_LOGIN:
async with self.with_async_session() as session:
# `== None` translates to SQL `is NULL`
# Select flows where user_id is NULL (orphaned flows)
stmt = select(models.Flow).where(models.Flow.user_id == None) # noqa: E711
flows = (await session.exec(stmt)).all()
if flows:
logger.debug("Migrating flows to default superuser")
username = settings_service.auth_settings.SUPERUSER
user = await get_user_by_username(session, username)
if not user:
orphaned_flows = (await session.exec(stmt)).all()
if orphaned_flows:
logger.debug("Assigning orphaned flows to the default superuser")
# Get the default superuser
superuser_username = settings_service.auth_settings.SUPERUSER
superuser = await get_user_by_username(session, superuser_username)
if not superuser:
logger.error("Default superuser not found")
msg = "Default superuser not found"
raise RuntimeError(msg)
for flow in flows:
flow.user_id = user.id
stmt = select(models.Flow.name).where(models.Flow.user_id == superuser.id)
result = await session.exec(stmt)
superuser_flows_names = result.all()
# Assign each orphaned flow to the superuser
for flow in orphaned_flows:
flow.user_id = superuser.id
if flow.name in superuser_flows_names:
name_match = re.search(r"\((\d+)\)$", flow.name)
if not name_match:
flow.name = f"{flow.name} (1)"
else:
num = int(name_match.group(1)) + 1
flow.name = re.sub(r"\(\d+\)$", f"({num})", flow.name)
# Commit the changes to the database
await session.commit()
logger.debug("Flows migrated successfully")
logger.debug("Successfully assigned orphaned flows to the default superuser")
def check_schema_health(self) -> bool:
inspector = inspect(self.engine)

View file

@ -166,8 +166,8 @@ async def initialize_services(*, fix_migration: bool = False) -> None:
async with get_db_service().with_async_session() as session:
await setup_superuser(get_service(ServiceType.SETTINGS_SERVICE), session)
try:
await get_db_service().migrate_flows_if_auto_login()
await get_db_service().assign_orphaned_flows_to_superuser()
except Exception as exc:
msg = "Error migrating flows"
msg = "Error assigning orphaned flows to the superuser"
logger.exception(msg)
raise RuntimeError(msg) from exc