379 lines
13 KiB
Python
379 lines
13 KiB
Python
import ast
|
|
import contextlib
|
|
import re
|
|
import traceback
|
|
import warnings
|
|
from typing import Any, Dict, List, Optional, Union
|
|
from uuid import UUID
|
|
|
|
from fastapi import HTTPException
|
|
from loguru import logger
|
|
|
|
from langflow.field_typing.range_spec import RangeSpec
|
|
from langflow.interface.custom.code_parser.utils import extract_inner_type
|
|
from langflow.interface.custom.custom_component import CustomComponent
|
|
from langflow.interface.custom.directory_reader.utils import (
|
|
build_custom_component_list_from_path,
|
|
determine_component_name,
|
|
merge_nested_dicts_with_renaming,
|
|
)
|
|
from langflow.interface.custom.eval import eval_custom_component_code
|
|
from langflow.template.field.base import TemplateField
|
|
from langflow.template.frontend_node.custom_components import CustomComponentFrontendNode
|
|
from langflow.utils import validate
|
|
from langflow.utils.util import get_base_classes
|
|
|
|
|
|
def add_output_types(frontend_node: CustomComponentFrontendNode, return_types: List[str]):
|
|
"""Add output types to the frontend node"""
|
|
for return_type in return_types:
|
|
if return_type is None:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail={
|
|
"error": ("Invalid return type. Please check your code and try again."),
|
|
"traceback": traceback.format_exc(),
|
|
},
|
|
)
|
|
if hasattr(return_type, "__name__"):
|
|
return_type = return_type.__name__
|
|
elif hasattr(return_type, "__class__"):
|
|
return_type = return_type.__class__.__name__
|
|
else:
|
|
return_type = str(return_type)
|
|
|
|
frontend_node.add_output_type(return_type)
|
|
|
|
|
|
def add_base_classes(frontend_node: CustomComponentFrontendNode, return_types: List[str]):
|
|
"""Add base classes to the frontend node"""
|
|
for return_type_instance in return_types:
|
|
if return_type_instance is None:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail={
|
|
"error": ("Invalid return type. Please check your code and try again."),
|
|
"traceback": traceback.format_exc(),
|
|
},
|
|
)
|
|
|
|
base_classes = get_base_classes(return_type_instance)
|
|
|
|
for base_class in base_classes:
|
|
frontend_node.add_base_class(base_class)
|
|
|
|
|
|
def extract_type_from_optional(field_type):
|
|
"""
|
|
Extract the type from a string formatted as "Optional[<type>]".
|
|
|
|
Parameters:
|
|
field_type (str): The string from which to extract the type.
|
|
|
|
Returns:
|
|
str: The extracted type, or an empty string if no type was found.
|
|
"""
|
|
match = re.search(r"\[(.*?)\]$", field_type)
|
|
return match[1] if match else None
|
|
|
|
|
|
def get_field_properties(extra_field):
|
|
"""Get the properties of an extra field"""
|
|
field_name = extra_field["name"]
|
|
field_type = extra_field.get("type", "str")
|
|
field_value = extra_field.get("default", "")
|
|
field_required = "optional" not in field_type.lower()
|
|
|
|
if not field_required:
|
|
field_type = extract_type_from_optional(field_type)
|
|
if field_value is not None:
|
|
with contextlib.suppress(Exception):
|
|
field_value = ast.literal_eval(field_value)
|
|
return field_name, field_type, field_value, field_required
|
|
|
|
|
|
def process_type(field_type: str):
|
|
if field_type.startswith("list") or field_type.startswith("List"):
|
|
return extract_inner_type(field_type)
|
|
return "prompt" if field_type == "Prompt" else field_type
|
|
|
|
|
|
def add_new_custom_field(
|
|
frontend_node: CustomComponentFrontendNode,
|
|
field_name: str,
|
|
field_type: str,
|
|
field_value: Any,
|
|
field_required: bool,
|
|
field_config: dict,
|
|
):
|
|
# Check field_config if any of the keys are in it
|
|
# if it is, update the value
|
|
display_name = field_config.pop("display_name", field_name)
|
|
field_type = field_config.pop("field_type", field_type)
|
|
field_contains_list = "list" in field_type.lower()
|
|
field_type = process_type(field_type)
|
|
field_value = field_config.pop("value", field_value)
|
|
field_advanced = field_config.pop("advanced", False)
|
|
|
|
if field_type == "bool" and field_value is None:
|
|
field_value = False
|
|
|
|
# If options is a list, then it's a dropdown
|
|
# If options is None, then it's a list of strings
|
|
is_list = isinstance(field_config.get("options"), list)
|
|
field_config["is_list"] = is_list or field_config.get("is_list", False) or field_contains_list
|
|
|
|
if "name" in field_config:
|
|
warnings.warn("The 'name' key in field_config is used to build the object and can't be changed.")
|
|
required = field_config.pop("required", field_required)
|
|
placeholder = field_config.pop("placeholder", "")
|
|
|
|
new_field = TemplateField(
|
|
name=field_name,
|
|
field_type=field_type,
|
|
value=field_value,
|
|
show=True,
|
|
required=required,
|
|
advanced=field_advanced,
|
|
placeholder=placeholder,
|
|
display_name=display_name,
|
|
**sanitize_field_config(field_config),
|
|
)
|
|
frontend_node.template.upsert_field(field_name, new_field)
|
|
if isinstance(frontend_node.custom_fields, dict):
|
|
frontend_node.custom_fields[field_name] = None
|
|
|
|
return frontend_node
|
|
|
|
|
|
def add_extra_fields(frontend_node, field_config, function_args):
|
|
"""Add extra fields to the frontend node"""
|
|
if not function_args:
|
|
return
|
|
|
|
# sort function_args which is a list of dicts
|
|
function_args.sort(key=lambda x: x["name"])
|
|
|
|
for extra_field in function_args:
|
|
if "name" not in extra_field or extra_field["name"] == "self":
|
|
continue
|
|
|
|
field_name, field_type, field_value, field_required = get_field_properties(extra_field)
|
|
config = field_config.get(field_name, {})
|
|
frontend_node = add_new_custom_field(
|
|
frontend_node,
|
|
field_name,
|
|
field_type,
|
|
field_value,
|
|
field_required,
|
|
config,
|
|
)
|
|
|
|
|
|
def get_field_dict(field: Union[TemplateField, dict]):
|
|
"""Get the field dictionary from a TemplateField or a dict"""
|
|
if isinstance(field, TemplateField):
|
|
return field.model_dump(by_alias=True, exclude_none=True)
|
|
return field
|
|
|
|
|
|
def run_build_config(custom_component: CustomComponent, user_id: Optional[Union[str, UUID]] = None, update_field=None):
|
|
"""Build the field configuration for a custom component"""
|
|
|
|
try:
|
|
if custom_component.code is None:
|
|
return {}
|
|
elif isinstance(custom_component.code, str):
|
|
custom_class = eval_custom_component_code(custom_component.code)
|
|
else:
|
|
raise ValueError("Invalid code type")
|
|
except Exception as exc:
|
|
logger.error(f"Error while evaluating custom component code: {str(exc)}")
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail={
|
|
"error": ("Invalid type convertion. Please check your code and try again."),
|
|
"traceback": traceback.format_exc(),
|
|
},
|
|
) from exc
|
|
|
|
try:
|
|
build_config: Dict = custom_class(user_id=user_id).build_config()
|
|
|
|
for field_name, field in build_config.items():
|
|
# Allow user to build TemplateField as well
|
|
# as a dict with the same keys as TemplateField
|
|
field_dict = get_field_dict(field)
|
|
if update_field is not None and field_name != update_field:
|
|
continue
|
|
try:
|
|
update_field_dict(field_dict)
|
|
build_config[field_name] = field_dict
|
|
except Exception as exc:
|
|
logger.error(f"Error while getting build_config: {str(exc)}")
|
|
|
|
return build_config
|
|
|
|
except Exception as exc:
|
|
logger.error(f"Error while building field config: {str(exc)}")
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail={
|
|
"error": ("Invalid type convertion. Please check your code and try again."),
|
|
"traceback": traceback.format_exc(),
|
|
},
|
|
) from exc
|
|
|
|
|
|
def sanitize_template_config(template_config):
|
|
"""Sanitize the template config"""
|
|
attributes = {
|
|
"display_name",
|
|
"description",
|
|
"beta",
|
|
"documentation",
|
|
"output_types",
|
|
"icon",
|
|
}
|
|
for key in template_config.copy():
|
|
if key not in attributes:
|
|
template_config.pop(key, None)
|
|
|
|
return template_config
|
|
|
|
|
|
def build_frontend_node(template_config):
|
|
"""Build a frontend node for a custom component"""
|
|
try:
|
|
sanitized_template_config = sanitize_template_config(template_config)
|
|
return CustomComponentFrontendNode(**sanitized_template_config)
|
|
except Exception as exc:
|
|
logger.error(f"Error while building base frontend node: {exc}")
|
|
raise exc
|
|
|
|
|
|
def add_code_field(frontend_node: CustomComponentFrontendNode, raw_code, field_config):
|
|
code_field = TemplateField(
|
|
dynamic=True,
|
|
required=True,
|
|
placeholder="",
|
|
multiline=True,
|
|
value=raw_code,
|
|
password=False,
|
|
name="code",
|
|
advanced=field_config.pop("advanced", False),
|
|
field_type="code",
|
|
is_list=False,
|
|
)
|
|
frontend_node.template.add_field(code_field)
|
|
|
|
return frontend_node
|
|
|
|
|
|
def build_custom_component_template(
|
|
custom_component: CustomComponent,
|
|
user_id: Optional[Union[str, UUID]] = None,
|
|
update_field: Optional[str] = None,
|
|
) -> Optional[Dict[str, Any]]:
|
|
"""Build a custom component template for the langchain"""
|
|
try:
|
|
logger.debug("Building custom component template")
|
|
frontend_node = build_frontend_node(custom_component.template_config)
|
|
|
|
field_config = run_build_config(custom_component, user_id=user_id, update_field=update_field)
|
|
|
|
entrypoint_args = custom_component.get_function_entrypoint_args
|
|
|
|
add_extra_fields(frontend_node, field_config, entrypoint_args)
|
|
|
|
frontend_node = add_code_field(frontend_node, custom_component.code, field_config.get("code", {}))
|
|
|
|
add_base_classes(frontend_node, custom_component.get_function_entrypoint_return_type)
|
|
add_output_types(frontend_node, custom_component.get_function_entrypoint_return_type)
|
|
|
|
return frontend_node.to_dict(add_name=False)
|
|
except Exception as exc:
|
|
if isinstance(exc, HTTPException):
|
|
raise exc
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail={
|
|
"error": ("Invalid type convertion. Please check your code and try again."),
|
|
"traceback": traceback.format_exc(),
|
|
},
|
|
) from exc
|
|
|
|
|
|
def create_component_template(component):
|
|
"""Create a template for a component."""
|
|
component_code = component["code"]
|
|
component_output_types = component["output_types"]
|
|
|
|
component_extractor = CustomComponent(code=component_code)
|
|
|
|
component_template = build_custom_component_template(component_extractor)
|
|
component_template["output_types"] = component_output_types
|
|
return component_template
|
|
|
|
|
|
def build_custom_components(settings_service):
|
|
"""Build custom components from the specified paths."""
|
|
if not settings_service.settings.COMPONENTS_PATH:
|
|
return {}
|
|
|
|
logger.info(f"Building custom components from {settings_service.settings.COMPONENTS_PATH}")
|
|
custom_components_from_file = {}
|
|
processed_paths = set()
|
|
for path in settings_service.settings.COMPONENTS_PATH:
|
|
path_str = str(path)
|
|
if path_str in processed_paths:
|
|
continue
|
|
|
|
custom_component_dict = build_custom_component_list_from_path(path_str)
|
|
if custom_component_dict:
|
|
category = next(iter(custom_component_dict))
|
|
logger.info(f"Loading {len(custom_component_dict[category])} component(s) from category {category}")
|
|
custom_components_from_file = merge_nested_dicts_with_renaming(
|
|
custom_components_from_file, custom_component_dict
|
|
)
|
|
processed_paths.add(path_str)
|
|
|
|
return custom_components_from_file
|
|
|
|
|
|
def update_field_dict(field_dict):
|
|
"""Update the field dictionary by calling options() or value() if they are callable"""
|
|
if "options" in field_dict and callable(field_dict["options"]):
|
|
field_dict["options"] = field_dict["options"]()
|
|
# Also update the "refresh" key
|
|
field_dict["refresh"] = True
|
|
|
|
if "value" in field_dict and callable(field_dict["value"]):
|
|
field_dict["value"] = field_dict["value"](field_dict.get("options", []))
|
|
field_dict["refresh"] = True
|
|
|
|
# Let's check if "range_spec" is a RangeSpec object
|
|
if "rangeSpec" in field_dict and isinstance(field_dict["rangeSpec"], RangeSpec):
|
|
field_dict["rangeSpec"] = field_dict["rangeSpec"].model_dump()
|
|
|
|
|
|
def sanitize_field_config(field_config: Dict):
|
|
# If any of the already existing keys are in field_config, remove them
|
|
for key in ["name", "field_type", "value", "required", "placeholder", "display_name", "advanced", "show"]:
|
|
field_config.pop(key, None)
|
|
return field_config
|
|
|
|
|
|
def build_component(component):
|
|
"""Build a single component."""
|
|
component_name = determine_component_name(component)
|
|
component_template = create_component_template(component)
|
|
logger.debug(f"Building component: {component_name, component.get('output_types')}")
|
|
return component_name, component_template
|
|
|
|
|
|
def get_function(code):
|
|
"""Get the function"""
|
|
function_name = validate.extract_function_name(code)
|
|
|
|
return validate.create_function(code, function_name)
|