🔧 fix(test_endpoints.py): fix typo in test_process_flow_without_autologin function to improve code readability

 feat(test_endpoints.py): add helper functions run_post and poll_task_status to improve code modularity and reusability
🔧 fix(test_endpoints.py): fix typo in test_basic_chat_with_two_session_ids_and_names function to improve code readability
 feat(test_endpoints.py): add async_test marker to test_vector_store_in_process function to indicate it is an asynchronous test
 feat(test_endpoints.py): add distributed_client parameter to test_vector_store_in_process function to test distributed client functionality
 feat(test_endpoints.py): add async_test marker to test_async_task_processing function to indicate it is an asynchronous test
 feat(test_endpoints.py): add distributed_client parameter to test_async_task_processing function to test distributed client functionality
 feat(test_endpoints.py): add async_test marker to test_async_task_processing_vector_store function to indicate it is an asynchronous test
 feat(test_endpoints.py): add distributed_client parameter to test_async_task_processing_vector_store function to test distributed client functionality
This commit is contained in:
Gabriel Luiz Freitas Almeida 2023-09-22 11:03:54 -03:00
commit 1f720a665e

View file

@ -8,7 +8,33 @@ from fastapi.testclient import TestClient
from langflow.interface.tools.constants import CUSTOM_TOOLS
from langflow.template.frontend_node.chains import TimeTravelGuideChainNode
from tests.utils import poll_task_status, run_post
import time
def run_post(client, flow_id, headers, post_data):
response = client.post(
f"api/v1/process/{flow_id}",
headers=headers,
json=post_data,
)
assert response.status_code == 200, response.json()
return response.json()
# Helper function to poll task status
def poll_task_status(client, headers, task_id, max_attempts=20, sleep_time=1):
for _ in range(max_attempts):
task_status_response = client.get(
f"api/v1/task/{task_id}/status",
headers=headers,
)
if (
task_status_response.status_code == 200
and task_status_response.json()["status"] == "SUCCESS"
):
return task_status_response.json()
time.sleep(sleep_time)
return None # Return None if task did not complete in time
PROMPT_REQUEST = {
@ -187,7 +213,7 @@ def test_process_flow_without_autologin(client, flow, monkeypatch, created_api_k
# Dummy POST data
post_data = {
"inputs": {"key": "value"},
"inputs": {"input": "value"},
"tweaks": None,
"clear_cache": False,
"session_id": None,
@ -451,11 +477,14 @@ def test_basic_chat_with_two_session_ids_and_names(client, added_flow, created_a
assert name in response_json["result"]["text"]
def test_vector_store_in_process(client, added_vector_store, created_api_key):
@pytest.mark.async_test
def test_vector_store_in_process(
distributed_client, added_vector_store, created_api_key
):
# Run the /api/v1/process/{flow_id} endpoint
headers = {"x-api-key": created_api_key.api_key}
post_data = {"inputs": {"input": "What is Langflow?"}}
response = client.post(
response = distributed_client.post(
f"api/v1/process/{added_vector_store.get('id')}",
headers=headers,
json=post_data,
@ -470,12 +499,12 @@ def test_vector_store_in_process(client, added_vector_store, created_api_key):
# Test function without loop
@pytest.mark.async_test
def test_async_task_processing(client, added_flow, created_api_key):
def test_async_task_processing(distributed_client, added_flow, created_api_key):
headers = {"x-api-key": created_api_key.api_key}
post_data = {"inputs": {"text": "Hi, My name is Gabriel"}}
# Run the /api/v1/process/{flow_id} endpoint with sync=False
response = client.post(
response = distributed_client.post(
f"api/v1/process/{added_flow.get('id')}",
headers=headers,
json={**post_data, "sync": False},
@ -487,7 +516,7 @@ def test_async_task_processing(client, added_flow, created_api_key):
assert task_id is not None
# Polling the task status using the helper function
task_status_json = poll_task_status(client, headers, task_id)
task_status_json = poll_task_status(distributed_client, headers, task_id)
assert task_status_json is not None, "Task did not complete in time"
# Validate that the task completed successfully and the result is as expected
@ -502,7 +531,7 @@ def test_async_task_processing_vector_store(
client, added_vector_store, created_api_key
):
headers = {"x-api-key": created_api_key.api_key}
post_data = {"inputs": {"input": "What is Langflow?"}}
post_data = {"inputs": {"input": "How do I upload examples?"}}
# Run the /api/v1/process/{flow_id} endpoint with sync=False
response = client.post(
@ -525,6 +554,6 @@ def test_async_task_processing_vector_store(
# Validate that the task completed successfully and the result is as expected
assert "result" in task_status_json, task_status_json
assert "output" in task_status_json["result"], task_status_json["result"]
assert "Langflow is" in task_status_json["result"]["output"], task_status_json[
assert "Langflow" in task_status_json["result"]["output"], task_status_json[
"result"
]