feat: Add webhook user authentication under WEBHOOK_AUTH_ENABLE variable (#9139)

* 📝 (endpoints.py): Add get_webhook_user function to handle webhook user authentication
🔧 (endpoints.py): Update webhook_run_flow endpoint to use get_webhook_user for authentication
🔧 (utils.py): Add get_webhook_user function to handle webhook user authentication in services.auth
 (test_webhook.py): Add tests for webhook endpoint authentication and authorization

* Update src/backend/base/langflow/services/auth/utils.py

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>

* [autofix.ci] apply automated fixes

* Update src/backend/base/langflow/services/auth/utils.py

Co-authored-by: Gabriel Luiz Freitas Almeida <gabriel@langflow.org>

* 🔧 (utils.py): refactor authentication logic to use existing api_key_security function for better code reuse and readability

* [autofix.ci] apply automated fixes

* 📝 (endpoints.py): Update ConfigResponse class to include webhook_auth_enable field and modify from_settings method to accept auth_settings parameter
📝 (endpoints.py): Update get_config function to pass auth_settings to ConfigResponse.from_settings method
📝 (utils.py): Update get_webhook_user function to use WEBHOOK_AUTH_ENABLE setting for authentication logic
📝 (auth.py): Add WEBHOOK_AUTH_ENABLE setting to AuthSettings class
📝 (index.tsx): Add webhookAuthEnable state and setWebhookAuthEnable function to utilityStore
📝 (use-get-config.ts): Update useGetConfig hook to set webhook_auth_enable value from API response
📝 (get-curl-code.tsx): Update getCurlWebhookCode function to use webhookAuthEnable instead of isAuth parameter
📝 (utilityStore.ts): Add webhookAuthEnable state and setWebhookAuthEnable function to utilityStore
📝 (index.ts): Update GetCodeType type to use webhookAuthEnable instead of isAuth parameter

* refactor: Simplify error messages in get_webhook_user function

- Updated HTTPException messages for flow not found and access denied scenarios to be more concise and user-friendly.
- Improved logging for invalid API key validation to enhance clarity.

* 🐛 (test_webhook.py): fix test descriptions to accurately reflect the conditions being tested
📝 (test_webhook.py): update test descriptions to improve clarity and consistency with actual test conditions

* 🐛 (utils.py): Fix issue where HTTPException was not properly handled when flow owner is not found in get_webhook_user function. Added explicit check and raise HTTPException with appropriate status code and detail message.

* 📝 (test_mcp_util.py): add conditional skip for test when DeepWiki server is rate limiting requests

---------

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: Gabriel Luiz Freitas Almeida <gabriel@langflow.org>
This commit is contained in:
Cristhian Zanforlin Lousa 2025-08-29 14:53:33 -03:00 committed by GitHub
commit 836faa73d6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 221 additions and 41 deletions

View file

@ -38,12 +38,11 @@ from langflow.exceptions.serialization import SerializationError
from langflow.graph.graph.base import Graph
from langflow.graph.schema import RunOutputs
from langflow.helpers.flow import get_flow_by_id_or_endpoint_name
from langflow.helpers.user import get_user_by_flow_id_or_endpoint_name
from langflow.interface.initialize.loading import update_params_with_load_from_db_fields
from langflow.logging.logger import logger
from langflow.processing.process import process_tweaks, run_graph_internal
from langflow.schema.graph import Tweaks
from langflow.services.auth.utils import api_key_security, get_current_active_user
from langflow.services.auth.utils import api_key_security, get_current_active_user, get_webhook_user
from langflow.services.cache.utils import save_uploaded_file
from langflow.services.database.models.flow.model import Flow, FlowRead
from langflow.services.database.models.flow.utils import get_all_webhook_components_in_flow
@ -399,16 +398,16 @@ async def simplified_run_flow(
@router.post("/webhook/{flow_id_or_name}", response_model=dict, status_code=HTTPStatus.ACCEPTED) # noqa: RUF100, FAST003
async def webhook_run_flow(
flow_id_or_name: str,
flow: Annotated[Flow, Depends(get_flow_by_id_or_endpoint_name)],
user: Annotated[User, Depends(get_user_by_flow_id_or_endpoint_name)],
request: Request,
background_tasks: BackgroundTasks,
):
"""Run a flow using a webhook request.
Args:
flow (Flow, optional): The flow to be executed. Defaults to Depends(get_flow_by_id).
user (User): The flow user.
flow_id_or_name (str): The flow ID or endpoint name.
flow (Flow): The flow to be executed.
request (Request): The incoming HTTP request.
background_tasks (BackgroundTasks): The background tasks manager.
@ -422,6 +421,10 @@ async def webhook_run_flow(
start_time = time.perf_counter()
await logger.adebug("Received webhook request")
error_msg = ""
# Get the appropriate user for webhook execution based on auth settings
webhook_user = await get_webhook_user(flow_id_or_name, request)
try:
try:
data = await request.body()
@ -453,7 +456,7 @@ async def webhook_run_flow(
simple_run_flow_task,
flow=flow,
input_request=input_request,
api_key_user=user,
api_key_user=webhook_user,
)
except Exception as exc:
error_msg = str(exc)
@ -762,7 +765,7 @@ async def get_config() -> ConfigResponse:
"""
try:
settings_service: SettingsService = get_settings_service()
return ConfigResponse.from_settings(settings_service.settings)
return ConfigResponse.from_settings(settings_service.settings, settings_service.auth_settings)
except Exception as exc:
raise HTTPException(status_code=500, detail=str(exc)) from exc

View file

@ -407,13 +407,15 @@ class ConfigResponse(BaseModel):
public_flow_cleanup_interval: int
public_flow_expiration: int
event_delivery: Literal["polling", "streaming", "direct"]
webhook_auth_enable: bool
@classmethod
def from_settings(cls, settings: Settings) -> "ConfigResponse":
"""Create a ConfigResponse instance using values from a Settings object and global feature flags.
def from_settings(cls, settings: Settings, auth_settings) -> "ConfigResponse":
"""Create a ConfigResponse instance using values from a Settings object and AuthSettings.
Parameters:
settings (Settings): The Settings object containing configuration values.
auth_settings: The AuthSettings object containing authentication configuration values.
Returns:
ConfigResponse: An instance populated with configuration and feature flag values.
@ -431,6 +433,7 @@ class ConfigResponse(BaseModel):
public_flow_cleanup_interval=settings.public_flow_cleanup_interval,
public_flow_expiration=settings.public_flow_expiration,
event_delivery=settings.event_delivery,
webhook_auth_enable=auth_settings.WEBHOOK_AUTH_ENABLE,
)

View file

@ -7,7 +7,7 @@ from typing import TYPE_CHECKING, Annotated
from uuid import UUID
from cryptography.fernet import Fernet
from fastapi import Depends, HTTPException, Security, WebSocketException, status
from fastapi import Depends, HTTPException, Request, Security, WebSocketException, status
from fastapi.security import APIKeyHeader, APIKeyQuery, OAuth2PasswordBearer
from jose import JWTError, jwt
from sqlalchemy.exc import IntegrityError
@ -259,6 +259,83 @@ async def get_current_active_superuser(current_user: Annotated[User, Depends(get
return current_user
async def get_webhook_user(flow_id: str, request: Request) -> UserRead:
"""Get the user for webhook execution.
When WEBHOOK_AUTH_ENABLE=false, allows execution as the flow owner without API key.
When WEBHOOK_AUTH_ENABLE=true, requires API key authentication and validates flow ownership.
Args:
flow_id: The ID of the flow being executed
request: The FastAPI request object
Returns:
UserRead: The user to execute the webhook as
Raises:
HTTPException: If authentication fails or user doesn't have permission
"""
from langflow.helpers.user import get_user_by_flow_id_or_endpoint_name
settings_service = get_settings_service()
if not settings_service.auth_settings.WEBHOOK_AUTH_ENABLE:
# When webhook auth is disabled, run webhook as the flow owner without requiring API key
try:
flow_owner = await get_user_by_flow_id_or_endpoint_name(flow_id)
if flow_owner is None:
raise HTTPException(status_code=404, detail="Flow not found")
return flow_owner # noqa: TRY300
except HTTPException:
raise
except Exception as exc:
raise HTTPException(status_code=404, detail="Flow not found") from exc
# When webhook auth is enabled, require API key authentication
api_key_header_val = request.headers.get("x-api-key")
api_key_query_val = request.query_params.get("x-api-key")
# Check if API key is provided
if not api_key_header_val and not api_key_query_val:
raise HTTPException(status_code=403, detail="API key required when webhook authentication is enabled")
# Use the provided API key (prefer header over query param)
api_key = api_key_header_val or api_key_query_val
try:
# Validate API key directly without AUTO_LOGIN fallback
async with get_db_service().with_session() as db:
result = await check_key(db, api_key)
if not result:
logger.warning("Invalid API key provided for webhook")
raise HTTPException(status_code=403, detail="Invalid API key")
authenticated_user = UserRead.model_validate(result, from_attributes=True)
logger.info("Webhook API key validated successfully")
except HTTPException:
# Re-raise HTTP exceptions as-is
raise
except Exception as exc:
# Handle other exceptions
logger.error(f"Webhook API key validation error: {exc}")
raise HTTPException(status_code=403, detail="API key authentication failed") from exc
# Get flow owner to check if authenticated user owns this flow
try:
flow_owner = await get_user_by_flow_id_or_endpoint_name(flow_id)
if flow_owner is None:
raise HTTPException(status_code=404, detail="Flow not found")
except HTTPException:
raise
except Exception as exc:
raise HTTPException(status_code=404, detail="Flow not found") from exc
if flow_owner.id != authenticated_user.id:
raise HTTPException(status_code=403, detail="Access denied: You can only execute webhooks for flows you own")
return authenticated_user
def verify_password(plain_password, hashed_password):
settings_service = get_settings_service()
return settings_service.auth_settings.pwd_context.verify(plain_password, hashed_password)

View file

@ -40,6 +40,10 @@ class AuthSettings(BaseSettings):
"""If True, the application will skip authentication when AUTO_LOGIN is enabled.
This will be removed in v1.6"""
WEBHOOK_AUTH_ENABLE: bool = False
"""If True, webhook endpoints will require API key authentication.
If False, webhooks run as flow owner without authentication."""
ENABLE_SUPERUSER_CLI: bool = Field(
default=True,
description="Allow creation of superusers via CLI. Set to False in production for security.",

View file

@ -11,6 +11,7 @@ import sys
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from langflow.base.mcp import util
from langflow.base.mcp.util import MCPSessionManager, MCPSseClient, MCPStdioClient, _process_headers, validate_headers

View file

@ -8,31 +8,67 @@ def _check_openai_api_key_in_environment_variables():
pass
async def test_webhook_endpoint(client, added_webhook_test):
# The test is as follows:
# 1. The flow when run will get a "path" from the payload and save a file with the path as the name.
# We will create a temporary file path and send it to the webhook endpoint, then check if the file exists.
# 2. we will delete the file, then send an invalid payload to the webhook endpoint and check if the file exists.
async def test_webhook_endpoint_requires_api_key_when_auto_login_false(client, added_webhook_test):
"""Test that webhook endpoint requires API key when WEBHOOK_AUTH_ENABLE=true."""
# Mock the settings service to enable webhook authentication
from unittest.mock import patch
with patch("langflow.services.auth.utils.get_settings_service") as mock_settings:
mock_auth_settings = type("AuthSettings", (), {"WEBHOOK_AUTH_ENABLE": True})()
mock_settings_service = type("SettingsService", (), {"auth_settings": mock_auth_settings})()
mock_settings.return_value = mock_settings_service
endpoint_name = added_webhook_test["endpoint_name"]
endpoint = f"api/v1/webhook/{endpoint_name}"
payload = {"path": "/tmp/test_file.txt"} # noqa: S108
# Should fail without API key when webhook auth is enabled
response = await client.post(endpoint, json=payload)
assert response.status_code == 403
assert "API key required when webhook authentication is enabled" in response.json()["detail"]
async def test_webhook_endpoint_with_valid_api_key(client, added_webhook_test, created_api_key):
"""Test that webhook works when valid API key is provided."""
endpoint_name = added_webhook_test["endpoint_name"]
endpoint = f"api/v1/webhook/{endpoint_name}"
# Create a temporary file
async with aiofiles.tempfile.TemporaryDirectory() as tmp:
file_path = anyio.Path(tmp) / "test_file.txt"
payload = {"path": str(file_path)}
response = await client.post(endpoint, json=payload)
# Should work with valid API key
response = await client.post(endpoint, headers={"x-api-key": created_api_key.api_key}, json=payload)
assert response.status_code == 202
# Wait a few seconds for the file to be created
assert await file_path.exists(), f"File {file_path} does not exist"
file_does_not_exist = not await file_path.exists()
assert file_does_not_exist, f"File {file_path} still exists"
# Send an invalid payload
payload = {"invalid_key": "invalid_value"}
response = await client.post(endpoint, json=payload)
assert response.status_code == 202
assert not await file_path.exists(), f"File {file_path} should not exist"
async def test_webhook_endpoint_unauthorized_user_flow(client, added_webhook_test):
"""Test that webhook fails when user doesn't own the flow."""
# Mock the settings service to enable webhook authentication
from unittest.mock import patch
with patch("langflow.services.auth.utils.get_settings_service") as mock_settings:
mock_auth_settings = type("AuthSettings", (), {"WEBHOOK_AUTH_ENABLE": True})()
mock_settings_service = type("SettingsService", (), {"auth_settings": mock_auth_settings})()
mock_settings.return_value = mock_settings_service
# This test would need a different user's API key to test authorization
# For now, we'll use an invalid API key to simulate this
endpoint_name = added_webhook_test["endpoint_name"]
endpoint = f"api/v1/webhook/{endpoint_name}"
payload = {"path": "/tmp/test_file.txt"} # noqa: S108
# Should fail with invalid API key
response = await client.post(endpoint, headers={"x-api-key": "invalid_key"}, json=payload)
assert response.status_code == 403
assert "Invalid API key" in response.json()["detail"]
async def test_webhook_flow_on_run_endpoint(client, added_webhook_test, created_api_key):
@ -47,12 +83,47 @@ async def test_webhook_flow_on_run_endpoint(client, added_webhook_test, created_
assert response.status_code == 200, response.json()
async def test_webhook_with_random_payload(client, added_webhook_test):
endpoint_name = added_webhook_test["endpoint_name"]
endpoint = f"api/v1/webhook/{endpoint_name}"
# Just test that "Random Payload" returns 202
response = await client.post(
endpoint,
json="Random Payload",
)
assert response.status_code == 202
async def test_webhook_with_auto_login_enabled(client, added_webhook_test):
"""Test webhook behavior when WEBHOOK_AUTH_ENABLE=false - should work without API key."""
# Mock the settings service to disable webhook authentication (default behavior)
from unittest.mock import patch
with patch("langflow.services.auth.utils.get_settings_service") as mock_settings:
mock_auth_settings = type("AuthSettings", (), {"WEBHOOK_AUTH_ENABLE": False})()
mock_settings_service = type("SettingsService", (), {"auth_settings": mock_auth_settings})()
mock_settings.return_value = mock_settings_service
endpoint_name = added_webhook_test["endpoint_name"]
endpoint = f"api/v1/webhook/{endpoint_name}"
payload = {"path": "/tmp/test_auto_login.txt"} # noqa: S108
# Should work without API key when webhook auth is disabled
response = await client.post(endpoint, json=payload)
assert response.status_code == 202
async def test_webhook_with_random_payload_requires_auth(client, added_webhook_test, created_api_key):
"""Test that webhook with random payload still requires authentication."""
# Mock the settings service to enable webhook authentication
from unittest.mock import patch
with patch("langflow.services.auth.utils.get_settings_service") as mock_settings:
mock_auth_settings = type("AuthSettings", (), {"WEBHOOK_AUTH_ENABLE": True})()
mock_settings_service = type("SettingsService", (), {"auth_settings": mock_auth_settings})()
mock_settings.return_value = mock_settings_service
endpoint_name = added_webhook_test["endpoint_name"]
endpoint = f"api/v1/webhook/{endpoint_name}"
# Should fail without API key
response = await client.post(endpoint, json="Random Payload")
assert response.status_code == 403
# Should work with API key (even with random payload)
response = await client.post(
endpoint,
headers={"x-api-key": created_api_key.api_key},
json="Random Payload",
)
assert response.status_code == 202

View file

@ -3,6 +3,7 @@ import { GRADIENT_CLASS } from "@/constants/constants";
import { customGetHostProtocol } from "@/customization/utils/custom-get-host-protocol";
import { getCurlWebhookCode } from "@/modals/apiModal/utils/get-curl-code";
import ComponentTextModal from "@/modals/textAreaModal";
import { useUtilityStore } from "@/stores/utilityStore";
import { cn } from "../../../../../utils/utils";
import IconComponent from "../../../../common/genericIconComponent";
import { Input } from "../../../../ui/input";
@ -74,6 +75,7 @@ export default function TextAreaComponent({
const inputRef = useRef<HTMLInputElement>(null);
const [isFocused, setIsFocused] = useState(false);
const [passwordVisible, setPasswordVisible] = useState(false);
const webhookAuthEnable = useUtilityStore((state) => state.webhookAuthEnable);
const [cursor, setCursor] = useState<number | null>(null);
const isWebhook = useMemo(
@ -90,7 +92,7 @@ export default function TextAreaComponent({
if (isWebhook && value === WEBHOOK_VALUE) {
const curlWebhookCode = getCurlWebhookCode({
flowId: nodeInformationMetadata?.flowId!,
isAuth: nodeInformationMetadata?.isAuth!,
webhookAuthEnable,
flowName: nodeInformationMetadata?.flowName!,
format: "singleline",
});
@ -99,7 +101,13 @@ export default function TextAreaComponent({
const mcpSSEUrl = `${URL_MCP_SSE}`;
handleOnNewValue({ value: mcpSSEUrl });
}
}, [isWebhook, value, nodeInformationMetadata, handleOnNewValue]);
}, [
isWebhook,
value,
nodeInformationMetadata,
handleOnNewValue,
webhookAuthEnable,
]);
// Restore cursor position after value changes
useEffect(() => {
@ -127,7 +135,7 @@ export default function TextAreaComponent({
if (isWebhook) {
const curlWebhookCode = getCurlWebhookCode({
flowId: nodeInformationMetadata?.flowId!,
isAuth: nodeInformationMetadata?.isAuth!,
webhookAuthEnable,
flowName: nodeInformationMetadata?.flowName!,
format,
});

View file

@ -21,6 +21,7 @@ export interface ConfigResponse {
webhook_polling_interval: number;
serialization_max_items_length: number;
event_delivery: EventDeliveryType;
webhook_auth_enable: boolean;
}
export const useGetConfig: useQueryFunctionType<undefined, ConfigResponse> = (
@ -44,6 +45,9 @@ export const useGetConfig: useQueryFunctionType<undefined, ConfigResponse> = (
(state) => state.setWebhookPollingInterval,
);
const setEventDelivery = useUtilityStore((state) => state.setEventDelivery);
const setWebhookAuthEnable = useUtilityStore(
(state) => state.setWebhookAuthEnable,
);
const { query } = UseRequestProcessor();
@ -66,6 +70,7 @@ export const useGetConfig: useQueryFunctionType<undefined, ConfigResponse> = (
data.webhook_polling_interval ?? DEFAULT_POLLING_INTERVAL,
);
setEventDelivery(data.event_delivery ?? EventDeliveryType.POLLING);
setWebhookAuthEnable(data.webhook_auth_enable ?? true);
}
return data;
};

View file

@ -13,21 +13,24 @@ import {
*
* @param {Object} options - The options for generating the cURL command.
* @param {string} options.flowId - The ID of the flow.
* @param {boolean} options.isAuth - Indicates whether authentication is required.
* @param {boolean} options.webhookAuthEnable - Indicates whether authentication is required for webhooks.
* @param {string} options.endpointName - The name of the webhook endpoint.
* @returns {string} The cURL command.
*/
export function getCurlWebhookCode({
flowId,
isAuth,
webhookAuthEnable,
endpointName,
format = "multiline",
}: GetCodeType & { format?: "multiline" | "singleline" }) {
}: GetCodeType & {
webhookAuthEnable: boolean;
format?: "multiline" | "singleline";
}) {
const { protocol, host } = customGetHostProtocol();
const baseUrl = `${protocol}//${host}/api/v1/webhook/${
endpointName || flowId
}`;
const authHeader = !isAuth ? `-H 'x-api-key: <your api key>'` : "";
const authHeader = webhookAuthEnable ? `-H 'x-api-key: <your api key>'` : "";
if (format === "singleline") {
return `curl -X POST "${baseUrl}" -H 'Content-Type: application/json' ${authHeader} -d '{"any": "data"}'`.trim();
@ -36,7 +39,7 @@ export function getCurlWebhookCode({
return `curl -X POST \\
"${baseUrl}" \\
-H 'Content-Type: application/json' \\${
isAuth ? `\n -H 'x-api-key: <your api key>' \\` : ""
webhookAuthEnable ? `\n -H 'x-api-key: <your api key>' \\` : ""
}${
ENABLE_DATASTAX_LANGFLOW
? `\n -H 'Authorization: Bearer <YOUR_APPLICATION_TOKEN>' \\`

View file

@ -48,4 +48,7 @@ export const useUtilityStore = create<UtilityStoreType>((set, get) => ({
eventDelivery: EventDeliveryType.POLLING,
setEventDelivery: (eventDelivery: EventDeliveryType) =>
set({ eventDelivery }),
webhookAuthEnable: true,
setWebhookAuthEnable: (webhookAuthEnable: boolean) =>
set({ webhookAuthEnable }),
}));

View file

@ -10,7 +10,7 @@ export type GetCodesType = {
export type GetCodeType = {
flowId: string;
flowName: string;
isAuth: boolean;
webhookAuthEnable: boolean;
tweaksBuildedObject?: {};
endpointName?: string | null;
activeTweaks?: boolean;

View file

@ -28,4 +28,6 @@ export type UtilityStoreType = {
setEventDelivery: (eventDelivery: EventDeliveryType) => void;
serializationMaxItemsLength: number;
setSerializationMaxItemsLength: (serializationMaxItemsLength: number) => void;
webhookAuthEnable: boolean;
setWebhookAuthEnable: (webhookAuthEnable: boolean) => void;
};