diff --git a/src/backend/base/langflow/services/database/service.py b/src/backend/base/langflow/services/database/service.py index 7890d9ac8..643835aa7 100644 --- a/src/backend/base/langflow/services/database/service.py +++ b/src/backend/base/langflow/services/database/service.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import re import sqlite3 import time from contextlib import asynccontextmanager, contextmanager @@ -145,28 +146,45 @@ class DatabaseService(Service): async with AsyncSession(self.async_engine, expire_on_commit=False) as session: yield session - async def migrate_flows_if_auto_login(self) -> None: - # if auto_login is enabled, we need to migrate the flows - # to the default superuser if they don't have a user id - # associated with them + async def assign_orphaned_flows_to_superuser(self) -> None: + """Assign flows without a user ID to the default superuser if auto_login is enabled.""" + # Get the settings service to check if auto_login is enabled settings_service = get_settings_service() if settings_service.auth_settings.AUTO_LOGIN: async with self.with_async_session() as session: - # `== None` translates to SQL `is NULL` + # Select flows where user_id is NULL (orphaned flows) stmt = select(models.Flow).where(models.Flow.user_id == None) # noqa: E711 - flows = (await session.exec(stmt)).all() - if flows: - logger.debug("Migrating flows to default superuser") - username = settings_service.auth_settings.SUPERUSER - user = await get_user_by_username(session, username) - if not user: + orphaned_flows = (await session.exec(stmt)).all() + + if orphaned_flows: + logger.debug("Assigning orphaned flows to the default superuser") + + # Get the default superuser + superuser_username = settings_service.auth_settings.SUPERUSER + superuser = await get_user_by_username(session, superuser_username) + + if not superuser: logger.error("Default superuser not found") msg = "Default superuser not found" raise RuntimeError(msg) - for flow in flows: - flow.user_id = user.id + + stmt = select(models.Flow.name).where(models.Flow.user_id == superuser.id) + result = await session.exec(stmt) + superuser_flows_names = result.all() + # Assign each orphaned flow to the superuser + for flow in orphaned_flows: + flow.user_id = superuser.id + if flow.name in superuser_flows_names: + name_match = re.search(r"\((\d+)\)$", flow.name) + if not name_match: + flow.name = f"{flow.name} (1)" + else: + num = int(name_match.group(1)) + 1 + flow.name = re.sub(r"\(\d+\)$", f"({num})", flow.name) + + # Commit the changes to the database await session.commit() - logger.debug("Flows migrated successfully") + logger.debug("Successfully assigned orphaned flows to the default superuser") def check_schema_health(self) -> bool: inspector = inspect(self.engine) diff --git a/src/backend/base/langflow/services/utils.py b/src/backend/base/langflow/services/utils.py index d857c6e9e..962d128f5 100644 --- a/src/backend/base/langflow/services/utils.py +++ b/src/backend/base/langflow/services/utils.py @@ -166,8 +166,8 @@ async def initialize_services(*, fix_migration: bool = False) -> None: async with get_db_service().with_async_session() as session: await setup_superuser(get_service(ServiceType.SETTINGS_SERVICE), session) try: - await get_db_service().migrate_flows_if_auto_login() + await get_db_service().assign_orphaned_flows_to_superuser() except Exception as exc: - msg = "Error migrating flows" + msg = "Error assigning orphaned flows to the superuser" logger.exception(msg) raise RuntimeError(msg) from exc