diff --git a/src/backend/langflow/__init__.py b/src/backend/langflow/__init__.py index 1be5464c2..fb06fe1a7 100644 --- a/src/backend/langflow/__init__.py +++ b/src/backend/langflow/__init__.py @@ -1 +1,4 @@ -from langflow.interface.loading import load_flow_from_json # noqa +from langflow.interface.loading import load_flow_from_json +from langflow.cache import cache_manager + +__all__ = ["load_flow_from_json", "cache_manager"] diff --git a/src/backend/langflow/graph/base.py b/src/backend/langflow/graph/base.py index 012250739..a4e6725da 100644 --- a/src/backend/langflow/graph/base.py +++ b/src/backend/langflow/graph/base.py @@ -4,6 +4,7 @@ # - Build each inner agent first, then build the outer agent import contextlib +import inspect import types import warnings from copy import deepcopy @@ -14,6 +15,7 @@ from langflow.graph.constants import DIRECT_TYPES from langflow.interface import loading from langflow.interface.listing import ALL_TYPES_DICT from langflow.utils.logger import logger +from langflow.utils.util import sync_to_async class Node: @@ -158,13 +160,20 @@ class Node: continue result = value.build() # If the key is "func", then we need to use the run method - if key == "func" and not isinstance(result, types.FunctionType): - # func can be PythonFunction(code='\ndef upper_case(text: str) -> str:\n return text.upper()\n') - # so we need to check if there is an attribute called run - if hasattr(result, "run"): - result = result.run # type: ignore - elif hasattr(result, "get_function"): - result = result.get_function() # type: ignore + if key == "func": + if not isinstance(result, types.FunctionType): + # func can be PythonFunction(code='\ndef upper_case(text: str) -> str:\n return text.upper()\n') + # so we need to check if there is an attribute called run + if hasattr(result, "run"): + result = result.run # type: ignore + elif hasattr(result, "get_function"): + result = result.get_function() # type: ignore + elif inspect.iscoroutinefunction(result): + self.params["coroutine"] = result + else: + # turn result which is a function into a coroutine + # so that it can be awaited + self.params["coroutine"] = sync_to_async(result) self.params[key] = result elif isinstance(value, list) and all( diff --git a/src/backend/langflow/utils/util.py b/src/backend/langflow/utils/util.py index b31a3bed1..eddd59ce1 100644 --- a/src/backend/langflow/utils/util.py +++ b/src/backend/langflow/utils/util.py @@ -1,3 +1,5 @@ +import asyncio +from functools import partial, wraps import importlib import inspect import re @@ -301,3 +303,17 @@ def update_verbose(d: dict, new_value: bool) -> dict: elif k == "verbose": d[k] = new_value return d + + +def sync_to_async(func): + """ + Decorator to convert a sync function to an async function. + """ + + @wraps(func) + async def async_wrapper(*args, **kwargs): + loop = asyncio.get_event_loop() + func_call = partial(func, *args, **kwargs) + return await loop.run_in_executor(None, func_call) + + return async_wrapper diff --git a/src/backend/langflow/utils/validate.py b/src/backend/langflow/utils/validate.py index d1353bd77..59d22a143 100644 --- a/src/backend/langflow/utils/validate.py +++ b/src/backend/langflow/utils/validate.py @@ -155,7 +155,7 @@ def create_function(code, function_name): exec_globals[function_name] = locals()[function_name] # Return a function that imports necessary modules and calls the target function - def wrapped_function(*args, **kwargs): + async def wrapped_function(*args, **kwargs): for module_name, module in exec_globals.items(): if isinstance(module, type(importlib)): globals()[module_name] = module