langflow/tests/test_database.py
Gabriel Luiz Freitas Almeida ec585b8acc 🐛 fix(flow.py): change flow field type from str to Dict to allow for JSON data
 feat(flow.py): add validator to ensure flow field is a valid JSON object with required fields
The flow field in the FlowBase model has been changed from a string to a dictionary to allow for JSON data. A validator has been added to ensure that the flow field is a valid JSON object with the required fields. The tests have been updated to reflect these changes.
2023-06-05 13:40:14 -03:00

199 lines
7.1 KiB
Python

from uuid import uuid4
from langflow.api.schemas import FlowListCreate
from langflow.database.models.flow import FlowCreate
import json
from sqlalchemy.orm import Session
from langflow.database.models.flow import Flow
from fastapi.testclient import TestClient
import threading
def test_create_flow(client: TestClient, json_flow: str):
flow = FlowCreate(name="Test Flow", flow=json.loads(json_flow))
response = client.post("/flows/", json=flow.dict())
assert response.status_code == 200
assert response.json()["name"] == flow.name
assert response.json()["flow"] == flow.flow
def test_read_flows(client: TestClient, json_flow: str):
flow = FlowCreate(name="Test Flow", flow=json.loads(json_flow))
response = client.post("/flows/", json=flow.dict())
assert response.status_code == 200
assert response.json()["name"] == flow.name
assert response.json()["flow"] == flow.flow
flow = FlowCreate(name="Test Flow", flow=json.loads(json_flow))
response = client.post("/flows/", json=flow.dict())
assert response.status_code == 200
assert response.json()["name"] == flow.name
assert response.json()["flow"] == flow.flow
response = client.get("/flows/")
assert response.status_code == 200
assert len(response.json()) > 0
def test_read_flow(client: TestClient, json_flow: str):
flow = FlowCreate(name="Test Flow", flow=json.loads(json_flow))
response = client.post("/flows/", json=flow.dict())
flow_id = response.json()["id"]
response = client.get(f"/flows/{flow_id}")
assert response.status_code == 200
assert response.json()["name"] == flow.name
assert response.json()["flow"] == flow.flow
def test_update_flow(client: TestClient, json_flow: str):
flow = FlowCreate(name="Test Flow", flow=json.loads(json_flow))
response = client.post("/flows/", json=flow.dict())
flow_id = response.json()["id"]
updated_flow = FlowCreate(
name="Updated Flow",
flow=json.loads(json_flow.replace("BasicExample", "Updated Flow")),
)
response = client.put(f"/flows/{flow_id}", json=updated_flow.dict())
assert response.status_code == 200
assert response.json()["name"] == updated_flow.name
assert response.json()["flow"] == updated_flow.flow
def test_delete_flow(client: TestClient, json_flow: str):
flow = FlowCreate(name="Test Flow", flow=json.loads(json_flow))
response = client.post("/flows/", json=flow.dict())
flow_id = response.json()["id"]
response = client.delete(f"/flows/{flow_id}")
assert response.status_code == 200
assert response.json()["message"] == "Flow deleted successfully"
def test_create_flows(client: TestClient, session: Session, json_flow: str):
# Create test data
flow_list = FlowListCreate(
flows=[
FlowCreate(name="Flow 1", flow=json.loads(json_flow)),
FlowCreate(name="Flow 2", flow=json.loads(json_flow)),
]
)
# Make request to endpoint
response = client.post("/flows/batch/", json=flow_list.dict())
# Check response status code
assert response.status_code == 200
# Check response data
response_data = response.json()
assert len(response_data) == 2
assert response_data[0]["name"] == "Flow 1"
assert response_data[0]["flow"] == json.loads(json_flow)
assert response_data[1]["name"] == "Flow 2"
assert response_data[1]["flow"] == json.loads(json_flow)
def test_upload_file(client: TestClient, session: Session, json_flow: str):
# Create test data
flow_list = FlowListCreate(
flows=[
FlowCreate(name="Flow 1", flow=json.loads(json_flow)),
FlowCreate(name="Flow 2", flow=json.loads(json_flow)),
]
)
file_contents = json.dumps(flow_list.dict())
response = client.post(
"/flows/upload/",
files={"file": ("examples.json", file_contents, "application/json")},
)
# Check response status code
assert response.status_code == 200
# Check response data
response_data = response.json()
assert len(response_data) == 2
assert response_data[0]["name"] == "Flow 1"
assert response_data[0]["flow"] == json.loads(json_flow)
assert response_data[1]["name"] == "Flow 2"
assert response_data[1]["flow"] == json.loads(json_flow)
def test_download_file(client: TestClient, session: Session, json_flow):
# Create test data
flow_list = FlowListCreate(
flows=[
FlowCreate(name="Flow 1", flow=json.loads(json_flow)),
FlowCreate(name="Flow 2", flow=json.loads(json_flow)),
]
)
for flow in flow_list.flows:
db_flow = Flow.from_orm(flow)
session.add(db_flow)
session.commit()
# Make request to endpoint
response = client.get("/flows/download/")
# Check response status code
assert response.status_code == 200
# Check response data
response_data = json.loads(response.json()["file"])
assert len(response_data) == 2
assert response_data[0]["name"] == "Flow 1"
assert response_data[0]["flow"] == json.loads(json_flow)
assert response_data[1]["name"] == "Flow 2"
assert response_data[1]["flow"] == json.loads(json_flow)
def test_create_flow_with_invalid_data(client: TestClient):
flow = {"name": "a" * 256, "flow": "Invalid flow data"}
response = client.post("/flows/", json=flow)
assert response.status_code == 422
def test_get_nonexistent_flow(client: TestClient):
# uuid4 generates a random UUID
uuid = uuid4()
response = client.get(f"/flows/{uuid}")
assert response.status_code == 404
def test_update_flow_idempotency(client: TestClient, json_flow: str):
flow = FlowCreate(name="Test Flow", flow=json.loads(json_flow))
response = client.post("/flows/", json=flow.dict())
flow_id = response.json()["id"]
updated_flow = FlowCreate(name="Updated Flow", flow=json.loads(json_flow))
response1 = client.put(f"/flows/{flow_id}", json=updated_flow.dict())
response2 = client.put(f"/flows/{flow_id}", json=updated_flow.dict())
assert response1.json() == response2.json()
def test_update_nonexistent_flow(client: TestClient, json_flow: str):
uuid = uuid4()
updated_flow = FlowCreate(
name="Updated Flow",
flow=json.loads(json_flow.replace("BasicExample", "Updated Flow")),
)
response = client.put(f"/flows/{uuid}", json=updated_flow.dict())
assert response.status_code == 404
def test_delete_nonexistent_flow(client: TestClient):
uuid = uuid4()
response = client.delete(f"/flows/{uuid}")
assert response.status_code == 404
def test_read_empty_flows(client: TestClient):
response = client.get("/flows/")
assert response.status_code == 200
assert len(response.json()) == 0
def test_stress_create_flow(client: TestClient, json_flow: str):
flow = FlowCreate(name="Test Flow", flow=json.loads(json_flow))
def create_flow():
response = client.post("/flows/", json=flow.dict())
assert response.status_code == 200
threads = []
for i in range(100):
t = threading.Thread(target=create_flow)
threads.append(t)
t.start()
for t in threads:
t.join()