* refactor: turn sorting methods into functions in a separate module - Added `layered_topological_sort` function to perform layered topological sorting of graph vertices, accommodating cycles and input vertex checks. - Introduced `refine_layers` function to ensure proper dependency ordering among vertices. - Implemented helper functions for sorting layers by dependency and filtering vertices based on predecessors. - Enhanced utility functions to support better graph traversal and layer management. This update improves the graph processing capabilities, allowing for more efficient handling of complex graph structures. * feat(tests): enhance graph utility tests with cycle detection and sorting functionality - Added a new fixture `graph_with_loop` to simulate a graph containing cycles for testing purposes. - Improved the `test_large_graph_efficiency` to validate cycle detection in large graphs. - Introduced multiple tests for sorting vertices in graphs with cycles, ensuring correct order and handling of input vertices. - Enhanced assertions to provide clearer error messages for failed tests, improving debugging experience. These changes strengthen the testing framework for graph utilities, ensuring robust handling of complex graph structures. * refactor(graph): remove unused parent_node_map from Graph class initialization - Eliminated the `parent_node_map` parameter from the Graph class constructor, streamlining the graph initialization process. - This change enhances code clarity and reduces unnecessary complexity in graph management. This update contributes to cleaner and more maintainable graph-related code. * refactor(graph): optimize dependency sorting and vertex filtering - Improved the `_max_dependency_index` function by utilizing `index_map.get()` for cleaner code and better handling of missing successors. - Enhanced the `_sort_single_layer_by_dependency` function with a caching mechanism to avoid redundant calculations, improving performance during vertex sorting. - Updated `filter_vertices_up_to_vertex` to use a set for `vertices_ids`, optimizing membership checks and enhancing efficiency in vertex filtering. These changes contribute to more efficient graph processing and improved code readability. * chore: remove unused 'parent_node_map' parameter * [autofix.ci] apply automated fixes * fix: replace old method call with a new func * test: enhance assertions for file existence in webhook tests * refactor(graph): enhance component ID retrieval and chat input sorting - Updated `find_start_component_id` to accept an optional `is_webhook` parameter, allowing for dynamic priority input selection based on the flow type. - Improved `sort_chat_inputs_first` to handle chat input positioning more efficiently, ensuring only one chat input exists and adjusting its position within the layers as needed. - These changes enhance the flexibility and efficiency of graph processing, particularly for webhook flows. * test(graph): update assertions in sort_chat_inputs_first test for accuracy - Modified assertions in the `test_chat_inputs_at_start` function to reflect the correct expected output of the `sort_chat_inputs_first` utility. - Adjusted the expected length and order of the result to ensure accurate validation of chat input sorting functionality. These changes enhance the reliability of the test suite for graph utilities, ensuring that the sorting logic is correctly validated. * test(chat): update assertion in consume_and_assert_stream for accurate ID validation - Modified the assertion in the `consume_and_assert_stream` function to include an additional expected ID, ensuring the test accurately reflects the current output of the chat endpoint. - This change enhances the reliability of the test suite by validating the correct behavior of the chat input sorting functionality. * test(endpoints): update assertion in test_get_vertices for accurate ID validation - Modified the assertion in the `test_get_vertices` function to include an additional expected ID, "Webhook", alongside "ChatInput". - This change ensures the test accurately reflects the current output of the endpoint, enhancing the reliability of the test suite for endpoint functionality. --------- Co-authored-by: italojohnny <italojohnnydosanjos@gmail.com> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
531 lines
21 KiB
Python
531 lines
21 KiB
Python
import asyncio
|
|
from uuid import UUID, uuid4
|
|
|
|
import pytest
|
|
from fastapi import status
|
|
from httpx import AsyncClient
|
|
from langflow.custom.directory_reader.directory_reader import DirectoryReader
|
|
from langflow.services.deps import get_settings_service
|
|
|
|
|
|
async def run_post(client, flow_id, headers, post_data):
|
|
response = await client.post(
|
|
f"api/v1/process/{flow_id}",
|
|
headers=headers,
|
|
json=post_data,
|
|
)
|
|
assert response.status_code == 200, response.json()
|
|
return response.json()
|
|
|
|
|
|
# Helper function to poll task status
|
|
async def poll_task_status(client, headers, href, max_attempts=20, sleep_time=1):
|
|
for _ in range(max_attempts):
|
|
task_status_response = await client.get(
|
|
href,
|
|
headers=headers,
|
|
)
|
|
if task_status_response.status_code == 200 and task_status_response.json()["status"] == "SUCCESS":
|
|
return task_status_response.json()
|
|
await asyncio.sleep(sleep_time)
|
|
return None # Return None if task did not complete in time
|
|
|
|
|
|
PROMPT_REQUEST = {
|
|
"name": "string",
|
|
"template": "string",
|
|
"frontend_node": {
|
|
"template": {},
|
|
"description": "string",
|
|
"base_classes": ["string"],
|
|
"name": "",
|
|
"display_name": "",
|
|
"documentation": "",
|
|
"custom_fields": {},
|
|
"output_types": [],
|
|
"field_formatters": {
|
|
"formatters": {"openai_api_key": {}},
|
|
"base_formatters": {
|
|
"kwargs": {},
|
|
"optional": {},
|
|
"list": {},
|
|
"dict": {},
|
|
"union": {},
|
|
"multiline": {},
|
|
"show": {},
|
|
"password": {},
|
|
"default": {},
|
|
"headers": {},
|
|
"dict_code_file": {},
|
|
"model_fields": {
|
|
"MODEL_DICT": {
|
|
"OpenAI": [
|
|
"text-davinci-003",
|
|
"text-davinci-002",
|
|
"text-curie-001",
|
|
"text-babbage-001",
|
|
"text-ada-001",
|
|
],
|
|
"ChatOpenAI": [
|
|
"gpt-4-turbo-preview",
|
|
"gpt-4-0125-preview",
|
|
"gpt-4-1106-preview",
|
|
"gpt-4-vision-preview",
|
|
"gpt-3.5-turbo-0125",
|
|
"gpt-3.5-turbo-1106",
|
|
],
|
|
"Anthropic": [
|
|
"claude-v1",
|
|
"claude-v1-100k",
|
|
"claude-instant-v1",
|
|
"claude-instant-v1-100k",
|
|
"claude-v1.3",
|
|
"claude-v1.3-100k",
|
|
"claude-v1.2",
|
|
"claude-v1.0",
|
|
"claude-instant-v1.1",
|
|
"claude-instant-v1.1-100k",
|
|
"claude-instant-v1.0",
|
|
],
|
|
"ChatAnthropic": [
|
|
"claude-v1",
|
|
"claude-v1-100k",
|
|
"claude-instant-v1",
|
|
"claude-instant-v1-100k",
|
|
"claude-v1.3",
|
|
"claude-v1.3-100k",
|
|
"claude-v1.2",
|
|
"claude-v1.0",
|
|
"claude-instant-v1.1",
|
|
"claude-instant-v1.1-100k",
|
|
"claude-instant-v1.0",
|
|
],
|
|
}
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
|
|
@pytest.mark.benchmark
|
|
async def test_get_all(client: AsyncClient, logged_in_headers):
|
|
response = await client.get("api/v1/all", headers=logged_in_headers)
|
|
assert response.status_code == 200
|
|
settings = get_settings_service().settings
|
|
dir_reader = DirectoryReader(settings.components_path[0])
|
|
files = dir_reader.get_files()
|
|
# json_response is a dict of dicts
|
|
all_names = [component_name for _, components in response.json().items() for component_name in components]
|
|
json_response = response.json()
|
|
# We need to test the custom nodes
|
|
assert len(all_names) <= len(
|
|
files
|
|
) # Less or equal because we might have some files that don't have the dependencies installed
|
|
assert "ChatInput" in json_response["inputs"]
|
|
assert "Prompt" in json_response["prompts"]
|
|
assert "ChatOutput" in json_response["outputs"]
|
|
|
|
|
|
async def test_post_validate_code(client: AsyncClient):
|
|
# Test case with a valid import and function
|
|
code1 = """
|
|
import math
|
|
|
|
def square(x):
|
|
return x ** 2
|
|
"""
|
|
response1 = await client.post("api/v1/validate/code", json={"code": code1})
|
|
assert response1.status_code == 200
|
|
assert response1.json() == {"imports": {"errors": []}, "function": {"errors": []}}
|
|
|
|
# Test case with an invalid import and valid function
|
|
code2 = """
|
|
import non_existent_module
|
|
|
|
def square(x):
|
|
return x ** 2
|
|
"""
|
|
response2 = await client.post("api/v1/validate/code", json={"code": code2})
|
|
assert response2.status_code == 200
|
|
assert response2.json() == {
|
|
"imports": {"errors": ["No module named 'non_existent_module'"]},
|
|
"function": {"errors": []},
|
|
}
|
|
|
|
# Test case with a valid import and invalid function syntax
|
|
code3 = """
|
|
import math
|
|
|
|
def square(x)
|
|
return x ** 2
|
|
"""
|
|
response3 = await client.post("api/v1/validate/code", json={"code": code3})
|
|
assert response3.status_code == 200
|
|
assert response3.json() == {
|
|
"imports": {"errors": []},
|
|
"function": {"errors": ["expected ':' (<unknown>, line 4)"]},
|
|
}
|
|
|
|
# Test case with invalid JSON payload
|
|
response4 = await client.post("api/v1/validate/code", json={"invalid_key": code1})
|
|
assert response4.status_code == 422
|
|
|
|
# Test case with an empty code string
|
|
response5 = await client.post("api/v1/validate/code", json={"code": ""})
|
|
assert response5.status_code == 200
|
|
assert response5.json() == {"imports": {"errors": []}, "function": {"errors": []}}
|
|
|
|
# Test case with a syntax error in the code
|
|
code6 = """
|
|
import math
|
|
|
|
def square(x)
|
|
return x ** 2
|
|
"""
|
|
response6 = await client.post("api/v1/validate/code", json={"code": code6})
|
|
assert response6.status_code == 200
|
|
assert response6.json() == {
|
|
"imports": {"errors": []},
|
|
"function": {"errors": ["expected ':' (<unknown>, line 4)"]},
|
|
}
|
|
|
|
|
|
VALID_PROMPT = """
|
|
I want you to act as a naming consultant for new companies.
|
|
|
|
Here are some examples of good company names:
|
|
|
|
- search engine, Google
|
|
- social media, Facebook
|
|
- video sharing, YouTube
|
|
|
|
The name should be short, catchy and easy to remember.
|
|
|
|
What is a good name for a company that makes {product}?
|
|
"""
|
|
|
|
INVALID_PROMPT = "This is an invalid prompt without any input variable."
|
|
|
|
|
|
async def test_valid_prompt(client: AsyncClient):
|
|
PROMPT_REQUEST["template"] = VALID_PROMPT
|
|
response = await client.post("api/v1/validate/prompt", json=PROMPT_REQUEST)
|
|
assert response.status_code == 200
|
|
assert response.json()["input_variables"] == ["product"]
|
|
|
|
|
|
async def test_invalid_prompt(client: AsyncClient):
|
|
PROMPT_REQUEST["template"] = INVALID_PROMPT
|
|
response = await client.post(
|
|
"api/v1/validate/prompt",
|
|
json=PROMPT_REQUEST,
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json()["input_variables"] == []
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
("prompt", "expected_input_variables"),
|
|
[
|
|
("{color} is my favorite color.", ["color"]),
|
|
("The weather is {weather} today.", ["weather"]),
|
|
("This prompt has no variables.", []),
|
|
("{a}, {b}, and {c} are variables.", ["a", "b", "c"]),
|
|
],
|
|
)
|
|
async def test_various_prompts(client, prompt, expected_input_variables):
|
|
PROMPT_REQUEST["template"] = prompt
|
|
response = await client.post("api/v1/validate/prompt", json=PROMPT_REQUEST)
|
|
assert response.status_code == 200
|
|
assert response.json()["input_variables"] == expected_input_variables
|
|
|
|
|
|
async def test_get_vertices_flow_not_found(client, logged_in_headers):
|
|
uuid = uuid4()
|
|
response = await client.post(f"/api/v1/build/{uuid}/vertices", headers=logged_in_headers)
|
|
assert response.status_code == 500
|
|
|
|
|
|
async def test_get_vertices(client, added_flow_webhook_test, logged_in_headers):
|
|
flow_id = added_flow_webhook_test["id"]
|
|
response = await client.post(f"/api/v1/build/{flow_id}/vertices", headers=logged_in_headers)
|
|
assert response.status_code == 200
|
|
assert "ids" in response.json()
|
|
# The response should contain the list in this order
|
|
# ['ConversationBufferMemory-Lu2Nb', 'PromptTemplate-5Q0W8', 'ChatOpenAI-vy7fV', 'LLMChain-UjBh1']
|
|
# The important part is before the - (ConversationBufferMemory, PromptTemplate, ChatOpenAI, LLMChain)
|
|
ids = [_id.split("-")[0] for _id in response.json()["ids"]]
|
|
|
|
assert set(ids) == {"ChatInput", "Webhook"}
|
|
|
|
|
|
async def test_build_vertex_invalid_flow_id(client, logged_in_headers):
|
|
uuid = uuid4()
|
|
response = await client.post(f"/api/v1/build/{uuid}/vertices/vertex_id", headers=logged_in_headers)
|
|
assert response.status_code == 500
|
|
|
|
|
|
async def test_build_vertex_invalid_vertex_id(client, added_flow_webhook_test, logged_in_headers):
|
|
flow_id = added_flow_webhook_test["id"]
|
|
response = await client.post(f"/api/v1/build/{flow_id}/vertices/invalid_vertex_id", headers=logged_in_headers)
|
|
assert response.status_code == 500
|
|
|
|
|
|
async def test_successful_run_no_payload(client, simple_api_test, created_api_key):
|
|
headers = {"x-api-key": created_api_key.api_key}
|
|
flow_id = simple_api_test["id"]
|
|
response = await client.post(f"/api/v1/run/{flow_id}", headers=headers)
|
|
assert response.status_code == status.HTTP_200_OK, response.text
|
|
# Add more assertions here to validate the response content
|
|
json_response = response.json()
|
|
assert "session_id" in json_response
|
|
assert "outputs" in json_response
|
|
outer_outputs = json_response["outputs"]
|
|
assert len(outer_outputs) == 1
|
|
outputs_dict = outer_outputs[0]
|
|
assert len(outputs_dict) == 2
|
|
assert "inputs" in outputs_dict
|
|
assert "outputs" in outputs_dict
|
|
assert outputs_dict.get("inputs") == {"input_value": ""}
|
|
assert isinstance(outputs_dict.get("outputs"), list)
|
|
assert len(outputs_dict.get("outputs")) == 1
|
|
ids = [output.get("component_id") for output in outputs_dict.get("outputs")]
|
|
assert all("ChatOutput" in _id for _id in ids)
|
|
display_names = [output.get("component_display_name") for output in outputs_dict.get("outputs")]
|
|
assert all(name in display_names for name in ["Chat Output"])
|
|
output_results_has_results = all("results" in output.get("results") for output in outputs_dict.get("outputs"))
|
|
inner_results = [output.get("results") for output in outputs_dict.get("outputs")]
|
|
|
|
assert all(result is not None for result in inner_results), (outputs_dict, output_results_has_results)
|
|
|
|
|
|
async def test_successful_run_with_output_type_text(client, simple_api_test, created_api_key):
|
|
headers = {"x-api-key": created_api_key.api_key}
|
|
flow_id = simple_api_test["id"]
|
|
payload = {
|
|
"output_type": "text",
|
|
}
|
|
response = await client.post(f"/api/v1/run/{flow_id}", headers=headers, json=payload)
|
|
assert response.status_code == status.HTTP_200_OK, response.text
|
|
# Add more assertions here to validate the response content
|
|
json_response = response.json()
|
|
assert "session_id" in json_response
|
|
assert "outputs" in json_response
|
|
outer_outputs = json_response["outputs"]
|
|
assert len(outer_outputs) == 1
|
|
outputs_dict = outer_outputs[0]
|
|
assert len(outputs_dict) == 2
|
|
assert "inputs" in outputs_dict
|
|
assert "outputs" in outputs_dict
|
|
assert outputs_dict.get("inputs") == {"input_value": ""}
|
|
assert isinstance(outputs_dict.get("outputs"), list)
|
|
assert len(outputs_dict.get("outputs")) == 1
|
|
ids = [output.get("component_id") for output in outputs_dict.get("outputs")]
|
|
assert all("ChatOutput" in _id for _id in ids), ids
|
|
display_names = [output.get("component_display_name") for output in outputs_dict.get("outputs")]
|
|
assert all(name in display_names for name in ["Chat Output"]), display_names
|
|
inner_results = [output.get("results") for output in outputs_dict.get("outputs")]
|
|
expected_keys = ["message"]
|
|
assert all(key in result for result in inner_results for key in expected_keys), outputs_dict
|
|
|
|
|
|
@pytest.mark.benchmark
|
|
async def test_successful_run_with_output_type_any(client, simple_api_test, created_api_key):
|
|
# This one should have both the ChatOutput and TextOutput components
|
|
headers = {"x-api-key": created_api_key.api_key}
|
|
flow_id = simple_api_test["id"]
|
|
payload = {
|
|
"output_type": "any",
|
|
}
|
|
response = await client.post(f"/api/v1/run/{flow_id}", headers=headers, json=payload)
|
|
assert response.status_code == status.HTTP_200_OK, response.text
|
|
# Add more assertions here to validate the response content
|
|
json_response = response.json()
|
|
assert "session_id" in json_response
|
|
assert "outputs" in json_response
|
|
outer_outputs = json_response["outputs"]
|
|
assert len(outer_outputs) == 1
|
|
outputs_dict = outer_outputs[0]
|
|
assert len(outputs_dict) == 2
|
|
assert "inputs" in outputs_dict
|
|
assert "outputs" in outputs_dict
|
|
assert outputs_dict.get("inputs") == {"input_value": ""}
|
|
assert isinstance(outputs_dict.get("outputs"), list)
|
|
assert len(outputs_dict.get("outputs")) == 1
|
|
ids = [output.get("component_id") for output in outputs_dict.get("outputs")]
|
|
assert all("ChatOutput" in _id or "TextOutput" in _id for _id in ids), ids
|
|
display_names = [output.get("component_display_name") for output in outputs_dict.get("outputs")]
|
|
assert all(name in display_names for name in ["Chat Output"]), display_names
|
|
inner_results = [output.get("results") for output in outputs_dict.get("outputs")]
|
|
expected_keys = ["message"]
|
|
assert all(key in result for result in inner_results for key in expected_keys), outputs_dict
|
|
|
|
|
|
@pytest.mark.benchmark
|
|
async def test_successful_run_with_output_type_debug(client, simple_api_test, created_api_key):
|
|
# This one should return outputs for all components
|
|
# Let's just check the amount of outputs(there should be 7)
|
|
headers = {"x-api-key": created_api_key.api_key}
|
|
flow_id = simple_api_test["id"]
|
|
payload = {
|
|
"output_type": "debug",
|
|
}
|
|
response = await client.post(f"/api/v1/run/{flow_id}", headers=headers, json=payload)
|
|
assert response.status_code == status.HTTP_200_OK, response.text
|
|
# Add more assertions here to validate the response content
|
|
json_response = response.json()
|
|
assert "session_id" in json_response
|
|
assert "outputs" in json_response
|
|
outer_outputs = json_response["outputs"]
|
|
assert len(outer_outputs) == 1
|
|
outputs_dict = outer_outputs[0]
|
|
assert len(outputs_dict) == 2
|
|
assert "inputs" in outputs_dict
|
|
assert "outputs" in outputs_dict
|
|
assert outputs_dict.get("inputs") == {"input_value": ""}
|
|
assert isinstance(outputs_dict.get("outputs"), list)
|
|
assert len(outputs_dict.get("outputs")) == 3
|
|
|
|
|
|
@pytest.mark.benchmark
|
|
async def test_successful_run_with_input_type_text(client, simple_api_test, created_api_key):
|
|
headers = {"x-api-key": created_api_key.api_key}
|
|
flow_id = simple_api_test["id"]
|
|
payload = {
|
|
"input_type": "text",
|
|
"output_type": "debug",
|
|
"input_value": "value1",
|
|
}
|
|
response = await client.post(f"/api/v1/run/{flow_id}", headers=headers, json=payload)
|
|
assert response.status_code == status.HTTP_200_OK, response.text
|
|
# Add more assertions here to validate the response content
|
|
json_response = response.json()
|
|
assert "session_id" in json_response
|
|
assert "outputs" in json_response
|
|
outer_outputs = json_response["outputs"]
|
|
assert len(outer_outputs) == 1
|
|
outputs_dict = outer_outputs[0]
|
|
assert len(outputs_dict) == 2
|
|
assert "inputs" in outputs_dict
|
|
assert "outputs" in outputs_dict
|
|
assert outputs_dict.get("inputs") == {"input_value": "value1"}
|
|
assert isinstance(outputs_dict.get("outputs"), list)
|
|
assert len(outputs_dict.get("outputs")) == 3
|
|
# Now we get all components that contain TextInput in the component_id
|
|
text_input_outputs = [output for output in outputs_dict.get("outputs") if "TextInput" in output.get("component_id")]
|
|
assert len(text_input_outputs) == 1
|
|
# Now we check if the input_value is correct
|
|
# We get text key twice because the output is now a Message
|
|
assert all(
|
|
output.get("results").get("text").get("text") == "value1" for output in text_input_outputs
|
|
), text_input_outputs
|
|
|
|
|
|
@pytest.mark.api_key_required
|
|
@pytest.mark.benchmark
|
|
async def test_successful_run_with_input_type_chat(client: AsyncClient, simple_api_test, created_api_key):
|
|
headers = {"x-api-key": created_api_key.api_key}
|
|
flow_id = simple_api_test["id"]
|
|
payload = {
|
|
"input_type": "chat",
|
|
"output_type": "debug",
|
|
"input_value": "value1",
|
|
}
|
|
response = await client.post(f"/api/v1/run/{flow_id}", headers=headers, json=payload)
|
|
assert response.status_code == status.HTTP_200_OK, response.text
|
|
# Add more assertions here to validate the response content
|
|
json_response = response.json()
|
|
assert "session_id" in json_response
|
|
assert "outputs" in json_response
|
|
outer_outputs = json_response["outputs"]
|
|
assert len(outer_outputs) == 1
|
|
outputs_dict = outer_outputs[0]
|
|
assert len(outputs_dict) == 2
|
|
assert "inputs" in outputs_dict
|
|
assert "outputs" in outputs_dict
|
|
assert outputs_dict.get("inputs") == {"input_value": "value1"}
|
|
assert isinstance(outputs_dict.get("outputs"), list)
|
|
assert len(outputs_dict.get("outputs")) == 3
|
|
# Now we get all components that contain TextInput in the component_id
|
|
chat_input_outputs = [output for output in outputs_dict.get("outputs") if "ChatInput" in output.get("component_id")]
|
|
assert len(chat_input_outputs) == 1
|
|
# Now we check if the input_value is correct
|
|
assert all(
|
|
output.get("results").get("message").get("text") == "value1" for output in chat_input_outputs
|
|
), chat_input_outputs
|
|
|
|
|
|
@pytest.mark.benchmark
|
|
async def test_invalid_run_with_input_type_chat(client, simple_api_test, created_api_key):
|
|
headers = {"x-api-key": created_api_key.api_key}
|
|
flow_id = simple_api_test["id"]
|
|
payload = {
|
|
"input_type": "chat",
|
|
"output_type": "debug",
|
|
"input_value": "value1",
|
|
"tweaks": {"Chat Input": {"input_value": "value2"}},
|
|
}
|
|
response = await client.post(f"/api/v1/run/{flow_id}", headers=headers, json=payload)
|
|
assert response.status_code == status.HTTP_400_BAD_REQUEST, response.text
|
|
assert "If you pass an input_value to the chat input, you cannot pass a tweak with the same name." in response.text
|
|
|
|
|
|
@pytest.mark.benchmark
|
|
async def test_successful_run_with_input_type_any(client, simple_api_test, created_api_key):
|
|
headers = {"x-api-key": created_api_key.api_key}
|
|
flow_id = simple_api_test["id"]
|
|
payload = {
|
|
"input_type": "any",
|
|
"output_type": "debug",
|
|
"input_value": "value1",
|
|
}
|
|
response = await client.post(f"/api/v1/run/{flow_id}", headers=headers, json=payload)
|
|
assert response.status_code == status.HTTP_200_OK, response.text
|
|
# Add more assertions here to validate the response content
|
|
json_response = response.json()
|
|
assert "session_id" in json_response
|
|
assert "outputs" in json_response
|
|
outer_outputs = json_response["outputs"]
|
|
assert len(outer_outputs) == 1
|
|
outputs_dict = outer_outputs[0]
|
|
assert len(outputs_dict) == 2
|
|
assert "inputs" in outputs_dict
|
|
assert "outputs" in outputs_dict
|
|
assert outputs_dict.get("inputs") == {"input_value": "value1"}
|
|
assert isinstance(outputs_dict.get("outputs"), list)
|
|
assert len(outputs_dict.get("outputs")) == 3
|
|
# Now we get all components that contain TextInput or ChatInput in the component_id
|
|
any_input_outputs = [
|
|
output
|
|
for output in outputs_dict.get("outputs")
|
|
if "TextInput" in output.get("component_id") or "ChatInput" in output.get("component_id")
|
|
]
|
|
assert len(any_input_outputs) == 2
|
|
# Now we check if the input_value is correct
|
|
all_result_dicts = [output.get("results") for output in any_input_outputs]
|
|
all_message_or_text_dicts = [
|
|
result_dict.get("message", result_dict.get("text")) for result_dict in all_result_dicts
|
|
]
|
|
assert all(
|
|
message_or_text_dict.get("text") == "value1" for message_or_text_dict in all_message_or_text_dicts
|
|
), any_input_outputs
|
|
|
|
|
|
async def test_invalid_flow_id(client, created_api_key):
|
|
headers = {"x-api-key": created_api_key.api_key}
|
|
flow_id = "invalid-flow-id"
|
|
response = await client.post(f"/api/v1/run/{flow_id}", headers=headers)
|
|
assert response.status_code == status.HTTP_404_NOT_FOUND, response.text
|
|
headers = {"x-api-key": created_api_key.api_key}
|
|
flow_id = UUID(int=0)
|
|
response = await client.post(f"/api/v1/run/{flow_id}", headers=headers)
|
|
assert response.status_code == status.HTTP_404_NOT_FOUND, response.text
|
|
# Check if the error detail is as expected
|
|
|
|
|
|
@pytest.mark.benchmark
|
|
async def test_starter_projects(client, created_api_key):
|
|
headers = {"x-api-key": created_api_key.api_key}
|
|
response = await client.get("api/v1/starter-projects/", headers=headers)
|
|
assert response.status_code == status.HTTP_200_OK, response.text
|