From abd3321677a7200b213348911728671a1df532b3 Mon Sep 17 00:00:00 2001 From: Christophe Bornet Date: Wed, 11 Dec 2024 18:53:33 +0100 Subject: [PATCH] ref: Make load_flow_from_json async (#5057) * Make load_flow_from_json async * Add back sync load_flow_from_json for backward compatibility --- src/backend/base/langflow/load/__init__.py | 11 +++- src/backend/base/langflow/load/load.py | 58 ++++++++++++++++--- src/backend/base/langflow/logging/logger.py | 3 +- .../base/langflow/services/settings/base.py | 15 +++-- src/backend/base/langflow/utils/util.py | 4 +- src/backend/tests/unit/test_loading.py | 6 +- 6 files changed, 74 insertions(+), 23 deletions(-) diff --git a/src/backend/base/langflow/load/__init__.py b/src/backend/base/langflow/load/__init__.py index 7c3369761..793f9f092 100644 --- a/src/backend/base/langflow/load/__init__.py +++ b/src/backend/base/langflow/load/__init__.py @@ -1,4 +1,11 @@ -from .load import load_flow_from_json, run_flow_from_json +from .load import aload_flow_from_json, arun_flow_from_json, load_flow_from_json, run_flow_from_json from .utils import get_flow, upload_file -__all__ = ["get_flow", "load_flow_from_json", "run_flow_from_json", "upload_file"] +__all__ = [ + "aload_flow_from_json", + "arun_flow_from_json", + "get_flow", + "load_flow_from_json", + "run_flow_from_json", + "upload_file", +] diff --git a/src/backend/base/langflow/load/load.py b/src/backend/base/langflow/load/load.py index 18d6ded57..7914d43f3 100644 --- a/src/backend/base/langflow/load/load.py +++ b/src/backend/base/langflow/load/load.py @@ -2,6 +2,7 @@ import asyncio import json from pathlib import Path +from aiofile import async_open from dotenv import load_dotenv from loguru import logger @@ -13,7 +14,7 @@ from langflow.utils.async_helpers import run_until_complete from langflow.utils.util import update_settings -def load_flow_from_json( +async def aload_flow_from_json( flow: Path | str | dict, *, tweaks: dict | None = None, @@ -49,14 +50,15 @@ def load_flow_from_json( # override env variables with .env file if env_file: - load_dotenv(env_file, override=True) + await asyncio.to_thread(load_dotenv, env_file, override=True) # Update settings with cache and components path - update_settings(cache=cache) + await update_settings(cache=cache) if isinstance(flow, str | Path): - with Path(flow).open(encoding="utf-8") as f: - flow_graph = json.load(f) + async with async_open(Path(flow).name, encoding="utf-8") as f: + content = await f.read() + flow_graph = json.load(content) # If input is a dictionary, assume it's a JSON object elif isinstance(flow, dict): flow_graph = flow @@ -71,6 +73,49 @@ def load_flow_from_json( return Graph.from_payload(graph_data) +def load_flow_from_json( + flow: Path | str | dict, + *, + tweaks: dict | None = None, + log_level: str | None = None, + log_file: str | None = None, + env_file: str | None = None, + cache: str | None = None, + disable_logs: bool | None = True, +) -> Graph: + """Load a flow graph from a JSON file or a JSON object. + + Args: + flow (Union[Path, str, dict]): The flow to load. It can be a file path (str or Path object) + or a JSON object (dict). + tweaks (Optional[dict]): Optional tweaks to apply to the loaded flow graph. + log_level (Optional[str]): Optional log level to configure for the flow processing. + log_file (Optional[str]): Optional log file to configure for the flow processing. + env_file (Optional[str]): Optional .env file to override environment variables. + cache (Optional[str]): Optional cache path to update the flow settings. + disable_logs (Optional[bool], default=True): Optional flag to disable logs during flow processing. + If log_level or log_file are set, disable_logs is not used. + + Returns: + Graph: The loaded flow graph as a Graph object. + + Raises: + TypeError: If the input is neither a file path (str or Path object) nor a JSON object (dict). + + """ + return run_until_complete( + aload_flow_from_json( + flow, + tweaks=tweaks, + log_level=log_level, + log_file=log_file, + env_file=env_file, + cache=cache, + disable_logs=disable_logs, + ) + ) + + async def arun_flow_from_json( flow: Path | str | dict, input_value: str, @@ -111,8 +156,7 @@ async def arun_flow_from_json( if tweaks is None: tweaks = {} tweaks["stream"] = False - graph = await asyncio.to_thread( - load_flow_from_json, + graph = await aload_flow_from_json( flow=flow, tweaks=tweaks, log_level=log_level, diff --git a/src/backend/base/langflow/logging/logger.py b/src/backend/base/langflow/logging/logger.py index 4db331d58..e705ee3b5 100644 --- a/src/backend/base/langflow/logging/logger.py +++ b/src/backend/base/langflow/logging/logger.py @@ -159,6 +159,7 @@ class AsyncFileSink(AsyncSink): self._sink = FileSink( path=file, rotation="10 MB", # Log rotation based on file size + delay=True, ) super().__init__(self.write_async, None, ErrorInterceptor(_defaults.LOGURU_CATCH, -1)) @@ -248,8 +249,6 @@ def configure( log_file = cache_dir / "langflow.log" logger.debug(f"Log file: {log_file}") try: - log_file.parent.mkdir(parents=True, exist_ok=True) - logger.add( sink=AsyncFileSink(log_file) if async_file else log_file, level=log_level.upper(), diff --git a/src/backend/base/langflow/services/settings/base.py b/src/backend/base/langflow/services/settings/base.py index 222c13ea1..f9b9db365 100644 --- a/src/backend/base/langflow/services/settings/base.py +++ b/src/backend/base/langflow/services/settings/base.py @@ -1,3 +1,4 @@ +import asyncio import contextlib import json import os @@ -7,6 +8,7 @@ from typing import Any, Literal import orjson import yaml +from aiofile import async_open from loguru import logger from pydantic import field_validator from pydantic.fields import FieldInfo @@ -334,8 +336,8 @@ class Settings(BaseSettings): model_config = SettingsConfigDict(validate_assignment=True, extra="ignore", env_prefix="LANGFLOW_") - def update_from_yaml(self, file_path: str, *, dev: bool = False) -> None: - new_settings = load_settings_from_yaml(file_path) + async def update_from_yaml(self, file_path: str, *, dev: bool = False) -> None: + new_settings = await load_settings_from_yaml(file_path) self.components_path = new_settings.components_path or [] self.dev = dev @@ -388,7 +390,7 @@ def save_settings_to_yaml(settings: Settings, file_path: str) -> None: yaml.dump(settings_dict, f) -def load_settings_from_yaml(file_path: str) -> Settings: +async def load_settings_from_yaml(file_path: str) -> Settings: # Check if a string is a valid path or a file name if "/" not in file_path: # Get current path @@ -397,8 +399,9 @@ def load_settings_from_yaml(file_path: str) -> Settings: else: file_path_ = Path(file_path) - with file_path_.open(encoding="utf-8") as f: - settings_dict = yaml.safe_load(f) + async with async_open(file_path_.name, encoding="utf-8") as f: + content = await f.read() + settings_dict = yaml.safe_load(content) settings_dict = {k.upper(): v for k, v in settings_dict.items()} for key in settings_dict: @@ -407,4 +410,4 @@ def load_settings_from_yaml(file_path: str) -> Settings: raise KeyError(msg) logger.debug(f"Loading {len(settings_dict[key])} {key} from {file_path}") - return Settings(**settings_dict) + return await asyncio.to_thread(Settings, **settings_dict) diff --git a/src/backend/base/langflow/utils/util.py b/src/backend/base/langflow/utils/util.py index ee89539c1..6bcd80421 100644 --- a/src/backend/base/langflow/utils/util.py +++ b/src/backend/base/langflow/utils/util.py @@ -402,7 +402,7 @@ def build_loader_repr_from_data(data: list[Data]) -> str: return "0 data" -def update_settings( +async def update_settings( *, config: str | None = None, cache: str | None = None, @@ -424,7 +424,7 @@ def update_settings( settings_service = get_settings_service() if config: logger.debug(f"Loading settings from {config}") - settings_service.settings.update_from_yaml(config, dev=dev) + await settings_service.settings.update_from_yaml(config, dev=dev) if remove_api_keys: logger.debug(f"Setting remove_api_keys to {remove_api_keys}") settings_service.settings.update_settings(remove_api_keys=remove_api_keys) diff --git a/src/backend/tests/unit/test_loading.py b/src/backend/tests/unit/test_loading.py index 6cac5b567..c46180f23 100644 --- a/src/backend/tests/unit/test_loading.py +++ b/src/backend/tests/unit/test_loading.py @@ -1,8 +1,6 @@ -import asyncio - from langflow.graph import Graph from langflow.initial_setup.setup import load_starter_projects -from langflow.load import load_flow_from_json +from langflow.load import aload_flow_from_json # TODO: UPDATE BASIC EXAMPLE # def test_load_flow_from_json(): @@ -24,6 +22,6 @@ async def test_load_flow_from_json_object(): """Test loading a flow from a json file and applying tweaks.""" result = await load_starter_projects() project = result[0][1] - loaded = await asyncio.to_thread(load_flow_from_json, project) + loaded = await aload_flow_from_json(project) assert loaded is not None assert isinstance(loaded, Graph)