232 lines
8.5 KiB
Python
232 lines
8.5 KiB
Python
from datetime import datetime
|
|
|
|
import pytest
|
|
|
|
from langflow.services.auth.utils import create_super_user, get_password_hash
|
|
from langflow.services.database.models.user import UserUpdate
|
|
from langflow.services.database.models.user.model import User
|
|
from langflow.services.database.utils import session_getter
|
|
from langflow.services.deps import get_db_service, get_settings_service
|
|
|
|
|
|
@pytest.fixture
|
|
def super_user(client):
|
|
settings_manager = get_settings_service()
|
|
auth_settings = settings_manager.auth_settings
|
|
with session_getter(get_db_service()) as session:
|
|
return create_super_user(
|
|
db=session,
|
|
username=auth_settings.SUPERUSER,
|
|
password=auth_settings.SUPERUSER_PASSWORD,
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def super_user_headers(client, super_user):
|
|
settings_service = get_settings_service()
|
|
auth_settings = settings_service.auth_settings
|
|
login_data = {
|
|
"username": auth_settings.SUPERUSER,
|
|
"password": auth_settings.SUPERUSER_PASSWORD,
|
|
}
|
|
response = client.post("/api/v1/login", data=login_data)
|
|
assert response.status_code == 200
|
|
tokens = response.json()
|
|
a_token = tokens["access_token"]
|
|
return {"Authorization": f"Bearer {a_token}"}
|
|
|
|
|
|
@pytest.fixture
|
|
def deactivated_user():
|
|
with session_getter(get_db_service()) as session:
|
|
user = User(
|
|
username="deactivateduser",
|
|
password=get_password_hash("testpassword"),
|
|
is_active=False,
|
|
is_superuser=False,
|
|
last_login_at=datetime.now(),
|
|
)
|
|
session.add(user)
|
|
session.commit()
|
|
session.refresh(user)
|
|
return user
|
|
|
|
|
|
def test_user_waiting_for_approval(
|
|
client,
|
|
):
|
|
# Create a user that is not active and has never logged in
|
|
with session_getter(get_db_service()) as session:
|
|
user = User(
|
|
username="waitingforapproval",
|
|
password=get_password_hash("testpassword"),
|
|
is_active=False,
|
|
last_login_at=None,
|
|
)
|
|
session.add(user)
|
|
session.commit()
|
|
|
|
login_data = {"username": "waitingforapproval", "password": "testpassword"}
|
|
response = client.post("/api/v1/login", data=login_data)
|
|
assert response.status_code == 400
|
|
assert response.json()["detail"] == "Waiting for approval"
|
|
|
|
|
|
def test_deactivated_user_cannot_login(client, deactivated_user):
|
|
login_data = {"username": deactivated_user.username, "password": "testpassword"}
|
|
response = client.post("/api/v1/login", data=login_data)
|
|
assert response.status_code == 400, response.json()
|
|
assert response.json()["detail"] == "Inactive user"
|
|
|
|
|
|
def test_deactivated_user_cannot_access(client, deactivated_user, logged_in_headers):
|
|
# Assuming the headers for deactivated_user
|
|
response = client.get("/api/v1/users", headers=logged_in_headers)
|
|
assert response.status_code == 400, response.json()
|
|
assert response.json()["detail"] == "The user doesn't have enough privileges"
|
|
|
|
|
|
def test_data_consistency_after_update(client, active_user, logged_in_headers, super_user_headers):
|
|
user_id = active_user.id
|
|
update_data = UserUpdate(is_active=False)
|
|
|
|
response = client.patch(f"/api/v1/users/{user_id}", json=update_data.model_dump(), headers=super_user_headers)
|
|
assert response.status_code == 200, response.json()
|
|
|
|
# Fetch the updated user from the database
|
|
response = client.get("/api/v1/users/whoami", headers=logged_in_headers)
|
|
assert response.status_code == 401, response.json()
|
|
assert response.json()["detail"] == "Could not validate credentials"
|
|
|
|
|
|
def test_data_consistency_after_delete(client, test_user, super_user_headers):
|
|
user_id = test_user.get("id")
|
|
response = client.delete(f"/api/v1/users/{user_id}", headers=super_user_headers)
|
|
assert response.status_code == 200, response.json()
|
|
|
|
# Attempt to fetch the deleted user from the database
|
|
response = client.get("/api/v1/users", headers=super_user_headers)
|
|
assert response.status_code == 200
|
|
assert all(user["id"] != user_id for user in response.json()["users"])
|
|
|
|
|
|
def test_inactive_user(client):
|
|
# Create a user that is not active and has a last_login_at value
|
|
with session_getter(get_db_service()) as session:
|
|
user = User(
|
|
username="inactiveuser",
|
|
password=get_password_hash("testpassword"),
|
|
is_active=False,
|
|
last_login_at=datetime.now(),
|
|
)
|
|
session.add(user)
|
|
session.commit()
|
|
|
|
login_data = {"username": "inactiveuser", "password": "testpassword"}
|
|
response = client.post("/api/v1/login", data=login_data)
|
|
assert response.status_code == 400
|
|
assert response.json()["detail"] == "Inactive user"
|
|
|
|
|
|
def test_add_user(client, test_user):
|
|
assert test_user["username"] == "testuser"
|
|
|
|
|
|
# This is not used in the Frontend at the moment
|
|
# def test_read_current_user(client: TestClient, active_user):
|
|
# # First we need to login to get the access token
|
|
# login_data = {"username": "testuser", "password": "testpassword"}
|
|
# response = client.post("/api/v1/login", data=login_data)
|
|
# assert response.status_code == 200
|
|
|
|
# headers = {"Authorization": f"Bearer {response.json()['access_token']}"}
|
|
|
|
# response = client.get("/api/v1/user", headers=headers)
|
|
# assert response.status_code == 200, response.json()
|
|
# assert response.json()["username"] == "testuser"
|
|
|
|
|
|
def test_read_all_users(client, super_user_headers):
|
|
response = client.get("/api/v1/users", headers=super_user_headers)
|
|
assert response.status_code == 200, response.json()
|
|
assert isinstance(response.json()["users"], list)
|
|
|
|
|
|
def test_normal_user_cant_read_all_users(client, logged_in_headers):
|
|
response = client.get("/api/v1/users", headers=logged_in_headers)
|
|
assert response.status_code == 400, response.json()
|
|
assert response.json() == {"detail": "The user doesn't have enough privileges"}
|
|
|
|
|
|
def test_patch_user(client, active_user, logged_in_headers):
|
|
user_id = active_user.id
|
|
update_data = UserUpdate(
|
|
username="newname",
|
|
)
|
|
|
|
response = client.patch(f"/api/v1/users/{user_id}", json=update_data.model_dump(), headers=logged_in_headers)
|
|
assert response.status_code == 200, response.json()
|
|
update_data = UserUpdate(
|
|
profile_image="new_image",
|
|
)
|
|
|
|
response = client.patch(f"/api/v1/users/{user_id}", json=update_data.model_dump(), headers=logged_in_headers)
|
|
assert response.status_code == 200, response.json()
|
|
|
|
|
|
def test_patch_reset_password(client, active_user, logged_in_headers):
|
|
user_id = active_user.id
|
|
update_data = UserUpdate(
|
|
password="newpassword",
|
|
)
|
|
|
|
response = client.patch(
|
|
f"/api/v1/users/{user_id}/reset-password",
|
|
json=update_data.model_dump(),
|
|
headers=logged_in_headers,
|
|
)
|
|
assert response.status_code == 200, response.json()
|
|
# Now we need to test if the new password works
|
|
login_data = {"username": active_user.username, "password": "newpassword"}
|
|
response = client.post("/api/v1/login", data=login_data)
|
|
assert response.status_code == 200
|
|
|
|
|
|
def test_patch_user_wrong_id(client, active_user, logged_in_headers):
|
|
user_id = "wrong_id"
|
|
update_data = UserUpdate(
|
|
username="newname",
|
|
)
|
|
|
|
response = client.patch(f"/api/v1/users/{user_id}", json=update_data.model_dump(), headers=logged_in_headers)
|
|
assert response.status_code == 422, response.json()
|
|
json_response = response.json()
|
|
detail = json_response["detail"]
|
|
assert detail[0]["type"] == "uuid_parsing"
|
|
assert detail[0]["loc"] == ["path", "user_id"]
|
|
assert detail[0]["input"] == "wrong_id"
|
|
|
|
|
|
def test_delete_user(client, test_user, super_user_headers):
|
|
user_id = test_user["id"]
|
|
response = client.delete(f"/api/v1/users/{user_id}", headers=super_user_headers)
|
|
assert response.status_code == 200
|
|
assert response.json() == {"detail": "User deleted"}
|
|
|
|
|
|
def test_delete_user_wrong_id(client, test_user, super_user_headers):
|
|
user_id = "wrong_id"
|
|
response = client.delete(f"/api/v1/users/{user_id}", headers=super_user_headers)
|
|
assert response.status_code == 422
|
|
json_response = response.json()
|
|
detail = json_response["detail"]
|
|
assert detail[0]["type"] == "uuid_parsing"
|
|
assert detail[0]["loc"] == ["path", "user_id"]
|
|
assert detail[0]["input"] == "wrong_id"
|
|
|
|
|
|
def test_normal_user_cant_delete_user(client, test_user, logged_in_headers):
|
|
user_id = test_user["id"]
|
|
response = client.delete(f"/api/v1/users/{user_id}", headers=logged_in_headers)
|
|
assert response.status_code == 400
|
|
assert response.json() == {"detail": "The user doesn't have enough privileges"}
|