Add new schema for simplified API request

This commit is contained in:
Gabriel Luiz Freitas Almeida 2024-04-01 10:38:01 -03:00
commit fcece1b29b
2 changed files with 62 additions and 49 deletions

View file

@ -1,5 +1,5 @@
from http import HTTPStatus
from typing import Annotated, List, Literal, Optional, Union
from typing import Annotated, List, Optional, Union
import sqlalchemy as sa
from fastapi import APIRouter, Body, Depends, HTTPException, UploadFile, status
@ -12,6 +12,7 @@ from langflow.api.v1.schemas import (
InputValueRequest,
ProcessResponse,
RunResponse,
SimplifiedAPIRequest,
TaskStatusResponse,
Tweaks,
UpdateCustomComponentRequest,
@ -52,91 +53,93 @@ def get_all(
@router.post("/run/{flow_id}", response_model=RunResponse, response_model_exclude_none=True)
async def simplified_run_flow_with_caching(
session: Annotated[Session, Depends(get_session)],
db: Annotated[Session, Depends(get_session)],
flow_id: str,
input_value: Optional[str] = "",
input_type: Optional[Literal["chat", "text", "any"]] = "chat",
output_type: Optional[Literal["chat", "text", "any", "debug"]] = "chat",
tweaks: Annotated[Optional[Tweaks], Body(embed=True)] = None, # noqa: F821
stream: Annotated[bool, Body(embed=True)] = False, # noqa: F821
session_id: Annotated[Union[None, str], Body(embed=True)] = None, # noqa: F821
input_request: SimplifiedAPIRequest = SimplifiedAPIRequest(),
stream: bool = False,
api_key_user: User = Depends(api_key_security),
session_service: SessionService = Depends(get_session_service),
):
"""
Executes a specified flow by ID, offering options for input, output customization, and performance enhancements through caching.
Executes a specified flow by ID with input customization, performance enhancements through caching, and optional data streaming.
Parameters:
- `session` (Session): Database session for executing queries.
- `flow_id` (str): Unique identifier of the flow to execute.
- `input_value` (Optional[str], default=""): Input value to pass to the flow. Defaults to an empty string.
- `input_type` (Optional[Literal["chat", "text", "any"]], default="chat"): Type of the input value.
- `output_type` (Optional[Literal["chat", "text", "any", "debug"]], default="chat"): Desired type of output. If "debug", all outputs are returned.
- `tweaks` (Optional[Tweaks], default=None): Optional parameter tweaks to customize flow execution.
- `stream` (bool, default=False): If true, outputs are streamed back as they are generated.
- `session_id` (Union[None, str], default=None): Session ID to reuse existing session data, enhancing efficiency.
- `api_key_user` (User): User object derived from the provided API key, ensuring secure access.
- `session_service` (SessionService): Service for session management, crucial for caching and session reuse.
### Parameters:
- `db` (Session): Database session for executing queries.
- `flow_id` (str): Unique identifier of the flow to be executed.
- `input_request` (SimplifiedAPIRequest): A request model containing:
- `input_value` (Optional[str], default=""): Input value to pass to the flow.
- `input_type` (Optional[Literal["chat", "text", "any"]], default="chat"): Type of the input value, determining how the input is interpreted.
- `output_type` (Optional[Literal["chat", "text", "any", "debug"]], default="chat"): Desired type of output, affecting which components' outputs are included in the response.
- `tweaks` (Optional[Tweaks], default=None): Adjustments to the flow's behavior, allowing for custom execution parameters.
- `session_id` (Optional[str], default=None): An identifier for reusing session data, aiding in performance for subsequent requests.
- `api_key_user` (User): User object derived from the provided API key, used for authentication.
- `session_service` (SessionService): Service for managing flow sessions, essential for session reuse and caching.
Returns:
- `RunResponse`: Object containing the flow execution results and the session ID, allowing for result retrieval and session management.
### Returns:
- A `RunResponse` object containing the execution results, including selected (or all, based on `output_type`) outputs of the flow and the session ID, facilitating result retrieval and further interactions in a session context.
Raises:
- HTTPException: 404 if the specified flow or session cannot be found; 500 for internal errors during execution.
### Raises:
- HTTPException: 404 if the specified flow ID curl -X 'POST' \
Example:
```http
POST /run/{flow_id}
Content-Type: application/json
x-api-key: YOUR_API_KEY
{
"input_value": "Sample input",
"input_type": "text",
"output_type": "debug",
"tweaks": {"example_tweak": "value"},
"stream": true
}
### Example:
```bash
curl -X 'POST' \
'http://<your_server>/run/{flow_id}' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-H 'x-api-key: YOU_API_KEY' \
-H '
-d '{
"input_value": "Sample input",
"input_type": "chat",
"output_type": "chat",
"tweaks": {},
}'
```
This endpoint serves as a flexible and efficient way to execute flows with customizable inputs and outputs, leveraging caching for improved performance.
This endpoint provides a powerful interface for executing flows with enhanced flexibility and efficiency, supporting a wide range of applications by allowing for dynamic input and output configuration along with performance optimizations through session management and caching.
"""
try:
task_result: List[RunOutputs] = []
artifacts = {}
if session_id:
session_data = await session_service.load_session(session_id, flow_id=flow_id)
if input_request.session_id:
session_data = await session_service.load_session(input_request.session_id, flow_id=flow_id)
graph, artifacts = session_data if session_data else (None, None)
if graph is None:
raise ValueError(f"Session {session_id} not found")
raise ValueError(f"Session {input_request.session_id} not found")
else:
# Get the flow that matches the flow_id and belongs to the user
# flow = session.query(Flow).filter(Flow.id == flow_id).filter(Flow.user_id == api_key_user.id).first()
flow = session.exec(select(Flow).where(Flow.id == flow_id).where(Flow.user_id == api_key_user.id)).first()
flow = db.exec(select(Flow).where(Flow.id == flow_id).where(Flow.user_id == api_key_user.id)).first()
if flow is None:
raise ValueError(f"Flow {flow_id} not found")
if flow.data is None:
raise ValueError(f"Flow {flow_id} has no data")
graph_data = flow.data
graph_data = process_tweaks(graph_data, tweaks or {})
graph_data = process_tweaks(graph_data, input_request.tweaks or {})
graph = Graph.from_payload(graph_data, flow_id=flow_id)
inputs = [InputValueRequest(components=[], input_value=input_value, type=input_type)]
inputs = [
InputValueRequest(components=[], input_value=input_request.input_value, type=input_request.input_type)
]
# outputs is a list of all components that should return output
# we need to get them by checking their type
# if the output type is debug, we return all outputs
# if the output type is any, we return all outputs that are either chat or text
# if the output type is chat or text, we return only the outputs that match the type
outputs = [
vertex
vertex.id
for vertex in graph.vertices
if output_type == "debug"
or (vertex.is_output and (output_type == "any" or output_type in vertex.id.lower()))
if input_request.output_type == "debug"
or (
vertex.is_output
and (input_request.output_type == "any" or input_request.output_type in vertex.id.lower())
)
]
task_result, session_id = await run_graph(
graph=graph,
flow_id=flow_id,
session_id=session_id,
session_id=input_request.session_id,
inputs=inputs,
outputs=outputs,
artifacts=artifacts,

View file

@ -8,6 +8,8 @@ from pydantic import BaseModel, ConfigDict, Field, RootModel, field_validator, m
from langflow.graph.schema import RunOutputs
from langflow.schema import dotdict
from langflow.schema.schema import InputType, OutputType
from langflow.schema.schema import InputType, OutputType
from langflow.services.database.models.api_key.model import ApiKeyRead
from langflow.services.database.models.base import orjson_dumps
from langflow.services.database.models.flow import FlowCreate, FlowRead
@ -259,7 +261,7 @@ class VerticesBuiltResponse(BaseModel):
class InputValueRequest(BaseModel):
components: Optional[List[str]] = []
input_value: Optional[str] = None
type: Optional[Literal["chat", "text", "any"]] = Field(
type: Optional[InputType] = Field(
"any",
description="Defines on which components the input value should be applied. 'any' applies to all input components.",
)
@ -310,3 +312,11 @@ class Tweaks(RootModel):
def items(self):
return self.root.items()
class SimplifiedAPIRequest(BaseModel):
input_value: Optional[str] = Field(default="", description="The input value")
input_type: Optional[InputType] = Field(default="chat", description="The input type")
output_type: Optional[OutputType] = Field(default="chat", description="The output type")
tweaks: Optional[Tweaks] = Field(default=None, description="The tweaks")
session_id: Optional[str] = Field(default=None, description="The session id")