diff --git a/src/backend/langflow/utils/validate.py b/src/backend/langflow/utils/validate.py index 6e6226568..f8b28aa25 100644 --- a/src/backend/langflow/utils/validate.py +++ b/src/backend/langflow/utils/validate.py @@ -7,6 +7,9 @@ from typing import Dict, List, Optional, Union from langflow.field_typing.constants import CUSTOM_COMPONENT_SUPPORTED_TYPES +PROMPT_INPUT_TYPES = ["Document", "BaseOutputParser", "Text", "Record"] + + def add_type_ignores(): if not hasattr(ast, "TypeIgnore"): @@ -43,7 +46,9 @@ def validate_code(code): # Evaluate the function definition for node in tree.body: if isinstance(node, ast.FunctionDef): - code_obj = compile(ast.Module(body=[node], type_ignores=[]), "", "exec") + code_obj = compile( + ast.Module(body=[node], type_ignores=[]), "", "exec" + ) try: exec(code_obj) except Exception as e: @@ -87,15 +92,23 @@ def execute_function(code, function_name, *args, **kwargs): exec_globals, locals(), ) - exec_globals[alias.asname or alias.name] = importlib.import_module(alias.name) + exec_globals[alias.asname or alias.name] = importlib.import_module( + alias.name + ) except ModuleNotFoundError as e: - raise ModuleNotFoundError(f"Module {alias.name} not found. Please install it and try again.") from e + raise ModuleNotFoundError( + f"Module {alias.name} not found. Please install it and try again." + ) from e function_code = next( - node for node in module.body if isinstance(node, ast.FunctionDef) and node.name == function_name + node + for node in module.body + if isinstance(node, ast.FunctionDef) and node.name == function_name ) function_code.parent = None - code_obj = compile(ast.Module(body=[function_code], type_ignores=[]), "", "exec") + code_obj = compile( + ast.Module(body=[function_code], type_ignores=[]), "", "exec" + ) try: exec(code_obj, exec_globals, locals()) except Exception as exc: @@ -122,15 +135,23 @@ def create_function(code, function_name): if isinstance(node, ast.Import): for alias in node.names: try: - exec_globals[alias.asname or alias.name] = importlib.import_module(alias.name) + exec_globals[alias.asname or alias.name] = importlib.import_module( + alias.name + ) except ModuleNotFoundError as e: - raise ModuleNotFoundError(f"Module {alias.name} not found. Please install it and try again.") from e + raise ModuleNotFoundError( + f"Module {alias.name} not found. Please install it and try again." + ) from e function_code = next( - node for node in module.body if isinstance(node, ast.FunctionDef) and node.name == function_name + node + for node in module.body + if isinstance(node, ast.FunctionDef) and node.name == function_name ) function_code.parent = None - code_obj = compile(ast.Module(body=[function_code], type_ignores=[]), "", "exec") + code_obj = compile( + ast.Module(body=[function_code], type_ignores=[]), "", "exec" + ) with contextlib.suppress(Exception): exec(code_obj, exec_globals, locals()) exec_globals[function_name] = locals()[function_name] @@ -192,16 +213,22 @@ def prepare_global_scope(code, module): if isinstance(node, ast.Import): for alias in node.names: try: - exec_globals[alias.asname or alias.name] = importlib.import_module(alias.name) + exec_globals[alias.asname or alias.name] = importlib.import_module( + alias.name + ) except ModuleNotFoundError as e: - raise ModuleNotFoundError(f"Module {alias.name} not found. Please install it and try again.") from e + raise ModuleNotFoundError( + f"Module {alias.name} not found. Please install it and try again." + ) from e elif isinstance(node, ast.ImportFrom) and node.module is not None: try: imported_module = importlib.import_module(node.module) for alias in node.names: exec_globals[alias.name] = getattr(imported_module, alias.name) except ModuleNotFoundError as e: - raise ModuleNotFoundError(f"Module {node.module} not found. Please install it and try again.") from e + raise ModuleNotFoundError( + f"Module {node.module} not found. Please install it and try again." + ) from e return exec_globals @@ -213,7 +240,11 @@ def extract_class_code(module, class_name): :param class_name: Name of the class to extract :return: AST node of the specified class """ - class_code = next(node for node in module.body if isinstance(node, ast.ClassDef) and node.name == class_name) + class_code = next( + node + for node in module.body + if isinstance(node, ast.ClassDef) and node.name == class_name + ) class_code.parent = None return class_code @@ -226,7 +257,9 @@ def compile_class_code(class_code): :param class_code: AST node of the class :return: Compiled code object of the class """ - code_obj = compile(ast.Module(body=[class_code], type_ignores=[]), "", "exec") + code_obj = compile( + ast.Module(body=[class_code], type_ignores=[]), "", "exec" + ) return code_obj @@ -270,7 +303,9 @@ def get_default_imports(code_string): langflow_imports = list(CUSTOM_COMPONENT_SUPPORTED_TYPES.keys()) necessary_imports = find_names_in_code(code_string, langflow_imports) langflow_module = importlib.import_module("langflow.field_typing") - default_imports.update({name: getattr(langflow_module, name) for name in necessary_imports}) + default_imports.update( + {name: getattr(langflow_module, name) for name in necessary_imports} + ) return default_imports