🔒 chore(utils.py): refactor import statements and remove unused imports for better code organization and readability

🔒 chore(utils.py): refactor auth_scheme_dependency to use OAuth2PasswordBearer for better security
🔒 chore(utils.py): refactor validate_api_key to use check_key function from crud module for better code reuse
🔒 chore(utils.py): add api_key_security function to handle API key authentication logic
🔒 chore(utils.py): refactor get_current_user to use oauth2_login dependency for better security
🔒 chore(utils.py): remove unused validate_api_key function
🔒 chore(auth.py): remove unused oauth2_scheme variable for better code organization
This commit is contained in:
Gabriel Luiz Freitas Almeida 2023-08-26 17:38:43 -03:00
commit 0f3bf9192d
2 changed files with 42 additions and 20 deletions

View file

@ -1,10 +1,10 @@
from datetime import datetime, timedelta, timezone
from fastapi import Depends, HTTPException, Request, status
from fastapi import Depends, HTTPException, Security, status
from fastapi.security import APIKeyHeader, APIKeyQuery, OAuth2PasswordBearer
from jose import JWTError, jwt
from typing import Annotated, Coroutine
from uuid import UUID
from langflow.services.auth.service import AuthManager
from langflow.services.database.models.api_key.api_key import ApiKey
from langflow.services.database.models.api_key.crud import check_key
from langflow.services.database.models.user.user import User
from langflow.services.database.models.user.crud import (
get_user_by_id,
@ -12,19 +12,51 @@ from langflow.services.database.models.user.crud import (
update_user_last_login_at,
)
from langflow.services.utils import get_session, get_settings_manager
from sqlmodel import Session, select
from sqlmodel import Session
oauth2_login = OAuth2PasswordBearer(tokenUrl="api/v1/login")
API_KEY_NAME = "api-key"
api_key_query = APIKeyQuery(
name=API_KEY_NAME, scheme_name="API key query", auto_error=False
)
api_key_header = APIKeyHeader(
name=API_KEY_NAME, scheme_name="API key header", auto_error=False
)
async def auth_scheme_dependency(request: Request):
settings_manager = (
get_settings_manager()
) # Assuming get_settings_manager is defined
# Source: https://github.com/mrtolkien/fastapi_simple_security/blob/master/fastapi_simple_security/security_api_key.py
async def api_key_security(
query_param: str = Security(api_key_query),
header_param: str = Security(api_key_header),
db: Session = Depends(get_session),
):
settings_manager = get_settings_manager()
if settings_manager.auth_settings.AUTO_LOGIN:
return settings_manager.auth_settings.API_KEY_SECRET_KEY
return await AuthManager(settings_manager).run_oauth2_scheme(request)
elif not query_param and not header_param:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="An API key must be passed as query or header",
)
elif query_param and check_key(db, query_param):
return query_param
elif header_param and check_key(db, header_param):
return header_param
else:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid or missing API key",
)
async def get_current_user(
token: Annotated[str, Depends(auth_scheme_dependency)],
token: Annotated[str, Depends(oauth2_login)],
db: Session = Depends(get_session),
) -> User:
settings_manager = get_settings_manager()
@ -58,14 +90,6 @@ async def get_current_user(
return user
async def validate_api_key(
token: Annotated[str, Depends(auth_scheme_dependency)],
db: Session = Depends(get_session),
) -> bool:
hashed_api_key = get_password_hash(token)
return db.exec(select(ApiKey).where(ApiKey.hashed_api_key == hashed_api_key)).one_or_none() # type: ignore
def get_current_active_user(current_user: Annotated[User, Depends(get_current_user)]):
if not current_user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")

View file

@ -3,7 +3,6 @@ import secrets
from pydantic import BaseSettings
from passlib.context import CryptContext
from fastapi.security import OAuth2PasswordBearer
class AuthSettings(BaseSettings):
@ -27,7 +26,6 @@ class AuthSettings(BaseSettings):
FIRST_SUPERUSER_PASSWORD: str = "langflow"
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{API_V1_STR}/login")
class Config:
validate_assignment = True