ref: Remove usage of deprecated get_event_loop (#4697)

Remove usage of deprecated get_event_loop
This commit is contained in:
Christophe Bornet 2024-11-26 01:17:29 +01:00 committed by GitHub
commit 1962e48266
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 57 additions and 50 deletions

View file

@ -40,6 +40,7 @@ from langflow.schema.dotdict import dotdict
from langflow.schema.schema import INPUT_FIELD_NAME, InputType
from langflow.services.cache.utils import CacheMiss
from langflow.services.deps import get_chat_service, get_tracing_service
from langflow.utils.async_helpers import run_until_complete
if TYPE_CHECKING:
from langflow.api.v1.schemas import InputValueRequest
@ -1345,9 +1346,18 @@ class Graph:
files: list[str] | None = None,
user_id: str | None = None,
):
# Call astep but synchronously
loop = asyncio.get_event_loop()
return loop.run_until_complete(self.astep(inputs, files, user_id))
"""Runs the next vertex in the graph.
Note:
This function is a synchronous wrapper around `astep`.
It creates an event loop if one does not exist.
Args:
inputs: The inputs for the vertex. Defaults to None.
files: The files for the vertex. Defaults to None.
user_id: The user ID. Defaults to None.
"""
return run_until_complete(self.astep(inputs, files, user_id))
async def build_vertex(
self,

View file

@ -9,6 +9,7 @@ from langflow.graph import Graph
from langflow.graph.schema import RunOutputs
from langflow.logging.logger import configure
from langflow.processing.process import process_tweaks, run_graph
from langflow.utils.async_helpers import run_until_complete
from langflow.utils.util import update_settings
@ -149,25 +150,45 @@ def run_flow_from_json(
disable_logs: bool | None = True,
fallback_to_env_vars: bool = False,
) -> list[RunOutputs]:
coro = arun_flow_from_json(
flow,
input_value,
session_id=session_id,
tweaks=tweaks,
input_type=input_type,
output_type=output_type,
output_component=output_component,
log_level=log_level,
log_file=log_file,
env_file=env_file,
cache=cache,
disable_logs=disable_logs,
fallback_to_env_vars=fallback_to_env_vars,
"""Run a flow from a JSON file or dictionary.
Note:
This function is a synchronous wrapper around `arun_flow_from_json`.
It creates an event loop if one does not exist and runs the flow.
Args:
flow (Union[Path, str, dict]): The path to the JSON file or the JSON dictionary representing the flow.
input_value (str): The input value to be processed by the flow.
session_id (str | None, optional): The session ID to be used for the flow. Defaults to None.
tweaks (Optional[dict], optional): Optional tweaks to be applied to the flow. Defaults to None.
input_type (str, optional): The type of the input value. Defaults to "chat".
output_type (str, optional): The type of the output value. Defaults to "chat".
output_component (Optional[str], optional): The specific component to output. Defaults to None.
log_level (Optional[str], optional): The log level to use. Defaults to None.
log_file (Optional[str], optional): The log file to write logs to. Defaults to None.
env_file (Optional[str], optional): The environment file to load. Defaults to None.
cache (Optional[str], optional): The cache directory to use. Defaults to None.
disable_logs (Optional[bool], optional): Whether to disable logs. Defaults to True.
fallback_to_env_vars (bool, optional): Whether Global Variables should fallback to environment variables if
not found. Defaults to False.
Returns:
List[RunOutputs]: A list of RunOutputs objects representing the results of running the flow.
"""
return run_until_complete(
arun_flow_from_json(
flow,
input_value,
session_id=session_id,
tweaks=tweaks,
input_type=input_type,
output_type=output_type,
output_component=output_component,
log_level=log_level,
log_file=log_file,
env_file=env_file,
cache=cache,
disable_logs=disable_logs,
fallback_to_env_vars=fallback_to_env_vars,
)
)
try:
loop = asyncio.get_running_loop()
except RuntimeError:
return asyncio.run(coro)
return loop.run_until_complete(coro)

View file

@ -1,34 +1,10 @@
import asyncio
import threading
def run_until_complete(coro):
try:
loop = asyncio.get_event_loop()
if loop.is_running():
# Run the coroutine in a separate event loop in a new thread
return run_in_thread(coro)
return loop.run_until_complete(coro)
loop = asyncio.get_running_loop()
except RuntimeError:
# If there's no event loop, create a new one and run the coroutine
return asyncio.run(coro)
def run_in_thread(coro):
result = None
exception = None
def target() -> None:
nonlocal result, exception
try:
result = asyncio.run(coro)
except Exception as e: # noqa: BLE001
exception = e
thread = threading.Thread(target=target)
thread.start()
thread.join()
if exception:
raise exception
return result
return loop.run_until_complete(coro)