Improve error handling and logging in get_current_user_by_jwt function (#2165)

chore: Improve error handling and logging in get_current_user_by_jwt function
This commit is contained in:
Gabriel Luiz Freitas Almeida 2024-06-14 04:24:52 -07:00 committed by GitHub
commit 5300b9cc08
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 41 additions and 27 deletions

View file

@ -1,14 +1,13 @@
import warnings
from datetime import datetime, timedelta, timezone
from typing import Annotated, Coroutine, Optional, Union
from uuid import UUID
import warnings
from cryptography.fernet import Fernet
from fastapi import Depends, HTTPException, Security, status
from fastapi.security import APIKeyHeader, APIKeyQuery, OAuth2PasswordBearer
from jose import JWTError, jwt
from loguru import logger
from sqlmodel import Session
from starlette.websockets import WebSocket
@ -92,44 +91,58 @@ async def get_current_user_by_jwt(
) -> User:
settings_service = get_settings_service()
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
if isinstance(token, Coroutine):
token = await token
if settings_service.auth_settings.SECRET_KEY.get_secret_value() is None:
raise credentials_exception
secret_key = settings_service.auth_settings.SECRET_KEY.get_secret_value()
if secret_key is None:
logger.error("Secret key is not set in settings.")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
# Careful not to leak sensitive information
detail="Authentication failure: Verify authentication settings.",
headers={"WWW-Authenticate": "Bearer"},
)
try:
# Ignore warning about datetime.utcnow
with warnings.catch_warnings():
warnings.simplefilter("ignore")
payload = jwt.decode(
token,
settings_service.auth_settings.SECRET_KEY.get_secret_value(),
algorithms=[settings_service.auth_settings.ALGORITHM],
)
user_id: UUID = payload.get("sub") # type: ignore
token_type: str = payload.get("type") # type: ignore
payload = jwt.decode(token, secret_key, algorithms=[settings_service.auth_settings.ALGORITHM])
user_id: UUID = payload.get("sub")
token_type: str = payload.get("type")
if expires := payload.get("exp", None):
expires_datetime = datetime.fromtimestamp(expires, timezone.utc)
# TypeError: can't compare offset-naive and offset-aware datetimes
if datetime.now(timezone.utc) > expires_datetime:
raise credentials_exception
logger.info("Token expired for user ID: %s", user_id)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token has expired.",
headers={"WWW-Authenticate": "Bearer"},
)
if user_id is None or token_type:
raise credentials_exception
logger.info("Invalid token payload: %s", payload)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token details.",
headers={"WWW-Authenticate": "Bearer"},
)
except JWTError as e:
raise credentials_exception from e
logger.error("JWT decoding error: %s", str(e))
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
) from e
user = get_user_by_id(db, user_id) # type: ignore
user = get_user_by_id(db, user_id)
if user is None or not user.is_active:
raise credentials_exception
logger.info("User not found or inactive: %s", user_id)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found or is inactive.",
headers={"WWW-Authenticate": "Bearer"},
)
return user

View file

@ -1,6 +1,7 @@
from datetime import datetime
import pytest
from langflow.services.auth.utils import create_super_user, get_password_hash
from langflow.services.database.models.user import UserUpdate
from langflow.services.database.models.user.model import User
@ -95,7 +96,7 @@ def test_data_consistency_after_update(client, active_user, logged_in_headers, s
# Fetch the updated user from the database
response = client.get("/api/v1/users/whoami", headers=logged_in_headers)
assert response.status_code == 401, response.json()
assert response.json()["detail"] == "Could not validate credentials"
assert response.json()["detail"] == "User not found or is inactive."
def test_data_consistency_after_delete(client, test_user, super_user_headers):