diff --git a/src/backend/base/langflow/api/v1/endpoints.py b/src/backend/base/langflow/api/v1/endpoints.py index 3514d23aa..c12c2f470 100644 --- a/src/backend/base/langflow/api/v1/endpoints.py +++ b/src/backend/base/langflow/api/v1/endpoints.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio import time from http import HTTPStatus from typing import TYPE_CHECKING, Annotated @@ -556,7 +557,7 @@ async def create_upload_file( """ try: flow_id_str = str(flow_id) - file_path = save_uploaded_file(file, folder_name=flow_id_str) + file_path = await asyncio.to_thread(save_uploaded_file, file, folder_name=flow_id_str) return UploadFileResponse( flow_id=flow_id_str, diff --git a/src/backend/base/langflow/custom/directory_reader/directory_reader.py b/src/backend/base/langflow/custom/directory_reader/directory_reader.py index 7b60ebf5a..ffc95d580 100644 --- a/src/backend/base/langflow/custom/directory_reader/directory_reader.py +++ b/src/backend/base/langflow/custom/directory_reader/directory_reader.py @@ -118,7 +118,7 @@ class DirectoryReader: except UnicodeDecodeError: # This is happening in Windows, so we need to open the file in binary mode # The file is always just a python file, so we can safely read it as utf-8 - with async_open(_file_path, "rb") as f: + async with async_open(_file_path, "rb") as f: return (await f.read()).decode("utf-8") def get_files(self): diff --git a/src/backend/base/langflow/initial_setup/setup.py b/src/backend/base/langflow/initial_setup/setup.py index 153bb35f5..b2ed742ee 100644 --- a/src/backend/base/langflow/initial_setup/setup.py +++ b/src/backend/base/langflow/initial_setup/setup.py @@ -9,6 +9,7 @@ from pathlib import Path from uuid import UUID import orjson +from aiofile import async_open from emoji import demojize, purely_emoji from loguru import logger from sqlalchemy.exc import NoResultFound @@ -545,13 +546,14 @@ async def load_flows_from_directory() -> None: user_id = user.id _flows_path = Path(flows_path) files = [f for f in _flows_path.iterdir() if f.is_file()] - for f in files: - if f.suffix != ".json": + for file_path in files: + if file_path.suffix != ".json": continue - logger.info(f"Loading flow from file: {f.name}") - content = f.read_text(encoding="utf-8") + logger.info(f"Loading flow from file: {file_path.name}") + async with async_open(file_path, "r", encoding="utf-8") as f: + content = await f.read() flow = orjson.loads(content) - no_json_name = f.stem + no_json_name = file_path.stem flow_endpoint_name = flow.get("endpoint_name") if _is_valid_uuid(no_json_name): flow["id"] = no_json_name diff --git a/src/backend/base/langflow/services/storage/local.py b/src/backend/base/langflow/services/storage/local.py index 5a112294c..bbe037141 100644 --- a/src/backend/base/langflow/services/storage/local.py +++ b/src/backend/base/langflow/services/storage/local.py @@ -1,6 +1,6 @@ -import asyncio from pathlib import Path +from aiofile import async_open from loguru import logger from .service import StorageService @@ -33,11 +33,9 @@ class LocalStorageService(StorageService): folder_path.mkdir(parents=True, exist_ok=True) file_path = folder_path / file_name - def write_file(file_path: Path, data: bytes) -> None: - file_path.write_bytes(data) - try: - await asyncio.to_thread(write_file, file_path, data) + async with async_open(file_path, "wb") as f: + await f.write(data) logger.info(f"File {file_name} saved successfully in flow {flow_id}.") except Exception: logger.exception(f"Error saving file {file_name} in flow {flow_id}") @@ -57,10 +55,9 @@ class LocalStorageService(StorageService): msg = f"File {file_name} not found in flow {flow_id}" raise FileNotFoundError(msg) - def read_file(file_path: Path) -> bytes: - return file_path.read_bytes() + async with async_open(file_path, "rb") as f: + content = await f.read() - content = await asyncio.to_thread(read_file, file_path) logger.debug(f"File {file_name} retrieved successfully from flow {flow_id}.") return content