Merge branch 'zustand/io/migration' of personal:logspace-ai/langflow into zustand/io/migration

This commit is contained in:
anovazzi1 2024-04-01 16:46:34 -03:00
commit 33e171e52b
10 changed files with 209 additions and 78 deletions

View file

@ -290,6 +290,7 @@ def run_langflow(host, port, log_level, options, app):
host=host,
port=port,
log_level=log_level.lower(),
loop="asyncio",
)
else:
from langflow.server import LangflowApplication

View file

@ -23,7 +23,7 @@ from langflow.graph.schema import RunOutputs
from langflow.interface.custom.custom_component import CustomComponent
from langflow.interface.custom.directory_reader import DirectoryReader
from langflow.interface.custom.utils import build_custom_component_template
from langflow.processing.process import process_tweaks, run_graph
from langflow.processing.process import process_tweaks, run_graph_internal
from langflow.services.auth.utils import api_key_security, get_current_active_user
from langflow.services.cache.utils import save_uploaded_file
from langflow.services.database.models.flow import Flow
@ -52,7 +52,7 @@ def get_all(
@router.post("/run/{flow_id}", response_model=RunResponse, response_model_exclude_none=True)
async def simplified_run_flow_with_caching(
async def simplified_run_flow(
db: Annotated[Session, Depends(get_session)],
flow_id: str,
input_request: SimplifiedAPIRequest = SimplifiedAPIRequest(),
@ -66,15 +66,24 @@ async def simplified_run_flow_with_caching(
### Parameters:
- `db` (Session): Database session for executing queries.
- `flow_id` (str): Unique identifier of the flow to be executed.
- `input_request` (SimplifiedAPIRequest): A request model containing:
- `input_value` (Optional[str], default=""): Input value to pass to the flow.
- `input_type` (Optional[Literal["chat", "text", "any"]], default="chat"): Type of the input value, determining how the input is interpreted.
- `output_type` (Optional[Literal["chat", "text", "any", "debug"]], default="chat"): Desired type of output, affecting which components' outputs are included in the response.
- `tweaks` (Optional[Tweaks], default=None): Adjustments to the flow's behavior, allowing for custom execution parameters.
- `session_id` (Optional[str], default=None): An identifier for reusing session data, aiding in performance for subsequent requests.
- `input_request` (SimplifiedAPIRequest): Request object containing input values, types, output selection, tweaks, and session ID.
- `api_key_user` (User): User object derived from the provided API key, used for authentication.
- `session_service` (SessionService): Service for managing flow sessions, essential for session reuse and caching.
### SimplifiedAPIRequest:
- `input_value` (Optional[str], default=""): Input value to pass to the flow.
- `input_type` (Optional[Literal["chat", "text", "any"]], default="chat"): Type of the input value, determining how the input is interpreted.
- `output_type` (Optional[Literal["chat", "text", "any", "debug"]], default="chat"): Desired type of output, affecting which components' outputs are included in the response. If set to "debug", all outputs are returned.
- `output_component` (Optional[str], default=None): Specific component output to retrieve. If provided, only the output of the specified component is returned. This overrides the `output_type` parameter.
- `tweaks` (Optional[Tweaks], default=None): Adjustments to the flow's behavior, allowing for custom execution parameters.
- `session_id` (Optional[str], default=None): An identifier for reusing session data, aiding in performance for subsequent requests.
### Tweaks
A dictionary of tweaks to customize the flow execution. The tweaks can be used to modify the flow's parameters and components. Tweaks can be overridden by the input values.
You can use Component's `id` or Display Name as key to tweak a specific component (e.g., `{"Component Name": {"parameter_name": "value"}}`).
You can also use the parameter name as key to tweak all components with that parameter (e.g., `{"parameter_name": "value"}`).
### Returns:
- A `RunResponse` object containing the execution results, including selected (or all, based on `output_type`) outputs of the flow and the session ID, facilitating result retrieval and further interactions in a session context.
@ -128,16 +137,19 @@ async def simplified_run_flow_with_caching(
# if the output type is debug, we return all outputs
# if the output type is any, we return all outputs that are either chat or text
# if the output type is chat or text, we return only the outputs that match the type
outputs = [
vertex.id
for vertex in graph.vertices
if input_request.output_type == "debug"
or (
vertex.is_output
and (input_request.output_type == "any" or input_request.output_type in vertex.id.lower())
)
]
task_result, session_id = await run_graph(
if input_request.output_component:
outputs = [input_request.output_component]
else:
outputs = [
vertex.id
for vertex in graph.vertices
if input_request.output_type == "debug"
or (
vertex.is_output
and (input_request.output_type == "any" or input_request.output_type in vertex.id.lower())
)
]
task_result, session_id = await run_graph_internal(
graph=graph,
flow_id=flow_id,
session_id=input_request.session_id,
@ -171,7 +183,7 @@ async def simplified_run_flow_with_caching(
@router.post("/run/advanced/{flow_id}", response_model=RunResponse, response_model_exclude_none=True)
async def experimental_run_flow_with_caching(
async def experimental_run_flow(
session: Annotated[Session, Depends(get_session)],
flow_id: str,
inputs: Optional[List[InputValueRequest]] = [InputValueRequest(components=[], input_value="")],
@ -243,7 +255,7 @@ async def experimental_run_flow_with_caching(
graph_data = flow.data
graph_data = process_tweaks(graph_data, tweaks or {})
graph = Graph.from_payload(graph_data, flow_id=flow_id)
task_result, session_id = await run_graph(
task_result, session_id = await run_graph_internal(
graph=graph,
flow_id=flow_id,
session_id=session_id,

View file

@ -1,7 +1,7 @@
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Literal, Optional, Union
from typing import Any, Dict, List, Optional, Union
from uuid import UUID
from pydantic import BaseModel, ConfigDict, Field, RootModel, field_validator, model_serializer
@ -9,7 +9,6 @@ from pydantic import BaseModel, ConfigDict, Field, RootModel, field_validator, m
from langflow.graph.schema import RunOutputs
from langflow.schema import dotdict
from langflow.schema.schema import InputType, OutputType
from langflow.schema.schema import InputType, OutputType
from langflow.services.database.models.api_key.model import ApiKeyRead
from langflow.services.database.models.base import orjson_dumps
from langflow.services.database.models.flow import FlowCreate, FlowRead
@ -318,5 +317,9 @@ class SimplifiedAPIRequest(BaseModel):
input_value: Optional[str] = Field(default="", description="The input value")
input_type: Optional[InputType] = Field(default="chat", description="The input type")
output_type: Optional[OutputType] = Field(default="chat", description="The output type")
output_component: Optional[str] = Field(
default="",
description="If there are multiple output components, you can specify the component to get the output from.",
)
tweaks: Optional[Tweaks] = Field(default=None, description="The tweaks")
session_id: Optional[str] = Field(default=None, description="The session id")

View file

@ -1,9 +1,10 @@
import json
from pathlib import Path
from typing import Optional, Union
from typing import List, Optional, Union
from langflow.graph import Graph
from langflow.processing.process import process_tweaks
from langflow.graph.schema import RunOutputs
from langflow.processing.process import process_tweaks, run_graph
def load_flow_from_json(flow: Union[Path, str, dict], tweaks: Optional[dict] = None) -> Graph:
@ -31,3 +32,36 @@ def load_flow_from_json(flow: Union[Path, str, dict], tweaks: Optional[dict] = N
graph = Graph.from_payload(graph_data)
return graph
def run_flow_from_json(
flow: Union[Path, str, dict],
input_value: str,
tweaks: Optional[dict] = None,
input_type: str = "chat",
output_type: str = "chat",
output_component: Optional[str] = None,
) -> List[RunOutputs]:
"""
Runs a JSON flow by loading it from a file or dictionary and executing it with the given input value.
Args:
flow (Union[Path, str, dict]): The path to the JSON file, or the JSON dictionary representing the flow.
input_value (str): The input value to be processed by the flow.
tweaks (Optional[dict], optional): Optional tweaks to be applied to the flow. Defaults to None.
input_type (str, optional): The type of the input value. Defaults to "chat".
output_type (str, optional): The type of the output value. Defaults to "chat".
output_component (Optional[str], optional): The specific output component to retrieve. Defaults to None.
Returns:
None: The result of running the flow.
"""
graph = load_flow_from_json(flow, tweaks)
result = run_graph(
graph=graph,
input_value=input_value,
input_type=input_type,
output_type=output_type,
output_component=output_component,
)
return result

View file

@ -124,10 +124,10 @@ class Result(BaseModel):
session_id: str
async def run_graph(
async def run_graph_internal(
graph: "Graph",
flow_id: str,
stream: bool,
stream: bool = False,
session_id: Optional[str] = None,
inputs: Optional[List["InputValueRequest"]] = None,
outputs: Optional[List[str]] = None,
@ -167,6 +167,58 @@ async def run_graph(
return run_outputs, session_id_str
def run_graph(
graph: "Graph",
input_value: str,
input_type: str,
output_type: str,
output_component: Optional[str] = None,
) -> List[RunOutputs]:
"""
Runs the given Langflow Graph with the specified input and returns the outputs.
Args:
graph (Graph): The graph to be executed.
input_value (str): The input value to be passed to the graph.
input_type (str): The type of the input value.
output_type (str): The type of the desired output.
output_component (Optional[str], optional): The specific output component to retrieve. Defaults to None.
Returns:
List[RunOutputs]: A list of RunOutputs objects representing the outputs of the graph.
"""
inputs = [InputValueRequest(components=[], input_value=input_value, type=input_type)]
if output_component:
outputs = [output_component]
else:
outputs = [
vertex.id
for vertex in graph.vertices
if output_type == "debug"
or (vertex.is_output and (output_type == "any" or output_type in vertex.id.lower()))
]
components = []
inputs_list = []
types = []
for input_value_request in inputs:
if input_value_request.input_value is None:
logger.warning("InputValueRequest input_value cannot be None, defaulting to an empty string.")
input_value_request.input_value = ""
components.append(input_value_request.components or [])
inputs_list.append({INPUT_FIELD_NAME: input_value_request.input_value})
types.append(input_value_request.type)
run_outputs = graph.run(
inputs_list,
components,
types,
outputs or [],
stream=False,
session_id="",
)
return run_outputs
def validate_input(
graph_data: Dict[str, Any], tweaks: Union["Tweaks", Dict[str, Dict[str, Any]]]
) -> List[Dict[str, Any]]:

View file

@ -1,11 +1,16 @@
from gunicorn.app.base import BaseApplication # type: ignore
from uvicorn.workers import UvicornWorker
class LangflowUvicornWorker(UvicornWorker):
CONFIG_KWARGS = {"loop": "asyncio"}
class LangflowApplication(BaseApplication):
def __init__(self, app, options=None):
self.options = options or {}
self.options["worker_class"] = "uvicorn.workers.UvicornWorker"
self.options["worker_class"] = "langflow.server.LangflowUvicornWorker"
self.application = app
super().__init__()

View file

@ -289,11 +289,10 @@ export function buildTweakObject(tweak: tweakType) {
/**
* Function to get Chat Input Field
* @param {FlowType} flow - The current flow.
* @param {FlowsState} tabsState - The current tabs state.
* @returns {string} - The chat input field
*/
export function getChatInputField(flow: FlowType, flowState?: FlowState) {
export function getChatInputField(flowState?: FlowState) {
let chat_input_field = "text";
if (flowState && flowState.input_keys) {
@ -305,13 +304,14 @@ export function getChatInputField(flow: FlowType, flowState?: FlowState) {
/**
* Function to get the python code for the API
* @param {string} flowId - The id of the flow
* @param {boolean} isAuth - If the API is authenticated
* @param {any[]} tweak - The tweaks
* @returns {string} - The python code
*/
export function getPythonApiCode(
flow: FlowType,
isAuth: boolean,
tweak?: any[],
flowState?: FlowState
tweak?: any[]
): string {
const flowId = flow.id;
@ -320,13 +320,10 @@ export function getPythonApiCode(
// node.data.id
// }
const tweaks = buildTweaks(flow);
const inputs = buildInputs();
return `import requests
from typing import Optional
BASE_API_URL = "${window.location.protocol}//${
window.location.host
}/api/v1/process"
BASE_API_URL = "${window.location.protocol}//${window.location.host}/api/v1/run"
FLOW_ID = "${flowId}"
# You can tweak the flow by adding a tweaks dictionary
# e.g {"OpenAI-XXXXX": {"model_name": "gpt-4"}}
@ -336,9 +333,12 @@ TWEAKS = ${
: JSON.stringify(tweaks, null, 2)
}
def run_flow(inputs: dict, flow_id: str, tweaks: Optional[dict] = None${
!isAuth ? `, api_key: Optional[str] = None` : ""
}) -> dict:
def run_flow(message: str,
flow_id: str,
output_type: str = "chat",
input_type: str = "chat",
tweaks: Optional[dict] = None,
api_key: Optional[str] = None) -> dict:
"""
Run a flow with a given message and optional tweaks.
@ -349,7 +349,11 @@ def run_flow(inputs: dict, flow_id: str, tweaks: Optional[dict] = None${
"""
api_url = f"{BASE_API_URL}/{flow_id}"
payload = {"inputs": inputs}
payload = {
"input_value": message,
"output_type": output_type,
"input_type": input_type,
}
headers = None
if tweaks:
payload["tweaks"] = tweaks
@ -359,9 +363,9 @@ def run_flow(inputs: dict, flow_id: str, tweaks: Optional[dict] = None${
return response.json()
# Setup any tweaks you want to apply to the flow
inputs = ${inputs}
message = "message"
${!isAuth ? `api_key = "<your api key>"` : ""}
print(run_flow(inputs, flow_id=FLOW_ID, tweaks=TWEAKS${
print(run_flow(message=message, flow_id=FLOW_ID, tweaks=TWEAKS${
!isAuth ? `, api_key=api_key` : ""
}))`;
}
@ -369,28 +373,27 @@ print(run_flow(inputs, flow_id=FLOW_ID, tweaks=TWEAKS${
/**
* Function to get the curl code for the API
* @param {string} flowId - The id of the flow
* @param {boolean} isAuth - If the API is authenticated
* @returns {string} - The curl code
*/
export function getCurlCode(
flow: FlowType,
isAuth: boolean,
tweak?: any[],
flowState?: FlowState
tweak?: any[]
): string {
const flowId = flow.id;
const tweaks = buildTweaks(flow);
const inputs = buildInputs();
const arrayOfOutputs = getOutputIds(flow);
return `curl -X POST \\
${window.location.protocol}//${window.location.host}/api/v1/run/${flowId} \\
${window.location.protocol}//${
window.location.host
}/api/v1/run/${flowId}?stream=false \\
-H 'Content-Type: application/json'\\${
!isAuth ? `\n -H 'x-api-key: <your api key>'\\` : ""
}
-d '{"inputs": [${inputs}],
"outputs": [${arrayOfOutputs}],
"stream": false,
-d '{"input_value": "message",
"output_type": "chat",
"input_type": "chat",
"tweaks": ${
tweak && tweak.length > 0
? buildTweakObject(tweak)
@ -419,26 +422,23 @@ export function getOutputIds(flow) {
/**
* Function to get the python code for the API
* @param {string} flow - The current flow
* @param {any[]} tweak - The tweaks
* @returns {string} - The python code
*/
export function getPythonCode(
flow: FlowType,
tweak?: any[],
flowState?: FlowState
): string {
export function getPythonCode(flow: FlowType, tweak?: any[]): string {
const flowName = flow.name;
const tweaks = buildTweaks(flow);
const inputs = buildInputs();
return `from langflow.load import load_flow_from_json
return `from langflow.load import run_flow_from_json
TWEAKS = ${
tweak && tweak.length > 0
? buildTweakObject(tweak)
: JSON.stringify(tweaks, null, 2)
}
flow = load_flow_from_json("${flowName}.json", tweaks=TWEAKS)
# Now you can use it like any chain
inputs = ${inputs}
flow(inputs)`;
result = run_flow_from_json(flow="${flowName}.json",
input_value="message",
tweaks=TWEAKS)`;
}
/**
@ -454,7 +454,7 @@ export function getWidgetCode(
const flowId = flow.id;
const flowName = flow.name;
const inputs = buildInputs();
let chat_input_field = getChatInputField(flow, flowState);
let chat_input_field = getChatInputField(flowState);
return `<script src="https://cdn.jsdelivr.net/gh/logspace-ai/langflow-embedded-chat@main/dist/build/static/js/bundle.min.js"></script>