ref: Make load_flow_from_json async (#5057)

* Make load_flow_from_json async

* Add back sync load_flow_from_json for backward compatibility
This commit is contained in:
Christophe Bornet 2024-12-11 18:53:33 +01:00 committed by GitHub
commit abd3321677
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 74 additions and 23 deletions

View file

@ -1,4 +1,11 @@
from .load import load_flow_from_json, run_flow_from_json
from .load import aload_flow_from_json, arun_flow_from_json, load_flow_from_json, run_flow_from_json
from .utils import get_flow, upload_file
__all__ = ["get_flow", "load_flow_from_json", "run_flow_from_json", "upload_file"]
__all__ = [
"aload_flow_from_json",
"arun_flow_from_json",
"get_flow",
"load_flow_from_json",
"run_flow_from_json",
"upload_file",
]

View file

@ -2,6 +2,7 @@ import asyncio
import json
from pathlib import Path
from aiofile import async_open
from dotenv import load_dotenv
from loguru import logger
@ -13,7 +14,7 @@ from langflow.utils.async_helpers import run_until_complete
from langflow.utils.util import update_settings
def load_flow_from_json(
async def aload_flow_from_json(
flow: Path | str | dict,
*,
tweaks: dict | None = None,
@ -49,14 +50,15 @@ def load_flow_from_json(
# override env variables with .env file
if env_file:
load_dotenv(env_file, override=True)
await asyncio.to_thread(load_dotenv, env_file, override=True)
# Update settings with cache and components path
update_settings(cache=cache)
await update_settings(cache=cache)
if isinstance(flow, str | Path):
with Path(flow).open(encoding="utf-8") as f:
flow_graph = json.load(f)
async with async_open(Path(flow).name, encoding="utf-8") as f:
content = await f.read()
flow_graph = json.load(content)
# If input is a dictionary, assume it's a JSON object
elif isinstance(flow, dict):
flow_graph = flow
@ -71,6 +73,49 @@ def load_flow_from_json(
return Graph.from_payload(graph_data)
def load_flow_from_json(
flow: Path | str | dict,
*,
tweaks: dict | None = None,
log_level: str | None = None,
log_file: str | None = None,
env_file: str | None = None,
cache: str | None = None,
disable_logs: bool | None = True,
) -> Graph:
"""Load a flow graph from a JSON file or a JSON object.
Args:
flow (Union[Path, str, dict]): The flow to load. It can be a file path (str or Path object)
or a JSON object (dict).
tweaks (Optional[dict]): Optional tweaks to apply to the loaded flow graph.
log_level (Optional[str]): Optional log level to configure for the flow processing.
log_file (Optional[str]): Optional log file to configure for the flow processing.
env_file (Optional[str]): Optional .env file to override environment variables.
cache (Optional[str]): Optional cache path to update the flow settings.
disable_logs (Optional[bool], default=True): Optional flag to disable logs during flow processing.
If log_level or log_file are set, disable_logs is not used.
Returns:
Graph: The loaded flow graph as a Graph object.
Raises:
TypeError: If the input is neither a file path (str or Path object) nor a JSON object (dict).
"""
return run_until_complete(
aload_flow_from_json(
flow,
tweaks=tweaks,
log_level=log_level,
log_file=log_file,
env_file=env_file,
cache=cache,
disable_logs=disable_logs,
)
)
async def arun_flow_from_json(
flow: Path | str | dict,
input_value: str,
@ -111,8 +156,7 @@ async def arun_flow_from_json(
if tweaks is None:
tweaks = {}
tweaks["stream"] = False
graph = await asyncio.to_thread(
load_flow_from_json,
graph = await aload_flow_from_json(
flow=flow,
tweaks=tweaks,
log_level=log_level,

View file

@ -159,6 +159,7 @@ class AsyncFileSink(AsyncSink):
self._sink = FileSink(
path=file,
rotation="10 MB", # Log rotation based on file size
delay=True,
)
super().__init__(self.write_async, None, ErrorInterceptor(_defaults.LOGURU_CATCH, -1))
@ -248,8 +249,6 @@ def configure(
log_file = cache_dir / "langflow.log"
logger.debug(f"Log file: {log_file}")
try:
log_file.parent.mkdir(parents=True, exist_ok=True)
logger.add(
sink=AsyncFileSink(log_file) if async_file else log_file,
level=log_level.upper(),

View file

@ -1,3 +1,4 @@
import asyncio
import contextlib
import json
import os
@ -7,6 +8,7 @@ from typing import Any, Literal
import orjson
import yaml
from aiofile import async_open
from loguru import logger
from pydantic import field_validator
from pydantic.fields import FieldInfo
@ -334,8 +336,8 @@ class Settings(BaseSettings):
model_config = SettingsConfigDict(validate_assignment=True, extra="ignore", env_prefix="LANGFLOW_")
def update_from_yaml(self, file_path: str, *, dev: bool = False) -> None:
new_settings = load_settings_from_yaml(file_path)
async def update_from_yaml(self, file_path: str, *, dev: bool = False) -> None:
new_settings = await load_settings_from_yaml(file_path)
self.components_path = new_settings.components_path or []
self.dev = dev
@ -388,7 +390,7 @@ def save_settings_to_yaml(settings: Settings, file_path: str) -> None:
yaml.dump(settings_dict, f)
def load_settings_from_yaml(file_path: str) -> Settings:
async def load_settings_from_yaml(file_path: str) -> Settings:
# Check if a string is a valid path or a file name
if "/" not in file_path:
# Get current path
@ -397,8 +399,9 @@ def load_settings_from_yaml(file_path: str) -> Settings:
else:
file_path_ = Path(file_path)
with file_path_.open(encoding="utf-8") as f:
settings_dict = yaml.safe_load(f)
async with async_open(file_path_.name, encoding="utf-8") as f:
content = await f.read()
settings_dict = yaml.safe_load(content)
settings_dict = {k.upper(): v for k, v in settings_dict.items()}
for key in settings_dict:
@ -407,4 +410,4 @@ def load_settings_from_yaml(file_path: str) -> Settings:
raise KeyError(msg)
logger.debug(f"Loading {len(settings_dict[key])} {key} from {file_path}")
return Settings(**settings_dict)
return await asyncio.to_thread(Settings, **settings_dict)

View file

@ -402,7 +402,7 @@ def build_loader_repr_from_data(data: list[Data]) -> str:
return "0 data"
def update_settings(
async def update_settings(
*,
config: str | None = None,
cache: str | None = None,
@ -424,7 +424,7 @@ def update_settings(
settings_service = get_settings_service()
if config:
logger.debug(f"Loading settings from {config}")
settings_service.settings.update_from_yaml(config, dev=dev)
await settings_service.settings.update_from_yaml(config, dev=dev)
if remove_api_keys:
logger.debug(f"Setting remove_api_keys to {remove_api_keys}")
settings_service.settings.update_settings(remove_api_keys=remove_api_keys)

View file

@ -1,8 +1,6 @@
import asyncio
from langflow.graph import Graph
from langflow.initial_setup.setup import load_starter_projects
from langflow.load import load_flow_from_json
from langflow.load import aload_flow_from_json
# TODO: UPDATE BASIC EXAMPLE
# def test_load_flow_from_json():
@ -24,6 +22,6 @@ async def test_load_flow_from_json_object():
"""Test loading a flow from a json file and applying tweaks."""
result = await load_starter_projects()
project = result[0][1]
loaded = await asyncio.to_thread(load_flow_from_json, project)
loaded = await aload_flow_from_json(project)
assert loaded is not None
assert isinstance(loaded, Graph)