fix: Use anyio for file operations in a few places (#5167)

Use anyio for file operations in a few places
This commit is contained in:
Christophe Bornet 2024-12-10 02:55:42 +01:00 committed by GitHub
commit e545d12c40
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 36 additions and 30 deletions

View file

@ -11,6 +11,7 @@ from zoneinfo import ZoneInfo
import httpx
import validators
from aiofile import async_open
from langflow.base.curl.parse import parse_context
from langflow.custom import Component
@ -193,8 +194,8 @@ class APIRequestComponent(Component):
mode = "wb" if is_binary else "w"
encoding = response.encoding if mode == "w" else None
if file_path:
with file_path.open(mode, encoding=encoding) as f:
f.write(response.content if is_binary else response.text)
async with async_open(file_path, mode, encoding=encoding) as f:
await f.write(response.content if is_binary else response.text)
if include_httpx_metadata:
metadata.update(

View file

@ -3,6 +3,7 @@ import asyncio
import zlib
from pathlib import Path
import anyio
from aiofile import async_open
from loguru import logger
@ -107,18 +108,18 @@ class DirectoryReader:
async def aread_file_content(self, file_path):
"""Read and return the content of a file."""
file_path_ = Path(file_path)
if not file_path_.is_file():
file_path_ = anyio.Path(file_path)
if not await file_path_.is_file():
return None
try:
async with async_open(file_path_, encoding="utf-8") as file:
async with async_open(str(file_path_), encoding="utf-8") as file:
# UnicodeDecodeError: 'charmap' codec can't decode byte 0x9d in position 3069:
# character maps to <undefined>
return await file.read()
except UnicodeDecodeError:
# This is happening in Windows, so we need to open the file in binary mode
# The file is always just a python file, so we can safely read it as utf-8
async with async_open(file_path_, "rb") as f:
async with async_open(str(file_path_), "rb") as f:
return (await f.read()).decode("utf-8")
def get_files(self):

View file

@ -8,6 +8,7 @@ from datetime import datetime, timezone
from pathlib import Path
from uuid import UUID
import anyio
import orjson
from aiofile import async_open
from emoji import demojize, purely_emoji
@ -544,13 +545,13 @@ async def load_flows_from_directory() -> None:
msg = "Superuser not found in the database"
raise NoResultFound(msg)
user_id = user.id
flows_path_ = Path(flows_path)
files = [f for f in flows_path_.iterdir() if f.is_file()]
flows_path_ = anyio.Path(flows_path)
files = [f async for f in flows_path_.iterdir() if await f.is_file()]
for file_path in files:
if file_path.suffix != ".json":
continue
logger.info(f"Loading flow from file: {file_path.name}")
async with async_open(file_path, "r", encoding="utf-8") as f:
async with async_open(str(file_path), "r", encoding="utf-8") as f:
content = await f.read()
flow = orjson.loads(content)
no_json_name = file_path.stem

View file

@ -8,6 +8,7 @@ from http import HTTPStatus
from pathlib import Path
from urllib.parse import urlencode
import anyio
from fastapi import FastAPI, HTTPException, Request, Response, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, JSONResponse
@ -260,9 +261,9 @@ def setup_static_files(app: FastAPI, static_files_dir: Path) -> None:
@app.exception_handler(404)
async def custom_404_handler(_request, _exc):
path = static_files_dir / "index.html"
path = anyio.Path(static_files_dir) / "index.html"
if not path.exists():
if not await path.exists():
msg = f"File at path {path} does not exist."
raise RuntimeError(msg)
return FileResponse(path)

View file

@ -1,5 +1,4 @@
from pathlib import Path
import anyio
from aiofile import async_open
from loguru import logger
@ -12,7 +11,7 @@ class LocalStorageService(StorageService):
def __init__(self, session_service, settings_service) -> None:
"""Initialize the local storage service with session and settings services."""
super().__init__(session_service, settings_service)
self.data_dir = Path(settings_service.settings.config_dir)
self.data_dir = anyio.Path(settings_service.settings.config_dir)
self.set_ready()
def build_full_path(self, flow_id: str, file_name: str) -> str:
@ -30,11 +29,11 @@ class LocalStorageService(StorageService):
:raises PermissionError: If there is no permission to write the file.
"""
folder_path = self.data_dir / flow_id
folder_path.mkdir(parents=True, exist_ok=True)
await folder_path.mkdir(parents=True, exist_ok=True)
file_path = folder_path / file_name
try:
async with async_open(file_path, "wb") as f:
async with async_open(str(file_path), "wb") as f:
await f.write(data)
logger.info(f"File {file_name} saved successfully in flow {flow_id}.")
except Exception:
@ -50,12 +49,12 @@ class LocalStorageService(StorageService):
:raises FileNotFoundError: If the file does not exist.
"""
file_path = self.data_dir / flow_id / file_name
if not file_path.exists():
if not await file_path.exists():
logger.warning(f"File {file_name} not found in flow {flow_id}.")
msg = f"File {file_name} not found in flow {flow_id}"
raise FileNotFoundError(msg)
async with async_open(file_path, "rb") as f:
async with async_open(str(file_path), "rb") as f:
content = await f.read()
logger.debug(f"File {file_name} retrieved successfully from flow {flow_id}.")
@ -69,12 +68,12 @@ class LocalStorageService(StorageService):
:raises FileNotFoundError: If the flow directory does not exist.
"""
folder_path = self.data_dir / flow_id
if not folder_path.exists() or not folder_path.is_dir():
if not await folder_path.exists() or not await folder_path.is_dir():
logger.warning(f"Flow {flow_id} directory does not exist.")
msg = f"Flow {flow_id} directory does not exist."
raise FileNotFoundError(msg)
files = [file.name for file in folder_path.iterdir() if file.is_file()]
files = [file.name async for file in folder_path.iterdir() if await file.is_file()]
logger.info(f"Listed {len(files)} files in flow {flow_id}.")
return files
@ -85,8 +84,8 @@ class LocalStorageService(StorageService):
:param file_name: The name of the file to be deleted.
"""
file_path = self.data_dir / flow_id / file_name
if file_path.exists():
file_path.unlink()
if await file_path.exists():
await file_path.unlink()
logger.info(f"File {file_name} deleted successfully from flow {flow_id}.")
else:
logger.warning(f"Attempted to delete non-existent file {file_name} in flow {flow_id}.")

View file

@ -10,6 +10,7 @@ from pathlib import Path
from typing import TYPE_CHECKING
from uuid import UUID
import anyio
import orjson
import pytest
from asgi_lifespan import LifespanManager
@ -365,7 +366,7 @@ async def client_fixture(
monkeypatch.undo()
# clear the temp db
with suppress(FileNotFoundError):
db_path.unlink()
await anyio.Path(db_path).unlink()
# create a fixture for session_getter above

View file

@ -7,6 +7,7 @@ from io import BytesIO
from pathlib import Path
from unittest.mock import MagicMock
import anyio
import pytest
from asgi_lifespan import LifespanManager
from httpx import ASGITransport, AsyncClient
@ -76,7 +77,7 @@ async def files_client_fixture(
monkeypatch.undo()
# clear the temp db
with suppress(FileNotFoundError):
db_path.unlink()
await anyio.Path(db_path).unlink()
@pytest.fixture

View file

@ -2,6 +2,7 @@ import asyncio
from datetime import datetime
from pathlib import Path
import anyio
import pytest
from langflow.custom.directory_reader.utils import abuild_custom_component_list_from_path
from langflow.initial_setup.constants import STARTER_FOLDER_NAME
@ -132,7 +133,7 @@ def add_edge(source, target, from_output, to_input):
async def test_refresh_starter_projects():
data_path = str(Path(__file__).parent.parent.parent.absolute() / "base" / "langflow" / "components")
data_path = str(await anyio.Path(__file__).parent.parent.parent.absolute() / "base" / "langflow" / "components")
components = await abuild_custom_component_list_from_path(data_path)
chat_input = find_component_by_name(components, "ChatInput")

View file

@ -1,6 +1,6 @@
import tempfile
from pathlib import Path
import anyio
import pytest
@ -18,21 +18,21 @@ async def test_webhook_endpoint(client, added_webhook_test):
endpoint = f"api/v1/webhook/{endpoint_name}"
# Create a temporary file
with tempfile.TemporaryDirectory() as tmp:
file_path = Path(tmp) / "test_file.txt"
file_path = anyio.Path(tmp) / "test_file.txt"
payload = {"path": str(file_path)}
response = await client.post(endpoint, json=payload)
assert response.status_code == 202
assert file_path.exists()
assert await file_path.exists()
assert not file_path.exists()
assert not await file_path.exists()
# Send an invalid payload
payload = {"invalid_key": "invalid_value"}
response = await client.post(endpoint, json=payload)
assert response.status_code == 202
assert not file_path.exists()
assert not await file_path.exists()
async def test_webhook_flow_on_run_endpoint(client, added_webhook_test, created_api_key):