feat: encrypt oauth auth settings at rest (#9490)
* encrypt oauth auth settings at rest * [autofix.ci] apply automated fixes * Fix rebase changes and add env to env server config * Correctly unmask secretstr before encryption * update mcp-composer args * [autofix.ci] apply automated fixes * ruff * ruff * ruff * [autofix.ci] apply automated fixes * ruff * catch invalidtoken error * ruff * [autofix.ci] apply automated fixes * ruff * [autofix.ci] apply automated fixes * ruff * ruff * [autofix.ci] apply automated fixes * ruff * [autofix.ci] apply automated fixes * fix test * [autofix.ci] apply automated fixes * remove oauth mention in server config * [autofix.ci] apply automated fixes * ruff --------- Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
parent
9ae41d6251
commit
a642691fb1
7 changed files with 520 additions and 16 deletions
|
|
@ -0,0 +1,122 @@
|
|||
"""Encrypt existing MCP auth_settings credentials
|
||||
|
||||
Revision ID: 0882f9657f22
|
||||
Revises: 1cb603706752
|
||||
Create Date: 2025-08-21 20:11:26.504681
|
||||
|
||||
"""
|
||||
import json
|
||||
from typing import Sequence, Union
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
import sqlmodel
|
||||
from sqlalchemy.engine.reflection import Inspector
|
||||
from langflow.utils import migration
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = '0882f9657f22'
|
||||
down_revision: Union[str, None] = '1cb603706752'
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
"""Encrypt sensitive fields in existing auth_settings data."""
|
||||
conn = op.get_bind()
|
||||
|
||||
# Import encryption utilities
|
||||
try:
|
||||
from langflow.services.auth.mcp_encryption import encrypt_auth_settings
|
||||
from langflow.services.deps import get_settings_service
|
||||
|
||||
# Check if the folder table exists
|
||||
inspector = sa.inspect(conn)
|
||||
if 'folder' not in inspector.get_table_names():
|
||||
return
|
||||
|
||||
# Query all folders with auth_settings
|
||||
result = conn.execute(
|
||||
sa.text("SELECT id, auth_settings FROM folder WHERE auth_settings IS NOT NULL")
|
||||
)
|
||||
|
||||
# Encrypt auth_settings for each folder
|
||||
for row in result:
|
||||
folder_id = row.id
|
||||
auth_settings = row.auth_settings
|
||||
|
||||
if auth_settings:
|
||||
try:
|
||||
# Parse JSON if it's a string
|
||||
if isinstance(auth_settings, str):
|
||||
auth_settings_dict = json.loads(auth_settings)
|
||||
else:
|
||||
auth_settings_dict = auth_settings
|
||||
|
||||
# Encrypt sensitive fields
|
||||
encrypted_settings = encrypt_auth_settings(auth_settings_dict)
|
||||
|
||||
# Update the record with encrypted data
|
||||
if encrypted_settings:
|
||||
conn.execute(
|
||||
sa.text("UPDATE folder SET auth_settings = :auth_settings WHERE id = :id"),
|
||||
{"auth_settings": json.dumps(encrypted_settings), "id": folder_id}
|
||||
)
|
||||
except Exception as e:
|
||||
# Log the error but continue with other records
|
||||
print(f"Warning: Failed to encrypt auth_settings for folder {folder_id}: {e}")
|
||||
|
||||
except ImportError as e:
|
||||
# If encryption utilities are not available, skip the migration
|
||||
print(f"Warning: Encryption utilities not available, skipping encryption migration: {e}")
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
"""Decrypt sensitive fields in auth_settings data (for rollback)."""
|
||||
conn = op.get_bind()
|
||||
|
||||
# Import decryption utilities
|
||||
try:
|
||||
from langflow.services.auth.mcp_encryption import decrypt_auth_settings
|
||||
from langflow.services.deps import get_settings_service
|
||||
|
||||
# Check if the folder table exists
|
||||
inspector = sa.inspect(conn)
|
||||
if 'folder' not in inspector.get_table_names():
|
||||
return
|
||||
|
||||
# Query all folders with auth_settings
|
||||
result = conn.execute(
|
||||
sa.text("SELECT id, auth_settings FROM folder WHERE auth_settings IS NOT NULL")
|
||||
)
|
||||
|
||||
# Decrypt auth_settings for each folder
|
||||
for row in result:
|
||||
folder_id = row.id
|
||||
auth_settings = row.auth_settings
|
||||
|
||||
if auth_settings:
|
||||
try:
|
||||
# Parse JSON if it's a string
|
||||
if isinstance(auth_settings, str):
|
||||
auth_settings_dict = json.loads(auth_settings)
|
||||
else:
|
||||
auth_settings_dict = auth_settings
|
||||
|
||||
# Decrypt sensitive fields
|
||||
decrypted_settings = decrypt_auth_settings(auth_settings_dict)
|
||||
|
||||
# Update the record with decrypted data
|
||||
if decrypted_settings:
|
||||
conn.execute(
|
||||
sa.text("UPDATE folder SET auth_settings = :auth_settings WHERE id = :id"),
|
||||
{"auth_settings": json.dumps(decrypted_settings), "id": folder_id}
|
||||
)
|
||||
except Exception as e:
|
||||
# Log the error but continue with other records
|
||||
print(f"Warning: Failed to decrypt auth_settings for folder {folder_id}: {e}")
|
||||
|
||||
except ImportError as e:
|
||||
# If decryption utilities are not available, skip the migration
|
||||
print(f"Warning: Decryption utilities not available, skipping decryption migration: {e}")
|
||||
|
|
@ -8,7 +8,7 @@ from datetime import datetime, timezone
|
|||
from ipaddress import ip_address
|
||||
from pathlib import Path
|
||||
from subprocess import CalledProcessError
|
||||
from typing import Annotated
|
||||
from typing import Annotated, Any
|
||||
from uuid import UUID
|
||||
|
||||
from anyio import BrokenResourceError
|
||||
|
|
@ -39,6 +39,7 @@ from langflow.api.v1.schemas import (
|
|||
from langflow.base.mcp.constants import MAX_MCP_SERVER_NAME_LENGTH
|
||||
from langflow.base.mcp.util import sanitize_mcp_name
|
||||
from langflow.logging import logger
|
||||
from langflow.services.auth.mcp_encryption import decrypt_auth_settings, encrypt_auth_settings
|
||||
from langflow.services.database.models import Flow, Folder
|
||||
from langflow.services.database.models.api_key.crud import check_key, create_api_key
|
||||
from langflow.services.database.models.api_key.model import ApiKeyCreate
|
||||
|
|
@ -205,7 +206,7 @@ async def list_project_tools(
|
|||
)
|
||||
try:
|
||||
tool = MCPSettings(
|
||||
id=str(flow.id),
|
||||
id=flow.id,
|
||||
action_name=name,
|
||||
action_description=description,
|
||||
mcp_enabled=flow.mcp_enabled,
|
||||
|
|
@ -219,10 +220,14 @@ async def list_project_tools(
|
|||
await logger.awarning(msg)
|
||||
continue
|
||||
|
||||
# Get project-level auth settings
|
||||
# Get project-level auth settings and decrypt sensitive fields
|
||||
auth_settings = None
|
||||
if project.auth_settings:
|
||||
auth_settings = AuthSettings(**project.auth_settings)
|
||||
from langflow.api.v1.schemas import AuthSettings
|
||||
|
||||
# Decrypt sensitive fields before returning
|
||||
decrypted_settings = decrypt_auth_settings(project.auth_settings)
|
||||
auth_settings = AuthSettings(**decrypted_settings) if decrypted_settings else None
|
||||
|
||||
except Exception as e:
|
||||
msg = f"Error listing project tools: {e!s}"
|
||||
|
|
@ -336,11 +341,33 @@ async def update_project_mcp_settings(
|
|||
if not project:
|
||||
raise HTTPException(status_code=404, detail="Project not found")
|
||||
|
||||
# Update project-level auth settings
|
||||
if request.auth_settings:
|
||||
project.auth_settings = request.auth_settings.model_dump(mode="json")
|
||||
else:
|
||||
project.auth_settings = None
|
||||
# Update project-level auth settings with encryption
|
||||
if "auth_settings" in request.model_fields_set:
|
||||
if request.auth_settings is None:
|
||||
# Explicitly set to None - clear auth settings
|
||||
project.auth_settings = None
|
||||
else:
|
||||
# Use python mode to get raw values without SecretStr masking
|
||||
auth_model = request.auth_settings
|
||||
auth_dict = auth_model.model_dump(mode="python", exclude_none=True)
|
||||
|
||||
# Extract actual secret values before encryption
|
||||
from pydantic import SecretStr
|
||||
|
||||
# Handle api_key if it's a SecretStr
|
||||
api_key_val = getattr(auth_model, "api_key", None)
|
||||
if isinstance(api_key_val, SecretStr):
|
||||
auth_dict["api_key"] = api_key_val.get_secret_value()
|
||||
|
||||
# Handle oauth_client_secret if it's a SecretStr
|
||||
client_secret_val = getattr(auth_model, "oauth_client_secret", None)
|
||||
if isinstance(client_secret_val, SecretStr):
|
||||
auth_dict["oauth_client_secret"] = client_secret_val.get_secret_value()
|
||||
|
||||
# Encrypt and store
|
||||
encrypted_settings = encrypt_auth_settings(auth_dict)
|
||||
project.auth_settings = encrypted_settings
|
||||
|
||||
session.add(project)
|
||||
|
||||
# Query flows in the project
|
||||
|
|
@ -458,7 +485,7 @@ async def install_mcp_config(
|
|||
should_generate_api_key = not settings_service.auth_settings.AUTO_LOGIN
|
||||
elif project.auth_settings:
|
||||
# When MCP_COMPOSER is enabled, only generate if auth_type is "apikey"
|
||||
auth_settings = AuthSettings(**project.auth_settings)
|
||||
auth_settings = AuthSettings(**project.auth_settings) if project.auth_settings else AuthSettings()
|
||||
should_generate_api_key = auth_settings.auth_type == "apikey"
|
||||
|
||||
if should_generate_api_key:
|
||||
|
|
@ -498,7 +525,7 @@ async def install_mcp_config(
|
|||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
stdout, stderr = await proc.communicate()
|
||||
stdout, _ = await proc.communicate()
|
||||
|
||||
if proc.returncode == 0 and stdout.strip():
|
||||
wsl_ip = stdout.decode().strip().split()[0] # Get first IP address
|
||||
|
|
@ -508,8 +535,8 @@ async def install_mcp_config(
|
|||
except OSError as e:
|
||||
await logger.awarning("Failed to get WSL IP address: %s. Using default URL.", str(e))
|
||||
|
||||
# Build the base args for mcp-proxy
|
||||
args = ["mcp-proxy"]
|
||||
# Base args
|
||||
args = ["mcp-composer"] if FEATURE_FLAGS.mcp_composer else ["mcp-proxy"]
|
||||
|
||||
# Add authentication args based on MCP_COMPOSER feature flag and auth settings
|
||||
if not FEATURE_FLAGS.mcp_composer:
|
||||
|
|
@ -518,6 +545,11 @@ async def install_mcp_config(
|
|||
if generated_api_key:
|
||||
args.extend(["--headers", "x-api-key", generated_api_key])
|
||||
elif project.auth_settings:
|
||||
# Decrypt sensitive fields before using them
|
||||
decrypted_settings = decrypt_auth_settings(project.auth_settings)
|
||||
auth_settings = AuthSettings(**decrypted_settings) if decrypted_settings else AuthSettings()
|
||||
args.extend(["--auth_type", auth_settings.auth_type])
|
||||
|
||||
# When MCP_COMPOSER is enabled, only add headers if auth_type is "apikey"
|
||||
auth_settings = AuthSettings(**project.auth_settings)
|
||||
if auth_settings.auth_type == "apikey" and generated_api_key:
|
||||
|
|
@ -525,7 +557,10 @@ async def install_mcp_config(
|
|||
# If no auth_settings or auth_type is "none", don't add any auth headers
|
||||
|
||||
# Add the SSE URL
|
||||
args.append(sse_url)
|
||||
if FEATURE_FLAGS.mcp_composer:
|
||||
args.extend(["--sse-url", sse_url])
|
||||
else:
|
||||
args.append(sse_url)
|
||||
|
||||
if os_type == "Windows":
|
||||
command = "cmd"
|
||||
|
|
@ -535,7 +570,7 @@ async def install_mcp_config(
|
|||
name = project.name
|
||||
|
||||
# Create the MCP configuration
|
||||
server_config = {
|
||||
server_config: dict[str, Any] = {
|
||||
"command": command,
|
||||
"args": args,
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ from pydantic import (
|
|||
BaseModel,
|
||||
ConfigDict,
|
||||
Field,
|
||||
SecretStr,
|
||||
field_serializer,
|
||||
field_validator,
|
||||
model_serializer,
|
||||
|
|
@ -449,7 +450,7 @@ class AuthSettings(BaseModel):
|
|||
oauth_server_url: str | None = None
|
||||
oauth_callback_path: str | None = None
|
||||
oauth_client_id: str | None = None
|
||||
oauth_client_secret: str | None = None
|
||||
oauth_client_secret: SecretStr | None = None
|
||||
oauth_auth_url: str | None = None
|
||||
oauth_token_url: str | None = None
|
||||
oauth_mcp_scope: str | None = None
|
||||
|
|
|
|||
104
src/backend/base/langflow/services/auth/mcp_encryption.py
Normal file
104
src/backend/base/langflow/services/auth/mcp_encryption.py
Normal file
|
|
@ -0,0 +1,104 @@
|
|||
"""MCP Authentication encryption utilities for secure credential storage."""
|
||||
|
||||
from typing import Any
|
||||
|
||||
from cryptography.fernet import InvalidToken
|
||||
from loguru import logger
|
||||
|
||||
from langflow.services.auth import utils as auth_utils
|
||||
from langflow.services.deps import get_settings_service
|
||||
|
||||
# Fields that should be encrypted when stored
|
||||
SENSITIVE_FIELDS = [
|
||||
"oauth_client_secret",
|
||||
"api_key",
|
||||
]
|
||||
|
||||
|
||||
def encrypt_auth_settings(auth_settings: dict[str, Any] | None) -> dict[str, Any] | None:
|
||||
"""Encrypt sensitive fields in auth_settings dictionary.
|
||||
|
||||
Args:
|
||||
auth_settings: Dictionary containing authentication settings
|
||||
|
||||
Returns:
|
||||
Dictionary with sensitive fields encrypted, or None if input is None
|
||||
"""
|
||||
if auth_settings is None:
|
||||
return None
|
||||
|
||||
settings_service = get_settings_service()
|
||||
encrypted_settings = auth_settings.copy()
|
||||
|
||||
for field in SENSITIVE_FIELDS:
|
||||
if encrypted_settings.get(field):
|
||||
try:
|
||||
# Only encrypt if the value is not already encrypted
|
||||
# Try to decrypt first - if it fails, it's not encrypted
|
||||
try:
|
||||
auth_utils.decrypt_api_key(encrypted_settings[field], settings_service)
|
||||
# If decrypt succeeds, it's already encrypted
|
||||
logger.debug(f"Field {field} is already encrypted")
|
||||
except (ValueError, TypeError, KeyError, InvalidToken):
|
||||
# If decrypt fails, the value is plaintext and needs encryption
|
||||
encrypted_value = auth_utils.encrypt_api_key(encrypted_settings[field], settings_service)
|
||||
encrypted_settings[field] = encrypted_value
|
||||
logger.debug(f"Encrypted field {field}")
|
||||
except (ValueError, TypeError, KeyError) as e:
|
||||
logger.error(f"Failed to encrypt field {field}: {e}")
|
||||
raise
|
||||
|
||||
return encrypted_settings
|
||||
|
||||
|
||||
def decrypt_auth_settings(auth_settings: dict[str, Any] | None) -> dict[str, Any] | None:
|
||||
"""Decrypt sensitive fields in auth_settings dictionary.
|
||||
|
||||
Args:
|
||||
auth_settings: Dictionary containing encrypted authentication settings
|
||||
|
||||
Returns:
|
||||
Dictionary with sensitive fields decrypted, or None if input is None
|
||||
"""
|
||||
if auth_settings is None:
|
||||
return None
|
||||
|
||||
settings_service = get_settings_service()
|
||||
decrypted_settings = auth_settings.copy()
|
||||
|
||||
for field in SENSITIVE_FIELDS:
|
||||
if decrypted_settings.get(field):
|
||||
try:
|
||||
decrypted_value = auth_utils.decrypt_api_key(decrypted_settings[field], settings_service)
|
||||
decrypted_settings[field] = decrypted_value
|
||||
logger.debug(f"Decrypted field {field}")
|
||||
except (ValueError, TypeError, KeyError, InvalidToken) as e:
|
||||
# If decryption fails, assume the value is already plaintext
|
||||
# This handles backward compatibility with existing unencrypted data
|
||||
logger.debug(f"Field {field} appears to be plaintext or decryption failed: {e}")
|
||||
# Keep the original value
|
||||
|
||||
return decrypted_settings
|
||||
|
||||
|
||||
def is_encrypted(value: str) -> bool:
|
||||
"""Check if a value appears to be encrypted.
|
||||
|
||||
Args:
|
||||
value: String value to check
|
||||
|
||||
Returns:
|
||||
True if the value appears to be encrypted (base64 Fernet token)
|
||||
"""
|
||||
if not value:
|
||||
return False
|
||||
|
||||
settings_service = get_settings_service()
|
||||
try:
|
||||
# Try to decrypt - if it succeeds, it's encrypted
|
||||
auth_utils.decrypt_api_key(value, settings_service)
|
||||
except (ValueError, TypeError, KeyError, InvalidToken):
|
||||
# If decryption fails, it's not encrypted
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
|
@ -434,6 +434,81 @@ async def test_user_can_update_own_flow_mcp_settings(
|
|||
assert updated_flow.mcp_enabled is False
|
||||
|
||||
|
||||
async def test_update_project_auth_settings_encryption(
|
||||
client: AsyncClient, user_test_project, test_flow_for_update, logged_in_headers
|
||||
):
|
||||
"""Test that sensitive auth_settings fields are encrypted when stored."""
|
||||
# Create settings with sensitive data
|
||||
json_payload = {
|
||||
"settings": [
|
||||
{
|
||||
"id": str(test_flow_for_update.id),
|
||||
"action_name": "test_action",
|
||||
"action_description": "Test description",
|
||||
"mcp_enabled": True,
|
||||
"name": test_flow_for_update.name,
|
||||
"description": test_flow_for_update.description,
|
||||
}
|
||||
],
|
||||
"auth_settings": {
|
||||
"auth_type": "oauth",
|
||||
"oauth_host": "localhost",
|
||||
"oauth_port": "3000",
|
||||
"oauth_server_url": "http://localhost:3000",
|
||||
"oauth_callback_path": "/callback",
|
||||
"oauth_client_id": "test-client-id",
|
||||
"oauth_client_secret": "test-oauth-secret-value-456",
|
||||
"oauth_auth_url": "https://oauth.example.com/auth",
|
||||
"oauth_token_url": "https://oauth.example.com/token",
|
||||
"oauth_mcp_scope": "read write",
|
||||
"oauth_provider_scope": "user:email",
|
||||
},
|
||||
}
|
||||
|
||||
# Send the update request
|
||||
response = await client.patch(
|
||||
f"/api/v1/mcp/project/{user_test_project.id}",
|
||||
json=json_payload,
|
||||
headers=logged_in_headers,
|
||||
)
|
||||
assert response.status_code == 200
|
||||
|
||||
# Verify the sensitive data is encrypted in the database
|
||||
async with session_scope() as session:
|
||||
updated_project = await session.get(Folder, user_test_project.id)
|
||||
assert updated_project is not None
|
||||
assert updated_project.auth_settings is not None
|
||||
|
||||
# Check that sensitive field is encrypted (not plaintext)
|
||||
stored_value = updated_project.auth_settings.get("oauth_client_secret")
|
||||
assert stored_value is not None
|
||||
assert stored_value != "test-oauth-secret-value-456" # Should be encrypted
|
||||
|
||||
# The encrypted value should be a base64-like string (Fernet token)
|
||||
assert len(stored_value) > 50 # Encrypted values are longer
|
||||
|
||||
# Now test that the GET endpoint returns the data (SecretStr will be masked)
|
||||
response = await client.get(
|
||||
f"/api/v1/mcp/project/{user_test_project.id}",
|
||||
headers=logged_in_headers,
|
||||
)
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
|
||||
# SecretStr fields are masked in the response for security
|
||||
assert data["auth_settings"]["oauth_client_secret"] == "**********" # noqa: S105
|
||||
assert data["auth_settings"]["oauth_client_id"] == "test-client-id"
|
||||
assert data["auth_settings"]["auth_type"] == "oauth"
|
||||
|
||||
# Verify that decryption is working by checking the actual decrypted value in the backend
|
||||
from langflow.services.auth.mcp_encryption import decrypt_auth_settings
|
||||
|
||||
async with session_scope() as session:
|
||||
project = await session.get(Folder, user_test_project.id)
|
||||
decrypted_settings = decrypt_auth_settings(project.auth_settings)
|
||||
assert decrypted_settings["oauth_client_secret"] == "test-oauth-secret-value-456" # noqa: S105
|
||||
|
||||
|
||||
async def test_project_sse_creation(user_test_project):
|
||||
"""Test that SSE transport and MCP server are correctly created for a project."""
|
||||
# Test getting an SSE transport for the first time
|
||||
|
|
|
|||
0
src/backend/tests/unit/services/auth/__init__.py
Normal file
0
src/backend/tests/unit/services/auth/__init__.py
Normal file
167
src/backend/tests/unit/services/auth/test_mcp_encryption.py
Normal file
167
src/backend/tests/unit/services/auth/test_mcp_encryption.py
Normal file
|
|
@ -0,0 +1,167 @@
|
|||
"""Test MCP authentication encryption functionality."""
|
||||
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
import pytest
|
||||
from cryptography.fernet import Fernet
|
||||
from langflow.services.auth.mcp_encryption import (
|
||||
decrypt_auth_settings,
|
||||
encrypt_auth_settings,
|
||||
is_encrypted,
|
||||
)
|
||||
from pydantic import SecretStr
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_settings_service():
|
||||
"""Mock settings service for testing."""
|
||||
mock_service = Mock()
|
||||
# Generate a valid Fernet key that's already properly formatted
|
||||
# Fernet.generate_key() returns a URL-safe base64-encoded 32-byte key
|
||||
valid_key = Fernet.generate_key()
|
||||
# Decode it to string for storage
|
||||
valid_key_str = valid_key.decode("utf-8")
|
||||
|
||||
# Create a proper SecretStr object
|
||||
secret_key_obj = SecretStr(valid_key_str)
|
||||
mock_service.auth_settings.SECRET_KEY = secret_key_obj
|
||||
return mock_service
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_auth_settings():
|
||||
"""Sample auth settings with sensitive data."""
|
||||
return {
|
||||
"auth_type": "oauth",
|
||||
"oauth_host": "localhost",
|
||||
"oauth_port": "3000",
|
||||
"oauth_server_url": "http://localhost:3000",
|
||||
"oauth_callback_path": "/callback",
|
||||
"oauth_client_id": "my-client-id",
|
||||
"oauth_client_secret": "super-secret-password-123",
|
||||
"oauth_auth_url": "https://oauth.example.com/auth",
|
||||
"oauth_token_url": "https://oauth.example.com/token",
|
||||
"oauth_mcp_scope": "read write",
|
||||
"oauth_provider_scope": "user:email",
|
||||
}
|
||||
|
||||
|
||||
class TestMCPEncryption:
|
||||
"""Test MCP encryption functionality."""
|
||||
|
||||
@patch("langflow.services.auth.mcp_encryption.get_settings_service")
|
||||
def test_encrypt_auth_settings(self, mock_get_settings, mock_settings_service, sample_auth_settings):
|
||||
"""Test that sensitive fields are encrypted."""
|
||||
mock_get_settings.return_value = mock_settings_service
|
||||
|
||||
# Encrypt the settings
|
||||
encrypted = encrypt_auth_settings(sample_auth_settings)
|
||||
|
||||
# Check that sensitive fields are encrypted
|
||||
assert encrypted is not None
|
||||
assert encrypted["oauth_client_secret"] != sample_auth_settings["oauth_client_secret"]
|
||||
|
||||
# Check that non-sensitive fields remain unchanged
|
||||
assert encrypted["auth_type"] == sample_auth_settings["auth_type"]
|
||||
assert encrypted["oauth_host"] == sample_auth_settings["oauth_host"]
|
||||
assert encrypted["oauth_client_id"] == sample_auth_settings["oauth_client_id"]
|
||||
|
||||
@patch("langflow.services.auth.mcp_encryption.get_settings_service")
|
||||
def test_decrypt_auth_settings(self, mock_get_settings, mock_settings_service, sample_auth_settings):
|
||||
"""Test that encrypted fields can be decrypted."""
|
||||
mock_get_settings.return_value = mock_settings_service
|
||||
|
||||
# First encrypt the settings
|
||||
encrypted = encrypt_auth_settings(sample_auth_settings)
|
||||
|
||||
# Then decrypt them
|
||||
decrypted = decrypt_auth_settings(encrypted)
|
||||
|
||||
# Verify all fields match the original
|
||||
assert decrypted == sample_auth_settings
|
||||
|
||||
@patch("langflow.services.auth.mcp_encryption.get_settings_service")
|
||||
def test_encrypt_none_returns_none(self, mock_get_settings): # noqa: ARG002
|
||||
"""Test that encrypting None returns None."""
|
||||
result = encrypt_auth_settings(None)
|
||||
assert result is None
|
||||
|
||||
@patch("langflow.services.auth.mcp_encryption.get_settings_service")
|
||||
def test_decrypt_none_returns_none(self, mock_get_settings): # noqa: ARG002
|
||||
"""Test that decrypting None returns None."""
|
||||
result = decrypt_auth_settings(None)
|
||||
assert result is None
|
||||
|
||||
@patch("langflow.services.auth.mcp_encryption.get_settings_service")
|
||||
def test_encrypt_empty_dict(self, mock_get_settings): # noqa: ARG002
|
||||
"""Test that encrypting empty dict returns empty dict."""
|
||||
result = encrypt_auth_settings({})
|
||||
assert result == {}
|
||||
|
||||
@patch("langflow.services.auth.mcp_encryption.get_settings_service")
|
||||
def test_idempotent_encryption(self, mock_get_settings, mock_settings_service, sample_auth_settings):
|
||||
"""Test that encrypting already encrypted data doesn't double-encrypt."""
|
||||
mock_get_settings.return_value = mock_settings_service
|
||||
|
||||
# First encryption
|
||||
encrypted_once = encrypt_auth_settings(sample_auth_settings)
|
||||
|
||||
# Second encryption should detect already encrypted fields
|
||||
encrypted_twice = encrypt_auth_settings(encrypted_once)
|
||||
|
||||
# Should be the same
|
||||
assert encrypted_once == encrypted_twice
|
||||
|
||||
@patch("langflow.services.auth.mcp_encryption.get_settings_service")
|
||||
def test_partial_auth_settings(self, mock_get_settings, mock_settings_service):
|
||||
"""Test encryption with only some sensitive fields present."""
|
||||
mock_get_settings.return_value = mock_settings_service
|
||||
|
||||
partial_settings = {
|
||||
"auth_type": "api",
|
||||
"api_key": "sk-test-api-key-123",
|
||||
"username": "admin",
|
||||
}
|
||||
|
||||
encrypted = encrypt_auth_settings(partial_settings)
|
||||
|
||||
# API key should be encrypted
|
||||
assert encrypted["api_key"] != partial_settings["api_key"]
|
||||
|
||||
# Other fields unchanged
|
||||
assert encrypted["auth_type"] == partial_settings["auth_type"]
|
||||
assert encrypted["username"] == partial_settings["username"]
|
||||
|
||||
@patch("langflow.services.auth.mcp_encryption.get_settings_service")
|
||||
def test_backward_compatibility(self, mock_get_settings, mock_settings_service):
|
||||
"""Test that plaintext data is handled gracefully during decryption."""
|
||||
mock_get_settings.return_value = mock_settings_service
|
||||
|
||||
# Simulate legacy plaintext data
|
||||
plaintext_settings = {
|
||||
"auth_type": "oauth",
|
||||
"oauth_client_secret": "plaintext-secret",
|
||||
"oauth_client_id": "client-123",
|
||||
}
|
||||
|
||||
# Decryption should handle plaintext gracefully
|
||||
decrypted = decrypt_auth_settings(plaintext_settings)
|
||||
|
||||
# Should return the same data
|
||||
assert decrypted == plaintext_settings
|
||||
|
||||
@patch("langflow.services.auth.mcp_encryption.get_settings_service")
|
||||
def test_is_encrypted(self, mock_get_settings, mock_settings_service):
|
||||
"""Test the is_encrypted helper function."""
|
||||
mock_get_settings.return_value = mock_settings_service
|
||||
|
||||
# Test with plaintext
|
||||
assert not is_encrypted("plaintext-value")
|
||||
assert not is_encrypted("")
|
||||
assert not is_encrypted(None)
|
||||
|
||||
# Test with encrypted value
|
||||
from langflow.services.auth import utils as auth_utils
|
||||
|
||||
encrypted_value = auth_utils.encrypt_api_key("secret-value", mock_settings_service)
|
||||
assert is_encrypted(encrypted_value)
|
||||
Loading…
Add table
Add a link
Reference in a new issue