🐛 fix(utils.py): raise HTTPException with status code 400 and detail message when FIRST_SUPERUSER credentials are missing in api_key_security function

🐛 fix(utils.py): raise credentials_exception when SECRET_KEY is None in get_current_user function
🐛 fix(utils.py): raise HTTPException with status code 400 and detail message when FIRST_SUPERUSER credentials are missing in create_user_longterm_token function
🐛 fix(auth.py): set SECRET_KEY default value to empty string and disallow mutation in AuthSettings class
🐛 fix(auth.py): set FIRST_SUPERUSER and FIRST_SUPERUSER_PASSWORD as optional fields with default values and disallow mutation in AuthSettings class
🐛 fix(manager.py): raise ValueError when CONFIG_DIR is not set in settings
This commit is contained in:
Gabriel Luiz Freitas Almeida 2023-08-31 11:17:16 -03:00
commit 9eca124b17
3 changed files with 39 additions and 9 deletions

View file

@ -37,7 +37,12 @@ async def api_key_security(
result: Optional[Union[ApiKey, User]] = None
if settings_manager.auth_settings.AUTO_LOGIN:
# Get the first user
settings_manager.auth_settings.FIRST_SUPERUSER
if not settings_manager.auth_settings.FIRST_SUPERUSER:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Missing first superuser credentials",
)
result = get_user_by_username(
db, settings_manager.auth_settings.FIRST_SUPERUSER
)
@ -80,6 +85,9 @@ async def get_current_user(
if isinstance(token, Coroutine):
token = await token
if settings_manager.auth_settings.SECRET_KEY is None:
raise credentials_exception
try:
payload = jwt.decode(
token,
@ -150,9 +158,9 @@ def create_token(data: dict, expires_delta: timedelta):
def create_super_user(
username: str,
password: str,
db: Session = Depends(get_session),
username: Optional[str] = None,
password: Optional[str] = None,
) -> User:
super_user = get_user_by_username(db, username)
@ -176,7 +184,12 @@ def create_user_longterm_token(db: Session = Depends(get_session)) -> dict:
settings_manager = get_settings_manager()
username = settings_manager.auth_settings.FIRST_SUPERUSER
password = settings_manager.auth_settings.FIRST_SUPERUSER_PASSWORD
super_user = create_super_user(db, username=username, password=password)
if not username or not password:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Missing first superuser credentials",
)
super_user = create_super_user(db=db, username=username, password=password)
access_token_expires_longterm = timedelta(days=365)
access_token = create_token(

View file

@ -11,10 +11,11 @@ from langflow.utils.logger import logger
class AuthSettings(BaseSettings):
# Login settings
CONFIG_DIR: str
SECRET_KEY: Optional[str] = Field(
None,
SECRET_KEY: str = Field(
default="",
description="Secret key for JWT. If not provided, a random one will be generated.",
env="LANGFLOW_SECRET_KEY",
allow_mutation=False,
)
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 60
@ -30,8 +31,18 @@ class AuthSettings(BaseSettings):
# If AUTO_LOGIN = True
# > The application does not request login and logs in automatically as a super user.
AUTO_LOGIN: bool = False
FIRST_SUPERUSER: str = "langflow"
FIRST_SUPERUSER_PASSWORD: str = "langflow"
FIRST_SUPERUSER: Optional[str] = Field(
"langflow",
description="First super user to be created if AUTO_LOGIN is True.",
env="LANGFLOW_FIRST_SUPERUSER",
allow_mutation=False,
)
FIRST_SUPERUSER_PASSWORD: Optional[str] = Field(
"langflow",
description="First super user password to be created if AUTO_LOGIN is True.",
env="LANGFLOW_FIRST_SUPERUSER_PASSWORD",
allow_mutation=False,
)
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

View file

@ -35,5 +35,11 @@ class SettingsManager(Service):
)
settings = Settings(**settings_dict)
auth_settings = AuthSettings(CONFIG_DIR=settings.CONFIG_DIR)
if not settings.CONFIG_DIR:
raise ValueError("CONFIG_DIR must be set in settings")
auth_settings = AuthSettings(
CONFIG_DIR=settings.CONFIG_DIR,
FIRST_SUPERUSER=None,
FIRST_SUPERUSER_PASSWORD=None,
)
return cls(settings, auth_settings)