fix: enforce authentication for superuser cli command (#9152)

* Enforce authentication for superuser cli command

* shorten security md

* cleanup

* use session_scope

* re-add uvlock

* [autofix.ci] apply automated fixes

* ruff

* update env example

* [autofix.ci] apply automated fixes

* better exception handling

* [autofix.ci] apply automated fixes

* update tests to not use mocks

* [autofix.ci] apply automated fixes

* [autofix.ci] apply automated fixes

* Remove old test

* Catch exceptions for typer

* Try output instead of stdout

* Use xdist to run in serial

* Separate create superuse

* [autofix.ci] apply automated fixes

* Ruff

* [autofix.ci] apply automated fixes

* lint

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
Jordan Frazier 2025-08-14 16:29:35 -04:00 committed by GitHub
commit c188ec113c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 353 additions and 112 deletions

View file

@ -15,7 +15,9 @@ import click
import httpx
import typer
from dotenv import load_dotenv
from fastapi import HTTPException
from httpx import HTTPError
from jose import JWTError
from multiprocess import cpu_count
from multiprocess.context import Process
from packaging import version as pkg_version
@ -29,9 +31,9 @@ from langflow.cli.progress import create_langflow_progress
from langflow.initial_setup.setup import get_or_create_default_folder
from langflow.logging.logger import configure, logger
from langflow.main import setup_app
from langflow.services.database.utils import session_getter
from langflow.services.auth.utils import check_key, get_current_user_by_jwt
from langflow.services.deps import get_db_service, get_settings_service, session_scope
from langflow.services.settings.constants import DEFAULT_SUPERUSER
from langflow.services.settings.constants import DEFAULT_SUPERUSER, DEFAULT_SUPERUSER_PASSWORD
from langflow.services.utils import initialize_services
from langflow.utils.version import fetch_latest_version, get_version_info
from langflow.utils.version import is_pre_release as langflow_is_pre_release
@ -632,41 +634,138 @@ def print_banner(host: str, port: int, protocol: str) -> None:
@app.command()
def superuser(
username: str = typer.Option(..., prompt=True, help="Username for the superuser."),
password: str = typer.Option(..., prompt=True, hide_input=True, help="Password for the superuser."),
username: str = typer.Option(
None, help="Username for the superuser. Defaults to 'langflow' when AUTO_LOGIN is enabled."
),
password: str = typer.Option(
None, help="Password for the superuser. Defaults to 'langflow' when AUTO_LOGIN is enabled."
),
log_level: str = typer.Option("error", help="Logging level.", envvar="LANGFLOW_LOG_LEVEL"),
auth_token: str = typer.Option(
None, help="Authentication token of existing superuser.", envvar="LANGFLOW_SUPERUSER_TOKEN"
),
) -> None:
"""Create a superuser."""
"""Create a superuser.
When AUTO_LOGIN is enabled, uses default credentials.
In production mode, requires authentication.
"""
configure(log_level=log_level)
db_service = get_db_service()
async def _create_superuser():
await initialize_services()
async with session_getter(db_service) as session:
from langflow.services.auth.utils import create_super_user
asyncio.run(_create_superuser(username, password, auth_token))
if await create_super_user(db=session, username=username, password=password):
# Verify that the superuser was created
from langflow.services.database.models.user.model import User
stmt = select(User).where(User.username == username)
user: User = (await session.exec(stmt)).first()
if user is None or not user.is_superuser:
typer.echo("Superuser creation failed.")
return
# Now create the first folder for the user
result = await get_or_create_default_folder(session, user.id)
if result:
typer.echo("Default folder created successfully.")
else:
msg = "Could not create default folder."
raise RuntimeError(msg)
typer.echo("Superuser created successfully.")
async def _create_superuser(username: str, password: str, auth_token: str | None):
"""Create a superuser."""
await initialize_services()
else:
settings_service = get_settings_service()
# Check if superuser creation via CLI is enabled
if not settings_service.auth_settings.ENABLE_SUPERUSER_CLI:
typer.echo("Error: Superuser creation via CLI is disabled.")
typer.echo("Set LANGFLOW_ENABLE_SUPERUSER_CLI=true to enable this feature.")
raise typer.Exit(1)
if settings_service.auth_settings.AUTO_LOGIN:
# Force default credentials for AUTO_LOGIN mode
username = DEFAULT_SUPERUSER
password = DEFAULT_SUPERUSER_PASSWORD
else:
# Production mode - prompt for credentials if not provided
if not username:
username = typer.prompt("Username")
if not password:
password = typer.prompt("Password", hide_input=True)
from langflow.services.database.models.user.crud import get_all_superusers
existing_superusers = []
async with session_scope() as session:
# Note that the default superuser is created by the initialize_services() function,
# but leaving this check here in case we change that behavior
existing_superusers = await get_all_superusers(session)
is_first_setup = len(existing_superusers) == 0
# If AUTO_LOGIN is true, only allow default superuser creation
if settings_service.auth_settings.AUTO_LOGIN:
if not is_first_setup:
typer.echo("Error: Cannot create additional superusers when AUTO_LOGIN is enabled.")
typer.echo("AUTO_LOGIN mode is for development with only the default superuser.")
typer.echo("To create additional superusers:")
typer.echo("1. Set LANGFLOW_AUTO_LOGIN=false")
typer.echo("2. Run this command again with --auth-token")
raise typer.Exit(1)
typer.echo(f"AUTO_LOGIN enabled. Creating default superuser '{username}'...")
typer.echo(f"Note: Default credentials are {DEFAULT_SUPERUSER}/{DEFAULT_SUPERUSER_PASSWORD}")
# AUTO_LOGIN is false - production mode
elif is_first_setup:
typer.echo("No superusers found. Creating first superuser...")
else:
# Authentication is required in production mode
if not auth_token:
typer.echo("Error: Creating a superuser requires authentication.")
typer.echo("Please provide --auth-token with a valid superuser API key or JWT token.")
typer.echo("To get a token, use: `uv run langflow api_key`")
raise typer.Exit(1)
# Validate the auth token
try:
auth_user = None
async with session_scope() as session:
# Try JWT first
user = None
try:
user = await get_current_user_by_jwt(auth_token, session)
except (JWTError, HTTPException):
# Try API key
api_key_result = await check_key(session, auth_token)
if api_key_result and hasattr(api_key_result, "is_superuser"):
user = api_key_result
auth_user = user
if not auth_user or not auth_user.is_superuser:
typer.echo(
"Error: Invalid token or insufficient privileges. Only superusers can create other superusers."
)
raise typer.Exit(1)
except typer.Exit:
raise # Re-raise typer.Exit without wrapping
except Exception as e: # noqa: BLE001
typer.echo(f"Error: Authentication failed - {e!s}")
raise typer.Exit(1) from None
# Auth complete, create the superuser
async with session_scope() as session:
from langflow.services.auth.utils import create_super_user
if await create_super_user(db=session, username=username, password=password):
# Verify that the superuser was created
from langflow.services.database.models.user.model import User
stmt = select(User).where(User.username == username)
created_user: User = (await session.exec(stmt)).first()
if created_user is None or not created_user.is_superuser:
typer.echo("Superuser creation failed.")
return
# Now create the first folder for the user
result = await get_or_create_default_folder(session, created_user.id)
if result:
typer.echo("Default folder created successfully.")
else:
msg = "Could not create default folder."
raise RuntimeError(msg)
asyncio.run(_create_superuser())
# Log the superuser creation for audit purposes
logger.warning(
f"SECURITY AUDIT: New superuser '{username}' created via CLI command"
+ (" by authenticated user" if auth_token else " (first-time setup)")
)
typer.echo("Superuser created successfully.")
else:
logger.error(f"SECURITY AUDIT: Failed attempt to create superuser '{username}' via CLI")
typer.echo("Superuser creation failed.")
# command to copy the langflow database from the cache to the current directory
@ -749,6 +848,7 @@ def api_key(
settings_service = get_settings_service()
auth_settings = settings_service.auth_settings
if not auth_settings.AUTO_LOGIN:
# TODO: Allow non-auto-login users to create API keys via CLI
typer.echo("Auto login is disabled. API keys cannot be created through the CLI.")
return None

View file

@ -60,3 +60,10 @@ async def update_user_last_login_at(user_id: UUID, db: AsyncSession):
return await update_user(user, user_data, db)
except Exception as e: # noqa: BLE001
logger.error(f"Error updating user last login at: {e!s}")
async def get_all_superusers(db: AsyncSession) -> list[User]:
"""Get all superuser accounts from the database."""
stmt = select(User).where(User.is_superuser == True) # noqa: E712
result = await db.exec(stmt)
return list(result.all())

View file

@ -27,12 +27,25 @@ class AuthSettings(BaseSettings):
API_KEY_ALGORITHM: str = "HS256"
API_V1_STR: str = "/api/v1"
AUTO_LOGIN: bool = True
AUTO_LOGIN: bool = Field(
default=True, # TODO: Set to False in v1.6
description=(
"Enable automatic login with default credentials. "
"SECURITY WARNING: This bypasses authentication and should only be used in development environments. "
"Set to False in production."
),
)
"""If True, the application will attempt to log in automatically as a super user."""
skip_auth_auto_login: bool = True
"""If True, the application will skip authentication when AUTO_LOGIN is enabled.
This will be removed in v1.6"""
ENABLE_SUPERUSER_CLI: bool = Field(
default=True,
description="Allow creation of superusers via CLI. Set to False in production for security.",
)
"""If True, allows creation of superusers via the CLI 'langflow superuser' command."""
NEW_USER_IS_ACTIVE: bool = False
SUPERUSER: str = DEFAULT_SUPERUSER
SUPERUSER_PASSWORD: str = DEFAULT_SUPERUSER_PASSWORD

View file

@ -68,15 +68,20 @@ async def get_or_create_super_user(session: AsyncSession, username, password, is
return await create_super_user(username, password, db=session)
async def setup_superuser(settings_service, session: AsyncSession) -> None:
async def setup_superuser(settings_service: SettingsService, session: AsyncSession) -> None:
if settings_service.auth_settings.AUTO_LOGIN:
logger.debug("AUTO_LOGIN is set to True. Creating default superuser.")
username = DEFAULT_SUPERUSER
password = DEFAULT_SUPERUSER_PASSWORD
else:
# Remove the default superuser if it exists
await teardown_superuser(settings_service, session)
username = settings_service.auth_settings.SUPERUSER
password = settings_service.auth_settings.SUPERUSER_PASSWORD
username = settings_service.auth_settings.SUPERUSER
password = settings_service.auth_settings.SUPERUSER_PASSWORD
if not username or not password:
msg = "Username and password must be set"
raise ValueError(msg)
is_default = (username == DEFAULT_SUPERUSER) and (password == DEFAULT_SUPERUSER_PASSWORD)

View file

@ -1,9 +1,11 @@
import socket
import threading
import time
from unittest.mock import patch
import pytest
from langflow.__main__ import app
import typer
from langflow.__main__ import _create_superuser, app
from langflow.services import deps
@ -57,7 +59,89 @@ def test_components_path(runner, default_settings, tmp_path):
assert str(temp_dir) in settings_service.settings.components_path
def test_superuser(runner):
result = runner.invoke(app, ["superuser"], input="admin\nadmin\n")
assert result.exit_code == 0, result.stdout
assert "Superuser created successfully." in result.stdout
@pytest.mark.xdist_group(name="serial-superuser-tests")
class TestSuperuserCommand:
"""Deterministic tests for the superuser CLI command."""
@pytest.mark.asyncio
async def test_additional_superuser_requires_auth_production(self, client, active_super_user): # noqa: ARG002
"""Test additional superuser creation requires authentication in production."""
# We already have active_super_user from the fixture, so we're not in first setup
with (
patch("langflow.services.deps.get_settings_service") as mock_settings,
patch("langflow.__main__.get_settings_service") as mock_settings2,
):
# Configure settings for production mode (AUTO_LOGIN=False)
mock_auth_settings = type("MockAuthSettings", (), {"AUTO_LOGIN": False, "ENABLE_SUPERUSER_CLI": True})()
mock_settings.return_value.auth_settings = mock_auth_settings
mock_settings2.return_value.auth_settings = mock_auth_settings
# Try to create a superuser without auth - should fail
with pytest.raises(typer.Exit) as exc_info:
await _create_superuser("newuser", "newpass", None)
assert exc_info.value.exit_code == 1
@pytest.mark.asyncio
async def test_additional_superuser_blocked_in_auto_login_mode(self, client, active_super_user): # noqa: ARG002
"""Test additional superuser creation blocked when AUTO_LOGIN=true."""
# We already have active_super_user from the fixture, so we're not in first setup
with (
patch("langflow.services.deps.get_settings_service") as mock_settings,
patch("langflow.__main__.get_settings_service") as mock_settings2,
):
# Configure settings for AUTO_LOGIN mode
mock_auth_settings = type("MockAuthSettings", (), {"AUTO_LOGIN": True, "ENABLE_SUPERUSER_CLI": True})()
mock_settings.return_value.auth_settings = mock_auth_settings
mock_settings2.return_value.auth_settings = mock_auth_settings
# Try to create a superuser - should fail
with pytest.raises(typer.Exit) as exc_info:
await _create_superuser("newuser", "newpass", None)
assert exc_info.value.exit_code == 1
@pytest.mark.asyncio
async def test_cli_disabled_blocks_creation(self, client): # noqa: ARG002
"""Test ENABLE_SUPERUSER_CLI=false blocks superuser creation."""
with (
patch("langflow.services.deps.get_settings_service") as mock_settings,
patch("langflow.__main__.get_settings_service") as mock_settings2,
):
mock_auth_settings = type("MockAuthSettings", (), {"AUTO_LOGIN": True, "ENABLE_SUPERUSER_CLI": False})()
mock_settings.return_value.auth_settings = mock_auth_settings
mock_settings2.return_value.auth_settings = mock_auth_settings
# Try to create a superuser - should fail
with pytest.raises(typer.Exit) as exc_info:
await _create_superuser("admin", "password", None)
assert exc_info.value.exit_code == 1
@pytest.mark.skip(reason="Skip -- default superuser is created by initialize_services() function")
@pytest.mark.asyncio
async def test_auto_login_forces_default_credentials(self, client):
"""Test AUTO_LOGIN=true forces default credentials."""
# Since client fixture already creates default user, we need to test in a clean DB scenario
# But that's why this test is skipped - the behavior is already handled by initialize_services
@pytest.mark.asyncio
async def test_failed_auth_token_validation(self, client, active_super_user): # noqa: ARG002
"""Test failed superuser creation with invalid auth token."""
# We already have active_super_user from the fixture, so we're not in first setup
with (
patch("langflow.services.deps.get_settings_service") as mock_settings,
patch("langflow.__main__.get_settings_service") as mock_settings2,
patch("langflow.__main__.get_current_user_by_jwt", side_effect=Exception("Invalid token")),
patch("langflow.__main__.check_key", return_value=None),
):
# Configure settings for production mode (AUTO_LOGIN=False)
mock_auth_settings = type("MockAuthSettings", (), {"AUTO_LOGIN": False, "ENABLE_SUPERUSER_CLI": True})()
mock_settings.return_value.auth_settings = mock_auth_settings
mock_settings2.return_value.auth_settings = mock_auth_settings
# Try to create a superuser with invalid token - should fail
with pytest.raises(typer.Exit) as exc_info:
await _create_superuser("newuser", "newpass", "invalid-token")
assert exc_info.value.exit_code == 1