From e545d12c4028a09a4a0d26a2edd1196f2f3a6d52 Mon Sep 17 00:00:00 2001 From: Christophe Bornet Date: Tue, 10 Dec 2024 02:55:42 +0100 Subject: [PATCH] fix: Use anyio for file operations in a few places (#5167) Use anyio for file operations in a few places --- .../langflow/components/data/api_request.py | 5 +++-- .../directory_reader/directory_reader.py | 9 ++++---- .../base/langflow/initial_setup/setup.py | 7 ++++--- src/backend/base/langflow/main.py | 5 +++-- .../base/langflow/services/storage/local.py | 21 +++++++++---------- src/backend/tests/conftest.py | 3 ++- src/backend/tests/unit/api/v1/test_files.py | 3 ++- src/backend/tests/unit/test_initial_setup.py | 3 ++- src/backend/tests/unit/test_webhook.py | 10 ++++----- 9 files changed, 36 insertions(+), 30 deletions(-) diff --git a/src/backend/base/langflow/components/data/api_request.py b/src/backend/base/langflow/components/data/api_request.py index 867e0a5c1..b05cca43b 100644 --- a/src/backend/base/langflow/components/data/api_request.py +++ b/src/backend/base/langflow/components/data/api_request.py @@ -11,6 +11,7 @@ from zoneinfo import ZoneInfo import httpx import validators +from aiofile import async_open from langflow.base.curl.parse import parse_context from langflow.custom import Component @@ -193,8 +194,8 @@ class APIRequestComponent(Component): mode = "wb" if is_binary else "w" encoding = response.encoding if mode == "w" else None if file_path: - with file_path.open(mode, encoding=encoding) as f: - f.write(response.content if is_binary else response.text) + async with async_open(file_path, mode, encoding=encoding) as f: + await f.write(response.content if is_binary else response.text) if include_httpx_metadata: metadata.update( diff --git a/src/backend/base/langflow/custom/directory_reader/directory_reader.py b/src/backend/base/langflow/custom/directory_reader/directory_reader.py index 458a2e2aa..840652418 100644 --- a/src/backend/base/langflow/custom/directory_reader/directory_reader.py +++ b/src/backend/base/langflow/custom/directory_reader/directory_reader.py @@ -3,6 +3,7 @@ import asyncio import zlib from pathlib import Path +import anyio from aiofile import async_open from loguru import logger @@ -107,18 +108,18 @@ class DirectoryReader: async def aread_file_content(self, file_path): """Read and return the content of a file.""" - file_path_ = Path(file_path) - if not file_path_.is_file(): + file_path_ = anyio.Path(file_path) + if not await file_path_.is_file(): return None try: - async with async_open(file_path_, encoding="utf-8") as file: + async with async_open(str(file_path_), encoding="utf-8") as file: # UnicodeDecodeError: 'charmap' codec can't decode byte 0x9d in position 3069: # character maps to return await file.read() except UnicodeDecodeError: # This is happening in Windows, so we need to open the file in binary mode # The file is always just a python file, so we can safely read it as utf-8 - async with async_open(file_path_, "rb") as f: + async with async_open(str(file_path_), "rb") as f: return (await f.read()).decode("utf-8") def get_files(self): diff --git a/src/backend/base/langflow/initial_setup/setup.py b/src/backend/base/langflow/initial_setup/setup.py index bf2edc106..b3a0e3549 100644 --- a/src/backend/base/langflow/initial_setup/setup.py +++ b/src/backend/base/langflow/initial_setup/setup.py @@ -8,6 +8,7 @@ from datetime import datetime, timezone from pathlib import Path from uuid import UUID +import anyio import orjson from aiofile import async_open from emoji import demojize, purely_emoji @@ -544,13 +545,13 @@ async def load_flows_from_directory() -> None: msg = "Superuser not found in the database" raise NoResultFound(msg) user_id = user.id - flows_path_ = Path(flows_path) - files = [f for f in flows_path_.iterdir() if f.is_file()] + flows_path_ = anyio.Path(flows_path) + files = [f async for f in flows_path_.iterdir() if await f.is_file()] for file_path in files: if file_path.suffix != ".json": continue logger.info(f"Loading flow from file: {file_path.name}") - async with async_open(file_path, "r", encoding="utf-8") as f: + async with async_open(str(file_path), "r", encoding="utf-8") as f: content = await f.read() flow = orjson.loads(content) no_json_name = file_path.stem diff --git a/src/backend/base/langflow/main.py b/src/backend/base/langflow/main.py index 56752e73a..b25b4b240 100644 --- a/src/backend/base/langflow/main.py +++ b/src/backend/base/langflow/main.py @@ -8,6 +8,7 @@ from http import HTTPStatus from pathlib import Path from urllib.parse import urlencode +import anyio from fastapi import FastAPI, HTTPException, Request, Response, status from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse, JSONResponse @@ -260,9 +261,9 @@ def setup_static_files(app: FastAPI, static_files_dir: Path) -> None: @app.exception_handler(404) async def custom_404_handler(_request, _exc): - path = static_files_dir / "index.html" + path = anyio.Path(static_files_dir) / "index.html" - if not path.exists(): + if not await path.exists(): msg = f"File at path {path} does not exist." raise RuntimeError(msg) return FileResponse(path) diff --git a/src/backend/base/langflow/services/storage/local.py b/src/backend/base/langflow/services/storage/local.py index bbe037141..7467aefe7 100644 --- a/src/backend/base/langflow/services/storage/local.py +++ b/src/backend/base/langflow/services/storage/local.py @@ -1,5 +1,4 @@ -from pathlib import Path - +import anyio from aiofile import async_open from loguru import logger @@ -12,7 +11,7 @@ class LocalStorageService(StorageService): def __init__(self, session_service, settings_service) -> None: """Initialize the local storage service with session and settings services.""" super().__init__(session_service, settings_service) - self.data_dir = Path(settings_service.settings.config_dir) + self.data_dir = anyio.Path(settings_service.settings.config_dir) self.set_ready() def build_full_path(self, flow_id: str, file_name: str) -> str: @@ -30,11 +29,11 @@ class LocalStorageService(StorageService): :raises PermissionError: If there is no permission to write the file. """ folder_path = self.data_dir / flow_id - folder_path.mkdir(parents=True, exist_ok=True) + await folder_path.mkdir(parents=True, exist_ok=True) file_path = folder_path / file_name try: - async with async_open(file_path, "wb") as f: + async with async_open(str(file_path), "wb") as f: await f.write(data) logger.info(f"File {file_name} saved successfully in flow {flow_id}.") except Exception: @@ -50,12 +49,12 @@ class LocalStorageService(StorageService): :raises FileNotFoundError: If the file does not exist. """ file_path = self.data_dir / flow_id / file_name - if not file_path.exists(): + if not await file_path.exists(): logger.warning(f"File {file_name} not found in flow {flow_id}.") msg = f"File {file_name} not found in flow {flow_id}" raise FileNotFoundError(msg) - async with async_open(file_path, "rb") as f: + async with async_open(str(file_path), "rb") as f: content = await f.read() logger.debug(f"File {file_name} retrieved successfully from flow {flow_id}.") @@ -69,12 +68,12 @@ class LocalStorageService(StorageService): :raises FileNotFoundError: If the flow directory does not exist. """ folder_path = self.data_dir / flow_id - if not folder_path.exists() or not folder_path.is_dir(): + if not await folder_path.exists() or not await folder_path.is_dir(): logger.warning(f"Flow {flow_id} directory does not exist.") msg = f"Flow {flow_id} directory does not exist." raise FileNotFoundError(msg) - files = [file.name for file in folder_path.iterdir() if file.is_file()] + files = [file.name async for file in folder_path.iterdir() if await file.is_file()] logger.info(f"Listed {len(files)} files in flow {flow_id}.") return files @@ -85,8 +84,8 @@ class LocalStorageService(StorageService): :param file_name: The name of the file to be deleted. """ file_path = self.data_dir / flow_id / file_name - if file_path.exists(): - file_path.unlink() + if await file_path.exists(): + await file_path.unlink() logger.info(f"File {file_name} deleted successfully from flow {flow_id}.") else: logger.warning(f"Attempted to delete non-existent file {file_name} in flow {flow_id}.") diff --git a/src/backend/tests/conftest.py b/src/backend/tests/conftest.py index 309bfcebf..391a86999 100644 --- a/src/backend/tests/conftest.py +++ b/src/backend/tests/conftest.py @@ -10,6 +10,7 @@ from pathlib import Path from typing import TYPE_CHECKING from uuid import UUID +import anyio import orjson import pytest from asgi_lifespan import LifespanManager @@ -365,7 +366,7 @@ async def client_fixture( monkeypatch.undo() # clear the temp db with suppress(FileNotFoundError): - db_path.unlink() + await anyio.Path(db_path).unlink() # create a fixture for session_getter above diff --git a/src/backend/tests/unit/api/v1/test_files.py b/src/backend/tests/unit/api/v1/test_files.py index 87add5396..05a67b206 100644 --- a/src/backend/tests/unit/api/v1/test_files.py +++ b/src/backend/tests/unit/api/v1/test_files.py @@ -7,6 +7,7 @@ from io import BytesIO from pathlib import Path from unittest.mock import MagicMock +import anyio import pytest from asgi_lifespan import LifespanManager from httpx import ASGITransport, AsyncClient @@ -76,7 +77,7 @@ async def files_client_fixture( monkeypatch.undo() # clear the temp db with suppress(FileNotFoundError): - db_path.unlink() + await anyio.Path(db_path).unlink() @pytest.fixture diff --git a/src/backend/tests/unit/test_initial_setup.py b/src/backend/tests/unit/test_initial_setup.py index ec1fb4a61..b68124861 100644 --- a/src/backend/tests/unit/test_initial_setup.py +++ b/src/backend/tests/unit/test_initial_setup.py @@ -2,6 +2,7 @@ import asyncio from datetime import datetime from pathlib import Path +import anyio import pytest from langflow.custom.directory_reader.utils import abuild_custom_component_list_from_path from langflow.initial_setup.constants import STARTER_FOLDER_NAME @@ -132,7 +133,7 @@ def add_edge(source, target, from_output, to_input): async def test_refresh_starter_projects(): - data_path = str(Path(__file__).parent.parent.parent.absolute() / "base" / "langflow" / "components") + data_path = str(await anyio.Path(__file__).parent.parent.parent.absolute() / "base" / "langflow" / "components") components = await abuild_custom_component_list_from_path(data_path) chat_input = find_component_by_name(components, "ChatInput") diff --git a/src/backend/tests/unit/test_webhook.py b/src/backend/tests/unit/test_webhook.py index 2c38c8fa7..c8d5a3ab0 100644 --- a/src/backend/tests/unit/test_webhook.py +++ b/src/backend/tests/unit/test_webhook.py @@ -1,6 +1,6 @@ import tempfile -from pathlib import Path +import anyio import pytest @@ -18,21 +18,21 @@ async def test_webhook_endpoint(client, added_webhook_test): endpoint = f"api/v1/webhook/{endpoint_name}" # Create a temporary file with tempfile.TemporaryDirectory() as tmp: - file_path = Path(tmp) / "test_file.txt" + file_path = anyio.Path(tmp) / "test_file.txt" payload = {"path": str(file_path)} response = await client.post(endpoint, json=payload) assert response.status_code == 202 - assert file_path.exists() + assert await file_path.exists() - assert not file_path.exists() + assert not await file_path.exists() # Send an invalid payload payload = {"invalid_key": "invalid_value"} response = await client.post(endpoint, json=payload) assert response.status_code == 202 - assert not file_path.exists() + assert not await file_path.exists() async def test_webhook_flow_on_run_endpoint(client, added_webhook_test, created_api_key):