From 1962e48266ee4e1bf0eee250a944a4d017a4053c Mon Sep 17 00:00:00 2001 From: Christophe Bornet Date: Tue, 26 Nov 2024 01:17:29 +0100 Subject: [PATCH] ref: Remove usage of deprecated get_event_loop (#4697) Remove usage of deprecated get_event_loop --- src/backend/base/langflow/graph/graph/base.py | 16 ++++- src/backend/base/langflow/load/load.py | 63 ++++++++++++------- .../base/langflow/utils/async_helpers.py | 28 +-------- 3 files changed, 57 insertions(+), 50 deletions(-) diff --git a/src/backend/base/langflow/graph/graph/base.py b/src/backend/base/langflow/graph/graph/base.py index b1a800360..8a6a8910d 100644 --- a/src/backend/base/langflow/graph/graph/base.py +++ b/src/backend/base/langflow/graph/graph/base.py @@ -40,6 +40,7 @@ from langflow.schema.dotdict import dotdict from langflow.schema.schema import INPUT_FIELD_NAME, InputType from langflow.services.cache.utils import CacheMiss from langflow.services.deps import get_chat_service, get_tracing_service +from langflow.utils.async_helpers import run_until_complete if TYPE_CHECKING: from langflow.api.v1.schemas import InputValueRequest @@ -1345,9 +1346,18 @@ class Graph: files: list[str] | None = None, user_id: str | None = None, ): - # Call astep but synchronously - loop = asyncio.get_event_loop() - return loop.run_until_complete(self.astep(inputs, files, user_id)) + """Runs the next vertex in the graph. + + Note: + This function is a synchronous wrapper around `astep`. + It creates an event loop if one does not exist. + + Args: + inputs: The inputs for the vertex. Defaults to None. + files: The files for the vertex. Defaults to None. + user_id: The user ID. Defaults to None. + """ + return run_until_complete(self.astep(inputs, files, user_id)) async def build_vertex( self, diff --git a/src/backend/base/langflow/load/load.py b/src/backend/base/langflow/load/load.py index 13d0ab051..18d6ded57 100644 --- a/src/backend/base/langflow/load/load.py +++ b/src/backend/base/langflow/load/load.py @@ -9,6 +9,7 @@ from langflow.graph import Graph from langflow.graph.schema import RunOutputs from langflow.logging.logger import configure from langflow.processing.process import process_tweaks, run_graph +from langflow.utils.async_helpers import run_until_complete from langflow.utils.util import update_settings @@ -149,25 +150,45 @@ def run_flow_from_json( disable_logs: bool | None = True, fallback_to_env_vars: bool = False, ) -> list[RunOutputs]: - coro = arun_flow_from_json( - flow, - input_value, - session_id=session_id, - tweaks=tweaks, - input_type=input_type, - output_type=output_type, - output_component=output_component, - log_level=log_level, - log_file=log_file, - env_file=env_file, - cache=cache, - disable_logs=disable_logs, - fallback_to_env_vars=fallback_to_env_vars, + """Run a flow from a JSON file or dictionary. + + Note: + This function is a synchronous wrapper around `arun_flow_from_json`. + It creates an event loop if one does not exist and runs the flow. + + Args: + flow (Union[Path, str, dict]): The path to the JSON file or the JSON dictionary representing the flow. + input_value (str): The input value to be processed by the flow. + session_id (str | None, optional): The session ID to be used for the flow. Defaults to None. + tweaks (Optional[dict], optional): Optional tweaks to be applied to the flow. Defaults to None. + input_type (str, optional): The type of the input value. Defaults to "chat". + output_type (str, optional): The type of the output value. Defaults to "chat". + output_component (Optional[str], optional): The specific component to output. Defaults to None. + log_level (Optional[str], optional): The log level to use. Defaults to None. + log_file (Optional[str], optional): The log file to write logs to. Defaults to None. + env_file (Optional[str], optional): The environment file to load. Defaults to None. + cache (Optional[str], optional): The cache directory to use. Defaults to None. + disable_logs (Optional[bool], optional): Whether to disable logs. Defaults to True. + fallback_to_env_vars (bool, optional): Whether Global Variables should fallback to environment variables if + not found. Defaults to False. + + Returns: + List[RunOutputs]: A list of RunOutputs objects representing the results of running the flow. + """ + return run_until_complete( + arun_flow_from_json( + flow, + input_value, + session_id=session_id, + tweaks=tweaks, + input_type=input_type, + output_type=output_type, + output_component=output_component, + log_level=log_level, + log_file=log_file, + env_file=env_file, + cache=cache, + disable_logs=disable_logs, + fallback_to_env_vars=fallback_to_env_vars, + ) ) - - try: - loop = asyncio.get_running_loop() - except RuntimeError: - return asyncio.run(coro) - - return loop.run_until_complete(coro) diff --git a/src/backend/base/langflow/utils/async_helpers.py b/src/backend/base/langflow/utils/async_helpers.py index 04cfbacae..1b0ba25b4 100644 --- a/src/backend/base/langflow/utils/async_helpers.py +++ b/src/backend/base/langflow/utils/async_helpers.py @@ -1,34 +1,10 @@ import asyncio -import threading def run_until_complete(coro): try: - loop = asyncio.get_event_loop() - if loop.is_running(): - # Run the coroutine in a separate event loop in a new thread - return run_in_thread(coro) - return loop.run_until_complete(coro) + loop = asyncio.get_running_loop() except RuntimeError: # If there's no event loop, create a new one and run the coroutine return asyncio.run(coro) - - -def run_in_thread(coro): - result = None - exception = None - - def target() -> None: - nonlocal result, exception - try: - result = asyncio.run(coro) - except Exception as e: # noqa: BLE001 - exception = e - - thread = threading.Thread(target=target) - thread.start() - thread.join() - - if exception: - raise exception - return result + return loop.run_until_complete(coro)