langflow/tests/test_initial_setup.py
Gabriel Luiz Freitas Almeida 836ac08d80
Update datetime handling in ApiKey and Variable models (#1673)
* Update datetime handling in ApiKey and Variable models

* Refactor test_login_successful function in test_login.py

* Fix nullable attribute for created_at and updated_at fields in Variable model
2024-04-10 23:04:10 -03:00

92 lines
3.6 KiB
Python

from datetime import datetime
from pathlib import Path
from langflow.initial_setup.setup import (
STARTER_FOLDER_NAME,
create_or_update_starter_projects,
get_project_data,
load_starter_projects,
)
from langflow.services.database.models.flow.model import Flow
from langflow.services.deps import session_scope
from sqlalchemy import func
from sqlmodel import select
def test_load_starter_projects():
projects = load_starter_projects()
assert isinstance(projects, list)
assert all(isinstance(project[1], dict) for project in projects)
assert all(isinstance(project[0], Path) for project in projects)
def test_get_project_data():
projects = load_starter_projects()
for _, project in projects:
(
project_name,
project_description,
project_is_component,
updated_at_datetime,
project_data,
project_icon,
project_icon_bg_color,
) = get_project_data(project)
assert isinstance(project_name, str)
assert isinstance(project_description, str)
assert isinstance(project_is_component, bool)
assert isinstance(updated_at_datetime, datetime)
assert isinstance(project_data, dict)
assert isinstance(project_icon, str) or project_icon is None
assert isinstance(project_icon_bg_color, str) or project_icon_bg_color is None
def test_create_or_update_starter_projects(client):
with session_scope() as session:
# Run the function to create or update projects
create_or_update_starter_projects()
# Get the number of projects returned by load_starter_projects
num_projects = len(load_starter_projects())
# Get the number of projects in the database
num_db_projects = session.exec(select(func.count(Flow.id)).where(Flow.folder == STARTER_FOLDER_NAME)).one()
# Check that the number of projects in the database is the same as the number of projects returned by load_starter_projects
assert num_db_projects == num_projects
# Some starter projects require integration
# @pytest.mark.asyncio
# async def test_starter_projects_can_run_successfully(client):
# with session_scope() as session:
# # Run the function to create or update projects
# create_or_update_starter_projects()
# # Get the number of projects returned by load_starter_projects
# num_projects = len(load_starter_projects())
# # Get the number of projects in the database
# num_db_projects = session.exec(select(func.count(Flow.id)).where(Flow.folder == STARTER_FOLDER_NAME)).one()
# # Check that the number of projects in the database is the same as the number of projects returned by load_starter_projects
# assert num_db_projects == num_projects
# # Get all the starter projects
# projects = session.exec(select(Flow).where(Flow.folder == STARTER_FOLDER_NAME)).all()
# graphs: list[tuple[str, Graph]] = []
# for project in projects:
# # Add tweaks to make file_path work
# tweaks = {"path": __file__}
# graph_data = process_tweaks(project.data, tweaks)
# graph_object = Graph.from_payload(graph_data, flow_id=project.id)
# graphs.append((project.name, graph_object))
# assert len(graphs) == len(projects)
# for name, graph in graphs:
# outputs = await graph.arun(
# inputs={},
# outputs=[],
# session_id="test",
# )
# assert all(isinstance(output, RunOutputs) for output in outputs), f"Project {name} error: {outputs}"
# delete_messages(session_id="test")