diff --git a/src/backend/langflow/api/v1/endpoints.py b/src/backend/langflow/api/v1/endpoints.py index 6ebb04182..ccba9674c 100644 --- a/src/backend/langflow/api/v1/endpoints.py +++ b/src/backend/langflow/api/v1/endpoints.py @@ -3,26 +3,24 @@ from typing import Annotated, Optional, Union import sqlalchemy as sa from fastapi import APIRouter, Body, Depends, HTTPException, UploadFile, status -from loguru import logger -from sqlmodel import select - from langflow.api.utils import update_frontend_node_with_template_values -from langflow.api.v1.schemas import ( - CustomComponentCode, - ProcessResponse, - TaskResponse, - TaskStatusResponse, - UploadFileResponse, -) +from langflow.api.v1.schemas import (CustomComponentCode, ProcessResponse, + TaskResponse, TaskStatusResponse, + UploadFileResponse) from langflow.interface.custom.custom_component import CustomComponent from langflow.interface.custom.directory_reader import DirectoryReader -from langflow.interface.types import build_custom_component_template, create_and_validate_component +from langflow.interface.custom.utils import (build_custom_component_template, + create_and_validate_component) from langflow.processing.process import process_graph_cached, process_tweaks -from langflow.services.auth.utils import api_key_security, get_current_active_user +from langflow.services.auth.utils import (api_key_security, + get_current_active_user) from langflow.services.cache.utils import save_uploaded_file from langflow.services.database.models.flow import Flow from langflow.services.database.models.user.model import User -from langflow.services.deps import get_session, get_session_service, get_settings_service, get_task_service +from langflow.services.deps import (get_session, get_session_service, + get_settings_service, get_task_service) +from loguru import logger +from sqlmodel import select try: from langflow.worker import process_graph_cached_task @@ -32,9 +30,8 @@ except ImportError: raise NotImplementedError("Celery is not installed") -from sqlmodel import Session - from langflow.services.task.service import TaskService +from sqlmodel import Session # build router router = APIRouter(tags=["Base"]) @@ -226,7 +223,7 @@ async def custom_component( @router.post("/custom_component/reload", status_code=HTTPStatus.OK) async def reload_custom_component(path: str, user: User = Depends(get_current_active_user)): - from langflow.interface.types import build_custom_component_template + from langflow.interface.custom.utils import build_custom_component_template try: reader = DirectoryReader("") diff --git a/src/backend/langflow/interface/custom/code_parser/__init__.py b/src/backend/langflow/interface/custom/code_parser/__init__.py new file mode 100644 index 000000000..328fa4d47 --- /dev/null +++ b/src/backend/langflow/interface/custom/code_parser/__init__.py @@ -0,0 +1,3 @@ +from .code_parser import CodeParser + +__all__ = ["CodeParser"] diff --git a/src/backend/langflow/interface/custom/code_parser.py b/src/backend/langflow/interface/custom/code_parser/code_parser.py similarity index 100% rename from src/backend/langflow/interface/custom/code_parser.py rename to src/backend/langflow/interface/custom/code_parser/code_parser.py diff --git a/src/backend/langflow/interface/custom/code_parser/utils.py b/src/backend/langflow/interface/custom/code_parser/utils.py new file mode 100644 index 000000000..5675ea85b --- /dev/null +++ b/src/backend/langflow/interface/custom/code_parser/utils.py @@ -0,0 +1,39 @@ +import re +from types import GenericAlias +from typing import Any + + +def extract_inner_type(return_type: str) -> str: + """ + Extracts the inner type from a type hint that is a list. + """ + if match := re.match(r"list\[(.*)\]", return_type, re.IGNORECASE): + return match[1] + return return_type + + +def extract_inner_type_from_generic_alias(return_type: GenericAlias) -> Any: + """ + Extracts the inner type from a type hint that is a list. + """ + if return_type.__origin__ == list: + return list(return_type.__args__) + + return return_type + + +def extract_union_types(return_type: str) -> list[str]: + """ + Extracts the inner type from a type hint that is a list. + """ + # If the return type is a Union, then we need to parse it + return_type = return_type.replace("Union", "").replace("[", "").replace("]", "") + return_types = return_type.split(",") + return [item.strip() for item in return_types] + + +def extract_union_types_from_generic_alias(return_type: GenericAlias) -> list: + """ + Extracts the inner type from a type hint that is a Union. + """ + return list(return_type.__args__) \ No newline at end of file diff --git a/src/backend/langflow/interface/custom/custom_component.py b/src/backend/langflow/interface/custom/custom_component.py index c2a7c4c74..948618d66 100644 --- a/src/backend/langflow/interface/custom/custom_component.py +++ b/src/backend/langflow/interface/custom/custom_component.py @@ -1,6 +1,7 @@ import operator from typing import Any, Callable, ClassVar, List, Optional, Union from uuid import UUID +from langflow.interface.custom.code_parser.utils import extract_inner_type_from_generic_alias import yaml from cachetools import TTLCache, cachedmethod @@ -8,8 +9,7 @@ from fastapi import HTTPException from langflow.interface.custom.component import Component from langflow.interface.custom.directory_reader import DirectoryReader -from langflow.interface.custom.utils import ( - extract_inner_type_from_generic_alias, +from langflow.interface.custom.code_parser.utils import ( extract_union_types_from_generic_alias, ) from langflow.services.database.models.flow import Flow diff --git a/src/backend/langflow/interface/custom/directory_reader/__init__.py b/src/backend/langflow/interface/custom/directory_reader/__init__.py new file mode 100644 index 000000000..2e8e0b9e5 --- /dev/null +++ b/src/backend/langflow/interface/custom/directory_reader/__init__.py @@ -0,0 +1,3 @@ +from .directory_reader import DirectoryReader + +__all__ = ["DirectoryReader"] diff --git a/src/backend/langflow/interface/custom/directory_reader.py b/src/backend/langflow/interface/custom/directory_reader/directory_reader.py similarity index 98% rename from src/backend/langflow/interface/custom/directory_reader.py rename to src/backend/langflow/interface/custom/directory_reader/directory_reader.py index e80f0bd28..41dd11483 100644 --- a/src/backend/langflow/interface/custom/directory_reader.py +++ b/src/backend/langflow/interface/custom/directory_reader/directory_reader.py @@ -1,6 +1,7 @@ -import os import ast +import os import zlib + from loguru import logger @@ -63,12 +64,13 @@ class DirectoryReader: return len(file_content.strip()) == 0 def filter_loaded_components(self, data: dict, with_errors: bool) -> dict: + from langflow.interface.custom.utils import build_component items = [ { "name": menu["name"], "path": menu["path"], "components": [ - component + (*build_component(component),component) for component in menu["components"] if (component["error"] if with_errors else not component["error"]) ], diff --git a/src/backend/langflow/interface/custom/directory_reader/utils.py b/src/backend/langflow/interface/custom/directory_reader/utils.py new file mode 100644 index 000000000..2dbbd2caa --- /dev/null +++ b/src/backend/langflow/interface/custom/directory_reader/utils.py @@ -0,0 +1,147 @@ +from langflow.interface.custom.directory_reader import DirectoryReader +from langflow.template.frontend_node.custom_components import \ + CustomComponentFrontendNode +from loguru import logger + + +def merge_nested_dicts_with_renaming(dict1, dict2): + for key, value in dict2.items(): + if key in dict1 and isinstance(value, dict) and isinstance(dict1.get(key), dict): + for sub_key, sub_value in value.items(): + # if sub_key in dict1[key]: + # new_key = get_new_key(dict1[key], sub_key) + # dict1[key][new_key] = sub_value + # else: + dict1[key][sub_key] = sub_value + else: + dict1[key] = value + return dict1 + + +def build_invalid_menu(invalid_components): + """Build the invalid menu.""" + if not invalid_components.get("menu"): + return {} + + logger.debug("------------------- INVALID COMPONENTS -------------------") + invalid_menu = {} + for menu_item in invalid_components["menu"]: + menu_name = menu_item["name"] + invalid_menu[menu_name] = build_invalid_menu_items(menu_item) + return invalid_menu + + +def build_valid_menu(valid_components): + """Build the valid menu.""" + valid_menu = {} + logger.debug("------------------- VALID COMPONENTS -------------------") + for menu_item in valid_components["menu"]: + menu_name = menu_item["name"] + valid_menu[menu_name] = build_menu_items(menu_item) + return valid_menu + + +def build_and_validate_all_files(reader: DirectoryReader, file_list): + """Build and validate all files""" + data = reader.build_component_menu_list(file_list) + + valid_components = reader.filter_loaded_components(data=data, with_errors=False) + invalid_components = reader.filter_loaded_components(data=data, with_errors=True) + + return valid_components, invalid_components + + +def load_files_from_path(path: str): + """Load all files from a given path""" + reader = DirectoryReader(path, False) + + return reader.get_files() + + +def build_custom_component_list_from_path(path: str): + """Build a list of custom components for the langchain from a given path""" + file_list = load_files_from_path(path) + reader = DirectoryReader(path, False) + + valid_components, invalid_components = build_and_validate_all_files(reader, file_list) + + valid_menu = build_valid_menu(valid_components) + invalid_menu = build_invalid_menu(invalid_components) + + return merge_nested_dicts_with_renaming(valid_menu, invalid_menu) + + +def create_invalid_component_template(component, component_name): + """Create a template for an invalid component.""" + component_code = component["code"] + component_frontend_node = CustomComponentFrontendNode( + description="ERROR - Check your Python Code", + display_name=f"ERROR - {component_name}", + ) + + + component_frontend_node.error = component.get("error", None) + field = component_frontend_node.template.get_field("code") + field.value = component_code + component_frontend_node.template.update_field("code", field) + return component_frontend_node.model_dump(by_alias=True, exclude_none=True) + + +def log_invalid_component_details(component): + """Log details of an invalid component.""" + logger.debug(component) + logger.debug(f"Component Path: {component.get('path', None)}") + logger.debug(f"Component Error: {component.get('error', None)}") + + +def build_invalid_component(component): + """Build a single invalid component.""" + component_name = component["name"] + component_template = create_invalid_component_template(component, component_name) + log_invalid_component_details(component) + return component_name, component_template + + +def build_invalid_menu_items(menu_item): + """Build invalid menu items for a given menu.""" + menu_items = {} + for component in menu_item["components"]: + try: + component_name, component_template = build_invalid_component(component) + menu_items[component_name] = component_template + logger.debug(f"Added {component_name} to invalid menu.") + except Exception as exc: + logger.exception(f"Error while creating custom component [{component_name}]: {str(exc)}") + return menu_items + + +def get_new_key(dictionary, original_key): + counter = 1 + new_key = original_key + " (" + str(counter) + ")" + while new_key in dictionary: + counter += 1 + new_key = original_key + " (" + str(counter) + ")" + return new_key + + +def determine_component_name(component): + """Determine the name of the component.""" + component_output_types = component["output_types"] + if len(component_output_types) == 1: + return component_output_types[0] + else: + file_name = component.get("file").split(".")[0] + return "".join(word.capitalize() for word in file_name.split("_")) if "_" in file_name else file_name + + +def build_menu_items(menu_item): + """Build menu items for a given menu.""" + menu_items = {} + for component_name, component_template, component in menu_item["components"]: + try: + menu_items[component_name] = component_template + logger.debug(f"Added {component_name} to valid menu.") + except Exception as exc: + logger.error(f"Error loading Component: {component['output_types']}") + logger.exception(f"Error while building custom component {component['output_types']}: {exc}") + return menu_items \ No newline at end of file diff --git a/src/backend/langflow/interface/custom/utils.py b/src/backend/langflow/interface/custom/utils.py index e527670a0..30eaab719 100644 --- a/src/backend/langflow/interface/custom/utils.py +++ b/src/backend/langflow/interface/custom/utils.py @@ -1,40 +1,379 @@ -import re -from types import GenericAlias -from typing import Any +import traceback +from typing import Any, Dict, List, Optional, Union +from uuid import UUID + +from fastapi import HTTPException +from langflow.field_typing.range_spec import RangeSpec +from langflow.interface.custom.code_parser.utils import extract_inner_type +from langflow.interface.custom.custom_component import CustomComponent +from langflow.interface.custom.directory_reader.utils import ( + build_custom_component_list_from_path, determine_component_name, + merge_nested_dicts_with_renaming) +from langflow.interface.importing.utils import eval_custom_component_code +from langflow.template.field.base import TemplateField +from langflow.template.frontend_node.custom_components import \ + CustomComponentFrontendNode +from langflow.utils.util import get_base_classes +from loguru import logger -def extract_inner_type(return_type: str) -> str: +def add_output_types(frontend_node: CustomComponentFrontendNode, return_types: List[str]): + """Add output types to the frontend node""" + for return_type in return_types: + if return_type is None: + raise HTTPException( + status_code=400, + detail={ + "error": ("Invalid return type. Please check your code and try again."), + "traceback": traceback.format_exc(), + }, + ) + if hasattr(return_type, "__name__"): + return_type = return_type.__name__ + elif hasattr(return_type, "__class__"): + return_type = return_type.__class__.__name__ + else: + return_type = str(return_type) + + frontend_node.add_output_type(return_type) + + +def add_base_classes(frontend_node: CustomComponentFrontendNode, return_types: List[str]): + """Add base classes to the frontend node""" + for return_type_instance in return_types: + if return_type_instance is None: + raise HTTPException( + status_code=400, + detail={ + "error": ("Invalid return type. Please check your code and try again."), + "traceback": traceback.format_exc(), + }, + ) + + base_classes = get_base_classes(return_type_instance) + + for base_class in base_classes: + frontend_node.add_base_class(base_class) + + +def extract_type_from_optional(field_type): """ - Extracts the inner type from a type hint that is a list. + Extract the type from a string formatted as "Optional[]". + + Parameters: + field_type (str): The string from which to extract the type. + + Returns: + str: The extracted type, or an empty string if no type was found. """ - if match := re.match(r"list\[(.*)\]", return_type, re.IGNORECASE): - return match[1] - return return_type + match = re.search(r"\[(.*?)\]$", field_type) + return match[1] if match else None -def extract_inner_type_from_generic_alias(return_type: GenericAlias) -> Any: - """ - Extracts the inner type from a type hint that is a list. - """ - if return_type.__origin__ == list: - return list(return_type.__args__) +def get_field_properties(extra_field): + """Get the properties of an extra field""" + field_name = extra_field["name"] + field_type = extra_field.get("type", "str") + field_value = extra_field.get("default", "") + field_required = "optional" not in field_type.lower() - return return_type + if not field_required: + field_type = extract_type_from_optional(field_type) + if field_value is not None: + with contextlib.suppress(Exception): + field_value = ast.literal_eval(field_value) + return field_name, field_type, field_value, field_required -def extract_union_types_from_generic_alias(return_type: GenericAlias) -> list: - """ - Extracts the inner type from a type hint that is a Union. - """ - return list(return_type.__args__) +def process_type(field_type: str): + if field_type.startswith("list") or field_type.startswith("List"): + return extract_inner_type(field_type) + return "prompt" if field_type == "Prompt" else field_type + + +def add_new_custom_field( + frontend_node: CustomComponentFrontendNode, + field_name: str, + field_type: str, + field_value: Any, + field_required: bool, + field_config: dict, +): + # Check field_config if any of the keys are in it + # if it is, update the value + display_name = field_config.pop("display_name", field_name) + field_type = field_config.pop("field_type", field_type) + field_contains_list = "list" in field_type.lower() + field_type = process_type(field_type) + field_value = field_config.pop("value", field_value) + field_advanced = field_config.pop("advanced", False) + + if field_type == "bool" and field_value is None: + field_value = False + + # If options is a list, then it's a dropdown + # If options is None, then it's a list of strings + is_list = isinstance(field_config.get("options"), list) + field_config["is_list"] = is_list or field_config.get("is_list", False) or field_contains_list + + if "name" in field_config: + warnings.warn("The 'name' key in field_config is used to build the object and can't be changed.") + required = field_config.pop("required", field_required) + placeholder = field_config.pop("placeholder", "") + + new_field = TemplateField( + name=field_name, + field_type=field_type, + value=field_value, + show=True, + required=required, + advanced=field_advanced, + placeholder=placeholder, + display_name=display_name, + **sanitize_field_config(field_config), + ) + frontend_node.template.upsert_field(field_name, new_field) + if isinstance(frontend_node.custom_fields, dict): + frontend_node.custom_fields[field_name] = None + + return frontend_node + + +def add_extra_fields(frontend_node, field_config, function_args): + """Add extra fields to the frontend node""" + if not function_args: + return + + # sort function_args which is a list of dicts + function_args.sort(key=lambda x: x["name"]) + + for extra_field in function_args: + if "name" not in extra_field or extra_field["name"] == "self": + continue + + field_name, field_type, field_value, field_required = get_field_properties(extra_field) + config = field_config.get(field_name, {}) + frontend_node = add_new_custom_field( + frontend_node, + field_name, + field_type, + field_value, + field_required, + config, + ) + + +def get_field_dict(field: Union[TemplateField, dict]): + """Get the field dictionary from a TemplateField or a dict""" + if isinstance(field, TemplateField): + return field.model_dump(by_alias=True, exclude_none=True) + return field + + +def run_build_config( + custom_component: CustomComponent, user_id: Optional[Union[str, UUID]] = None, update_field=None +): + """Build the field configuration for a custom component""" + + try: + if custom_component.code is None: + return {} + elif isinstance(custom_component.code, str): + custom_class = eval_custom_component_code(custom_component.code) + else: + raise ValueError("Invalid code type") + except Exception as exc: + logger.error(f"Error while evaluating custom component code: {str(exc)}") + raise HTTPException( + status_code=400, + detail={ + "error": ("Invalid type convertion. Please check your code and try again."), + "traceback": traceback.format_exc(), + }, + ) from exc + + try: + build_config: Dict = custom_class(user_id=user_id).build_config() + + for field_name, field in build_config.items(): + # Allow user to build TemplateField as well + # as a dict with the same keys as TemplateField + field_dict = get_field_dict(field) + if update_field is not None and field_name != update_field: + continue + try: + update_field_dict(field_dict) + build_config[field_name] = field_dict + except Exception as exc: + logger.error(f"Error while getting build_config: {str(exc)}") + + return build_config + + except Exception as exc: + logger.error(f"Error while building field config: {str(exc)}") + raise HTTPException( + status_code=400, + detail={ + "error": ("Invalid type convertion. Please check your code and try again."), + "traceback": traceback.format_exc(), + }, + ) from exc + + +def sanitize_template_config(template_config): + """Sanitize the template config""" + attributes = { + "display_name", + "description", + "beta", + "documentation", + "output_types", + } + for key in template_config.copy(): + if key not in attributes: + template_config.pop(key, None) + + return template_config + + +def build_frontend_node(template_config): + """Build a frontend node for a custom component""" + try: + sanitized_template_config = sanitize_template_config(template_config) + return CustomComponentFrontendNode(**sanitized_template_config) + except Exception as exc: + logger.error(f"Error while building base frontend node: {exc}") + raise exc + + +def add_code_field(frontend_node: CustomComponentFrontendNode, raw_code, field_config): + code_field = TemplateField( + dynamic=True, + required=True, + placeholder="", + multiline=True, + value=raw_code, + password=False, + name="code", + advanced=field_config.pop("advanced", False), + field_type="code", + is_list=False, + ) + frontend_node.template.add_field(code_field) + + return frontend_node + + +def build_custom_component_template( + custom_component: CustomComponent, + user_id: Optional[Union[str, UUID]] = None, + update_field: Optional[str] = None, +) -> Optional[Dict[str, Any]]: + """Build a custom component template for the langchain""" + try: + logger.debug("Building custom component template") + frontend_node = build_frontend_node(custom_component.template_config) + + logger.debug("Built base frontend node") + + logger.debug("Updated attributes") + field_config = run_build_config(custom_component, user_id=user_id, update_field=update_field) + logger.debug("Built field config") + entrypoint_args = custom_component.get_function_entrypoint_args + + add_extra_fields(frontend_node, field_config, entrypoint_args) + logger.debug("Added extra fields") + frontend_node = add_code_field(frontend_node, custom_component.code, field_config.get("code", {})) + logger.debug("Added code field") + add_base_classes(frontend_node, custom_component.get_function_entrypoint_return_type) + add_output_types(frontend_node, custom_component.get_function_entrypoint_return_type) + logger.debug("Added base classes") + return frontend_node.to_dict(add_name=False) + except Exception as exc: + if isinstance(exc, HTTPException): + raise exc + raise HTTPException( + status_code=400, + detail={ + "error": ("Invalid type convertion. Please check your code and try again."), + "traceback": traceback.format_exc(), + }, + ) from exc + + +def create_component_template(component): + """Create a template for a component.""" + component_code = component["code"] + component_output_types = component["output_types"] + + component_extractor = CustomComponent(code=component_code) + component_extractor.validate() + + component_template = build_custom_component_template(component_extractor) + component_template["output_types"] = component_output_types + return component_template + + +def build_custom_components(settings_service): + """Build custom components from the specified paths.""" + if not settings_service.settings.COMPONENTS_PATH: + return {} + + logger.info(f"Building custom components from {settings_service.settings.COMPONENTS_PATH}") + custom_components_from_file = {} + processed_paths = set() + for path in settings_service.settings.COMPONENTS_PATH: + path_str = str(path) + if path_str in processed_paths: + continue + + custom_component_dict = build_custom_component_list_from_path(path_str) + if custom_component_dict: + category = next(iter(custom_component_dict)) + logger.info(f"Loading {len(custom_component_dict[category])} component(s) from category {category}") + custom_components_from_file = merge_nested_dicts_with_renaming( + custom_components_from_file, custom_component_dict + ) + processed_paths.add(path_str) + + return custom_components_from_file + + +def create_and_validate_component(code: str) -> CustomComponent: + component = CustomComponent(code=code) + component.validate() + return component + + +def update_field_dict(field_dict): + """Update the field dictionary by calling options() or value() if they are callable""" + if "options" in field_dict and callable(field_dict["options"]): + field_dict["options"] = field_dict["options"]() + # Also update the "refresh" key + field_dict["refresh"] = True + + if "value" in field_dict and callable(field_dict["value"]): + field_dict["value"] = field_dict["value"](field_dict.get("options", [])) + field_dict["refresh"] = True + + # Let's check if "range_spec" is a RangeSpec object + if "rangeSpec" in field_dict and isinstance(field_dict["rangeSpec"], RangeSpec): + field_dict["rangeSpec"] = field_dict["rangeSpec"].model_dump() + + +def sanitize_field_config(field_config: Dict): + # If any of the already existing keys are in field_config, remove them + for key in ["name", "field_type", "value", "required", "placeholder", "display_name", "advanced", "show"]: + field_config.pop(key, None) + return field_config + + +def build_component(component): + """Build a single component.""" + logger.debug(f"Building component: {component.get('name'), component.get('output_types')}") + component_name = determine_component_name(component) + component_template = create_component_template(component) + return component_name, component_template + -def extract_union_types(return_type: str) -> list[str]: - """ - Extracts the inner type from a type hint that is a list. - """ - # If the return type is a Union, then we need to parse it - return_type = return_type.replace("Union", "").replace("[", "").replace("]", "") - return_types = return_type.split(",") - return_types = [item.strip() for item in return_types] - return return_types diff --git a/src/backend/langflow/interface/types.py b/src/backend/langflow/interface/types.py index b6e4250f9..59b2ec66e 100644 --- a/src/backend/langflow/interface/types.py +++ b/src/backend/langflow/interface/types.py @@ -1,24 +1,12 @@ -import ast -import contextlib -import re -import traceback -import warnings -from typing import Any, Dict, List, Optional, Union -from uuid import UUID from cachetools import LRUCache, cached -from fastapi import HTTPException -from loguru import logger - -from langflow.field_typing.range_spec import RangeSpec from langflow.interface.agents.base import agent_creator from langflow.interface.chains.base import chain_creator -from langflow.interface.custom.custom_component import CustomComponent -from langflow.interface.custom.directory_reader import DirectoryReader -from langflow.interface.custom.utils import extract_inner_type +from langflow.interface.custom.directory_reader.utils import \ + merge_nested_dicts_with_renaming +from langflow.interface.custom.utils import build_custom_components from langflow.interface.document_loaders.base import documentloader_creator from langflow.interface.embeddings.base import embedding_creator -from langflow.interface.importing.utils import eval_custom_component_code from langflow.interface.llms.base import llm_creator from langflow.interface.memories.base import memory_creator from langflow.interface.output_parsers.base import output_parser_creator @@ -30,9 +18,6 @@ from langflow.interface.tools.base import tool_creator from langflow.interface.utilities.base import utility_creator from langflow.interface.vector_store.base import vectorstore_creator from langflow.interface.wrappers.base import wrapper_creator -from langflow.template.field.base import TemplateField -from langflow.template.frontend_node.custom_components import CustomComponentFrontendNode -from langflow.utils.util import get_base_classes # Used to get the base_classes list @@ -80,481 +65,6 @@ def build_langchain_types_dict(): # sourcery skip: dict-assign-update-to-union return all_types -def process_type(field_type: str): - if field_type.startswith("list") or field_type.startswith("List"): - return extract_inner_type(field_type) - return "prompt" if field_type == "Prompt" else field_type - - -# TODO: Move to correct place -def add_new_custom_field( - frontend_node: CustomComponentFrontendNode, - field_name: str, - field_type: str, - field_value: Any, - field_required: bool, - field_config: dict, -): - # Check field_config if any of the keys are in it - # if it is, update the value - display_name = field_config.pop("display_name", field_name) - field_type = field_config.pop("field_type", field_type) - field_contains_list = "list" in field_type.lower() - field_type = process_type(field_type) - field_value = field_config.pop("value", field_value) - field_advanced = field_config.pop("advanced", False) - - if field_type == "bool" and field_value is None: - field_value = False - - # If options is a list, then it's a dropdown - # If options is None, then it's a list of strings - is_list = isinstance(field_config.get("options"), list) - field_config["is_list"] = is_list or field_config.get("is_list", False) or field_contains_list - - if "name" in field_config: - warnings.warn("The 'name' key in field_config is used to build the object and can't be changed.") - required = field_config.pop("required", field_required) - placeholder = field_config.pop("placeholder", "") - - new_field = TemplateField( - name=field_name, - field_type=field_type, - value=field_value, - show=True, - required=required, - advanced=field_advanced, - placeholder=placeholder, - display_name=display_name, - **sanitize_field_config(field_config), - ) - frontend_node.template.upsert_field(field_name, new_field) - if isinstance(frontend_node.custom_fields, dict): - frontend_node.custom_fields[field_name] = None - - return frontend_node - - -def sanitize_field_config(field_config: Dict): - # If any of the already existing keys are in field_config, remove them - for key in ["name", "field_type", "value", "required", "placeholder", "display_name", "advanced", "show"]: - field_config.pop(key, None) - return field_config - - -# TODO: Move to correct place -def add_code_field(frontend_node: CustomComponentFrontendNode, raw_code, field_config): - code_field = TemplateField( - dynamic=True, - required=True, - placeholder="", - multiline=True, - value=raw_code, - password=False, - name="code", - advanced=field_config.pop("advanced", False), - field_type="code", - is_list=False, - ) - frontend_node.template.add_field(code_field) - - return frontend_node - - -def extract_type_from_optional(field_type): - """ - Extract the type from a string formatted as "Optional[]". - - Parameters: - field_type (str): The string from which to extract the type. - - Returns: - str: The extracted type, or an empty string if no type was found. - """ - match = re.search(r"\[(.*?)\]$", field_type) - return match[1] if match else None - - -def build_frontend_node(template_config): - """Build a frontend node for a custom component""" - try: - sanitized_template_config = sanitize_template_config(template_config) - return CustomComponentFrontendNode(**sanitized_template_config) - except Exception as exc: - logger.error(f"Error while building base frontend node: {exc}") - raise exc - - -def sanitize_template_config(template_config): - """Sanitize the template config""" - attributes = { - "display_name", - "description", - "beta", - "documentation", - "output_types", - } - for key in template_config.copy(): - if key not in attributes: - template_config.pop(key, None) - - return template_config - - -def build_field_config( - custom_component: CustomComponent, user_id: Optional[Union[str, UUID]] = None, update_field=None -): - """Build the field configuration for a custom component""" - - try: - if custom_component.code is None: - return {} - elif isinstance(custom_component.code, str): - custom_class = eval_custom_component_code(custom_component.code) - else: - raise ValueError("Invalid code type") - except Exception as exc: - logger.error(f"Error while evaluating custom component code: {str(exc)}") - raise HTTPException( - status_code=400, - detail={ - "error": ("Invalid type convertion. Please check your code and try again."), - "traceback": traceback.format_exc(), - }, - ) from exc - - try: - build_config: Dict = custom_class(user_id=user_id).build_config() - - for field_name, field in build_config.items(): - # Allow user to build TemplateField as well - # as a dict with the same keys as TemplateField - field_dict = get_field_dict(field) - if update_field is not None and field_name != update_field: - continue - try: - update_field_dict(field_dict) - build_config[field_name] = field_dict - except Exception as exc: - logger.error(f"Error while getting build_config: {str(exc)}") - - return build_config - - except Exception as exc: - logger.error(f"Error while building field config: {str(exc)}") - raise HTTPException( - status_code=400, - detail={ - "error": ("Invalid type convertion. Please check your code and try again."), - "traceback": traceback.format_exc(), - }, - ) from exc - - -def get_field_dict(field): - """Get the field dictionary from a TemplateField or a dict""" - if isinstance(field, TemplateField): - return field.model_dump(by_alias=True, exclude_none=True) - return field - - -def update_field_dict(field_dict): - """Update the field dictionary by calling options() or value() if they are callable""" - if "options" in field_dict and callable(field_dict["options"]): - field_dict["options"] = field_dict["options"]() - # Also update the "refresh" key - field_dict["refresh"] = True - - if "value" in field_dict and callable(field_dict["value"]): - field_dict["value"] = field_dict["value"](field_dict.get("options", [])) - field_dict["refresh"] = True - - # Let's check if "range_spec" is a RangeSpec object - if "rangeSpec" in field_dict and isinstance(field_dict["rangeSpec"], RangeSpec): - field_dict["rangeSpec"] = field_dict["rangeSpec"].model_dump() - - -def add_extra_fields(frontend_node, field_config, function_args): - """Add extra fields to the frontend node""" - if not function_args: - return - - # sort function_args which is a list of dicts - function_args.sort(key=lambda x: x["name"]) - - for extra_field in function_args: - if "name" not in extra_field or extra_field["name"] == "self": - continue - - field_name, field_type, field_value, field_required = get_field_properties(extra_field) - config = field_config.get(field_name, {}) - frontend_node = add_new_custom_field( - frontend_node, - field_name, - field_type, - field_value, - field_required, - config, - ) - - -def get_field_properties(extra_field): - """Get the properties of an extra field""" - field_name = extra_field["name"] - field_type = extra_field.get("type", "str") - field_value = extra_field.get("default", "") - field_required = "optional" not in field_type.lower() - - if not field_required: - field_type = extract_type_from_optional(field_type) - if field_value is not None: - with contextlib.suppress(Exception): - field_value = ast.literal_eval(field_value) - return field_name, field_type, field_value, field_required - - -def add_base_classes(frontend_node: CustomComponentFrontendNode, return_types: List[str]): - """Add base classes to the frontend node""" - for return_type_instance in return_types: - if return_type_instance is None: - raise HTTPException( - status_code=400, - detail={ - "error": ("Invalid return type. Please check your code and try again."), - "traceback": traceback.format_exc(), - }, - ) - - base_classes = get_base_classes(return_type_instance) - - for base_class in base_classes: - frontend_node.add_base_class(base_class) - - -def add_output_types(frontend_node: CustomComponentFrontendNode, return_types: List[str]): - """Add output types to the frontend node""" - for return_type in return_types: - if return_type is None: - raise HTTPException( - status_code=400, - detail={ - "error": ("Invalid return type. Please check your code and try again."), - "traceback": traceback.format_exc(), - }, - ) - if hasattr(return_type, "__name__"): - return_type = return_type.__name__ - elif hasattr(return_type, "__class__"): - return_type = return_type.__class__.__name__ - else: - return_type = str(return_type) - - frontend_node.add_output_type(return_type) - - -def build_custom_component_template( - custom_component: CustomComponent, - user_id: Optional[Union[str, UUID]] = None, - update_field: Optional[str] = None, -) -> Optional[Dict[str, Any]]: - """Build a custom component template for the langchain""" - try: - logger.debug("Building custom component template") - frontend_node = build_frontend_node(custom_component.template_config) - - logger.debug("Built base frontend node") - - logger.debug("Updated attributes") - field_config = build_field_config(custom_component, user_id=user_id, update_field=update_field) - logger.debug("Built field config") - entrypoint_args = custom_component.get_function_entrypoint_args - - add_extra_fields(frontend_node, field_config, entrypoint_args) - logger.debug("Added extra fields") - frontend_node = add_code_field(frontend_node, custom_component.code, field_config.get("code", {})) - logger.debug("Added code field") - add_base_classes(frontend_node, custom_component.get_function_entrypoint_return_type) - add_output_types(frontend_node, custom_component.get_function_entrypoint_return_type) - logger.debug("Added base classes") - return frontend_node.to_dict(add_name=False) - except Exception as exc: - if isinstance(exc, HTTPException): - raise exc - raise HTTPException( - status_code=400, - detail={ - "error": ("Invalid type convertion. Please check your code and try again."), - "traceback": traceback.format_exc(), - }, - ) from exc - - -def load_files_from_path(path: str): - """Load all files from a given path""" - reader = DirectoryReader(path, False) - - return reader.get_files() - - -def build_and_validate_all_files(reader: DirectoryReader, file_list): - """Build and validate all files""" - data = reader.build_component_menu_list(file_list) - - valid_components = reader.filter_loaded_components(data=data, with_errors=False) - invalid_components = reader.filter_loaded_components(data=data, with_errors=True) - - return valid_components, invalid_components - - -def build_valid_menu(valid_components): - """Build the valid menu.""" - valid_menu = {} - logger.debug("------------------- VALID COMPONENTS -------------------") - for menu_item in valid_components["menu"]: - menu_name = menu_item["name"] - valid_menu[menu_name] = build_menu_items(menu_item) - return valid_menu - - -def build_menu_items(menu_item): - """Build menu items for a given menu.""" - menu_items = {} - for component in menu_item["components"]: - try: - component_name, component_template = build_component(component) - menu_items[component_name] = component_template - logger.debug(f"Added {component_name} to valid menu.") - except Exception as exc: - logger.error(f"Error loading Component: {component['output_types']}") - logger.exception(f"Error while building custom component {component['output_types']}: {exc}") - return menu_items - - -def build_component(component): - """Build a single component.""" - logger.debug(f"Building component: {component.get('name'), component.get('output_types')}") - component_name = determine_component_name(component) - component_template = create_component_template(component) - return component_name, component_template - - -def determine_component_name(component): - """Determine the name of the component.""" - component_output_types = component["output_types"] - if len(component_output_types) == 1: - return component_output_types[0] - else: - file_name = component.get("file").split(".")[0] - return "".join(word.capitalize() for word in file_name.split("_")) if "_" in file_name else file_name - - -def create_component_template(component): - """Create a template for a component.""" - component_code = component["code"] - component_output_types = component["output_types"] - - component_extractor = CustomComponent(code=component_code) - component_extractor.validate() - - component_template = build_custom_component_template(component_extractor) - component_template["output_types"] = component_output_types - return component_template - - -def build_invalid_menu(invalid_components): - """Build the invalid menu.""" - if not invalid_components.get("menu"): - return {} - - logger.debug("------------------- INVALID COMPONENTS -------------------") - invalid_menu = {} - for menu_item in invalid_components["menu"]: - menu_name = menu_item["name"] - invalid_menu[menu_name] = build_invalid_menu_items(menu_item) - return invalid_menu - - -def build_invalid_menu_items(menu_item): - """Build invalid menu items for a given menu.""" - menu_items = {} - for component in menu_item["components"]: - try: - component_name, component_template = build_invalid_component(component) - menu_items[component_name] = component_template - logger.debug(f"Added {component_name} to invalid menu.") - except Exception as exc: - logger.exception(f"Error while creating custom component [{component_name}]: {str(exc)}") - return menu_items - - -def build_invalid_component(component): - """Build a single invalid component.""" - component_name = component["name"] - component_template = create_invalid_component_template(component, component_name) - log_invalid_component_details(component) - return component_name, component_template - - -def create_invalid_component_template(component, component_name): - """Create a template for an invalid component.""" - component_code = component["code"] - component_template = ( - CustomComponentFrontendNode( - description="ERROR - Check your Python Code", - display_name=f"ERROR - {component_name}", - ) - .to_dict() - .get(type(CustomComponent()).__name__) - ) - - component_template["error"] = component.get("error", None) - component_template.get("template").get("code")["value"] = component_code - return component_template - - -def log_invalid_component_details(component): - """Log details of an invalid component.""" - logger.debug(component) - logger.debug(f"Component Path: {component.get('path', None)}") - logger.debug(f"Component Error: {component.get('error', None)}") - - -def get_new_key(dictionary, original_key): - counter = 1 - new_key = original_key + " (" + str(counter) + ")" - while new_key in dictionary: - counter += 1 - new_key = original_key + " (" + str(counter) + ")" - return new_key - - -def merge_nested_dicts_with_renaming(dict1, dict2): - for key, value in dict2.items(): - if key in dict1 and isinstance(value, dict) and isinstance(dict1.get(key), dict): - for sub_key, sub_value in value.items(): - # if sub_key in dict1[key]: - # new_key = get_new_key(dict1[key], sub_key) - # dict1[key][new_key] = sub_value - # else: - dict1[key][sub_key] = sub_value - else: - dict1[key] = value - return dict1 - - -def build_custom_component_list_from_path(path: str): - """Build a list of custom components for the langchain from a given path""" - file_list = load_files_from_path(path) - reader = DirectoryReader(path, False) - - valid_components, invalid_components = build_and_validate_all_files(reader, file_list) - - valid_menu = build_valid_menu(valid_components) - invalid_menu = build_invalid_menu(invalid_components) - - return merge_nested_dicts_with_renaming(valid_menu, invalid_menu) - - def get_all_types_dict(settings_service): """Get all types dictionary combining native and custom components.""" native_components = build_langchain_types_dict() @@ -562,41 +72,3 @@ def get_all_types_dict(settings_service): return merge_nested_dicts_with_renaming(native_components, custom_components_from_file) -def build_custom_components(settings_service): - """Build custom components from the specified paths.""" - if not settings_service.settings.COMPONENTS_PATH: - return {} - - logger.info(f"Building custom components from {settings_service.settings.COMPONENTS_PATH}") - custom_components_from_file = {} - processed_paths = set() - for path in settings_service.settings.COMPONENTS_PATH: - path_str = str(path) - if path_str in processed_paths: - continue - - custom_component_dict = build_custom_component_list_from_path(path_str) - if custom_component_dict: - category = next(iter(custom_component_dict)) - logger.info(f"Loading {len(custom_component_dict[category])} component(s) from category {category}") - custom_components_from_file = merge_nested_dicts_with_renaming( - custom_components_from_file, custom_component_dict - ) - processed_paths.add(path_str) - - return custom_components_from_file - - -def merge_nested_dicts(dict1, dict2): - for key, value in dict2.items(): - if isinstance(value, dict) and isinstance(dict1.get(key), dict): - dict1[key] = merge_nested_dicts(dict1[key], value) - else: - dict1[key] = value - return dict1 - - -def create_and_validate_component(code: str) -> CustomComponent: - component = CustomComponent(code=code) - component.validate() - return component diff --git a/tests/test_custom_component.py b/tests/test_custom_component.py index 35eaba00e..98dd587f5 100644 --- a/tests/test_custom_component.py +++ b/tests/test_custom_component.py @@ -4,11 +4,12 @@ from uuid import uuid4 import pytest from fastapi import HTTPException - from langflow.interface.custom.base import CustomComponent from langflow.interface.custom.code_parser import CodeParser, CodeSyntaxError -from langflow.interface.custom.component import Component, ComponentCodeNullError -from langflow.interface.types import build_custom_component_template, create_and_validate_component +from langflow.interface.custom.component import (Component, + ComponentCodeNullError) +from langflow.interface.custom.utils import (build_custom_component_template, + create_and_validate_component) from langflow.services.database.models.flow import Flow, FlowCreate code_default = """