🔒 chore(auth.py): refactor authenticate_user function to use database session instead of fake_db for authentication

🔒 chore(auth.py): refactor get_current_user function to use database session instead of fake_users_db for retrieving user information

🔒 chore(auth.py): refactor get_current_active_user function to use database session instead of fake_users_db for retrieving user information

🔒 chore(user.py): refactor get_user function to use database session instead of fake_users_db for retrieving user information

🔒 chore(login.py): refactor login_for_access_token function to use database session instead of fake_users_db for authentication and token creation

🔒 feat(models.py): add User model to represent user data in the database

🔒 feat(base_control.py): add BaseControl model to represent common control fields in database models
This commit is contained in:
gustavoschaedler 2023-08-03 21:49:31 +01:00
commit f3174033ed
5 changed files with 72 additions and 68 deletions

View file

@ -1,16 +1,14 @@
from typing import Annotated
from fastapi import Depends, HTTPException, status
from passlib.context import CryptContext
from jose import JWTError, jwt
from datetime import datetime, timedelta, timezone
from fastapi.security import OAuth2PasswordBearer
from ..models.token import TokenData
from ..models.user import get_user, fake_users_db, User
from langflow.models.token import TokenData
from langflow.models.user import get_user, User
from sqlalchemy.orm import Session
from langflow.database.base import get_session
# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "698619adad2d916f1f32d264540976964b3c0d3828e0870a65add5800a8cc6b9"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
@ -37,23 +35,21 @@ def create_access_token(data: dict, expires_delta: timedelta = None):
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
def authenticate_user(db: Session, username: str, password: str):
if user := get_user(db, username):
return user if verify_password(password, user.hashed_password) else False
else:
return False
if not verify_password(password, user.hashed_password):
return False
return user
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
async def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)], db: Session = Depends(get_session)
):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
@ -63,7 +59,7 @@ async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
except JWTError as e:
raise credentials_exception from e
user = get_user(fake_users_db, username=token_data.username)
user = get_user(db, username=token_data.username)
if user is None:
raise credentials_exception
return user

View file

@ -0,0 +1,7 @@
from pydantic import BaseModel
from datetime import datetime
class BaseControl(BaseModel):
created_at: datetime
updated_at: datetime

View file

@ -0,0 +1,21 @@
from sqlalchemy import Column, String, Boolean, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql import func
from sqlalchemy.dialects.postgresql import UUID
from uuid import uuid4
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(
UUID(as_uuid=True), primary_key=True, default=uuid4, unique=True, nullable=False
)
username = Column(String, unique=True, index=True)
email = Column(String, unique=True, index=True)
disabled = Column(Boolean, default=False)
is_superuser = Column(Boolean, default=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())

View file

@ -1,39 +1,17 @@
from pydantic import BaseModel
from sqlalchemy.orm import Session
from langflow.models.user import User as DBUser
from langflow.models.base_control import BaseControl
from uuid import UUID
class User(BaseModel):
class User(BaseControl):
id: UUID
username: str
email: str | None = None
full_name: str | None = None
disabled: bool | None = None
is_admin: bool | None = False
email: str
disabled: bool = False
is_superuser: bool = False
class UserInDB(User):
hashed_password: str
fake_users_db = {
"gustavo": {
"username": "gustavo",
"full_name": "Gustavo Schaedler",
"email": "gustavopoa@gmail.com",
"hashed_password": "$2b$12$f4R8IHUaVxVchhpWrwhckeJXnPalW1vUbJzcvb1KeovJcuMwE861K", #secret
"disabled": False,
"is_admin": True,
},
"gustavo_disabled": {
"username": "gustavo_disabled",
"full_name": "Gustavo Disabled",
"email": "gustavo_disabled@gmail.com",
"hashed_password": "$2b$12$f4R8IHUaVxVchhpWrwhckeJXnPalW1vUbJzcvb1KeovJcuMwE861K", #secret
"disabled": True,
"is_admin": False,
}
}
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def get_user(db: Session, user_id: UUID) -> User:
db_user = db.query(DBUser).filter(DBUser.id == user_id).first()
return User.from_orm(db_user) if db_user else None # type: ignore

View file

@ -1,35 +1,37 @@
from datetime import timedelta
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from langflow.models.token import Token
from langflow.models.user import fake_users_db
from datetime import timedelta
from langflow.auth.auth import (
ACCESS_TOKEN_EXPIRE_MINUTES,
authenticate_user,
create_access_token
ACCESS_TOKEN_EXPIRE_MINUTES,
authenticate_user,
create_access_token,
)
from sqlalchemy.orm import Session
from langflow.database.base import get_session
TOKEN_TYPE = "bearer"
router = APIRouter()
def create_user_token(user: str) -> dict:
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": TOKEN_TYPE}
@router.post("/token", response_model=Token)
async def login_for_access_token(
form_data: OAuth2PasswordRequestForm = Depends()
form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_session)
):
user = authenticate_user(
fake_users_db,
form_data.username,
form_data.password
)
if not user:
if user := authenticate_user(db, form_data.username, form_data.password):
return create_user_token(user)
else:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username},
expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}