Refactor flow.py to generate dynamic flow functions and build schemas
This commit is contained in:
parent
affe488791
commit
11f9445b29
1 changed files with 106 additions and 17 deletions
|
|
@ -1,5 +1,6 @@
|
|||
from typing import TYPE_CHECKING, Any, List, Optional, Union
|
||||
from typing import TYPE_CHECKING, Any, Callable, Coroutine, List, Optional, Tuple, Union
|
||||
|
||||
from pydantic.v1 import BaseModel, Field, create_model
|
||||
from sqlmodel import select
|
||||
|
||||
from langflow.schema.schema import INPUT_FIELD_NAME, Record
|
||||
|
|
@ -8,6 +9,13 @@ from langflow.services.deps import session_scope
|
|||
|
||||
if TYPE_CHECKING:
|
||||
from langflow.graph.graph.base import Graph
|
||||
from langflow.graph.vertex.base import Vertex
|
||||
|
||||
INPUT_TYPE_MAP = {
|
||||
"ChatInput": "str",
|
||||
"TextInput": "str",
|
||||
"JSONInput": "dict",
|
||||
}
|
||||
|
||||
|
||||
def list_flows(*, user_id: Optional[str] = None) -> List[Record]:
|
||||
|
|
@ -76,29 +84,110 @@ async def run_flow(
|
|||
return await graph.arun(inputs_list, inputs_components=inputs_components, types=types)
|
||||
|
||||
|
||||
def extract_argument_signatures(arguments):
|
||||
def generate_function_for_flow(inputs: List["Vertex"], flow_id: str) -> Coroutine:
|
||||
"""
|
||||
Extracts and formats function argument signatures with type hints.
|
||||
Generate a dynamic flow function based on the given inputs and flow ID.
|
||||
|
||||
Args:
|
||||
inputs (List[Vertex]): The list of input vertices for the flow.
|
||||
flow_id (str): The ID of the flow.
|
||||
|
||||
Returns:
|
||||
Coroutine: The dynamic flow function.
|
||||
|
||||
Raises:
|
||||
None
|
||||
|
||||
Example:
|
||||
inputs = [vertex1, vertex2]
|
||||
flow_id = "my_flow"
|
||||
function = generate_function_for_flow(inputs, flow_id)
|
||||
result = function(input1, input2)
|
||||
"""
|
||||
type_mapping = {"str": "str"} # Extend this mapping as needed.
|
||||
return [
|
||||
f"{arg['display_name'].replace(' ', '_').lower()}: {type_mapping.get(arg['type'], 'Any')}" for arg in arguments
|
||||
]
|
||||
# Prepare function arguments with type hints and default values
|
||||
args = [f"{input_.display_name.lower().replace(' ', '_')}: {INPUT_TYPE_MAP[input_.base_name]}" for input_ in inputs]
|
||||
|
||||
# Maintain original argument names for constructing the tweaks dictionary
|
||||
original_arg_names = [input_.display_name for input_ in inputs]
|
||||
|
||||
# Prepare a Pythonic, valid function argument string
|
||||
func_args = ", ".join(args)
|
||||
|
||||
# Map original argument names to their corresponding Pythonic variable names in the function
|
||||
arg_mappings = ", ".join(
|
||||
f'"{original_name}": {name}'
|
||||
for original_name, name in zip(original_arg_names, [arg.split(":")[0] for arg in args])
|
||||
)
|
||||
|
||||
func_body = f"""
|
||||
async def flow_function({func_args}):
|
||||
tweaks = {{ {arg_mappings} }}
|
||||
from langflow.helpers.flow import run_flow
|
||||
from langchain_core.tools import ToolException
|
||||
try:
|
||||
return await run_flow(
|
||||
tweaks={{key: {{'input_value': value}} for key, value in tweaks.items()}},
|
||||
flow_id="{flow_id}",
|
||||
)
|
||||
except Exception as e:
|
||||
raise ToolException(f'Error running flow: ' + e)
|
||||
"""
|
||||
|
||||
compiled_func = compile(func_body, "<string>", "exec")
|
||||
local_scope = {}
|
||||
exec(compiled_func, globals(), local_scope)
|
||||
return local_scope["dynamic_flow_function"]
|
||||
|
||||
|
||||
def create_function_definition(arg_signatures, body):
|
||||
def build_function_and_schema(flow_record: Record, graph: "Graph") -> Tuple[Callable, BaseModel]:
|
||||
"""
|
||||
Constructs the function definition string.
|
||||
Builds a dynamic function and schema for a given flow.
|
||||
|
||||
Args:
|
||||
flow_record (Record): The flow record containing information about the flow.
|
||||
graph (Graph): The graph representing the flow.
|
||||
|
||||
Returns:
|
||||
Tuple[Callable, BaseModel]: A tuple containing the dynamic function and the schema.
|
||||
"""
|
||||
func_signature = ", ".join(arg_signatures)
|
||||
return f"def dynamic_function({func_signature}):\n{body}"
|
||||
flow_id = flow_record.id
|
||||
inputs = get_flow_inputs(graph)
|
||||
dynamic_flow_function = generate_function_for_flow(inputs, flow_id)
|
||||
schema = build_schema_from_inputs(flow_record.name, inputs)
|
||||
return dynamic_flow_function, schema
|
||||
|
||||
|
||||
def define_dynamic_function(function_definition):
|
||||
def get_flow_inputs(graph: "Graph") -> List["Vertex"]:
|
||||
"""
|
||||
Defines the dynamic function by executing the function definition string
|
||||
within a local environment and returns the function object.
|
||||
Retrieves the flow inputs from the given graph.
|
||||
|
||||
Args:
|
||||
graph (Graph): The graph object representing the flow.
|
||||
|
||||
Returns:
|
||||
List[Record]: A list of input records, where each record contains the ID, name, and description of the input vertex.
|
||||
"""
|
||||
local_env = {}
|
||||
exec(function_definition, globals(), local_env)
|
||||
return local_env["dynamic_function"]
|
||||
inputs = []
|
||||
for vertex in graph.vertices:
|
||||
if vertex.is_input:
|
||||
inputs.append(vertex)
|
||||
return inputs
|
||||
|
||||
|
||||
def build_schema_from_inputs(name: str, inputs: List[tuple[str, str, str]]) -> BaseModel:
|
||||
"""
|
||||
Builds a schema from the given inputs.
|
||||
|
||||
Args:
|
||||
name (str): The name of the schema.
|
||||
inputs (List[tuple[str, str, str]]): A list of tuples representing the inputs.
|
||||
Each tuple contains three elements: the input name, the input type, and the input description.
|
||||
|
||||
Returns:
|
||||
BaseModel: The schema model.
|
||||
|
||||
"""
|
||||
fields = {}
|
||||
for input_ in inputs:
|
||||
fields[input_[1]] = (str, Field(default="", description=input_[2]))
|
||||
return create_model(name, **fields)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue