🔒 refactor(auth.py): move SECRET_KEY logic to AuthSettings class to improve code organization and reusability

🔒 refactor(base.py): remove SECRET_KEY field from Settings class since it is now handled by AuthSettings class
🔒 refactor(manager.py): pass CONFIG_DIR to AuthSettings constructor when creating an instance
This commit is contained in:
Gabriel Luiz Freitas Almeida 2023-08-30 18:58:12 -03:00
commit 98dfc01934
3 changed files with 42 additions and 46 deletions

View file

@ -1,13 +1,21 @@
from pathlib import Path
from typing import Optional
import secrets
from langflow.services.settings.utils import read_secret_from_file, write_secret_to_file
from pydantic import BaseSettings
from pydantic import BaseSettings, Field, validator
from passlib.context import CryptContext
from langflow.utils.logger import logger
class AuthSettings(BaseSettings):
# Login settings
SECRET_KEY: str = secrets.token_hex(32)
CONFIG_DIR: str
SECRET_KEY: Optional[str] = Field(
None,
description="Secret key for JWT. If not provided, a random one will be generated.",
env="LANGFLOW_SECRET_KEY",
)
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 60
REFRESH_TOKEN_EXPIRE_MINUTES: int = 70
@ -31,3 +39,33 @@ class AuthSettings(BaseSettings):
validate_assignment = True
extra = "ignore"
env_prefix = "LANGFLOW_"
@validator("SECRET_KEY", pre=True)
def get_secret_key(cls, value, values):
config_dir = values.get("CONFIG_DIR")
if not config_dir:
logger.debug("No CONFIG_DIR provided, not saving secret key")
return value or secrets.token_urlsafe(32)
secret_key_path = Path(config_dir) / "secret_key"
if value:
logger.debug("Secret key provided")
write_secret_to_file(secret_key_path, value)
else:
logger.debug("No secret key provided, generating a random one")
if secret_key_path.exists():
value = read_secret_from_file(secret_key_path)
logger.debug("Loaded secret key")
if not value:
value = secrets.token_urlsafe(32)
write_secret_to_file(secret_key_path, value)
logger.debug("Saved secret key")
else:
value = secrets.token_urlsafe(32)
write_secret_to_file(secret_key_path, value)
logger.debug("Saved secret key")
return value

View file

@ -1,7 +1,5 @@
import contextlib
import json
import secrets
from langflow.services.settings.utils import read_secret_from_file, write_secret_to_file
import orjson
import os
from shutil import copy2
@ -9,7 +7,7 @@ from typing import Optional, List
from pathlib import Path
import yaml
from pydantic import BaseSettings, Field, root_validator, validator
from pydantic import BaseSettings, root_validator, validator
from langflow.utils.logger import logger
# BASE_COMPONENTS_PATH = str(Path(__file__).parent / "components")
@ -43,46 +41,6 @@ class Settings(BaseSettings):
REMOVE_API_KEYS: bool = False
COMPONENTS_PATH: List[str] = []
# Login settings
SECRET_KEY: Optional[str] = Field(None, env="LANGFLOW_SECRET_KEY")
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 60
REFRESH_TOKEN_EXPIRE_MINUTES: int = 70
# If AUTO_LOGIN = True
# > The application does not request login and logs in automatically as a super user.
AUTO_LOGIN: bool = True
@validator("SECRET_KEY", pre=True)
def get_secret_key(cls, value, values):
config_dir = values.get("CONFIG_DIR")
if not config_dir:
logger.debug("No CONFIG_DIR provided, not saving secret key")
return value or secrets.token_urlsafe(32)
secret_key_path = Path(config_dir) / "secret_key"
if value:
logger.debug("Secret key provided")
write_secret_to_file(secret_key_path, value)
else:
logger.debug("No secret key provided, generating a random one")
if secret_key_path.exists():
value = read_secret_from_file(secret_key_path)
logger.debug("Loaded secret key")
if not value:
value = secrets.token_urlsafe(32)
write_secret_to_file(secret_key_path, value)
logger.debug("Saved secret key")
else:
value = secrets.token_urlsafe(32)
write_secret_to_file(secret_key_path, value)
logger.debug("Saved secret key")
return value
@validator("CONFIG_DIR", pre=True, allow_reuse=True)
def set_langflow_dir(cls, value):
if not value:

View file

@ -35,5 +35,5 @@ class SettingsManager(Service):
)
settings = Settings(**settings_dict)
auth_settings = AuthSettings()
auth_settings = AuthSettings(CONFIG_DIR=settings.CONFIG_DIR)
return cls(settings, auth_settings)