Add first api tests

This commit is contained in:
Gabriel Luiz Freitas Almeida 2024-03-08 14:29:17 -03:00
commit b2c3b3b6d2
2 changed files with 117 additions and 244 deletions

View file

@ -10,35 +10,53 @@ import orjson
import pytest
from fastapi.testclient import TestClient
from httpx import AsyncClient
from sqlmodel import Session, SQLModel, create_engine, select
from sqlmodel.pool import StaticPool
from typer.testing import CliRunner
from langflow.graph.graph.base import Graph
from langflow.initial_setup.setup import STARTER_FOLDER_NAME
from langflow.services.auth.utils import get_password_hash
from langflow.services.database.models.api_key.model import ApiKey
from langflow.services.database.models.flow.model import Flow, FlowCreate
from langflow.services.database.models.user.model import User, UserCreate
from langflow.services.database.utils import session_getter
from langflow.services.deps import get_db_service
from sqlmodel import Session, SQLModel, create_engine
from sqlmodel.pool import StaticPool
from typer.testing import CliRunner
if TYPE_CHECKING:
from langflow.services.database.service import DatabaseService
def pytest_configure():
pytest.BASIC_EXAMPLE_PATH = Path(__file__).parent.absolute() / "data" / "basic_example.json"
pytest.COMPLEX_EXAMPLE_PATH = Path(__file__).parent.absolute() / "data" / "complex_example.json"
pytest.OPENAPI_EXAMPLE_PATH = Path(__file__).parent.absolute() / "data" / "Openapi.json"
pytest.GROUPED_CHAT_EXAMPLE_PATH = Path(__file__).parent.absolute() / "data" / "grouped_chat.json"
pytest.ONE_GROUPED_CHAT_EXAMPLE_PATH = Path(__file__).parent.absolute() / "data" / "one_group_chat.json"
pytest.VECTOR_STORE_GROUPED_EXAMPLE_PATH = Path(__file__).parent.absolute() / "data" / "vector_store_grouped.json"
pytest.BASIC_EXAMPLE_PATH = (
Path(__file__).parent.absolute() / "data" / "basic_example.json"
)
pytest.COMPLEX_EXAMPLE_PATH = (
Path(__file__).parent.absolute() / "data" / "complex_example.json"
)
pytest.OPENAPI_EXAMPLE_PATH = (
Path(__file__).parent.absolute() / "data" / "Openapi.json"
)
pytest.GROUPED_CHAT_EXAMPLE_PATH = (
Path(__file__).parent.absolute() / "data" / "grouped_chat.json"
)
pytest.ONE_GROUPED_CHAT_EXAMPLE_PATH = (
Path(__file__).parent.absolute() / "data" / "one_group_chat.json"
)
pytest.VECTOR_STORE_GROUPED_EXAMPLE_PATH = (
Path(__file__).parent.absolute() / "data" / "vector_store_grouped.json"
)
pytest.BASIC_CHAT_WITH_PROMPT_AND_HISTORY = (
Path(__file__).parent.absolute() / "data" / "BasicChatWithPromptAndHistory.json"
)
pytest.CHAT_INPUT = Path(__file__).parent.absolute() / "data" / "ChatInputTest.json"
pytest.TWO_OUTPUTS = Path(__file__).parent.absolute() / "data" / "TwoOutputsTest.json"
pytest.VECTOR_STORE_PATH = Path(__file__).parent.absolute() / "data" / "Vector_store.json"
pytest.TWO_OUTPUTS = (
Path(__file__).parent.absolute() / "data" / "TwoOutputsTest.json"
)
pytest.VECTOR_STORE_PATH = (
Path(__file__).parent.absolute() / "data" / "Vector_store.json"
)
pytest.CODE_WITH_SYNTAX_ERROR = """
def get_text():
retun "Hello World"
@ -49,7 +67,9 @@ def get_text():
def check_openai_api_key_in_environment_variables():
import os
assert os.environ.get("OPENAI_API_KEY") is not None, "OPENAI_API_KEY is not set in environment variables"
assert (
os.environ.get("OPENAI_API_KEY") is not None
), "OPENAI_API_KEY is not set in environment variables"
@pytest.fixture()
@ -63,7 +83,9 @@ async def async_client() -> AsyncGenerator:
@pytest.fixture(name="session")
def session_fixture():
engine = create_engine("sqlite://", connect_args={"check_same_thread": False}, poolclass=StaticPool)
engine = create_engine(
"sqlite://", connect_args={"check_same_thread": False}, poolclass=StaticPool
)
SQLModel.metadata.create_all(engine)
with Session(engine) as session:
yield session
@ -99,7 +121,9 @@ def distributed_client_fixture(session: Session, monkeypatch, distributed_env):
monkeypatch.setenv("LANGFLOW_AUTO_LOGIN", "false")
# monkeypatch langflow.services.task.manager.USE_CELERY to True
# monkeypatch.setattr(manager, "USE_CELERY", True)
monkeypatch.setattr(celery_app, "celery_app", celery_app.make_celery("langflow", Config))
monkeypatch.setattr(
celery_app, "celery_app", celery_app.make_celery("langflow", Config)
)
# def get_session_override():
# return session
@ -250,7 +274,11 @@ def active_user(client):
is_superuser=False,
)
# check if user exists
if active_user := session.query(User).filter(User.username == user.username).first():
if (
active_user := session.query(User)
.filter(User.username == user.username)
.first()
):
return active_user
session.add(user)
session.commit()
@ -273,7 +301,9 @@ def flow(client, json_flow: str, active_user):
from langflow.services.database.models.flow.model import FlowCreate
loaded_json = json.loads(json_flow)
flow_data = FlowCreate(name="test_flow", data=loaded_json.get("data"), user_id=active_user.id)
flow_data = FlowCreate(
name="test_flow", data=loaded_json.get("data"), user_id=active_user.id
)
flow = Flow.model_validate(flow_data)
with session_getter(get_db_service()) as session:
@ -297,7 +327,9 @@ def json_two_outputs():
@pytest.fixture
def added_flow_with_prompt_and_history(client, json_flow_with_prompt_and_history, logged_in_headers):
def added_flow_with_prompt_and_history(
client, json_flow_with_prompt_and_history, logged_in_headers
):
flow = orjson.loads(json_flow_with_prompt_and_history)
data = flow["data"]
flow = FlowCreate(name="Basic Chat", description="description", data=data)
@ -337,7 +369,9 @@ def added_vector_store(client, json_vector_store, logged_in_headers):
vector_store = orjson.loads(json_vector_store)
data = vector_store["data"]
vector_store = FlowCreate(name="Vector Store", description="description", data=data)
response = client.post("api/v1/flows/", json=vector_store.dict(), headers=logged_in_headers)
response = client.post(
"api/v1/flows/", json=vector_store.dict(), headers=logged_in_headers
)
assert response.status_code == 201
assert response.json()["name"] == vector_store.name
assert response.json()["data"] == vector_store.data
@ -355,9 +389,37 @@ def created_api_key(active_user):
)
db_manager = get_db_service()
with session_getter(db_manager) as session:
if existing_api_key := session.query(ApiKey).filter(ApiKey.api_key == api_key.api_key).first():
if (
existing_api_key := session.query(ApiKey)
.filter(ApiKey.api_key == api_key.api_key)
.first()
):
return existing_api_key
session.add(api_key)
session.commit()
session.refresh(api_key)
return api_key
@pytest.fixture(name="starter_project")
def get_starter_project(active_user):
# once the client is created, we can get the starter project
with session_getter(get_db_service()) as session:
flow = session.exec(
select(Flow).where(Flow.folder == STARTER_FOLDER_NAME)
).first()
if not flow:
raise ValueError("No starter project found")
new_flow_create = FlowCreate(
name=flow.name,
description=flow.description,
data=flow.data,
user_id=active_user.id,
)
new_flow = Flow.model_validate(new_flow_create, from_attributes=True)
session.add(new_flow)
session.commit()
session.refresh(new_flow)
new_flow_dict = new_flow.model_dump()
return new_flow_dict

View file

@ -1,6 +1,7 @@
import time
import pytest
from fastapi import status
from fastapi.testclient import TestClient
from langflow.interface.custom.directory_reader.directory_reader import DirectoryReader
@ -467,232 +468,42 @@ def test_build_vertex_invalid_vertex_id(
assert response.status_code == 500
# TODO: Add flow with new components
# def test_build_all_vertices_in_sequence_with_chat_input(
# client, added_flow_chat_input, logged_in_headers
# ):
# flow_id = added_flow_chat_input["id"]
# # First, get all the vertices in the correct sequence
# response = client.get(
# f"/api/v1/build/{flow_id}/vertices", headers=logged_in_headers
# )
# assert response.status_code == 200
# assert "ids" in response.json()
# vertex_ids = response.json()["ids"]
# # Now, iterate through each vertex and build it
# for layer in vertex_ids:
# for vertex_id in layer:
# response = client.post(
# f"/api/v1/build/{flow_id}/vertices/{vertex_id}",
# headers=logged_in_headers,
# )
# json_response = response.json()
# assert (
# response.status_code == 200
# ), f"Failed at vertex {vertex_id}: {json_response}"
# assert "valid" in json_response
# assert json_response["valid"], json_response["params"]
def test_successful_run(client, starter_project, created_api_key):
headers = {"x-api-key": created_api_key.api_key}
flow_id = starter_project["id"]
response = client.post(f"/api/v1/run/{flow_id}", headers=headers)
assert response.status_code == status.HTTP_200_OK, response.text
# Add more assertions here to validate the response content
json_response = response.json()
assert "outputs" in json_response
outer_outputs = json_response["outputs"]
assert len(outer_outputs) == 1
outputs = outer_outputs[0]
assert len(outputs) == 2
keys = ["results", "artifacts", "messages"]
for output in outputs:
assert all(key in output for key in keys)
output = outputs[0]
result = output["results"]["result"]
assert result == "Write a press release \n\n- Cars\n- Bottle\n\n\nAnswer:\n\n"
assert "session_id" in json_response
# def test_build_all_vertices_in_sequence_with_two_outputs(
# client, added_flow_two_outputs, logged_in_headers
# ):
# """This tests the case where a node has two outputs, one of which is Text and the other (in this case) is
# a LLMChain. We need to make sure the correct output is passed in both cases."""
# flow_id = added_flow_two_outputs["id"]
# # First, get all the vertices in the correct sequence
# response = client.get(
# f"/api/v1/build/{flow_id}/vertices", headers=logged_in_headers
# )
# assert response.status_code == 200
# assert "ids" in response.json()
# vertex_ids = response.json()["ids"]
# # Now, iterate through each vertex and build it
# for vertex_id in vertex_ids:
# response = client.post(
# f"/api/v1/build/{flow_id}/vertices/{vertex_id}", headers=logged_in_headers
# )
# json_response = response.json()
# assert (
# response.status_code == 200
# ), f"Failed at vertex {vertex_id}: {json_response}"
# assert "valid" in json_response
# assert json_response["valid"], json_response["params"]
def test_run_with_inputs_and_outputs(client, starter_project, created_api_key):
headers = {"x-api-key": created_api_key.api_key}
flow_id = starter_project["id"]
payload = {
"inputs": [{"components": ["component1"], "input_value": "value1"}],
"outputs": ["Component Name", "component_id"],
}
response = client.post(f"/api/v1/run/{flow_id}", json=payload, headers=headers)
assert response.status_code == status.HTTP_200_OK
# Validate the response structure and content
# def test_basic_chat_in_process(client, flow, created_api_key):
# # Run the /api/v1/process/{flow_id} endpoint
# headers = {"x-api-key": created_api_key.api_key}
# post_data = {"inputs": {"text": "Hi, My name is Gabriel"}}
# response = client.post(
# f"api/v1/process/{flow.get('id')}",
# headers=headers,
# json=post_data,
# )
# assert response.status_code == 200, response.json()
# # Check the response
# assert "Gabriel" in response.json()["result"]["text"]
# # session_id should be returned
# assert "session_id" in response.json()
# assert response.json()["session_id"] is not None
# # New request with the same session_id
# # asking "What is my name?" should return "Gabriel"
# post_data = {
# "inputs": {"text": "What is my name?"},
# "session_id": response.json()["session_id"],
# }
# response = client.post(
# f"api/v1/process/{flow.get('id')}",
# headers=headers,
# json=post_data,
# )
# assert response.status_code == 200, response.json()
# assert "Gabriel" in response.json()["result"]["text"]
# def test_basic_chat_different_session_ids(client, flow, created_api_key):
# # Run the /api/v1/process/{flow_id} endpoint
# headers = {"x-api-key": created_api_key.api_key}
# post_data = {"inputs": {"text": "Hi, My name is Gabriel"}}
# response = client.post(
# f"api/v1/process/{flow.get('id')}",
# headers=headers,
# json=post_data,
# )
# assert response.status_code == 200, response.json()
# # Check the response
# assert "Gabriel" in response.json()["result"]["text"]
# # session_id should be returned
# assert "session_id" in response.json()
# assert response.json()["session_id"] is not None
# session_id1 = response.json()["session_id"]
# # New request with a different session_id
# # asking "What is my name?" should return "Gabriel"
# post_data = {
# "inputs": {"text": "What is my name?"},
# }
# response = client.post(
# f"api/v1/process/{flow.get('id')}",
# headers=headers,
# json=post_data,
# )
# assert response.status_code == 200, response.json()
# assert "Gabriel" not in response.json()["result"]["text"]
# assert session_id1 != response.json()["session_id"]
# def test_basic_chat_with_two_session_ids_and_names(client, flow, created_api_key):
# headers = {"x-api-key": created_api_key.api_key}
# flow_id = flow.get("id")
# names = ["Gabriel", "John"]
# session_ids = []
# for name in names:
# post_data = {"inputs": {"text": f"Hi, My name is {name}"}}
# response_json = run_post(client, flow_id, headers, post_data)
# assert name in response_json["result"]["text"]
# assert "session_id" in response_json
# assert response_json["session_id"] is not None
# session_ids.append(response_json["session_id"])
# for i, name in enumerate(names):
# post_data = {
# "inputs": {"text": "What is my name?"},
# "session_id": session_ids[i],
# }
# response_json = run_post(client, flow_id, headers, post_data)
# assert name in response_json["result"]["text"]
# @pytest.mark.async_test
# def test_vector_store_in_process(
# distributed_client, added_vector_store, created_api_key
# ):
# # Run the /api/v1/process/{flow_id} endpoint
# headers = {"x-api-key": created_api_key.api_key}
# post_data = {"inputs": {"input": "What is Langflow?"}}
# response = distributed_client.post(
# f"api/v1/process/{added_vector_store.get('id')}",
# headers=headers,
# json=post_data,
# )
# assert response.status_code == 200, response.json()
# # Check the response
# assert "Langflow" in response.json()["result"]["output"]
# # session_id should be returned
# assert "session_id" in response.json()
# assert response.json()["session_id"] is not None
# Test function without loop
# @pytest.mark.async_test
# def test_async_task_processing(distributed_client, flow, created_api_key):
# headers = {"x-api-key": created_api_key.api_key}
# post_data = {"inputs": {"text": "Hi, My name is Gabriel"}}
# flow = flow.model_dump()
# # Run the /api/v1/process/{flow_id} endpoint with sync=False
# response = distributed_client.post(
# f"api/v1/process/{flow.get('id')}",
# headers=headers,
# json={**post_data, "sync": False},
# )
# assert response.status_code == 200, response.json()
# # Extract the task ID from the response
# task = response.json().get("task")
# task_id = task.get("id")
# task_href = task.get("href")
# assert task_id is not None
# assert task_href is not None
# assert task_href == f"api/v1/task/{task_id}"
# # Polling the task status using the helper function
# task_status_json = poll_task_status(distributed_client, headers, task_href)
# assert task_status_json is not None, "Task did not complete in time"
# # Validate that the task completed successfully and the result is as expected
# assert "result" in task_status_json, task_status_json
# assert "text" in task_status_json["result"], task_status_json["result"]
# assert "Gabriel" in task_status_json["result"]["text"], task_status_json["result"]
# ! Deactivating this until updating the test
# Test function without loop
# @pytest.mark.async_test
# def test_async_task_processing_vector_store(client, added_vector_store, created_api_key):
# headers = {"x-api-key": created_api_key.api_key}
# post_data = {"inputs": {"input": "How do I upload examples?"}}
# # Run the /api/v1/process/{flow_id} endpoint with sync=False
# response = client.post(
# f"api/v1/process/{added_vector_store.get('id')}",
# headers=headers,
# json={**post_data, "sync": False},
# )
# assert response.status_code == 200, response.json()
# assert "result" in response.json()
# assert "FAILURE" not in response.json()["result"]
# # Extract the task ID from the response
# task = response.json().get("task")
# task_id = task.get("id")
# task_href = task.get("href")
# assert task_id is not None
# assert task_href is not None
# assert task_href == f"api/v1/task/{task_id}"
# # Polling the task status using the helper function
# task_status_json = poll_task_status(client, headers, task_href)
# assert task_status_json is not None, "Task did not complete in time"
# # Validate that the task completed successfully and the result is as expected
# assert "result" in task_status_json, task_status_json
# assert "output" in task_status_json["result"], task_status_json["result"]
# assert "Langflow" in task_status_json["result"]["output"], task_status_json["result"]
def test_invalid_flow_id(client, created_api_key):
headers = {"x-api-key": created_api_key.api_key}
flow_id = "invalid-flow-id"
response = client.post(f"/api/v1/run/{flow_id}", headers=headers)
assert response.status_code == status.HTTP_404_NOT_FOUND
# Check if the error detail is as expected