feat(auth): update AUTO_LOGIN authentication to enforce API key or JWT requirement (#8513)

* feat(auth): update AUTO_LOGIN authentication to enforce API key or JWT requirement

* Removed deprecated warning messages and implemented explicit HTTP exceptions for missing API key or JWT in both API and WebSocket authentication methods.
* Enhanced error handling to ensure compliance with the new authentication requirements introduced in v1.5.

* fix(auth): refine error message for AUTO_LOGIN API key requirement

* Updated the error message in the API key security function to clarify that AUTO_LOGIN requires a valid API key, removing the mention of JWT for consistency with the latest authentication requirements.

* feat(auth): introduce SKIP_AUTH_AUTO_LOGIN setting for enhanced authentication flexibility

* Added a new configuration option `SKIP_AUTH_AUTO_LOGIN` to the AuthSettings class, allowing the application to bypass API key validation for auto login.
* Updated the API and WebSocket security functions to utilize this setting, improving error handling and providing a fallback for superuser credentials when authentication is skipped.

* refactor(auth): rename SKIP_AUTH_AUTO_LOGIN to skip_auth_auto_login for consistency

* Updated the `SKIP_AUTH_AUTO_LOGIN` setting in the `AuthSettings` class to `skip_auth_auto_login` to follow Python naming conventions.
* Adjusted references in the API and WebSocket security functions to use the new attribute name, ensuring consistent behavior across the authentication logic.

* feat(auth): add deprecation warning for SKIP_AUTH_AUTO_LOGIN removal

* Introduced a warning log in both API and WebSocket security functions to inform users that the `LANGFLOW_SKIP_AUTH_AUTO_LOGIN` feature will be removed in version 1.6, prompting necessary updates to authentication methods.

* feat(auth): enhance deprecation warnings for AUTO_LOGIN features

* Added constants for deprecation warning and error messages related to `LANGFLOW_SKIP_AUTH_AUTO_LOGIN` and `AUTO_LOGIN` requirements, improving code maintainability and clarity.
* Updated API and WebSocket security functions to utilize these constants for logging and exception handling, ensuring consistent messaging across authentication methods.

* fix(auth): update AUTO_LOGIN_ERROR message to include LANGFLOW_SKIP_AUTH_AUTO_LOGIN usage

* fix(auth): correct logic for API key validation in WebSocket security function

* Adjusted the conditional flow in the `ws_api_key_security` function to ensure that the API key is checked only when necessary, improving the clarity and correctness of the authentication logic.

* [autofix.ci] apply automated fixes

* feat(tests): add authentication token retrieval for starter projects integration tests

* Implemented a helper function to obtain a JWT token for API requests, enhancing the security of the integration tests.
* Updated the test for starter projects to include the token in API requests, ensuring proper authentication during testing.

* feat(auth): add MCP-specific user authentication and active user dependency

* Introduced `get_current_user_mcp` function for MCP-specific user authentication, allowing fallback to username lookup when no API key is provided.
* Added `get_current_active_user_mcp` dependency to manage active user checks for MCP, ensuring proper integration with the authentication flow.

* refactor(api): replace user dependency with CurrentActiveMCPUser in mcp project endpoints

* Updated project-related API endpoints to use CurrentActiveMCPUser for user authentication, enhancing clarity and consistency in user management.
* Removed unused imports and dependencies related to the previous user authentication method, streamlining the codebase.

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
Gabriel Luiz Freitas Almeida 2025-06-24 10:29:27 -03:00 committed by GitHub
commit 3a3e205f6f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 157 additions and 43 deletions

View file

@ -13,7 +13,7 @@ from sqlalchemy import delete
from sqlmodel.ext.asyncio.session import AsyncSession
from langflow.graph.graph.base import Graph
from langflow.services.auth.utils import get_current_active_user
from langflow.services.auth.utils import get_current_active_user, get_current_active_user_mcp
from langflow.services.database.models.flow.model import Flow
from langflow.services.database.models.message.model import MessageTable
from langflow.services.database.models.transactions.model import TransactionTable
@ -33,6 +33,7 @@ MAX_PAGE_SIZE = 50
MIN_PAGE_SIZE = 1
CurrentActiveUser = Annotated[User, Depends(get_current_active_user)]
CurrentActiveMCPUser = Annotated[User, Depends(get_current_active_user_mcp)]
DbSession = Annotated[AsyncSession, Depends(get_session)]

View file

@ -3,13 +3,13 @@ import base64
from collections.abc import Awaitable, Callable
from contextvars import ContextVar
from functools import wraps
from typing import Annotated, Any, ParamSpec, TypeVar
from typing import Any, ParamSpec, TypeVar
from urllib.parse import quote, unquote, urlparse
from uuid import uuid4
import pydantic
from anyio import BrokenResourceError
from fastapi import APIRouter, Depends, HTTPException, Request, Response
from fastapi import APIRouter, HTTPException, Request, Response
from fastapi.responses import HTMLResponse, StreamingResponse
from loguru import logger
from mcp import types
@ -17,13 +17,13 @@ from mcp.server import NotificationOptions, Server
from mcp.server.sse import SseServerTransport
from sqlmodel import select
from langflow.api.utils import CurrentActiveMCPUser
from langflow.api.v1.endpoints import simple_run_flow
from langflow.api.v1.schemas import SimplifiedAPIRequest
from langflow.base.mcp.constants import MAX_MCP_TOOL_NAME_LENGTH
from langflow.base.mcp.util import get_flow_snake_case
from langflow.helpers.flow import json_schema_from_flow
from langflow.schema.message import Message
from langflow.services.auth.utils import get_current_active_user
from langflow.services.database.models.flow.model import Flow
from langflow.services.database.models.user.model import User
from langflow.services.deps import (
@ -345,7 +345,7 @@ async def im_alive():
@router.get("/sse", response_class=StreamingResponse)
async def handle_sse(request: Request, current_user: Annotated[User, Depends(get_current_active_user)]):
async def handle_sse(request: Request, current_user: CurrentActiveMCPUser):
msg = f"Starting SSE connection, server name: {server.name}"
logger.info(msg)
token = current_user_ctx.set(current_user)

View file

@ -9,12 +9,11 @@ from contextvars import ContextVar
from datetime import datetime, timezone
from ipaddress import ip_address
from pathlib import Path
from typing import Annotated
from urllib.parse import quote, unquote, urlparse
from uuid import UUID, uuid4
from anyio import BrokenResourceError
from fastapi import APIRouter, Depends, HTTPException, Request, Response
from fastapi import APIRouter, HTTPException, Request, Response
from fastapi.responses import HTMLResponse
from mcp import types
from mcp.server import NotificationOptions, Server
@ -22,6 +21,7 @@ from mcp.server.sse import SseServerTransport
from sqlalchemy.orm import selectinload
from sqlmodel import select
from langflow.api.utils import CurrentActiveMCPUser
from langflow.api.v1.endpoints import simple_run_flow
from langflow.api.v1.mcp import (
current_user_ctx,
@ -34,8 +34,7 @@ from langflow.base.mcp.constants import MAX_MCP_SERVER_NAME_LENGTH, MAX_MCP_TOOL
from langflow.base.mcp.util import get_flow_snake_case, get_unique_name
from langflow.helpers.flow import json_schema_from_flow
from langflow.schema.message import Message
from langflow.services.auth.utils import get_current_active_user
from langflow.services.database.models import Flow, Folder, User
from langflow.services.database.models import Flow, Folder
from langflow.services.database.models.folder.constants import DEFAULT_FOLDER_NAME, NEW_FOLDER_NAME
from langflow.services.deps import get_settings_service, get_storage_service, session_scope
from langflow.services.storage.utils import build_content_type_from_extension
@ -62,7 +61,7 @@ def get_project_sse(project_id: UUID) -> SseServerTransport:
@router.get("/{project_id}")
async def list_project_tools(
project_id: UUID,
current_user: Annotated[User, Depends(get_current_active_user)],
current_user: CurrentActiveMCPUser,
*,
mcp_enabled: bool = True,
) -> list[MCPSettings]:
@ -136,7 +135,7 @@ async def im_alive():
async def handle_project_sse(
project_id: UUID,
request: Request,
current_user: Annotated[User, Depends(get_current_active_user)],
current_user: CurrentActiveMCPUser,
):
"""Handle SSE connections for a specific project."""
# Verify project exists and user has access
@ -187,9 +186,7 @@ async def handle_project_sse(
@router.post("/{project_id}")
async def handle_project_messages(
project_id: UUID, request: Request, current_user: Annotated[User, Depends(get_current_active_user)]
):
async def handle_project_messages(project_id: UUID, request: Request, current_user: CurrentActiveMCPUser):
"""Handle POST messages for a project-specific MCP server."""
# Verify project exists and user has access
async with session_scope() as session:
@ -216,9 +213,7 @@ async def handle_project_messages(
@router.post("/{project_id}/")
async def handle_project_messages_with_slash(
project_id: UUID, request: Request, current_user: Annotated[User, Depends(get_current_active_user)]
):
async def handle_project_messages_with_slash(project_id: UUID, request: Request, current_user: CurrentActiveMCPUser):
"""Handle POST messages for a project-specific MCP server with trailing slash."""
# Call the original handler
return await handle_project_messages(project_id, request, current_user)
@ -228,7 +223,7 @@ async def handle_project_messages_with_slash(
async def update_project_mcp_settings(
project_id: UUID,
settings: list[MCPSettings],
current_user: Annotated[User, Depends(get_current_active_user)],
current_user: CurrentActiveMCPUser,
):
"""Update the MCP settings of all flows in a project."""
try:
@ -329,7 +324,7 @@ async def install_mcp_config(
project_id: UUID,
body: MCPInstallRequest,
request: Request,
current_user: Annotated[User, Depends(get_current_active_user)],
current_user: CurrentActiveMCPUser,
):
"""Install MCP server configuration for Cursor or Claude."""
# Check if the request is coming from a local IP address
@ -452,7 +447,7 @@ async def install_mcp_config(
@router.get("/{project_id}/installed")
async def check_installed_mcp_servers(
project_id: UUID,
current_user: Annotated[User, Depends(get_current_active_user)],
current_user: CurrentActiveMCPUser,
):
"""Check if MCP server configuration is installed for this project in Cursor or Claude."""
try:

View file

@ -31,6 +31,12 @@ api_key_query = APIKeyQuery(name=API_KEY_NAME, scheme_name="API key query", auto
api_key_header = APIKeyHeader(name=API_KEY_NAME, scheme_name="API key header", auto_error=False)
MINIMUM_KEY_LENGTH = 32
AUTO_LOGIN_WARNING = "In v1.6 LANGFLOW_SKIP_AUTH_AUTO_LOGIN will be removed. Please update your authentication method."
AUTO_LOGIN_ERROR = (
"Since v1.5, LANGFLOW_AUTO_LOGIN requires a valid API key. "
"Set LANGFLOW_SKIP_AUTH_AUTO_LOGIN=true to skip this check. "
"Please update your authentication method."
)
# Source: https://github.com/mrtolkien/fastapi_simple_security/blob/master/fastapi_simple_security/security_api_key.py
@ -49,19 +55,16 @@ async def api_key_security(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Missing first superuser credentials",
)
warnings.warn(
(
"In v1.5, the default behavior of AUTO_LOGIN authentication will change to require a valid API key"
" or JWT. If you integrated with Langflow prior to v1.5, make sure to update your code to pass an "
"API key or JWT when authenticating with protected endpoints."
),
DeprecationWarning,
stacklevel=2,
)
if query_param or header_param:
result = await check_key(db, query_param or header_param)
else:
result = await get_user_by_username(db, settings_service.auth_settings.SUPERUSER)
if not query_param and not header_param:
if settings_service.auth_settings.skip_auth_auto_login:
result = await get_user_by_username(db, settings_service.auth_settings.SUPERUSER)
logger.warning(AUTO_LOGIN_WARNING)
else:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=AUTO_LOGIN_ERROR,
)
result = await check_key(db, query_param or header_param)
elif not query_param and not header_param:
raise HTTPException(
@ -98,15 +101,17 @@ async def ws_api_key_security(
code=status.WS_1011_INTERNAL_ERROR,
reason="Missing first superuser credentials",
)
warnings.warn(
("In v1.5, AUTO_LOGIN will *require* a valid API key or JWT. Please update your clients accordingly."),
DeprecationWarning,
stacklevel=2,
)
if api_key:
result = await check_key(db, api_key)
if not api_key:
if settings.auth_settings.skip_auth_auto_login:
result = await get_user_by_username(db, settings.auth_settings.SUPERUSER)
logger.warning(AUTO_LOGIN_WARNING)
else:
raise WebSocketException(
code=status.WS_1008_POLICY_VIOLATION,
reason=AUTO_LOGIN_ERROR,
)
else:
result = await get_user_by_username(db, settings.auth_settings.SUPERUSER)
result = await check_key(db, api_key)
# normal path: must provide an API key
else:
@ -473,3 +478,81 @@ def decrypt_api_key(encrypted_api_key: str, settings_service: SettingsService):
)
return fernet.decrypt(encrypted_api_key).decode()
return ""
# MCP-specific authentication functions that always behave as if skip_auth_auto_login is True
async def get_current_user_mcp(
token: Annotated[str, Security(oauth2_login)],
query_param: Annotated[str, Security(api_key_query)],
header_param: Annotated[str, Security(api_key_header)],
db: Annotated[AsyncSession, Depends(get_session)],
) -> User:
"""MCP-specific user authentication that always allows fallback to username lookup.
This function provides authentication for MCP endpoints with special handling:
- If a JWT token is provided, it uses standard JWT authentication
- If no API key is provided and AUTO_LOGIN is enabled, it falls back to
username lookup using the configured superuser credentials
- Otherwise, it validates the provided API key (from query param or header)
"""
if token:
return await get_current_user_by_jwt(token, db)
# MCP-specific authentication logic - always behaves as if skip_auth_auto_login is True
settings_service = get_settings_service()
result: ApiKey | User | None
if settings_service.auth_settings.AUTO_LOGIN:
# Get the first user
if not settings_service.auth_settings.SUPERUSER:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Missing first superuser credentials",
)
if not query_param and not header_param:
# For MCP endpoints, always fall back to username lookup when no API key is provided
result = await get_user_by_username(db, settings_service.auth_settings.SUPERUSER)
if result:
logger.warning(AUTO_LOGIN_WARNING)
return result
else:
result = await check_key(db, query_param or header_param)
elif not query_param and not header_param:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="An API key must be passed as query or header",
)
elif query_param:
result = await check_key(db, query_param)
else:
result = await check_key(db, header_param)
if not result:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid or missing API key",
)
# If result is a User, return it directly
if isinstance(result, User):
return result
# If result is an ApiKey, we need to get the associated user
# This should not happen in normal flow, but adding for completeness
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid authentication result",
)
async def get_current_active_user_mcp(current_user: Annotated[User, Depends(get_current_user_mcp)]):
"""MCP-specific active user dependency.
This dependency is temporary and will be removed once MCP is fully integrated.
"""
if not current_user.is_active:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Inactive user")
return current_user

View file

@ -50,6 +50,10 @@ class AuthSettings(BaseSettings):
COOKIE_DOMAIN: str | None = None
"""The domain attribute of the cookies. If None, the domain is not set."""
skip_auth_auto_login: bool = True
"""If True, the application will skip the authentication auto login, set this to False to revert to pre-v1.5
behavior. This will be removed in v1.6"""
pwd_context: CryptContext = CryptContext(schemes=["bcrypt"], deprecated="auto")
model_config = SettingsConfigDict(validate_assignment=True, extra="ignore", env_prefix="LANGFLOW_")

View file

@ -1,11 +1,36 @@
import { expect, test } from "@playwright/test";
import { awaitBootstrapTest } from "../../utils/await-bootstrap-test";
// Helper function to get JWT token for API requests
async function getAuthToken(request: any) {
const formData = new URLSearchParams();
formData.append("username", "langflow");
formData.append("password", "langflow");
const loginResponse = await request.post("/api/v1/login", {
headers: {
"Content-Type": "application/x-www-form-urlencoded",
},
data: formData.toString(),
});
expect(loginResponse.status()).toBe(200);
const tokenData = await loginResponse.json();
return tokenData.access_token;
}
test(
"vector store from starter projects should have its connections and nodes on the flow",
{ tag: ["@release", "@starter-projects"] },
async ({ page, request }) => {
const response = await request.get("/api/v1/starter-projects");
// Get authentication token
const authToken = await getAuthToken(request);
const response = await request.get("/api/v1/starter-projects", {
headers: {
Authorization: `Bearer ${authToken}`,
},
});
expect(response.status()).toBe(200);
const responseBody = await response.json();
@ -18,7 +43,13 @@ test(
await page.route("**/api/v1/flows/", async (route) => {
if (route.request().method() === "GET") {
try {
const response = await route.fetch();
// Add authorization header to the request
const headers = route.request().headers();
headers["Authorization"] = `Bearer ${authToken}`;
const response = await route.fetch({
headers: headers,
});
const flowsData = await response.json();
const modifiedFlows = flowsData.map((flow) => {