From fcece1b29b804000302f3b5304e04189ad217add Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Mon, 1 Apr 2024 10:38:01 -0300 Subject: [PATCH] Add new schema for simplified API request --- src/backend/base/langflow/api/v1/endpoints.py | 99 ++++++++++--------- src/backend/base/langflow/api/v1/schemas.py | 12 ++- 2 files changed, 62 insertions(+), 49 deletions(-) diff --git a/src/backend/base/langflow/api/v1/endpoints.py b/src/backend/base/langflow/api/v1/endpoints.py index 48bd69fc8..50ab58ae9 100644 --- a/src/backend/base/langflow/api/v1/endpoints.py +++ b/src/backend/base/langflow/api/v1/endpoints.py @@ -1,5 +1,5 @@ from http import HTTPStatus -from typing import Annotated, List, Literal, Optional, Union +from typing import Annotated, List, Optional, Union import sqlalchemy as sa from fastapi import APIRouter, Body, Depends, HTTPException, UploadFile, status @@ -12,6 +12,7 @@ from langflow.api.v1.schemas import ( InputValueRequest, ProcessResponse, RunResponse, + SimplifiedAPIRequest, TaskStatusResponse, Tweaks, UpdateCustomComponentRequest, @@ -52,91 +53,93 @@ def get_all( @router.post("/run/{flow_id}", response_model=RunResponse, response_model_exclude_none=True) async def simplified_run_flow_with_caching( - session: Annotated[Session, Depends(get_session)], + db: Annotated[Session, Depends(get_session)], flow_id: str, - input_value: Optional[str] = "", - input_type: Optional[Literal["chat", "text", "any"]] = "chat", - output_type: Optional[Literal["chat", "text", "any", "debug"]] = "chat", - tweaks: Annotated[Optional[Tweaks], Body(embed=True)] = None, # noqa: F821 - stream: Annotated[bool, Body(embed=True)] = False, # noqa: F821 - session_id: Annotated[Union[None, str], Body(embed=True)] = None, # noqa: F821 + input_request: SimplifiedAPIRequest = SimplifiedAPIRequest(), + stream: bool = False, api_key_user: User = Depends(api_key_security), session_service: SessionService = Depends(get_session_service), ): """ - Executes a specified flow by ID, offering options for input, output customization, and performance enhancements through caching. + Executes a specified flow by ID with input customization, performance enhancements through caching, and optional data streaming. - Parameters: - - `session` (Session): Database session for executing queries. - - `flow_id` (str): Unique identifier of the flow to execute. - - `input_value` (Optional[str], default=""): Input value to pass to the flow. Defaults to an empty string. - - `input_type` (Optional[Literal["chat", "text", "any"]], default="chat"): Type of the input value. - - `output_type` (Optional[Literal["chat", "text", "any", "debug"]], default="chat"): Desired type of output. If "debug", all outputs are returned. - - `tweaks` (Optional[Tweaks], default=None): Optional parameter tweaks to customize flow execution. - - `stream` (bool, default=False): If true, outputs are streamed back as they are generated. - - `session_id` (Union[None, str], default=None): Session ID to reuse existing session data, enhancing efficiency. - - `api_key_user` (User): User object derived from the provided API key, ensuring secure access. - - `session_service` (SessionService): Service for session management, crucial for caching and session reuse. + ### Parameters: + - `db` (Session): Database session for executing queries. + - `flow_id` (str): Unique identifier of the flow to be executed. + - `input_request` (SimplifiedAPIRequest): A request model containing: + - `input_value` (Optional[str], default=""): Input value to pass to the flow. + - `input_type` (Optional[Literal["chat", "text", "any"]], default="chat"): Type of the input value, determining how the input is interpreted. + - `output_type` (Optional[Literal["chat", "text", "any", "debug"]], default="chat"): Desired type of output, affecting which components' outputs are included in the response. + - `tweaks` (Optional[Tweaks], default=None): Adjustments to the flow's behavior, allowing for custom execution parameters. + - `session_id` (Optional[str], default=None): An identifier for reusing session data, aiding in performance for subsequent requests. + - `api_key_user` (User): User object derived from the provided API key, used for authentication. + - `session_service` (SessionService): Service for managing flow sessions, essential for session reuse and caching. - Returns: - - `RunResponse`: Object containing the flow execution results and the session ID, allowing for result retrieval and session management. + ### Returns: + - A `RunResponse` object containing the execution results, including selected (or all, based on `output_type`) outputs of the flow and the session ID, facilitating result retrieval and further interactions in a session context. - Raises: - - HTTPException: 404 if the specified flow or session cannot be found; 500 for internal errors during execution. + ### Raises: + - HTTPException: 404 if the specified flow ID curl -X 'POST' \ - Example: - ```http - POST /run/{flow_id} - Content-Type: application/json - x-api-key: YOUR_API_KEY - - { - "input_value": "Sample input", - "input_type": "text", - "output_type": "debug", - "tweaks": {"example_tweak": "value"}, - "stream": true - } + ### Example: + ```bash + curl -X 'POST' \ + 'http:///run/{flow_id}' \ + -H 'accept: application/json' \ + -H 'Content-Type: application/json' \ + -H 'x-api-key: YOU_API_KEY' \ + -H ' + -d '{ + "input_value": "Sample input", + "input_type": "chat", + "output_type": "chat", + "tweaks": {}, + }' ``` - This endpoint serves as a flexible and efficient way to execute flows with customizable inputs and outputs, leveraging caching for improved performance. + This endpoint provides a powerful interface for executing flows with enhanced flexibility and efficiency, supporting a wide range of applications by allowing for dynamic input and output configuration along with performance optimizations through session management and caching. """ try: task_result: List[RunOutputs] = [] artifacts = {} - if session_id: - session_data = await session_service.load_session(session_id, flow_id=flow_id) + if input_request.session_id: + session_data = await session_service.load_session(input_request.session_id, flow_id=flow_id) graph, artifacts = session_data if session_data else (None, None) if graph is None: - raise ValueError(f"Session {session_id} not found") + raise ValueError(f"Session {input_request.session_id} not found") else: # Get the flow that matches the flow_id and belongs to the user # flow = session.query(Flow).filter(Flow.id == flow_id).filter(Flow.user_id == api_key_user.id).first() - flow = session.exec(select(Flow).where(Flow.id == flow_id).where(Flow.user_id == api_key_user.id)).first() + flow = db.exec(select(Flow).where(Flow.id == flow_id).where(Flow.user_id == api_key_user.id)).first() if flow is None: raise ValueError(f"Flow {flow_id} not found") if flow.data is None: raise ValueError(f"Flow {flow_id} has no data") graph_data = flow.data - graph_data = process_tweaks(graph_data, tweaks or {}) + graph_data = process_tweaks(graph_data, input_request.tweaks or {}) graph = Graph.from_payload(graph_data, flow_id=flow_id) - inputs = [InputValueRequest(components=[], input_value=input_value, type=input_type)] + inputs = [ + InputValueRequest(components=[], input_value=input_request.input_value, type=input_request.input_type) + ] # outputs is a list of all components that should return output # we need to get them by checking their type # if the output type is debug, we return all outputs # if the output type is any, we return all outputs that are either chat or text # if the output type is chat or text, we return only the outputs that match the type outputs = [ - vertex + vertex.id for vertex in graph.vertices - if output_type == "debug" - or (vertex.is_output and (output_type == "any" or output_type in vertex.id.lower())) + if input_request.output_type == "debug" + or ( + vertex.is_output + and (input_request.output_type == "any" or input_request.output_type in vertex.id.lower()) + ) ] task_result, session_id = await run_graph( graph=graph, flow_id=flow_id, - session_id=session_id, + session_id=input_request.session_id, inputs=inputs, outputs=outputs, artifacts=artifacts, diff --git a/src/backend/base/langflow/api/v1/schemas.py b/src/backend/base/langflow/api/v1/schemas.py index eb8a4dd8c..bd6adb3a6 100644 --- a/src/backend/base/langflow/api/v1/schemas.py +++ b/src/backend/base/langflow/api/v1/schemas.py @@ -8,6 +8,8 @@ from pydantic import BaseModel, ConfigDict, Field, RootModel, field_validator, m from langflow.graph.schema import RunOutputs from langflow.schema import dotdict +from langflow.schema.schema import InputType, OutputType +from langflow.schema.schema import InputType, OutputType from langflow.services.database.models.api_key.model import ApiKeyRead from langflow.services.database.models.base import orjson_dumps from langflow.services.database.models.flow import FlowCreate, FlowRead @@ -259,7 +261,7 @@ class VerticesBuiltResponse(BaseModel): class InputValueRequest(BaseModel): components: Optional[List[str]] = [] input_value: Optional[str] = None - type: Optional[Literal["chat", "text", "any"]] = Field( + type: Optional[InputType] = Field( "any", description="Defines on which components the input value should be applied. 'any' applies to all input components.", ) @@ -310,3 +312,11 @@ class Tweaks(RootModel): def items(self): return self.root.items() + + +class SimplifiedAPIRequest(BaseModel): + input_value: Optional[str] = Field(default="", description="The input value") + input_type: Optional[InputType] = Field(default="chat", description="The input type") + output_type: Optional[OutputType] = Field(default="chat", description="The output type") + tweaks: Optional[Tweaks] = Field(default=None, description="The tweaks") + session_id: Optional[str] = Field(default=None, description="The session id")