Refactor FlowToolComponent in FlowTool.py
This commit is contained in:
parent
11f9445b29
commit
8cedc611ad
1 changed files with 7 additions and 78 deletions
|
|
@ -1,13 +1,12 @@
|
|||
from typing import Any, Callable, List, Optional, Text, Tuple
|
||||
from typing import Any, List, Optional, Text
|
||||
|
||||
from langchain_core.tools import StructuredTool
|
||||
from loguru import logger
|
||||
from pydantic.v1 import BaseModel, Field, create_model
|
||||
|
||||
from langflow import CustomComponent
|
||||
from langflow.field_typing import Tool
|
||||
from langflow.graph.graph.base import Graph
|
||||
from langflow.schema import Record
|
||||
from langflow.helpers.flow import build_function_and_schema
|
||||
from langflow.schema.dotdict import dotdict
|
||||
|
||||
|
||||
|
|
@ -16,45 +15,6 @@ class FlowToolComponent(CustomComponent):
|
|||
description = "Construct a Tool from a function that runs the loaded Flow."
|
||||
field_order = ["flow_name", "name", "description", "return_direct"]
|
||||
|
||||
def generate_function_for_flow(self, inputs: List[tuple[str, str, str]], flow_id: str) -> Callable:
|
||||
# Prepare function arguments with type hints and default values
|
||||
args = [f'{input_[1].lower().replace(" ", "_")}: str = ""' for input_ in inputs]
|
||||
# Maintain original argument names for constructing the tweaks dictionary
|
||||
original_arg_names = [input_[1] for input_ in inputs]
|
||||
# Prepare a Pythonic, valid function argument string
|
||||
func_args = ", ".join(args)
|
||||
# Map original argument names to their corresponding Pythonic variable names in the function
|
||||
arg_mappings = ", ".join(
|
||||
[
|
||||
f'"{original_name}": {name}'
|
||||
for original_name, name in zip(original_arg_names, [arg.split(":")[0] for arg in args])
|
||||
]
|
||||
)
|
||||
|
||||
func_body = f"""
|
||||
async def dynamic_flow_function({func_args}):
|
||||
tweaks = {{ {arg_mappings} }}
|
||||
from langflow.helpers.flow import run_flow # Ensure this import exists or adjust accordingly
|
||||
return await run_flow(
|
||||
tweaks={{key: {{'input_value': value}} for key, value in tweaks.items()}},
|
||||
flow_id="{flow_id}",
|
||||
)
|
||||
"""
|
||||
local_scope = {}
|
||||
exec(func_body, globals(), local_scope)
|
||||
return local_scope["dynamic_flow_function"]
|
||||
|
||||
async def build_function_and_schema(self, flow_name: str) -> Tuple[Callable, BaseModel]:
|
||||
flow_record = self.get_flow(flow_name)
|
||||
if not flow_record:
|
||||
raise ValueError(f"Flow {flow_name} not found.")
|
||||
flow_id = flow_record.id # Assuming the flow record has an 'id' attribute
|
||||
graph = Graph.from_payload(flow_record.data["data"])
|
||||
inputs = self.get_flow_inputs(graph)
|
||||
dynamic_flow_function = self.generate_function_for_flow(inputs, flow_id)
|
||||
schema = self.build_schema_from_inputs(flow_name, inputs)
|
||||
return dynamic_flow_function, schema
|
||||
|
||||
def get_flow_names(self) -> List[str]:
|
||||
flow_records = self.list_flows()
|
||||
return [flow_record.data["name"] for flow_record in flow_records]
|
||||
|
|
@ -82,41 +42,6 @@ async def dynamic_flow_function({func_args}):
|
|||
|
||||
return build_config
|
||||
|
||||
def get_flow_inputs(self, graph: Graph) -> List[Record]:
|
||||
"""
|
||||
Retrieves the flow inputs from the given graph.
|
||||
|
||||
Args:
|
||||
graph (Graph): The graph object representing the flow.
|
||||
|
||||
Returns:
|
||||
List[Record]: A list of input records, where each record contains the ID, name, and description of the input vertex.
|
||||
"""
|
||||
inputs = []
|
||||
for vertex in graph.vertices:
|
||||
if vertex.is_input:
|
||||
inputs.append((vertex.id, vertex.display_name, vertex.description))
|
||||
logger.debug(inputs)
|
||||
return inputs
|
||||
|
||||
def build_schema_from_inputs(self, name: str, inputs: List[tuple[str, str, str]]) -> BaseModel:
|
||||
"""
|
||||
Builds a schema from the given inputs.
|
||||
|
||||
Args:
|
||||
name (str): The name of the schema.
|
||||
inputs (List[tuple[str, str, str]]): A list of tuples representing the inputs.
|
||||
Each tuple contains three elements: the input name, the input type, and the input description.
|
||||
|
||||
Returns:
|
||||
BaseModel: The schema model.
|
||||
|
||||
"""
|
||||
fields = {}
|
||||
for input_ in inputs:
|
||||
fields[input_[1]] = (str, Field(default="", description=input_[2]))
|
||||
return create_model(name, **fields)
|
||||
|
||||
def build_config(self):
|
||||
return {
|
||||
"flow_name": {
|
||||
|
|
@ -142,7 +67,11 @@ async def dynamic_flow_function({func_args}):
|
|||
}
|
||||
|
||||
async def build(self, flow_name: str, name: str, description: str, return_direct: bool = False) -> Tool:
|
||||
dynamic_flow_function, schema = await self.build_function_and_schema(flow_name)
|
||||
flow_record = self.get_flow(flow_name)
|
||||
if not flow_record:
|
||||
raise ValueError("Flow not found.")
|
||||
graph = Graph.from_payload(flow_record.data["data"])
|
||||
dynamic_flow_function, schema = build_function_and_schema(flow_record, graph)
|
||||
tool = StructuredTool.from_function(
|
||||
coroutine=dynamic_flow_function,
|
||||
name=name,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue