ref: split ast parsing / class exec loops (#7248)
* Split ast parsing / class exec loops * remove unnecessary module check * remove comment * [autofix.ci] apply automated fixes * ruff * [autofix.ci] apply automated fixes * Ruff * [autofix.ci] apply automated fixes * ruff --------- Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
parent
2407fe39d6
commit
6c36f4895b
2 changed files with 63 additions and 36 deletions
|
|
@ -306,9 +306,10 @@ class Component(CustomComponent):
|
|||
if module is None:
|
||||
msg = "Could not find module for class"
|
||||
raise ValueError(msg)
|
||||
|
||||
class_code = inspect.getsource(module)
|
||||
self._code = class_code
|
||||
except OSError as e:
|
||||
except (OSError, TypeError) as e:
|
||||
msg = f"Could not find source code for {self.__class__.__name__}"
|
||||
raise ValueError(msg) from e
|
||||
|
||||
|
|
|
|||
|
|
@ -185,25 +185,35 @@ def create_class(code, class_name):
|
|||
if not hasattr(ast, "TypeIgnore"):
|
||||
ast.TypeIgnore = create_type_ignore_class()
|
||||
|
||||
# Replace from langflow import CustomComponent with from langflow.custom import CustomComponent
|
||||
code = code.replace("from langflow import CustomComponent", "from langflow.custom import CustomComponent")
|
||||
code = code.replace(
|
||||
"from langflow.interface.custom.custom_component import CustomComponent",
|
||||
"from langflow.custom import CustomComponent",
|
||||
)
|
||||
# Add DEFAULT_IMPORT_STRING
|
||||
code = DEFAULT_IMPORT_STRING + "\n" + code
|
||||
module = ast.parse(code)
|
||||
exec_globals = prepare_global_scope(module)
|
||||
|
||||
class_code = extract_class_code(module, class_name)
|
||||
compiled_class = compile_class_code(class_code)
|
||||
code = DEFAULT_IMPORT_STRING + "\n" + code
|
||||
try:
|
||||
module = ast.parse(code)
|
||||
exec_globals = prepare_global_scope(module)
|
||||
|
||||
class_code = extract_class_code(module, class_name)
|
||||
compiled_class = compile_class_code(class_code)
|
||||
|
||||
return build_class_constructor(compiled_class, exec_globals, class_name)
|
||||
|
||||
except SyntaxError as e:
|
||||
msg = f"Syntax error in code: {e!s}"
|
||||
raise ValueError(msg) from e
|
||||
except NameError as e:
|
||||
msg = f"Name error (possibly undefined variable): {e!s}"
|
||||
raise ValueError(msg) from e
|
||||
except ValidationError as e:
|
||||
messages = [error["msg"].split(",", 1) for error in e.errors()]
|
||||
error_message = "\n".join([message[1] if len(message) > 1 else message[0] for message in messages])
|
||||
raise ValueError(error_message) from e
|
||||
except Exception as e:
|
||||
msg = f"Error creating class: {e!s}"
|
||||
raise ValueError(msg) from e
|
||||
|
||||
|
||||
def create_type_ignore_class():
|
||||
|
|
@ -232,40 +242,56 @@ def prepare_global_scope(module):
|
|||
ModuleNotFoundError: If a module is not found in the code
|
||||
"""
|
||||
exec_globals = globals().copy()
|
||||
imports = []
|
||||
import_froms = []
|
||||
definitions = []
|
||||
|
||||
for node in module.body:
|
||||
if isinstance(node, ast.Import):
|
||||
for alias in node.names:
|
||||
try:
|
||||
exec_globals[alias.asname or alias.name] = importlib.import_module(alias.name)
|
||||
except ModuleNotFoundError as e:
|
||||
msg = f"Module {alias.name} not found. Please install it and try again."
|
||||
raise ModuleNotFoundError(msg) from e
|
||||
imports.append(node)
|
||||
elif isinstance(node, ast.ImportFrom) and node.module is not None:
|
||||
import_froms.append(node)
|
||||
elif isinstance(node, ast.ClassDef | ast.FunctionDef | ast.Assign):
|
||||
definitions.append(node)
|
||||
|
||||
for node in imports:
|
||||
for alias in node.names:
|
||||
try:
|
||||
module_name = alias.name
|
||||
variable_name = alias.asname or alias.name
|
||||
exec_globals[variable_name] = importlib.import_module(module_name)
|
||||
except ModuleNotFoundError as e:
|
||||
msg = f"Module {alias.name} not found. Please install it and try again."
|
||||
raise ModuleNotFoundError(msg) from e
|
||||
|
||||
for node in import_froms:
|
||||
try:
|
||||
module_name = node.module
|
||||
# Apply warning suppression only when needed
|
||||
if "langchain" in module_name:
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", LangChainDeprecationWarning)
|
||||
imported_module = importlib.import_module(node.module)
|
||||
for alias in node.names:
|
||||
try:
|
||||
# First try getting it as an attribute
|
||||
exec_globals[alias.name] = getattr(imported_module, alias.name)
|
||||
except AttributeError:
|
||||
# If that fails, try importing the full module path
|
||||
full_module_path = f"{node.module}.{alias.name}"
|
||||
exec_globals[alias.name] = importlib.import_module(full_module_path)
|
||||
except ModuleNotFoundError as e:
|
||||
msg = f"Module {node.module} not found. Please install it and try again"
|
||||
raise ModuleNotFoundError(msg) from e
|
||||
elif isinstance(node, ast.ClassDef):
|
||||
# Compile and execute the class definition to properly create the class
|
||||
class_code = compile(ast.Module(body=[node], type_ignores=[]), "<string>", "exec")
|
||||
exec(class_code, exec_globals)
|
||||
elif isinstance(node, ast.FunctionDef):
|
||||
function_code = compile(ast.Module(body=[node], type_ignores=[]), "<string>", "exec")
|
||||
exec(function_code, exec_globals)
|
||||
elif isinstance(node, ast.Assign):
|
||||
assign_code = compile(ast.Module(body=[node], type_ignores=[]), "<string>", "exec")
|
||||
exec(assign_code, exec_globals)
|
||||
imported_module = importlib.import_module(module_name)
|
||||
else:
|
||||
imported_module = importlib.import_module(module_name)
|
||||
|
||||
for alias in node.names:
|
||||
try:
|
||||
# First try getting it as an attribute
|
||||
exec_globals[alias.name] = getattr(imported_module, alias.name)
|
||||
except AttributeError:
|
||||
# If that fails, try importing the full module path
|
||||
full_module_path = f"{module_name}.{alias.name}"
|
||||
exec_globals[alias.name] = importlib.import_module(full_module_path)
|
||||
except ModuleNotFoundError as e:
|
||||
msg = f"Module {node.module} not found. Please install it and try again"
|
||||
raise ModuleNotFoundError(msg) from e
|
||||
|
||||
if definitions:
|
||||
combined_module = ast.Module(body=definitions, type_ignores=[])
|
||||
compiled_code = compile(combined_module, "<string>", "exec")
|
||||
exec(compiled_code, exec_globals)
|
||||
|
||||
return exec_globals
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue