feat(langflow): add support for async functions in Node's func parameter

fix(langflow): fix Node's func parameter to be a coroutine function if it is a sync function
This commit is contained in:
Gabriel Almeida 2023-04-25 19:11:04 -03:00
commit 60e09a3628
4 changed files with 37 additions and 9 deletions

View file

@ -1 +1,4 @@
from langflow.interface.loading import load_flow_from_json # noqa
from langflow.interface.loading import load_flow_from_json
from langflow.cache import cache_manager
__all__ = ["load_flow_from_json", "cache_manager"]

View file

@ -4,6 +4,7 @@
# - Build each inner agent first, then build the outer agent
import contextlib
import inspect
import types
import warnings
from copy import deepcopy
@ -14,6 +15,7 @@ from langflow.graph.constants import DIRECT_TYPES
from langflow.interface import loading
from langflow.interface.listing import ALL_TYPES_DICT
from langflow.utils.logger import logger
from langflow.utils.util import sync_to_async
class Node:
@ -158,13 +160,20 @@ class Node:
continue
result = value.build()
# If the key is "func", then we need to use the run method
if key == "func" and not isinstance(result, types.FunctionType):
# func can be PythonFunction(code='\ndef upper_case(text: str) -> str:\n return text.upper()\n')
# so we need to check if there is an attribute called run
if hasattr(result, "run"):
result = result.run # type: ignore
elif hasattr(result, "get_function"):
result = result.get_function() # type: ignore
if key == "func":
if not isinstance(result, types.FunctionType):
# func can be PythonFunction(code='\ndef upper_case(text: str) -> str:\n return text.upper()\n')
# so we need to check if there is an attribute called run
if hasattr(result, "run"):
result = result.run # type: ignore
elif hasattr(result, "get_function"):
result = result.get_function() # type: ignore
elif inspect.iscoroutinefunction(result):
self.params["coroutine"] = result
else:
# turn result which is a function into a coroutine
# so that it can be awaited
self.params["coroutine"] = sync_to_async(result)
self.params[key] = result
elif isinstance(value, list) and all(

View file

@ -1,3 +1,5 @@
import asyncio
from functools import partial, wraps
import importlib
import inspect
import re
@ -301,3 +303,17 @@ def update_verbose(d: dict, new_value: bool) -> dict:
elif k == "verbose":
d[k] = new_value
return d
def sync_to_async(func):
"""
Decorator to convert a sync function to an async function.
"""
@wraps(func)
async def async_wrapper(*args, **kwargs):
loop = asyncio.get_event_loop()
func_call = partial(func, *args, **kwargs)
return await loop.run_in_executor(None, func_call)
return async_wrapper

View file

@ -155,7 +155,7 @@ def create_function(code, function_name):
exec_globals[function_name] = locals()[function_name]
# Return a function that imports necessary modules and calls the target function
def wrapped_function(*args, **kwargs):
async def wrapped_function(*args, **kwargs):
for module_name, module in exec_globals.items():
if isinstance(module, type(importlib)):
globals()[module_name] = module