773 lines
32 KiB
Python
773 lines
32 KiB
Python
import asyncio
|
|
import json
|
|
from typing import NamedTuple
|
|
from uuid import UUID, uuid4
|
|
|
|
import orjson
|
|
import pytest
|
|
from httpx import AsyncClient
|
|
from langflow.api.v1.schemas import FlowListCreate, ResultDataResponse
|
|
from langflow.graph.utils import log_transaction, log_vertex_build
|
|
from langflow.initial_setup.setup import load_starter_projects
|
|
from langflow.services.database.models.base import orjson_dumps
|
|
from langflow.services.database.models.flow import Flow, FlowCreate, FlowUpdate
|
|
from langflow.services.database.models.folder.model import FolderCreate
|
|
from langflow.services.database.utils import session_getter
|
|
from langflow.services.deps import get_db_service
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def json_style():
|
|
# class FlowStyleBase(SQLModel):
|
|
# color: str = Field(index=True)
|
|
# emoji: str = Field(index=False)
|
|
# flow_id: UUID = Field(default=None, foreign_key="flow.id")
|
|
return orjson_dumps(
|
|
{
|
|
"color": "red",
|
|
"emoji": "👍",
|
|
}
|
|
)
|
|
|
|
|
|
@pytest.mark.usefixtures("active_user")
|
|
async def test_create_flow(client: AsyncClient, json_flow: str, logged_in_headers):
|
|
flow = orjson.loads(json_flow)
|
|
data = flow["data"]
|
|
flow = FlowCreate(name=str(uuid4()), description="description", data=data)
|
|
response = await client.post("api/v1/flows/", json=flow.model_dump(), headers=logged_in_headers)
|
|
assert response.status_code == 201
|
|
assert response.json()["name"] == flow.name
|
|
assert response.json()["data"] == flow.data
|
|
# flow is optional so we can create a flow without a flow
|
|
flow = FlowCreate(name=str(uuid4()))
|
|
response = await client.post("api/v1/flows/", json=flow.model_dump(exclude_unset=True), headers=logged_in_headers)
|
|
assert response.status_code == 201
|
|
assert response.json()["name"] == flow.name
|
|
assert response.json()["data"] == flow.data
|
|
|
|
|
|
@pytest.mark.usefixtures("active_user")
|
|
async def test_read_flows(client: AsyncClient, json_flow: str, logged_in_headers):
|
|
flow_data = orjson.loads(json_flow)
|
|
data = flow_data["data"]
|
|
flow = FlowCreate(name=str(uuid4()), description="description", data=data)
|
|
response = await client.post("api/v1/flows/", json=flow.model_dump(), headers=logged_in_headers)
|
|
assert response.status_code == 201
|
|
assert response.json()["name"] == flow.name
|
|
assert response.json()["data"] == flow.data
|
|
|
|
flow = FlowCreate(name=str(uuid4()), description="description", data=data)
|
|
response = await client.post("api/v1/flows/", json=flow.model_dump(), headers=logged_in_headers)
|
|
assert response.status_code == 201
|
|
assert response.json()["name"] == flow.name
|
|
assert response.json()["data"] == flow.data
|
|
|
|
response = await client.get("api/v1/flows/", headers=logged_in_headers)
|
|
assert response.status_code == 200
|
|
assert len(response.json()) > 0
|
|
|
|
|
|
async def test_read_flows_pagination_with_params(client: AsyncClient, logged_in_headers):
|
|
response = await client.get(
|
|
"api/v1/flows/", headers=logged_in_headers, params={"page": 3, "size": 10, "get_all": False}
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json()["page"] == 3
|
|
assert response.json()["size"] == 10
|
|
assert response.json()["pages"] == 0
|
|
assert response.json()["total"] == 0
|
|
assert len(response.json()["items"]) == 0
|
|
|
|
|
|
async def test_read_flows_pagination_with_flows(client: AsyncClient, logged_in_headers):
|
|
number_of_flows = 30
|
|
flows = [FlowCreate(name=f"Flow {i}", description="description", data={}) for i in range(number_of_flows)]
|
|
flow_ids = []
|
|
for flow in flows:
|
|
response = await client.post("api/v1/flows/", json=flow.model_dump(), headers=logged_in_headers)
|
|
assert response.status_code == 201
|
|
flow_ids.append(response.json()["id"])
|
|
|
|
response = await client.get(
|
|
"api/v1/flows/", headers=logged_in_headers, params={"page": 3, "size": 10, "get_all": False}
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json()["page"] == 3
|
|
assert response.json()["size"] == 10
|
|
assert response.json()["pages"] == 3
|
|
assert response.json()["total"] == number_of_flows
|
|
assert len(response.json()["items"]) == 10
|
|
|
|
response = await client.get(
|
|
"api/v1/flows/", headers=logged_in_headers, params={"page": 4, "size": 10, "get_all": False}
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json()["page"] == 4
|
|
assert response.json()["size"] == 10
|
|
assert response.json()["pages"] == 3
|
|
assert response.json()["total"] == number_of_flows
|
|
assert len(response.json()["items"]) == 0
|
|
|
|
|
|
async def test_read_flows_custom_page_size(client: AsyncClient, logged_in_headers):
|
|
number_of_flows = 30
|
|
flows = [FlowCreate(name=f"Flow {i}", description="description", data={}) for i in range(number_of_flows)]
|
|
for flow in flows:
|
|
response = await client.post("api/v1/flows/", json=flow.model_dump(), headers=logged_in_headers)
|
|
assert response.status_code == 201
|
|
|
|
response = await client.get(
|
|
"api/v1/flows/", headers=logged_in_headers, params={"page": 1, "size": 15, "get_all": False}
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json()["page"] == 1
|
|
assert response.json()["size"] == 15
|
|
assert response.json()["pages"] == 2
|
|
assert response.json()["total"] == number_of_flows
|
|
assert len(response.json()["items"]) == 15
|
|
|
|
|
|
async def test_read_flows_invalid_page(client: AsyncClient, logged_in_headers):
|
|
number_of_flows = 30
|
|
flows = [FlowCreate(name=f"Flow {i}", description="description", data={}) for i in range(number_of_flows)]
|
|
flow_ids = []
|
|
for flow in flows:
|
|
response = await client.post("api/v1/flows/", json=flow.model_dump(), headers=logged_in_headers)
|
|
assert response.status_code == 201
|
|
flow_ids.append(response.json()["id"])
|
|
|
|
response = await client.get(
|
|
"api/v1/flows/", headers=logged_in_headers, params={"page": 0, "size": 10, "get_all": False}
|
|
)
|
|
assert response.status_code == 422 # Assuming 422 is the status code for invalid input
|
|
|
|
|
|
async def test_read_flows_invalid_size(client: AsyncClient, logged_in_headers):
|
|
number_of_flows = 30
|
|
flows = [FlowCreate(name=f"Flow {i}", description="description", data={}) for i in range(number_of_flows)]
|
|
flow_ids = []
|
|
for flow in flows:
|
|
response = await client.post("api/v1/flows/", json=flow.model_dump(), headers=logged_in_headers)
|
|
assert response.status_code == 201
|
|
flow_ids.append(response.json()["id"])
|
|
|
|
response = await client.get(
|
|
"api/v1/flows/", headers=logged_in_headers, params={"page": 1, "size": 0, "get_all": False}
|
|
)
|
|
assert response.status_code == 422 # Assuming 422 is the status code for invalid input
|
|
|
|
|
|
async def test_read_flows_no_pagination_params(client: AsyncClient, logged_in_headers):
|
|
number_of_flows = 30
|
|
flows = [FlowCreate(name=f"Flow {i}", description="description", data={}) for i in range(number_of_flows)]
|
|
for flow in flows:
|
|
response = await client.post("api/v1/flows/", json=flow.model_dump(), headers=logged_in_headers)
|
|
assert response.status_code == 201
|
|
|
|
response = await client.get("api/v1/flows/", headers=logged_in_headers, params={"get_all": False})
|
|
assert response.status_code == 200
|
|
# Assert default pagination values, adjust these according to your API's default behavior
|
|
assert response.json()["page"] == 1
|
|
assert response.json()["size"] == 50
|
|
assert response.json()["pages"] == 1
|
|
assert response.json()["total"] == number_of_flows
|
|
assert len(response.json()["items"]) == number_of_flows
|
|
|
|
|
|
async def test_read_flows_components_only_paginated(client: AsyncClient, logged_in_headers):
|
|
number_of_flows = 10
|
|
flows = [
|
|
FlowCreate(name=f"Flow {i}", description="description", data={}, is_component=True)
|
|
for i in range(number_of_flows)
|
|
]
|
|
for flow in flows:
|
|
response = await client.post("api/v1/flows/", json=flow.model_dump(), headers=logged_in_headers)
|
|
assert response.status_code == 201
|
|
response = await client.get(
|
|
"api/v1/flows/", headers=logged_in_headers, params={"components_only": True, "get_all": False}
|
|
)
|
|
assert response.status_code == 200
|
|
response_json = response.json()
|
|
assert response_json["total"] == 10
|
|
assert response_json["pages"] == 1
|
|
assert response_json["page"] == 1
|
|
assert response_json["size"] == 50
|
|
assert all(flow["is_component"] is True for flow in response_json["items"])
|
|
|
|
|
|
async def test_read_flows_components_only(client: AsyncClient, logged_in_headers):
|
|
number_of_flows = 10
|
|
flows = [
|
|
FlowCreate(name=f"Flow {i}", description="description", data={}, is_component=True)
|
|
for i in range(number_of_flows)
|
|
]
|
|
for flow in flows:
|
|
response = await client.post("api/v1/flows/", json=flow.model_dump(), headers=logged_in_headers)
|
|
assert response.status_code == 201
|
|
response = await client.get("api/v1/flows/", headers=logged_in_headers, params={"components_only": True})
|
|
assert response.status_code == 200
|
|
response_json = response.json()
|
|
assert all(flow["is_component"] is True for flow in response_json)
|
|
|
|
|
|
async def test_read_flow(client: AsyncClient, json_flow: str, logged_in_headers):
|
|
flow = orjson.loads(json_flow)
|
|
data = flow["data"]
|
|
unique_name = str(uuid4())
|
|
flow = FlowCreate(name=unique_name, description="description", data=data)
|
|
response = await client.post("api/v1/flows/", json=flow.model_dump(), headers=logged_in_headers)
|
|
flow_id = response.json()["id"] # flow_id should be a UUID but is a string
|
|
# turn it into a UUID
|
|
flow_id = UUID(flow_id)
|
|
|
|
response = await client.get(f"api/v1/flows/{flow_id}", headers=logged_in_headers)
|
|
assert response.status_code == 200
|
|
assert response.json()["name"] == flow.name
|
|
assert response.json()["data"] == flow.data
|
|
|
|
|
|
@pytest.mark.usefixtures("active_user")
|
|
async def test_update_flow(client: AsyncClient, json_flow: str, logged_in_headers):
|
|
flow = orjson.loads(json_flow)
|
|
data = flow["data"]
|
|
|
|
flow = FlowCreate(name="Test Flow", description="description", data=data)
|
|
response = await client.post("api/v1/flows/", json=flow.model_dump(), headers=logged_in_headers)
|
|
|
|
flow_id = response.json()["id"]
|
|
updated_flow = FlowUpdate(
|
|
name="Updated Flow",
|
|
description="updated description",
|
|
data=data,
|
|
)
|
|
response = await client.patch(f"api/v1/flows/{flow_id}", json=updated_flow.model_dump(), headers=logged_in_headers)
|
|
|
|
assert response.status_code == 200
|
|
assert response.json()["name"] == updated_flow.name
|
|
assert response.json()["description"] == updated_flow.description
|
|
# assert response.json()["data"] == updated_flow.data
|
|
|
|
|
|
@pytest.mark.usefixtures("active_user")
|
|
async def test_delete_flow(client: AsyncClient, json_flow: str, logged_in_headers):
|
|
flow = orjson.loads(json_flow)
|
|
data = flow["data"]
|
|
flow = FlowCreate(name="Test Flow", description="description", data=data)
|
|
response = await client.post("api/v1/flows/", json=flow.model_dump(), headers=logged_in_headers)
|
|
flow_id = response.json()["id"]
|
|
response = await client.delete(f"api/v1/flows/{flow_id}", headers=logged_in_headers)
|
|
assert response.status_code == 200
|
|
assert response.json()["message"] == "Flow deleted successfully"
|
|
|
|
|
|
@pytest.mark.usefixtures("active_user")
|
|
async def test_delete_flows(client: AsyncClient, logged_in_headers):
|
|
# Create ten flows
|
|
number_of_flows = 10
|
|
flows = [FlowCreate(name=f"Flow {i}", description="description", data={}) for i in range(number_of_flows)]
|
|
flow_ids = []
|
|
for flow in flows:
|
|
response = await client.post("api/v1/flows/", json=flow.model_dump(), headers=logged_in_headers)
|
|
assert response.status_code == 201
|
|
flow_ids.append(response.json()["id"])
|
|
|
|
response = await client.request("DELETE", "api/v1/flows/", headers=logged_in_headers, json=flow_ids)
|
|
assert response.status_code == 200, response.content
|
|
assert response.json().get("deleted") == number_of_flows
|
|
|
|
|
|
@pytest.mark.usefixtures("active_user")
|
|
async def test_delete_flows_with_transaction_and_build(client: AsyncClient, logged_in_headers):
|
|
# Create ten flows
|
|
number_of_flows = 10
|
|
flows = [FlowCreate(name=f"Flow {i}", description="description", data={}) for i in range(number_of_flows)]
|
|
flow_ids = []
|
|
for flow in flows:
|
|
response = await client.post("api/v1/flows/", json=flow.model_dump(), headers=logged_in_headers)
|
|
assert response.status_code == 201
|
|
flow_ids.append(response.json()["id"])
|
|
|
|
class VertexTuple(NamedTuple):
|
|
id: str
|
|
|
|
# Create a transaction for each flow
|
|
for flow_id in flow_ids:
|
|
await log_transaction(
|
|
str(flow_id), source=VertexTuple(id="vid"), target=VertexTuple(id="tid"), status="success"
|
|
)
|
|
|
|
# Create a build for each flow
|
|
for flow_id in flow_ids:
|
|
build = {
|
|
"valid": True,
|
|
"params": {},
|
|
"data": ResultDataResponse(),
|
|
"artifacts": {},
|
|
"vertex_id": "vid",
|
|
"flow_id": flow_id,
|
|
}
|
|
log_vertex_build(
|
|
flow_id=build["flow_id"],
|
|
vertex_id=build["vertex_id"],
|
|
valid=build["valid"],
|
|
params=build["params"],
|
|
data=build["data"],
|
|
artifacts=build.get("artifacts"),
|
|
)
|
|
|
|
response = await client.request("DELETE", "api/v1/flows/", headers=logged_in_headers, json=flow_ids)
|
|
assert response.status_code == 200, response.content
|
|
assert response.json().get("deleted") == number_of_flows
|
|
|
|
for flow_id in flow_ids:
|
|
response = await client.request(
|
|
"GET", "api/v1/monitor/transactions", params={"flow_id": flow_id}, headers=logged_in_headers
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json() == []
|
|
|
|
for flow_id in flow_ids:
|
|
response = await client.request(
|
|
"GET", "api/v1/monitor/builds", params={"flow_id": flow_id}, headers=logged_in_headers
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json() == {"vertex_builds": {}}
|
|
|
|
|
|
@pytest.mark.usefixtures("active_user")
|
|
async def test_delete_folder_with_flows_with_transaction_and_build(client: AsyncClient, logged_in_headers):
|
|
# Create a new folder
|
|
folder_name = f"Test Folder {uuid4()}"
|
|
folder = FolderCreate(name=folder_name, description="Test folder description", components_list=[], flows_list=[])
|
|
|
|
response = await client.post("api/v1/folders/", json=folder.model_dump(), headers=logged_in_headers)
|
|
assert response.status_code == 201, f"Expected status code 201, but got {response.status_code}"
|
|
|
|
created_folder = response.json()
|
|
folder_id = created_folder["id"]
|
|
|
|
# Create ten flows
|
|
number_of_flows = 10
|
|
flows = [FlowCreate(name=f"Flow {i}", description="description", data={}) for i in range(number_of_flows)]
|
|
flow_ids = []
|
|
for flow in flows:
|
|
flow.folder_id = folder_id
|
|
response = await client.post("api/v1/flows/", json=flow.model_dump(), headers=logged_in_headers)
|
|
assert response.status_code == 201
|
|
flow_ids.append(response.json()["id"])
|
|
|
|
class VertexTuple(NamedTuple):
|
|
id: str
|
|
|
|
# Create a transaction for each flow
|
|
for flow_id in flow_ids:
|
|
await log_transaction(
|
|
str(flow_id), source=VertexTuple(id="vid"), target=VertexTuple(id="tid"), status="success"
|
|
)
|
|
|
|
# Create a build for each flow
|
|
for flow_id in flow_ids:
|
|
build = {
|
|
"valid": True,
|
|
"params": {},
|
|
"data": ResultDataResponse(),
|
|
"artifacts": {},
|
|
"vertex_id": "vid",
|
|
"flow_id": flow_id,
|
|
}
|
|
log_vertex_build(
|
|
flow_id=build["flow_id"],
|
|
vertex_id=build["vertex_id"],
|
|
valid=build["valid"],
|
|
params=build["params"],
|
|
data=build["data"],
|
|
artifacts=build.get("artifacts"),
|
|
)
|
|
|
|
response = await client.request("DELETE", f"api/v1/folders/{folder_id}", headers=logged_in_headers)
|
|
assert response.status_code == 204
|
|
|
|
for flow_id in flow_ids:
|
|
response = await client.request(
|
|
"GET", "api/v1/monitor/transactions", params={"flow_id": flow_id}, headers=logged_in_headers
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json() == []
|
|
|
|
for flow_id in flow_ids:
|
|
response = await client.request(
|
|
"GET", "api/v1/monitor/builds", params={"flow_id": flow_id}, headers=logged_in_headers
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json() == {"vertex_builds": {}}
|
|
|
|
|
|
async def test_get_flows_from_folder_pagination(client: AsyncClient, logged_in_headers):
|
|
# Create a new folder
|
|
folder_name = f"Test Folder {uuid4()}"
|
|
folder = FolderCreate(name=folder_name, description="Test folder description", components_list=[], flows_list=[])
|
|
|
|
response = await client.post("api/v1/folders/", json=folder.model_dump(), headers=logged_in_headers)
|
|
assert response.status_code == 201, f"Expected status code 201, but got {response.status_code}"
|
|
|
|
created_folder = response.json()
|
|
folder_id = created_folder["id"]
|
|
|
|
response = await client.get(
|
|
f"api/v1/folders/{folder_id}", headers=logged_in_headers, params={"page": 1, "size": 50}
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json()["folder"]["name"] == folder_name
|
|
assert response.json()["folder"]["description"] == "Test folder description"
|
|
assert response.json()["flows"]["page"] == 1
|
|
assert response.json()["flows"]["size"] == 50
|
|
assert response.json()["flows"]["pages"] == 0
|
|
assert response.json()["flows"]["total"] == 0
|
|
assert len(response.json()["flows"]["items"]) == 0
|
|
|
|
|
|
async def test_get_flows_from_folder_pagination_with_params(client: AsyncClient, logged_in_headers):
|
|
# Create a new folder
|
|
folder_name = f"Test Folder {uuid4()}"
|
|
folder = FolderCreate(name=folder_name, description="Test folder description", components_list=[], flows_list=[])
|
|
|
|
response = await client.post("api/v1/folders/", json=folder.model_dump(), headers=logged_in_headers)
|
|
assert response.status_code == 201, f"Expected status code 201, but got {response.status_code}"
|
|
|
|
created_folder = response.json()
|
|
folder_id = created_folder["id"]
|
|
|
|
response = await client.get(
|
|
f"api/v1/folders/{folder_id}", headers=logged_in_headers, params={"page": 3, "size": 10}
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json()["folder"]["name"] == folder_name
|
|
assert response.json()["folder"]["description"] == "Test folder description"
|
|
assert response.json()["flows"]["page"] == 3
|
|
assert response.json()["flows"]["size"] == 10
|
|
assert response.json()["flows"]["pages"] == 0
|
|
assert response.json()["flows"]["total"] == 0
|
|
assert len(response.json()["flows"]["items"]) == 0
|
|
|
|
|
|
@pytest.mark.usefixtures("session")
|
|
async def test_create_flows(client: AsyncClient, json_flow: str, logged_in_headers):
|
|
flow = orjson.loads(json_flow)
|
|
data = flow["data"]
|
|
# Create test data
|
|
flow_unique_name = str(uuid4())
|
|
flow_2_unique_name = str(uuid4())
|
|
flow_list = FlowListCreate(
|
|
flows=[
|
|
FlowCreate(name=flow_unique_name, description="description", data=data),
|
|
FlowCreate(name=flow_2_unique_name, description="description", data=data),
|
|
]
|
|
)
|
|
# Make request to endpoint
|
|
response = await client.post("api/v1/flows/batch/", json=flow_list.dict(), headers=logged_in_headers)
|
|
# Check response status code
|
|
assert response.status_code == 201
|
|
# Check response data
|
|
response_data = response.json()
|
|
assert len(response_data) == 2
|
|
assert flow_unique_name in response_data[0]["name"]
|
|
assert response_data[0]["description"] == "description"
|
|
assert response_data[0]["data"] == data
|
|
assert response_data[1]["name"] == flow_2_unique_name
|
|
assert response_data[1]["description"] == "description"
|
|
assert response_data[1]["data"] == data
|
|
|
|
|
|
@pytest.mark.usefixtures("session")
|
|
async def test_upload_file(client: AsyncClient, json_flow: str, logged_in_headers):
|
|
flow = orjson.loads(json_flow)
|
|
data = flow["data"]
|
|
# Create test data
|
|
flow_unique_name = str(uuid4())
|
|
flow_2_unique_name = str(uuid4())
|
|
flow_list = FlowListCreate(
|
|
flows=[
|
|
FlowCreate(name=flow_unique_name, description="description", data=data),
|
|
FlowCreate(name=flow_2_unique_name, description="description", data=data),
|
|
]
|
|
)
|
|
file_contents = orjson_dumps(flow_list.dict())
|
|
response = await client.post(
|
|
"api/v1/flows/upload/",
|
|
files={"file": ("examples.json", file_contents, "application/json")},
|
|
headers=logged_in_headers,
|
|
)
|
|
# Check response status code
|
|
assert response.status_code == 201
|
|
# Check response data
|
|
response_data = response.json()
|
|
assert len(response_data) == 2
|
|
assert flow_unique_name in response_data[0]["name"]
|
|
assert response_data[0]["description"] == "description"
|
|
assert response_data[0]["data"] == data
|
|
assert response_data[1]["name"] == flow_2_unique_name
|
|
assert response_data[1]["description"] == "description"
|
|
assert response_data[1]["data"] == data
|
|
|
|
|
|
@pytest.mark.usefixtures("session")
|
|
async def test_download_file(
|
|
client: AsyncClient,
|
|
json_flow,
|
|
active_user,
|
|
logged_in_headers,
|
|
):
|
|
flow = orjson.loads(json_flow)
|
|
data = flow["data"]
|
|
# Create test data
|
|
flow_unique_name = str(uuid4())
|
|
flow_2_unique_name = str(uuid4())
|
|
flow_list = FlowListCreate(
|
|
flows=[
|
|
FlowCreate(name=flow_unique_name, description="description", data=data),
|
|
FlowCreate(name=flow_2_unique_name, description="description", data=data),
|
|
]
|
|
)
|
|
db_manager = get_db_service()
|
|
with session_getter(db_manager) as _session:
|
|
saved_flows = []
|
|
for flow in flow_list.flows:
|
|
flow.user_id = active_user.id
|
|
db_flow = Flow.model_validate(flow, from_attributes=True)
|
|
_session.add(db_flow)
|
|
saved_flows.append(db_flow)
|
|
_session.commit()
|
|
# Make request to endpoint inside the session context
|
|
flow_ids = [str(db_flow.id) for db_flow in saved_flows] # Convert UUIDs to strings
|
|
flow_ids_json = json.dumps(flow_ids)
|
|
response = await client.post(
|
|
"api/v1/flows/download/",
|
|
data=flow_ids_json,
|
|
headers={**logged_in_headers, "Content-Type": "application/json"},
|
|
)
|
|
# Check response status code
|
|
assert response.status_code == 200, response.json()
|
|
# Check response data
|
|
# Since the endpoint now returns a zip file, we need to check the content type and the filename in the headers
|
|
assert response.headers["Content-Type"] == "application/x-zip-compressed"
|
|
assert "attachment; filename=" in response.headers["Content-Disposition"]
|
|
|
|
|
|
@pytest.mark.usefixtures("active_user")
|
|
async def test_create_flow_with_invalid_data(client: AsyncClient, logged_in_headers):
|
|
flow = {"name": "a" * 256, "data": "Invalid flow data"}
|
|
response = await client.post("api/v1/flows/", json=flow, headers=logged_in_headers)
|
|
assert response.status_code == 422
|
|
|
|
|
|
@pytest.mark.usefixtures("active_user")
|
|
async def test_get_nonexistent_flow(client: AsyncClient, logged_in_headers):
|
|
uuid = uuid4()
|
|
response = await client.get(f"api/v1/flows/{uuid}", headers=logged_in_headers)
|
|
assert response.status_code == 404
|
|
|
|
|
|
@pytest.mark.usefixtures("active_user")
|
|
async def test_update_flow_idempotency(client: AsyncClient, json_flow: str, logged_in_headers):
|
|
flow_data = orjson.loads(json_flow)
|
|
data = flow_data["data"]
|
|
flow_data = FlowCreate(name="Test Flow", description="description", data=data)
|
|
response = await client.post("api/v1/flows/", json=flow_data.dict(), headers=logged_in_headers)
|
|
flow_id = response.json()["id"]
|
|
updated_flow = FlowCreate(name="Updated Flow", description="description", data=data)
|
|
response1 = await client.put(f"api/v1/flows/{flow_id}", json=updated_flow.model_dump(), headers=logged_in_headers)
|
|
response2 = await client.put(f"api/v1/flows/{flow_id}", json=updated_flow.model_dump(), headers=logged_in_headers)
|
|
assert response1.json() == response2.json()
|
|
|
|
|
|
@pytest.mark.usefixtures("active_user")
|
|
async def test_update_nonexistent_flow(client: AsyncClient, json_flow: str, logged_in_headers):
|
|
flow_data = orjson.loads(json_flow)
|
|
data = flow_data["data"]
|
|
uuid = uuid4()
|
|
updated_flow = FlowCreate(
|
|
name="Updated Flow",
|
|
description="description",
|
|
data=data,
|
|
)
|
|
response = await client.patch(f"api/v1/flows/{uuid}", json=updated_flow.model_dump(), headers=logged_in_headers)
|
|
assert response.status_code == 404, response.text
|
|
|
|
|
|
@pytest.mark.usefixtures("active_user")
|
|
async def test_delete_nonexistent_flow(client: AsyncClient, logged_in_headers):
|
|
uuid = uuid4()
|
|
response = await client.delete(f"api/v1/flows/{uuid}", headers=logged_in_headers)
|
|
assert response.status_code == 404
|
|
|
|
|
|
@pytest.mark.usefixtures("active_user")
|
|
async def test_read_only_starter_projects(client: AsyncClient, logged_in_headers):
|
|
response = await client.get("api/v1/flows/basic_examples/", headers=logged_in_headers)
|
|
starter_projects = await asyncio.to_thread(load_starter_projects)
|
|
assert response.status_code == 200
|
|
assert len(response.json()) == len(starter_projects)
|
|
|
|
|
|
def test_sqlite_pragmas():
|
|
db_service = get_db_service()
|
|
|
|
with db_service.with_session() as session:
|
|
from sqlalchemy import text
|
|
|
|
assert session.exec(text("PRAGMA journal_mode;")).scalar() == "wal"
|
|
assert session.exec(text("PRAGMA synchronous;")).scalar() == 1
|
|
|
|
|
|
@pytest.mark.usefixtures("active_user")
|
|
async def test_read_folder(client: AsyncClient, logged_in_headers):
|
|
# Create a new folder
|
|
folder_name = f"Test Folder {uuid4()}"
|
|
folder = FolderCreate(name=folder_name, description="Test folder description")
|
|
response = await client.post("api/v1/folders/", json=folder.model_dump(), headers=logged_in_headers)
|
|
assert response.status_code == 201
|
|
created_folder = response.json()
|
|
folder_id = created_folder["id"]
|
|
|
|
# Read the folder
|
|
response = await client.get(f"api/v1/folders/{folder_id}", headers=logged_in_headers)
|
|
assert response.status_code == 200
|
|
folder_data = response.json()
|
|
assert folder_data["name"] == folder_name
|
|
assert folder_data["description"] == "Test folder description"
|
|
assert "flows" in folder_data
|
|
assert isinstance(folder_data["flows"], list)
|
|
|
|
|
|
@pytest.mark.usefixtures("active_user")
|
|
async def test_read_folder_with_pagination(client: AsyncClient, logged_in_headers):
|
|
# Create a new folder
|
|
folder_name = f"Test Folder {uuid4()}"
|
|
folder = FolderCreate(name=folder_name, description="Test folder description")
|
|
response = await client.post("api/v1/folders/", json=folder.model_dump(), headers=logged_in_headers)
|
|
assert response.status_code == 201
|
|
created_folder = response.json()
|
|
folder_id = created_folder["id"]
|
|
|
|
# Read the folder with pagination
|
|
response = await client.get(
|
|
f"api/v1/folders/{folder_id}", headers=logged_in_headers, params={"page": 1, "size": 10}
|
|
)
|
|
assert response.status_code == 200
|
|
folder_data = response.json()
|
|
assert isinstance(folder_data, dict)
|
|
assert "folder" in folder_data
|
|
assert "flows" in folder_data
|
|
assert folder_data["folder"]["name"] == folder_name
|
|
assert folder_data["folder"]["description"] == "Test folder description"
|
|
assert folder_data["flows"]["page"] == 1
|
|
assert folder_data["flows"]["size"] == 10
|
|
assert isinstance(folder_data["flows"]["items"], list)
|
|
|
|
|
|
@pytest.mark.usefixtures("active_user")
|
|
async def test_read_folder_with_flows(client: AsyncClient, json_flow: str, logged_in_headers):
|
|
# Create a new folder
|
|
folder_name = f"Test Folder {uuid4()}"
|
|
flow_name = f"Test Flow {uuid4()}"
|
|
folder = FolderCreate(name=folder_name, description="Test folder description")
|
|
response = await client.post("api/v1/folders/", json=folder.model_dump(), headers=logged_in_headers)
|
|
assert response.status_code == 201
|
|
created_folder = response.json()
|
|
folder_id = created_folder["id"]
|
|
|
|
# Create a flow in the folder
|
|
flow_data = orjson.loads(json_flow)
|
|
data = flow_data["data"]
|
|
flow = FlowCreate(name=flow_name, description="description", data=data)
|
|
flow.folder_id = folder_id
|
|
response = await client.post("api/v1/flows/", json=flow.model_dump(), headers=logged_in_headers)
|
|
assert response.status_code == 201
|
|
|
|
# Read the folder with flows
|
|
response = await client.get(f"api/v1/folders/{folder_id}", headers=logged_in_headers)
|
|
assert response.status_code == 200
|
|
folder_data = response.json()
|
|
assert folder_data["name"] == folder_name
|
|
assert folder_data["description"] == "Test folder description"
|
|
assert len(folder_data["flows"]) == 1
|
|
assert folder_data["flows"][0]["name"] == flow_name
|
|
|
|
|
|
@pytest.mark.usefixtures("active_user")
|
|
async def test_read_nonexistent_folder(client: AsyncClient, logged_in_headers):
|
|
nonexistent_id = str(uuid4())
|
|
response = await client.get(f"api/v1/folders/{nonexistent_id}", headers=logged_in_headers)
|
|
assert response.status_code == 404
|
|
assert response.json()["detail"] == "Folder not found"
|
|
|
|
|
|
@pytest.mark.usefixtures("active_user")
|
|
async def test_read_folder_with_search(client: AsyncClient, json_flow: str, logged_in_headers):
|
|
# Create a new folder
|
|
folder_name = f"Test Folder {uuid4()}"
|
|
folder = FolderCreate(name=folder_name, description="Test folder description")
|
|
response = await client.post("api/v1/folders/", json=folder.model_dump(), headers=logged_in_headers)
|
|
assert response.status_code == 201
|
|
created_folder = response.json()
|
|
folder_id = created_folder["id"]
|
|
|
|
# Create two flows in the folder
|
|
flow_data = orjson.loads(json_flow)
|
|
flow_name_1 = f"Test Flow 1 {uuid4()}"
|
|
flow_name_2 = f"Another Flow {uuid4()}"
|
|
|
|
flow1 = FlowCreate(
|
|
name=flow_name_1, description="Test flow description", data=flow_data["data"], folder_id=folder_id
|
|
)
|
|
flow2 = FlowCreate(
|
|
name=flow_name_2, description="Another flow description", data=flow_data["data"], folder_id=folder_id
|
|
)
|
|
flow1.folder_id = folder_id
|
|
flow2.folder_id = folder_id
|
|
await client.post("api/v1/flows/", json=flow1.model_dump(), headers=logged_in_headers)
|
|
await client.post("api/v1/flows/", json=flow2.model_dump(), headers=logged_in_headers)
|
|
|
|
# Read the folder with search
|
|
response = await client.get(
|
|
f"api/v1/folders/{folder_id}", headers=logged_in_headers, params={"search": "Test", "page": 1, "size": 10}
|
|
)
|
|
assert response.status_code == 200
|
|
folder_data = response.json()
|
|
assert len(folder_data["flows"]["items"]) == 1
|
|
assert folder_data["flows"]["items"][0]["name"] == flow_name_1
|
|
|
|
|
|
@pytest.mark.usefixtures("active_user")
|
|
async def test_read_folder_with_component_filter(client: AsyncClient, json_flow: str, logged_in_headers):
|
|
# Create a new folder
|
|
folder_name = f"Test Folder {uuid4()}"
|
|
folder = FolderCreate(name=folder_name, description="Test folder description")
|
|
response = await client.post("api/v1/folders/", json=folder.model_dump(), headers=logged_in_headers)
|
|
assert response.status_code == 201
|
|
created_folder = response.json()
|
|
folder_id = created_folder["id"]
|
|
|
|
# Create a component flow in the folder
|
|
flow_data = orjson.loads(json_flow)
|
|
component_flow_name = f"Component Flow {uuid4()}"
|
|
component_flow = FlowCreate(
|
|
name=component_flow_name,
|
|
description="Component flow description",
|
|
data=flow_data["data"],
|
|
folder_id=folder_id,
|
|
is_component=True,
|
|
)
|
|
component_flow.folder_id = folder_id
|
|
await client.post("api/v1/flows/", json=component_flow.model_dump(), headers=logged_in_headers)
|
|
|
|
# Read the folder with component filter
|
|
response = await client.get(
|
|
f"api/v1/folders/{folder_id}", headers=logged_in_headers, params={"is_component": True, "page": 1, "size": 10}
|
|
)
|
|
assert response.status_code == 200
|
|
folder_data = response.json()
|
|
assert len(folder_data["flows"]["items"]) == 1
|
|
assert folder_data["flows"]["items"][0]["name"] == component_flow_name
|
|
assert folder_data["flows"]["items"][0]["is_component"] == True # noqa: E712
|