* Add options field to FIELD_FORMAT_ATTRIBUTES constant and import pathlib in test_initial_setup.py * Update TEXT_FILE_TYPES in utils.py and handle missing file path error in Vertex class * Fix tweak value assignment in process.py and clear session cache in test_process.py * New lock * Update repository URLs and fix file paths in code blocks * Fix data retrieval in test_pickle_graph and test_pickle_each_vertex in test_graph.py * Refactor load_starter_projects function to include type hinting in setup.py * Update name of Basic Prompting (Hello, world!) project to Basic Prompting (Hello, World) * Refactor Graph.process() method to accept start_component_id parameter * Refactor test_endpoints.py to use "Chat Output" instead of "Prompt Output" and "ChatOutput" instead of "TextOutput"
96 lines
3.7 KiB
Python
96 lines
3.7 KiB
Python
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
from langflow.graph.graph.base import Graph
|
|
from langflow.graph.schema import RunOutputs
|
|
from langflow.initial_setup.setup import (
|
|
STARTER_FOLDER_NAME,
|
|
create_or_update_starter_projects,
|
|
get_project_data,
|
|
load_starter_projects,
|
|
)
|
|
from langflow.memory import delete_messages
|
|
from langflow.processing.process import process_tweaks
|
|
from langflow.services.database.models.flow.model import Flow
|
|
from langflow.services.deps import session_scope
|
|
from sqlalchemy import func
|
|
from sqlmodel import select
|
|
|
|
|
|
def test_load_starter_projects():
|
|
projects = load_starter_projects()
|
|
assert isinstance(projects, list)
|
|
assert all(isinstance(project[1], dict) for project in projects)
|
|
assert all(isinstance(project[0], Path) for project in projects)
|
|
|
|
|
|
def test_get_project_data():
|
|
projects = load_starter_projects()
|
|
for _, project in projects:
|
|
(
|
|
project_name,
|
|
project_description,
|
|
project_is_component,
|
|
updated_at_datetime,
|
|
project_data,
|
|
project_icon,
|
|
project_icon_bg_color,
|
|
) = get_project_data(project)
|
|
assert isinstance(project_name, str)
|
|
assert isinstance(project_description, str)
|
|
assert isinstance(project_is_component, bool)
|
|
assert isinstance(updated_at_datetime, datetime)
|
|
assert isinstance(project_data, dict)
|
|
assert isinstance(project_icon, str) or project_icon is None
|
|
assert isinstance(project_icon_bg_color, str) or project_icon_bg_color is None
|
|
|
|
|
|
def test_create_or_update_starter_projects(client):
|
|
with session_scope() as session:
|
|
# Run the function to create or update projects
|
|
create_or_update_starter_projects()
|
|
|
|
# Get the number of projects returned by load_starter_projects
|
|
num_projects = len(load_starter_projects())
|
|
|
|
# Get the number of projects in the database
|
|
num_db_projects = session.exec(select(func.count(Flow.id)).where(Flow.folder == STARTER_FOLDER_NAME)).one()
|
|
|
|
# Check that the number of projects in the database is the same as the number of projects returned by load_starter_projects
|
|
assert num_db_projects == num_projects
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_starter_projects_can_run_successfully(client):
|
|
with session_scope() as session:
|
|
# Run the function to create or update projects
|
|
create_or_update_starter_projects()
|
|
|
|
# Get the number of projects returned by load_starter_projects
|
|
num_projects = len(load_starter_projects())
|
|
|
|
# Get the number of projects in the database
|
|
num_db_projects = session.exec(select(func.count(Flow.id)).where(Flow.folder == STARTER_FOLDER_NAME)).one()
|
|
|
|
# Check that the number of projects in the database is the same as the number of projects returned by load_starter_projects
|
|
assert num_db_projects == num_projects
|
|
|
|
# Get all the starter projects
|
|
projects = session.exec(select(Flow).where(Flow.folder == STARTER_FOLDER_NAME)).all()
|
|
graphs: list[tuple[str, Graph]] = []
|
|
for project in projects:
|
|
# Add tweaks to make file_path work
|
|
tweaks = {"path": __file__}
|
|
graph_data = process_tweaks(project.data, tweaks)
|
|
graph_object = Graph.from_payload(graph_data, flow_id=project.id)
|
|
graphs.append((project.name, graph_object))
|
|
assert len(graphs) == len(projects)
|
|
for name, graph in graphs:
|
|
outputs = await graph.arun(
|
|
inputs={},
|
|
outputs=[],
|
|
session_id="test",
|
|
)
|
|
assert all(isinstance(output, RunOutputs) for output in outputs), f"Project {name} error: {outputs}"
|
|
delete_messages(session_id="test")
|