* refactor: move tests folder to src/backend * chore(Makefile): update pytest commands to run tests from the correct directory paths for unit and integration tests * refactor: update file path in test_custom_component.py The file path in the test_custom_component.py file has been updated to use the correct relative path to the component_multiple_outputs.py file. This change ensures that the test code can access the correct file and improves the reliability of the test.
232 lines
8.5 KiB
Python
232 lines
8.5 KiB
Python
from datetime import datetime
|
|
|
|
import pytest
|
|
|
|
from langflow.services.auth.utils import create_super_user, get_password_hash
|
|
from langflow.services.database.models.user import UserUpdate
|
|
from langflow.services.database.models.user.model import User
|
|
from langflow.services.database.utils import session_getter
|
|
from langflow.services.deps import get_db_service, get_settings_service
|
|
|
|
|
|
@pytest.fixture
|
|
def super_user(client):
|
|
settings_manager = get_settings_service()
|
|
auth_settings = settings_manager.auth_settings
|
|
with session_getter(get_db_service()) as session:
|
|
return create_super_user(
|
|
db=session,
|
|
username=auth_settings.SUPERUSER,
|
|
password=auth_settings.SUPERUSER_PASSWORD,
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def super_user_headers(client, super_user):
|
|
settings_service = get_settings_service()
|
|
auth_settings = settings_service.auth_settings
|
|
login_data = {
|
|
"username": auth_settings.SUPERUSER,
|
|
"password": auth_settings.SUPERUSER_PASSWORD,
|
|
}
|
|
response = client.post("/api/v1/login", data=login_data)
|
|
assert response.status_code == 200
|
|
tokens = response.json()
|
|
a_token = tokens["access_token"]
|
|
return {"Authorization": f"Bearer {a_token}"}
|
|
|
|
|
|
@pytest.fixture
|
|
def deactivated_user():
|
|
with session_getter(get_db_service()) as session:
|
|
user = User(
|
|
username="deactivateduser",
|
|
password=get_password_hash("testpassword"),
|
|
is_active=False,
|
|
is_superuser=False,
|
|
last_login_at=datetime.now(),
|
|
)
|
|
session.add(user)
|
|
session.commit()
|
|
session.refresh(user)
|
|
return user
|
|
|
|
|
|
def test_user_waiting_for_approval(
|
|
client,
|
|
):
|
|
# Create a user that is not active and has never logged in
|
|
with session_getter(get_db_service()) as session:
|
|
user = User(
|
|
username="waitingforapproval",
|
|
password=get_password_hash("testpassword"),
|
|
is_active=False,
|
|
last_login_at=None,
|
|
)
|
|
session.add(user)
|
|
session.commit()
|
|
|
|
login_data = {"username": "waitingforapproval", "password": "testpassword"}
|
|
response = client.post("/api/v1/login", data=login_data)
|
|
assert response.status_code == 400
|
|
assert response.json()["detail"] == "Waiting for approval"
|
|
|
|
|
|
def test_deactivated_user_cannot_login(client, deactivated_user):
|
|
login_data = {"username": deactivated_user.username, "password": "testpassword"}
|
|
response = client.post("/api/v1/login", data=login_data)
|
|
assert response.status_code == 401, response.json()
|
|
assert response.json()["detail"] == "Inactive user", response.text
|
|
|
|
|
|
def test_deactivated_user_cannot_access(client, deactivated_user, logged_in_headers):
|
|
# Assuming the headers for deactivated_user
|
|
response = client.get("/api/v1/users", headers=logged_in_headers)
|
|
assert response.status_code == 403, response.json()
|
|
assert response.json()["detail"] == "The user doesn't have enough privileges", response.text
|
|
|
|
|
|
def test_data_consistency_after_update(client, active_user, logged_in_headers, super_user_headers):
|
|
user_id = active_user.id
|
|
update_data = UserUpdate(is_active=False)
|
|
|
|
response = client.patch(f"/api/v1/users/{user_id}", json=update_data.model_dump(), headers=super_user_headers)
|
|
assert response.status_code == 200, response.json()
|
|
|
|
# Fetch the updated user from the database
|
|
response = client.get("/api/v1/users/whoami", headers=logged_in_headers)
|
|
assert response.status_code == 401, response.json()
|
|
assert response.json()["detail"] == "User not found or is inactive."
|
|
|
|
|
|
def test_data_consistency_after_delete(client, test_user, super_user_headers):
|
|
user_id = test_user.get("id")
|
|
response = client.delete(f"/api/v1/users/{user_id}", headers=super_user_headers)
|
|
assert response.status_code == 200, response.json()
|
|
|
|
# Attempt to fetch the deleted user from the database
|
|
response = client.get("/api/v1/users", headers=super_user_headers)
|
|
assert response.status_code == 200
|
|
assert all(user["id"] != user_id for user in response.json()["users"])
|
|
|
|
|
|
def test_inactive_user(client):
|
|
# Create a user that is not active and has a last_login_at value
|
|
with session_getter(get_db_service()) as session:
|
|
user = User(
|
|
username="inactiveuser",
|
|
password=get_password_hash("testpassword"),
|
|
is_active=False,
|
|
last_login_at=datetime(2023, 1, 1, 0, 0, 0),
|
|
)
|
|
session.add(user)
|
|
session.commit()
|
|
|
|
login_data = {"username": "inactiveuser", "password": "testpassword"}
|
|
response = client.post("/api/v1/login", data=login_data)
|
|
assert response.status_code == 401
|
|
assert response.json()["detail"] == "Inactive user"
|
|
|
|
|
|
def test_add_user(client, test_user):
|
|
assert test_user["username"] == "testuser"
|
|
|
|
|
|
# This is not used in the Frontend at the moment
|
|
# def test_read_current_user(client: TestClient, active_user):
|
|
# # First we need to login to get the access token
|
|
# login_data = {"username": "testuser", "password": "testpassword"}
|
|
# response = client.post("/api/v1/login", data=login_data)
|
|
# assert response.status_code == 200
|
|
|
|
# headers = {"Authorization": f"Bearer {response.json()['access_token']}"}
|
|
|
|
# response = client.get("/api/v1/user", headers=headers)
|
|
# assert response.status_code == 200, response.json()
|
|
# assert response.json()["username"] == "testuser"
|
|
|
|
|
|
def test_read_all_users(client, super_user_headers):
|
|
response = client.get("/api/v1/users", headers=super_user_headers)
|
|
assert response.status_code == 200, response.json()
|
|
assert isinstance(response.json()["users"], list)
|
|
|
|
|
|
def test_normal_user_cant_read_all_users(client, logged_in_headers):
|
|
response = client.get("/api/v1/users", headers=logged_in_headers)
|
|
assert response.status_code == 403, response.json()
|
|
assert response.json() == {"detail": "The user doesn't have enough privileges"}
|
|
|
|
|
|
def test_patch_user(client, active_user, logged_in_headers):
|
|
user_id = active_user.id
|
|
update_data = UserUpdate(
|
|
username="newname",
|
|
)
|
|
|
|
response = client.patch(f"/api/v1/users/{user_id}", json=update_data.model_dump(), headers=logged_in_headers)
|
|
assert response.status_code == 200, response.json()
|
|
update_data = UserUpdate(
|
|
profile_image="new_image",
|
|
)
|
|
|
|
response = client.patch(f"/api/v1/users/{user_id}", json=update_data.model_dump(), headers=logged_in_headers)
|
|
assert response.status_code == 200, response.json()
|
|
|
|
|
|
def test_patch_reset_password(client, active_user, logged_in_headers):
|
|
user_id = active_user.id
|
|
update_data = UserUpdate(
|
|
password="newpassword",
|
|
)
|
|
|
|
response = client.patch(
|
|
f"/api/v1/users/{user_id}/reset-password",
|
|
json=update_data.model_dump(),
|
|
headers=logged_in_headers,
|
|
)
|
|
assert response.status_code == 200, response.json()
|
|
# Now we need to test if the new password works
|
|
login_data = {"username": active_user.username, "password": "newpassword"}
|
|
response = client.post("/api/v1/login", data=login_data)
|
|
assert response.status_code == 200
|
|
|
|
|
|
def test_patch_user_wrong_id(client, active_user, logged_in_headers):
|
|
user_id = "wrong_id"
|
|
update_data = UserUpdate(
|
|
username="newname",
|
|
)
|
|
|
|
response = client.patch(f"/api/v1/users/{user_id}", json=update_data.model_dump(), headers=logged_in_headers)
|
|
assert response.status_code == 422, response.json()
|
|
json_response = response.json()
|
|
detail = json_response["detail"]
|
|
error = detail[0]
|
|
assert error["loc"] == ["path", "user_id"]
|
|
assert error["type"] == "uuid_parsing"
|
|
|
|
|
|
def test_delete_user(client, test_user, super_user_headers):
|
|
user_id = test_user["id"]
|
|
response = client.delete(f"/api/v1/users/{user_id}", headers=super_user_headers)
|
|
assert response.status_code == 200
|
|
assert response.json() == {"detail": "User deleted"}
|
|
|
|
|
|
def test_delete_user_wrong_id(client, test_user, super_user_headers):
|
|
user_id = "wrong_id"
|
|
response = client.delete(f"/api/v1/users/{user_id}", headers=super_user_headers)
|
|
assert response.status_code == 422
|
|
json_response = response.json()
|
|
detail = json_response["detail"]
|
|
error = detail[0]
|
|
assert error["loc"] == ["path", "user_id"]
|
|
assert error["type"] == "uuid_parsing"
|
|
|
|
|
|
def test_normal_user_cant_delete_user(client, test_user, logged_in_headers):
|
|
user_id = test_user["id"]
|
|
response = client.delete(f"/api/v1/users/{user_id}", headers=logged_in_headers)
|
|
assert response.status_code == 403
|
|
assert response.json() == {"detail": "The user doesn't have enough privileges"}
|