Adds new AuthSettings

This commit is contained in:
Gabriel Luiz Freitas Almeida 2023-08-21 16:12:49 -03:00
commit f4d12f27e6
6 changed files with 262 additions and 21 deletions

View file

@ -0,0 +1,228 @@
from uuid import UUID
from typing import Annotated
from jose import JWTError, jwt
from langflow.services.base import Service
from langflow.services.database.models.user import (
User,
get_user_by_id,
get_user_by_username,
)
from sqlalchemy.orm import Session
from passlib.context import CryptContext
from fastapi.security import OAuth2PasswordBearer
from fastapi import Depends, HTTPException, status
from datetime import datetime, timedelta, timezone
from langflow.services.utils import get_settings_manager, get_session
from langflow.services.database.models.user import (
update_user_last_login_at,
)
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login")
class AuthManager(Service):
name = "auth_manager"
def __init__(self):
self.pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
self.oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login")
async def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)], db: Session = Depends(get_session)
) -> User:
settings_manager = get_settings_manager()
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(
token,
settings_manager.auth_settings.SECRET_KEY,
algorithms=[settings_manager.auth_settings.ALGORITHM],
)
user_id: UUID = payload.get("sub") # type: ignore
token_type: str = payload.get("type") # type: ignore
if user_id is None or token_type:
raise credentials_exception
except JWTError as e:
raise credentials_exception from e
user = get_user_by_id(db, user_id) # type: ignore
if user is None:
raise credentials_exception
return user
async def get_current_active_user(
current_user: Annotated[User, Depends(get_current_user)]
):
if not current_user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def create_token(data: dict, expires_delta: timedelta):
settings_manager = get_settings_manager()
to_encode = data.copy()
expire = datetime.now(timezone.utc) + expires_delta
to_encode["exp"] = expire
return jwt.encode(
to_encode,
settings_manager.auth_settings.SECRET_KEY,
algorithm=settings_manager.auth_settings.ALGORITHM,
)
def create_super_user(db: Session = Depends(get_session)) -> User:
settings_manager = get_settings_manager()
super_user = get_user_by_username(
db, settings_manager.auth_settings.FIRST_SUPERUSER
)
if not super_user:
super_user = User(
username=settings_manager.auth_settings.FIRST_SUPERUSER,
password=get_password_hash(
settings_manager.auth_settings.FIRST_SUPERUSER_PASSWORD
),
is_superuser=True,
is_active=True,
last_login_at=None,
)
db.add(super_user)
db.commit()
db.refresh(super_user)
return super_user
def create_user_longterm_token(db: Session = Depends(get_session)) -> dict:
super_user = create_super_user(db)
access_token_expires_longterm = timedelta(days=365)
access_token = create_token(
data={"sub": str(super_user.id)},
expires_delta=access_token_expires_longterm,
)
# Update: last_login_at
update_user_last_login_at(super_user.id, db)
return {
"access_token": access_token,
"refresh_token": None,
"token_type": "bearer",
}
def create_user_api_key(user_id: UUID) -> dict:
access_token = create_token(
data={"sub": str(user_id), "role": "api_key"},
expires_delta=timedelta(days=365 * 2),
)
return {"api_key": access_token}
def get_user_id_from_token(token: str) -> UUID:
try:
user_id = jwt.get_unverified_claims(token)["sub"]
return UUID(user_id)
except (KeyError, JWTError, ValueError):
return UUID(int=0)
def create_user_tokens(
user_id: UUID, db: Session = Depends(get_session), update_last_login: bool = False
) -> dict:
settings_manager = get_settings_manager()
access_token_expires = timedelta(
minutes=settings_manager.auth_settings.ACCESS_TOKEN_EXPIRE_MINUTES
)
access_token = create_token(
data={"sub": str(user_id)},
expires_delta=access_token_expires,
)
refresh_token_expires = timedelta(
minutes=settings_manager.auth_settings.REFRESH_TOKEN_EXPIRE_MINUTES
)
refresh_token = create_token(
data={"sub": str(user_id), "type": "rf"},
expires_delta=refresh_token_expires,
)
# Update: last_login_at
if update_last_login:
update_user_last_login_at(user_id, db)
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer",
}
def create_refresh_token(refresh_token: str, db: Session = Depends(get_session)):
settings_manager = get_settings_manager()
try:
payload = jwt.decode(
refresh_token,
settings_manager.auth_settings.SECRET_KEY,
algorithms=[settings_manager.auth_settings.ALGORITHM],
)
user_id: UUID = payload.get("sub") # type: ignore
token_type: str = payload.get("type") # type: ignore
if user_id is None or token_type is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid refresh token"
)
return create_user_tokens(user_id, db)
except JWTError as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid refresh token",
) from e
def authenticate_user(
username: str, password: str, db: Session = Depends(get_session)
) -> User | None:
user = get_user_by_username(db, username)
if not user:
return None
if not user.is_active:
if not user.last_login_at:
raise HTTPException(status_code=400, detail="Waiting for approval")
raise HTTPException(status_code=400, detail="Inactive user")
return user if verify_password(password, user.password) else None

View file

@ -0,0 +1,29 @@
from typing import Optional
import secrets
from pydantic import BaseSettings
class AuthSettings(BaseSettings):
# Login settings
SECRET_KEY: str = secrets.token_hex(32)
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 60
REFRESH_TOKEN_EXPIRE_MINUTES: int = 70
# API Key to execute /process endpoint
API_KEY_SECRET_KEY: Optional[
str
] = "b82818e0ad4ff76615c5721ee21004b07d84cd9b87ba4d9cb42374da134b841a"
API_KEY_ALGORITHM: str = "HS256"
# If AUTO_LOGIN = True
# > The application does not request login and logs in automatically as a super user.
AUTO_LOGIN: bool = True
FIRST_SUPERUSER: str = "langflow"
FIRST_SUPERUSER_PASSWORD: str = "langflow"
class Config:
validate_assignment = True
extra = "ignore"
env_prefix = "LANGFLOW_"

View file

@ -2,7 +2,6 @@ import contextlib
import json
import os
from shutil import copy2
import secrets
from typing import Optional, List
from pathlib import Path
@ -41,24 +40,6 @@ class Settings(BaseSettings):
REMOVE_API_KEYS: bool = False
COMPONENTS_PATH: List[str] = []
# Login settings
SECRET_KEY: str = secrets.token_hex(32)
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 60
REFRESH_TOKEN_EXPIRE_MINUTES: int = 70
# API Key to execute /process endpoint
API_KEY_SECRET_KEY: Optional[
str
] = "b82818e0ad4ff76615c5721ee21004b07d84cd9b87ba4d9cb42374da134b841a"
API_KEY_ALGORITHM: str = "HS256"
# If AUTO_LOGIN = True
# > The application does not request login and logs in automatically as a super user.
AUTO_LOGIN: bool = True
FIRST_SUPERUSER: str = "langflow"
FIRST_SUPERUSER_PASSWORD: str = "langflow"
@validator("CONFIG_DIR", pre=True, allow_reuse=True)
def set_langflow_dir(cls, value):
if not value:

View file

@ -1,4 +1,5 @@
from langflow.services.base import Service
from langflow.services.settings.auth import AuthSettings
from langflow.services.settings.base import Settings
from langflow.utils.logger import logger
import os
@ -8,9 +9,10 @@ import yaml
class SettingsManager(Service):
name = "settings_manager"
def __init__(self, settings: Settings):
def __init__(self, settings: Settings, auth_settings: AuthSettings):
super().__init__()
self.settings = settings
self.auth_settings = auth_settings
@classmethod
def load_settings_from_yaml(cls, file_path: str) -> "SettingsManager":
@ -33,4 +35,5 @@ class SettingsManager(Service):
)
settings = Settings(**settings_dict)
return cls(settings)
auth_settings = AuthSettings()
return cls(settings, auth_settings)