diff --git a/src/backend/base/langflow/__main__.py b/src/backend/base/langflow/__main__.py index 8cb0523be..cbfe65d96 100644 --- a/src/backend/base/langflow/__main__.py +++ b/src/backend/base/langflow/__main__.py @@ -25,9 +25,9 @@ from rich.panel import Panel from rich.table import Table from sqlmodel import select +from langflow.initial_setup.setup import get_or_create_default_folder from langflow.logging.logger import configure, logger from langflow.main import setup_app -from langflow.services.database.models.folder.utils import create_default_folder_if_it_doesnt_exist from langflow.services.database.utils import session_getter from langflow.services.deps import get_db_service, get_settings_service, session_scope from langflow.services.settings.constants import DEFAULT_SUPERUSER @@ -464,7 +464,7 @@ def superuser( typer.echo("Superuser creation failed.") return # Now create the first folder for the user - result = await create_default_folder_if_it_doesnt_exist(session, user.id) + result = await get_or_create_default_folder(session, user.id) if result: typer.echo("Default folder created successfully.") else: diff --git a/src/backend/base/langflow/api/v1/login.py b/src/backend/base/langflow/api/v1/login.py index d46e40993..2671809ef 100644 --- a/src/backend/base/langflow/api/v1/login.py +++ b/src/backend/base/langflow/api/v1/login.py @@ -7,13 +7,13 @@ from fastapi.security import OAuth2PasswordRequestForm from langflow.api.utils import DbSession from langflow.api.v1.schemas import Token +from langflow.initial_setup.setup import get_or_create_default_folder from langflow.services.auth.utils import ( authenticate_user, create_refresh_token, create_user_longterm_token, create_user_tokens, ) -from langflow.services.database.models.folder.utils import create_default_folder_if_it_doesnt_exist from langflow.services.database.models.user.crud import get_user_by_id from langflow.services.deps import get_settings_service, get_variable_service @@ -68,7 +68,7 @@ async def login_to_get_access_token( ) await get_variable_service().initialize_user_variables(user.id, db) # Create default folder for user if it doesn't exist - await create_default_folder_if_it_doesnt_exist(db, user.id) + _ = await get_or_create_default_folder(db, user.id) return tokens raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, diff --git a/src/backend/base/langflow/api/v1/users.py b/src/backend/base/langflow/api/v1/users.py index 81e965fbf..e0c545f39 100644 --- a/src/backend/base/langflow/api/v1/users.py +++ b/src/backend/base/langflow/api/v1/users.py @@ -9,12 +9,12 @@ from sqlmodel.sql.expression import SelectOfScalar from langflow.api.utils import CurrentActiveUser, DbSession from langflow.api.v1.schemas import UsersResponse +from langflow.initial_setup.setup import get_or_create_default_folder from langflow.services.auth.utils import ( get_current_active_superuser, get_password_hash, verify_password, ) -from langflow.services.database.models.folder.utils import create_default_folder_if_it_doesnt_exist from langflow.services.database.models.user import User, UserCreate, UserRead, UserUpdate from langflow.services.database.models.user.crud import get_user_by_id, update_user from langflow.services.deps import get_settings_service @@ -35,7 +35,7 @@ async def add_user( session.add(new_user) await session.commit() await session.refresh(new_user) - folder = await create_default_folder_if_it_doesnt_exist(session, new_user.id) + folder = await get_or_create_default_folder(session, new_user.id) if not folder: raise HTTPException(status_code=500, detail="Error creating default folder") except IntegrityError as e: diff --git a/src/backend/base/langflow/initial_setup/setup.py b/src/backend/base/langflow/initial_setup/setup.py index 4a6d136f0..2e3994ac7 100644 --- a/src/backend/base/langflow/initial_setup/setup.py +++ b/src/backend/base/langflow/initial_setup/setup.py @@ -17,6 +17,7 @@ from uuid import UUID import anyio import httpx import orjson +import sqlalchemy as sa from aiofile import async_open from emoji import demojize, purely_emoji from loguru import logger @@ -29,11 +30,8 @@ from langflow.base.constants import FIELD_FORMAT_ATTRIBUTES, NODE_FORMAT_ATTRIBU from langflow.initial_setup.constants import STARTER_FOLDER_DESCRIPTION, STARTER_FOLDER_NAME from langflow.services.auth.utils import create_super_user from langflow.services.database.models.flow.model import Flow, FlowCreate -from langflow.services.database.models.folder.model import Folder, FolderCreate -from langflow.services.database.models.folder.utils import ( - create_default_folder_if_it_doesnt_exist, - get_default_folder_id, -) +from langflow.services.database.models.folder.constants import DEFAULT_FOLDER_NAME +from langflow.services.database.models.folder.model import Folder, FolderCreate, FolderRead from langflow.services.database.models.user.crud import get_user_by_username from langflow.services.deps import get_settings_service, get_storage_service, get_variable_service, session_scope from langflow.template.field.prompt import DEFAULT_PROMPT_INTUT_TYPES @@ -595,6 +593,10 @@ async def load_flows_from_directory() -> None: if user is None: msg = "Superuser not found in the database" raise NoResultFound(msg) + + # Ensure that the default folder exists for this user + _ = await get_or_create_default_folder(session, user.id) + async for file_path in anyio.Path(flows_path).iterdir(): if not await file_path.is_file() or file_path.suffix != ".json": continue @@ -707,11 +709,9 @@ async def upsert_flow_from_file(file_content: AnyStr, filename: str, session: As existing.updated_at = datetime.now(tz=timezone.utc).astimezone() existing.user_id = user_id - # Generally, folder_id should not be None, but we must check this due to the previous - # behavior where flows could be added and folder_id was None, orphaning - # them within Langflow. + # Ensure that the flow is associated with an existing default folder if existing.folder_id is None: - folder_id = await get_default_folder_id(session, user_id) + folder_id = await get_or_create_default_folder(session, user_id) existing.folder_id = folder_id if isinstance(existing.id, str): @@ -725,11 +725,11 @@ async def upsert_flow_from_file(file_content: AnyStr, filename: str, session: As else: logger.info(f"Creating new flow: {flow_id} with endpoint name {flow_endpoint_name}") - # Current behavior loads all new flows into default folder - folder_id = await get_default_folder_id(session, user_id) + # Assign the newly created flow to the default folder + folder = await get_or_create_default_folder(session, user_id) flow["user_id"] = user_id - flow["folder_id"] = folder_id - flow = Flow.model_validate(flow, from_attributes=True) + flow["folder_id"] = folder.id + flow = Flow.model_validate(flow) flow.updated_at = datetime.now(tz=timezone.utc).astimezone() session.add(flow) @@ -816,5 +816,42 @@ async def initialize_super_user_if_needed() -> None: async with session_scope() as async_session: super_user = await create_super_user(db=async_session, username=username, password=password) await get_variable_service().initialize_user_variables(super_user.id, async_session) - await create_default_folder_if_it_doesnt_exist(async_session, super_user.id) + _ = await get_or_create_default_folder(async_session, super_user.id) logger.info("Super user initialized") + + +async def get_or_create_default_folder(session: AsyncSession, user_id: UUID) -> FolderRead: + """Ensure the default folder exists for the given user_id. If it doesn't exist, create it. + + Uses an idempotent insertion approach to handle concurrent creation gracefully. + + This implementation avoids an external distributed lock and works with both SQLite and PostgreSQL. + + Args: + session (AsyncSession): The active database session. + user_id (UUID): The ID of the user who owns the folder. + + Returns: + UUID: The ID of the default folder. + """ + stmt = select(Folder).where(Folder.user_id == user_id, Folder.name == DEFAULT_FOLDER_NAME) + result = await session.exec(stmt) + folder = result.first() + if folder: + return FolderRead.model_validate(folder, from_attributes=True) + + try: + folder_obj = Folder(user_id=user_id, name=DEFAULT_FOLDER_NAME) + session.add(folder_obj) + await session.commit() + await session.refresh(folder_obj) + except sa.exc.IntegrityError as e: + # Another worker may have created the folder concurrently. + await session.rollback() + result = await session.exec(stmt) + folder = result.first() + if folder: + return FolderRead.model_validate(folder, from_attributes=True) + msg = "Failed to get or create default folder" + raise ValueError(msg) from e + return FolderRead.model_validate(folder_obj, from_attributes=True) diff --git a/src/backend/base/langflow/services/database/models/folder/utils.py b/src/backend/base/langflow/services/database/models/folder/utils.py index c7f8f4aee..42687b505 100644 --- a/src/backend/base/langflow/services/database/models/folder/utils.py +++ b/src/backend/base/langflow/services/database/models/folder/utils.py @@ -3,6 +3,7 @@ from uuid import UUID from sqlmodel import and_, select, update from sqlmodel.ext.asyncio.session import AsyncSession +from langflow.initial_setup.setup import get_or_create_default_folder from langflow.services.database.models.flow.model import Flow from .constants import DEFAULT_FOLDER_DESCRIPTION, DEFAULT_FOLDER_NAME @@ -40,5 +41,5 @@ async def get_default_folder_id(session: AsyncSession, user_id: UUID): await session.exec(select(Folder).where(Folder.name == DEFAULT_FOLDER_NAME, Folder.user_id == user_id)) ).first() if not folder: - folder = await create_default_folder_if_it_doesnt_exist(session, user_id) + folder = await get_or_create_default_folder(session, user_id) return folder.id diff --git a/src/backend/tests/unit/initial_setup/test_setup_functions.py b/src/backend/tests/unit/initial_setup/test_setup_functions.py new file mode 100644 index 000000000..2ca996635 --- /dev/null +++ b/src/backend/tests/unit/initial_setup/test_setup_functions.py @@ -0,0 +1,52 @@ +import asyncio +from uuid import uuid4 + +import pytest +from langflow.initial_setup.setup import DEFAULT_FOLDER_NAME, get_or_create_default_folder, session_scope +from langflow.services.database.models.folder.model import FolderRead + + +@pytest.mark.usefixtures("client") +async def test_get_or_create_default_folder_creation() -> None: + """Test that a default folder is created for a new user. + + This test verifies that when no default folder exists for a given user, + get_or_create_default_folder creates one with the expected name and assigns it an ID. + """ + test_user_id = uuid4() + async with session_scope() as session: + folder = await get_or_create_default_folder(session, test_user_id) + assert folder.name == DEFAULT_FOLDER_NAME, "The folder name should match the default." + assert hasattr(folder, "id"), "The folder should have an 'id' attribute after creation." + + +@pytest.mark.usefixtures("client") +async def test_get_or_create_default_folder_idempotency() -> None: + """Test that subsequent calls to get_or_create_default_folder return the same folder. + + The function should be idempotent such that if a default folder already exists, + calling the function again does not create a new one. + """ + test_user_id = uuid4() + async with session_scope() as session: + folder_first = await get_or_create_default_folder(session, test_user_id) + folder_second = await get_or_create_default_folder(session, test_user_id) + assert folder_first.id == folder_second.id, "Both calls should return the same folder instance." + + +@pytest.mark.usefixtures("client") +async def test_get_or_create_default_folder_concurrent_calls() -> None: + """Test concurrent invocations of get_or_create_default_folder. + + This test ensures that when multiple concurrent calls are made for the same user, + only one default folder is created, demonstrating idempotency under concurrent access. + """ + test_user_id = uuid4() + + async def get_folder() -> FolderRead: + async with session_scope() as session: + return await get_or_create_default_folder(session, test_user_id) + + results = await asyncio.gather(get_folder(), get_folder(), get_folder()) + folder_ids = {folder.id for folder in results} + assert len(folder_ids) == 1, "Concurrent calls must return a single, consistent folder instance." diff --git a/src/frontend/tests/extended/regression/generalBugs-shard-10.spec.ts b/src/frontend/tests/extended/regression/generalBugs-shard-10.spec.ts index f281875ea..56db662c5 100644 --- a/src/frontend/tests/extended/regression/generalBugs-shard-10.spec.ts +++ b/src/frontend/tests/extended/regression/generalBugs-shard-10.spec.ts @@ -34,21 +34,15 @@ test( //connection 1 - const elementPrompt = await page + await page .getByTestId("handle-prompt-shownode-prompt message-right") - .first(); - await elementPrompt.hover(); - await page.mouse.down(); + .first() + .click(); - await page.locator('//*[@id="react-flow-id"]').hover(); - - const elementChatOutput = await page + await page .getByTestId("handle-chatoutput-shownode-text-left") - .first(); - await elementChatOutput.hover(); - await page.mouse.up(); - - await page.locator('//*[@id="react-flow-id"]').hover(); + .first() + .click(); await page.getByTestId("button_open_prompt_modal").click(); @@ -70,7 +64,7 @@ test( await page.getByText("Close").last().click(); - await page.getByText("Prompt", { exact: true }).click(); + await page.getByText("Prompt", { exact: true }).last().click(); await page.getByTestId("more-options-modal").click(); @@ -80,8 +74,6 @@ test( expect(page.locator(".border-ring-frozen")).toHaveCount(1); - await page.locator('//*[@id="react-flow-id"]').click(); - await page.getByTestId("button_open_prompt_modal").click(); await page.getByTestId("edit-prompt-sanitized").first().click();