diff --git a/src/backend/langflow/helpers/flow.py b/src/backend/langflow/helpers/flow.py index cfb0a9881..90d360f57 100644 --- a/src/backend/langflow/helpers/flow.py +++ b/src/backend/langflow/helpers/flow.py @@ -1,5 +1,6 @@ -from typing import TYPE_CHECKING, Any, List, Optional, Union +from typing import TYPE_CHECKING, Any, Callable, Coroutine, List, Optional, Tuple, Union +from pydantic.v1 import BaseModel, Field, create_model from sqlmodel import select from langflow.schema.schema import INPUT_FIELD_NAME, Record @@ -8,6 +9,13 @@ from langflow.services.deps import session_scope if TYPE_CHECKING: from langflow.graph.graph.base import Graph + from langflow.graph.vertex.base import Vertex + +INPUT_TYPE_MAP = { + "ChatInput": "str", + "TextInput": "str", + "JSONInput": "dict", +} def list_flows(*, user_id: Optional[str] = None) -> List[Record]: @@ -76,29 +84,110 @@ async def run_flow( return await graph.arun(inputs_list, inputs_components=inputs_components, types=types) -def extract_argument_signatures(arguments): +def generate_function_for_flow(inputs: List["Vertex"], flow_id: str) -> Coroutine: """ - Extracts and formats function argument signatures with type hints. + Generate a dynamic flow function based on the given inputs and flow ID. + + Args: + inputs (List[Vertex]): The list of input vertices for the flow. + flow_id (str): The ID of the flow. + + Returns: + Coroutine: The dynamic flow function. + + Raises: + None + + Example: + inputs = [vertex1, vertex2] + flow_id = "my_flow" + function = generate_function_for_flow(inputs, flow_id) + result = function(input1, input2) """ - type_mapping = {"str": "str"} # Extend this mapping as needed. - return [ - f"{arg['display_name'].replace(' ', '_').lower()}: {type_mapping.get(arg['type'], 'Any')}" for arg in arguments - ] + # Prepare function arguments with type hints and default values + args = [f"{input_.display_name.lower().replace(' ', '_')}: {INPUT_TYPE_MAP[input_.base_name]}" for input_ in inputs] + + # Maintain original argument names for constructing the tweaks dictionary + original_arg_names = [input_.display_name for input_ in inputs] + + # Prepare a Pythonic, valid function argument string + func_args = ", ".join(args) + + # Map original argument names to their corresponding Pythonic variable names in the function + arg_mappings = ", ".join( + f'"{original_name}": {name}' + for original_name, name in zip(original_arg_names, [arg.split(":")[0] for arg in args]) + ) + + func_body = f""" +async def flow_function({func_args}): + tweaks = {{ {arg_mappings} }} + from langflow.helpers.flow import run_flow + from langchain_core.tools import ToolException + try: + return await run_flow( + tweaks={{key: {{'input_value': value}} for key, value in tweaks.items()}}, + flow_id="{flow_id}", + ) + except Exception as e: + raise ToolException(f'Error running flow: ' + e) +""" + + compiled_func = compile(func_body, "", "exec") + local_scope = {} + exec(compiled_func, globals(), local_scope) + return local_scope["dynamic_flow_function"] -def create_function_definition(arg_signatures, body): +def build_function_and_schema(flow_record: Record, graph: "Graph") -> Tuple[Callable, BaseModel]: """ - Constructs the function definition string. + Builds a dynamic function and schema for a given flow. + + Args: + flow_record (Record): The flow record containing information about the flow. + graph (Graph): The graph representing the flow. + + Returns: + Tuple[Callable, BaseModel]: A tuple containing the dynamic function and the schema. """ - func_signature = ", ".join(arg_signatures) - return f"def dynamic_function({func_signature}):\n{body}" + flow_id = flow_record.id + inputs = get_flow_inputs(graph) + dynamic_flow_function = generate_function_for_flow(inputs, flow_id) + schema = build_schema_from_inputs(flow_record.name, inputs) + return dynamic_flow_function, schema -def define_dynamic_function(function_definition): +def get_flow_inputs(graph: "Graph") -> List["Vertex"]: """ - Defines the dynamic function by executing the function definition string - within a local environment and returns the function object. + Retrieves the flow inputs from the given graph. + + Args: + graph (Graph): The graph object representing the flow. + + Returns: + List[Record]: A list of input records, where each record contains the ID, name, and description of the input vertex. """ - local_env = {} - exec(function_definition, globals(), local_env) - return local_env["dynamic_function"] + inputs = [] + for vertex in graph.vertices: + if vertex.is_input: + inputs.append(vertex) + return inputs + + +def build_schema_from_inputs(name: str, inputs: List[tuple[str, str, str]]) -> BaseModel: + """ + Builds a schema from the given inputs. + + Args: + name (str): The name of the schema. + inputs (List[tuple[str, str, str]]): A list of tuples representing the inputs. + Each tuple contains three elements: the input name, the input type, and the input description. + + Returns: + BaseModel: The schema model. + + """ + fields = {} + for input_ in inputs: + fields[input_[1]] = (str, Field(default="", description=input_[2])) + return create_model(name, **fields)