From 244a967517ec9538c252d588a746a39ce2e7745f Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Mon, 21 Aug 2023 16:33:56 -0300 Subject: [PATCH] =?UTF-8?q?=F0=9F=94=A8=20refactor(service.py):=20remove?= =?UTF-8?q?=20unused=20imports=20and=20unused=20functions=20in=20the=20Aut?= =?UTF-8?q?hManager=20class=20=F0=9F=94=A7=20chore(service.py):=20refactor?= =?UTF-8?q?=20the=20AuthManager=20class=20to=20accept=20a=20settings=5Fman?= =?UTF-8?q?ager=20parameter=20in=20the=20constructor=20for=20better=20depe?= =?UTF-8?q?ndency=20injection=20=F0=9F=94=A7=20chore(service.py):=20refact?= =?UTF-8?q?or=20the=20run=5Foauth2=5Fscheme=20method=20in=20the=20AuthMana?= =?UTF-8?q?ger=20class=20to=20use=20the=20oauth2=5Fscheme=20from=20the=20s?= =?UTF-8?q?ettings=5Fmanager=20instead=20of=20a=20hardcoded=20value?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ✨ feat(auth/utils.py): add authentication and authorization utilities for user authentication and token generation 🔒 chore(auth/utils.py): add auth_scheme_dependency function to handle authentication scheme dependency 🔒 chore(auth/utils.py): add get_current_user function to retrieve the current authenticated user 🔒 chore(auth/utils.py): add get_current_active_user function to retrieve the current active authenticated user 🔒 chore(auth/utils.py): add verify_password function to verify the password 🔒 chore(auth/utils.py): add get_password_hash function to get the hashed password 🔒 chore(auth/utils.py): add create_token function to create a JWT token 🔒 chore(auth/utils.py): add create_super_user function to create a super user 🔒 chore(auth/utils.py): add create_user_longterm_token function to create a long-term token for a user 🔒 chore(auth/utils.py): add create_user_api_key function to create an API key for a user 🔒 chore(auth/utils.py): add get_user_id_from_token function to get the user ID from a token 🔒 chore(auth/utils.py): add create_user_tokens function to create access and refresh tokens for a user 🔒 chore(auth/utils.py): add create_refresh_token function to create new access and refresh tokens using a refresh token 🔒 chore(auth/utils.py): add authenticate_user function to authenticate a user with username and password --- src/backend/langflow/services/auth/service.py | 227 +----------------- src/backend/langflow/services/auth/utils.py | 222 +++++++++++++++++ 2 files changed, 228 insertions(+), 221 deletions(-) create mode 100644 src/backend/langflow/services/auth/utils.py diff --git a/src/backend/langflow/services/auth/service.py b/src/backend/langflow/services/auth/service.py index 5e25aa207..c5f380298 100644 --- a/src/backend/langflow/services/auth/service.py +++ b/src/backend/langflow/services/auth/service.py @@ -1,228 +1,13 @@ -from uuid import UUID -from typing import Annotated -from jose import JWTError, jwt from langflow.services.base import Service -from langflow.services.database.models.user import ( - User, - get_user_by_id, - get_user_by_username, -) -from sqlalchemy.orm import Session -from passlib.context import CryptContext -from fastapi.security import OAuth2PasswordBearer -from fastapi import Depends, HTTPException, status -from datetime import datetime, timedelta, timezone - -from langflow.services.utils import get_settings_manager, get_session - -from langflow.services.database.models.user import ( - update_user_last_login_at, -) - - -pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") -oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login") class AuthManager(Service): name = "auth_manager" - def __init__(self): - self.pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") - self.oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login") + def __init__(self, settings_manager): + self.settings_manager = settings_manager - -async def get_current_user( - token: Annotated[str, Depends(oauth2_scheme)], db: Session = Depends(get_session) -) -> User: - settings_manager = get_settings_manager() - - credentials_exception = HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Could not validate credentials", - headers={"WWW-Authenticate": "Bearer"}, - ) - - try: - payload = jwt.decode( - token, - settings_manager.auth_settings.SECRET_KEY, - algorithms=[settings_manager.auth_settings.ALGORITHM], - ) - user_id: UUID = payload.get("sub") # type: ignore - token_type: str = payload.get("type") # type: ignore - - if user_id is None or token_type: - raise credentials_exception - except JWTError as e: - raise credentials_exception from e - - user = get_user_by_id(db, user_id) # type: ignore - if user is None: - raise credentials_exception - return user - - -async def get_current_active_user( - current_user: Annotated[User, Depends(get_current_user)] -): - if not current_user.is_active: - raise HTTPException(status_code=400, detail="Inactive user") - return current_user - - -def verify_password(plain_password, hashed_password): - return pwd_context.verify(plain_password, hashed_password) - - -def get_password_hash(password): - return pwd_context.hash(password) - - -def create_token(data: dict, expires_delta: timedelta): - settings_manager = get_settings_manager() - - to_encode = data.copy() - expire = datetime.now(timezone.utc) + expires_delta - to_encode["exp"] = expire - - return jwt.encode( - to_encode, - settings_manager.auth_settings.SECRET_KEY, - algorithm=settings_manager.auth_settings.ALGORITHM, - ) - - -def create_super_user(db: Session = Depends(get_session)) -> User: - settings_manager = get_settings_manager() - - super_user = get_user_by_username( - db, settings_manager.auth_settings.FIRST_SUPERUSER - ) - - if not super_user: - super_user = User( - username=settings_manager.auth_settings.FIRST_SUPERUSER, - password=get_password_hash( - settings_manager.auth_settings.FIRST_SUPERUSER_PASSWORD - ), - is_superuser=True, - is_active=True, - last_login_at=None, - ) - - db.add(super_user) - db.commit() - db.refresh(super_user) - - return super_user - - -def create_user_longterm_token(db: Session = Depends(get_session)) -> dict: - super_user = create_super_user(db) - - access_token_expires_longterm = timedelta(days=365) - access_token = create_token( - data={"sub": str(super_user.id)}, - expires_delta=access_token_expires_longterm, - ) - - # Update: last_login_at - update_user_last_login_at(super_user.id, db) - - return { - "access_token": access_token, - "refresh_token": None, - "token_type": "bearer", - } - - -def create_user_api_key(user_id: UUID) -> dict: - access_token = create_token( - data={"sub": str(user_id), "role": "api_key"}, - expires_delta=timedelta(days=365 * 2), - ) - - return {"api_key": access_token} - - -def get_user_id_from_token(token: str) -> UUID: - try: - user_id = jwt.get_unverified_claims(token)["sub"] - return UUID(user_id) - except (KeyError, JWTError, ValueError): - return UUID(int=0) - - -def create_user_tokens( - user_id: UUID, db: Session = Depends(get_session), update_last_login: bool = False -) -> dict: - settings_manager = get_settings_manager() - - access_token_expires = timedelta( - minutes=settings_manager.auth_settings.ACCESS_TOKEN_EXPIRE_MINUTES - ) - access_token = create_token( - data={"sub": str(user_id)}, - expires_delta=access_token_expires, - ) - - refresh_token_expires = timedelta( - minutes=settings_manager.auth_settings.REFRESH_TOKEN_EXPIRE_MINUTES - ) - refresh_token = create_token( - data={"sub": str(user_id), "type": "rf"}, - expires_delta=refresh_token_expires, - ) - - # Update: last_login_at - if update_last_login: - update_user_last_login_at(user_id, db) - - return { - "access_token": access_token, - "refresh_token": refresh_token, - "token_type": "bearer", - } - - -def create_refresh_token(refresh_token: str, db: Session = Depends(get_session)): - settings_manager = get_settings_manager() - - try: - payload = jwt.decode( - refresh_token, - settings_manager.auth_settings.SECRET_KEY, - algorithms=[settings_manager.auth_settings.ALGORITHM], - ) - user_id: UUID = payload.get("sub") # type: ignore - token_type: str = payload.get("type") # type: ignore - - if user_id is None or token_type is None: - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid refresh token" - ) - - return create_user_tokens(user_id, db) - - except JWTError as e: - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Invalid refresh token", - ) from e - - -def authenticate_user( - username: str, password: str, db: Session = Depends(get_session) -) -> User | None: - user = get_user_by_username(db, username) - - if not user: - return None - - if not user.is_active: - if not user.last_login_at: - raise HTTPException(status_code=400, detail="Waiting for approval") - raise HTTPException(status_code=400, detail="Inactive user") - - return user if verify_password(password, user.password) else None + # We need to define a function that can be passed to the Depends() function. + # This function will be called by FastAPI to run oauth2_scheme + def run_oauth2_scheme(self, *args, **kwargs): + return self.settings_manager.auth_settings.oauth2_scheme(*args, **kwargs) diff --git a/src/backend/langflow/services/auth/utils.py b/src/backend/langflow/services/auth/utils.py new file mode 100644 index 000000000..f897151e7 --- /dev/null +++ b/src/backend/langflow/services/auth/utils.py @@ -0,0 +1,222 @@ +from datetime import datetime, timedelta, timezone +from fastapi import Depends, HTTPException, status +from jose import JWTError, jwt +from typing import Annotated +from uuid import UUID +from langflow.services.auth.service import AuthManager +from langflow.services.database.models.user import ( + User, + get_user_by_id, + get_user_by_username, + update_user_last_login_at, +) +from langflow.services.utils import get_session, get_settings_manager +from sqlalchemy.orm import Session + + +def auth_scheme_dependency(*args, **kwargs): + settings_manager = ( + get_settings_manager() + ) # Assuming get_settings_manager is defined + + return AuthManager(settings_manager).run_oauth2_scheme(*args, **kwargs) + + +async def get_current_user( + token: Annotated[str, Depends(auth_scheme_dependency)], + db: Session = Depends(get_session), +) -> User: + settings_manager = get_settings_manager() + + credentials_exception = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Could not validate credentials", + headers={"WWW-Authenticate": "Bearer"}, + ) + try: + payload = jwt.decode( + token, + settings_manager.auth_settings.SECRET_KEY, + algorithms=[settings_manager.auth_settings.ALGORITHM], + ) + user_id: UUID = payload.get("sub") # type: ignore + token_type: str = payload.get("type") # type: ignore + + if user_id is None or token_type: + raise credentials_exception + except JWTError as e: + raise credentials_exception from e + + user = get_user_by_id(db, user_id) # type: ignore + if user is None: + raise credentials_exception + return user + + +async def get_current_active_user( + current_user: Annotated[User, Depends(get_current_user)] +): + if not current_user.is_active: + raise HTTPException(status_code=400, detail="Inactive user") + return current_user + + +def verify_password(plain_password, hashed_password): + settings_manager = get_settings_manager() + return settings_manager.auth_settings.pwd_context.verify( + plain_password, hashed_password + ) + + +def get_password_hash(password): + settings_manager = get_settings_manager() + return settings_manager.auth_settings.pwd_context.hash(password) + + +def create_token(data: dict, expires_delta: timedelta): + settings_manager = get_settings_manager() + + to_encode = data.copy() + expire = datetime.now(timezone.utc) + expires_delta + to_encode["exp"] = expire + + return jwt.encode( + to_encode, + settings_manager.auth_settings.SECRET_KEY, + algorithm=settings_manager.auth_settings.ALGORITHM, + ) + + +def create_super_user(db: Session = Depends(get_session)) -> User: + settings_manager = get_settings_manager() + + super_user = get_user_by_username( + db, settings_manager.auth_settings.FIRST_SUPERUSER + ) + + if not super_user: + super_user = User( + username=settings_manager.auth_settings.FIRST_SUPERUSER, + password=get_password_hash( + settings_manager.auth_settings.FIRST_SUPERUSER_PASSWORD + ), + is_superuser=True, + is_active=True, + last_login_at=None, + ) + + db.add(super_user) + db.commit() + db.refresh(super_user) + + return super_user + + +def create_user_longterm_token(db: Session = Depends(get_session)) -> dict: + super_user = create_super_user(db) + + access_token_expires_longterm = timedelta(days=365) + access_token = create_token( + data={"sub": str(super_user.id)}, + expires_delta=access_token_expires_longterm, + ) + + # Update: last_login_at + update_user_last_login_at(super_user.id, db) + + return { + "access_token": access_token, + "refresh_token": None, + "token_type": "bearer", + } + + +def create_user_api_key(user_id: UUID) -> dict: + access_token = create_token( + data={"sub": str(user_id), "role": "api_key"}, + expires_delta=timedelta(days=365 * 2), + ) + + return {"api_key": access_token} + + +def get_user_id_from_token(token: str) -> UUID: + try: + user_id = jwt.get_unverified_claims(token)["sub"] + return UUID(user_id) + except (KeyError, JWTError, ValueError): + return UUID(int=0) + + +def create_user_tokens( + user_id: UUID, db: Session = Depends(get_session), update_last_login: bool = False +) -> dict: + settings_manager = get_settings_manager() + + access_token_expires = timedelta( + minutes=settings_manager.auth_settings.ACCESS_TOKEN_EXPIRE_MINUTES + ) + access_token = create_token( + data={"sub": str(user_id)}, + expires_delta=access_token_expires, + ) + + refresh_token_expires = timedelta( + minutes=settings_manager.auth_settings.REFRESH_TOKEN_EXPIRE_MINUTES + ) + refresh_token = create_token( + data={"sub": str(user_id), "type": "rf"}, + expires_delta=refresh_token_expires, + ) + + # Update: last_login_at + if update_last_login: + update_user_last_login_at(user_id, db) + + return { + "access_token": access_token, + "refresh_token": refresh_token, + "token_type": "bearer", + } + + +def create_refresh_token(refresh_token: str, db: Session = Depends(get_session)): + settings_manager = get_settings_manager() + + try: + payload = jwt.decode( + refresh_token, + settings_manager.auth_settings.SECRET_KEY, + algorithms=[settings_manager.auth_settings.ALGORITHM], + ) + user_id: UUID = payload.get("sub") # type: ignore + token_type: str = payload.get("type") # type: ignore + + if user_id is None or token_type is None: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid refresh token" + ) + + return create_user_tokens(user_id, db) + + except JWTError as e: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid refresh token", + ) from e + + +def authenticate_user( + username: str, password: str, db: Session = Depends(get_session) +) -> User | None: + user = get_user_by_username(db, username) + + if not user: + return None + + if not user.is_active: + if not user.last_login_at: + raise HTTPException(status_code=400, detail="Waiting for approval") + raise HTTPException(status_code=400, detail="Inactive user") + + return user if verify_password(password, user.password) else None