Add async file operations to LocalStorageService
This commit is contained in:
parent
7d7a980e7c
commit
35087cd2b4
1 changed files with 77 additions and 16 deletions
|
|
@ -1,31 +1,92 @@
|
|||
from pathlib import Path
|
||||
|
||||
import aiofiles
|
||||
from loguru import logger
|
||||
|
||||
from .service import StorageService
|
||||
|
||||
|
||||
class LocalStorageService(StorageService):
|
||||
def __init__(self, session_service, settings_service):
|
||||
super().__init__(session_service, settings_service)
|
||||
self.data_dir = settings_service.settings.CONFIG_DIR
|
||||
"""A service class for handling local storage operations."""
|
||||
|
||||
def __init__(self, session_service, settings_service):
|
||||
"""Initialize the local storage service with session and settings services."""
|
||||
super().__init__(session_service, settings_service)
|
||||
self.data_dir = Path(settings_service.settings.CONFIG_DIR)
|
||||
self.set_ready()
|
||||
|
||||
def save_file(self, flow_id: str, file_name: str, data: bytes):
|
||||
folder_path = Path(f"{self.data_dir}/{flow_id}")
|
||||
async def save_file(self, flow_id: str, file_name: str, data: bytes):
|
||||
"""
|
||||
Save a file in the local storage.
|
||||
|
||||
:param flow_id: The identifier for the flow.
|
||||
:param file_name: The name of the file to be saved.
|
||||
:param data: The byte content of the file.
|
||||
:raises FileNotFoundError: If the specified flow does not exist.
|
||||
:raises IsADirectoryError: If the file name is a directory.
|
||||
:raises PermissionError: If there is no permission to write the file.
|
||||
"""
|
||||
folder_path = self.data_dir / flow_id
|
||||
folder_path.mkdir(parents=True, exist_ok=True)
|
||||
with open(folder_path / file_name, "wb") as f:
|
||||
f.write(data)
|
||||
file_path = folder_path / file_name
|
||||
|
||||
def get_file(self, flow_id: str, file_name: str) -> bytes:
|
||||
with open(f"{self.data_dir}/{flow_id}/{file_name}", "rb") as f:
|
||||
return f.read()
|
||||
try:
|
||||
async with aiofiles.open(file_path, "wb") as f:
|
||||
await f.write(data)
|
||||
logger.info(f"File {file_name} saved successfully in flow {flow_id}.")
|
||||
except Exception as e:
|
||||
logger.error(f"Error saving file {file_name} in flow {flow_id}: {e}")
|
||||
raise e
|
||||
|
||||
def list_files(self, flow_id: str):
|
||||
folder_path = Path(f"{self.data_dir}/{flow_id}")
|
||||
return [file.name for file in folder_path.iterdir() if file.is_file()]
|
||||
async def get_file(self, flow_id: str, file_name: str) -> bytes:
|
||||
"""
|
||||
Retrieve a file from the local storage.
|
||||
|
||||
def delete_file(self, flow_id: str, file_name: str):
|
||||
Path(f"{self.data_dir}/{flow_id}/{file_name}").unlink()
|
||||
:param flow_id: The identifier for the flow.
|
||||
:param file_name: The name of the file to be retrieved.
|
||||
:return: The byte content of the file.
|
||||
:raises FileNotFoundError: If the file does not exist.
|
||||
"""
|
||||
file_path = self.data_dir / flow_id / file_name
|
||||
if not file_path.exists():
|
||||
logger.warning(f"File {file_name} not found in flow {flow_id}.")
|
||||
raise FileNotFoundError(f"File {file_name} not found in flow {flow_id}")
|
||||
|
||||
async with aiofiles.open(file_path, "rb") as f:
|
||||
logger.info(f"File {file_name} retrieved successfully from flow {flow_id}.")
|
||||
return await f.read()
|
||||
|
||||
async def list_files(self, flow_id: str):
|
||||
"""
|
||||
List all files in a specified flow.
|
||||
|
||||
:param flow_id: The identifier for the flow.
|
||||
:return: A list of file names.
|
||||
:raises FileNotFoundError: If the flow directory does not exist.
|
||||
"""
|
||||
folder_path = self.data_dir / flow_id
|
||||
if not folder_path.exists() or not folder_path.is_dir():
|
||||
logger.warning(f"Flow {flow_id} directory does not exist.")
|
||||
raise FileNotFoundError(f"Flow {flow_id} directory does not exist.")
|
||||
|
||||
files = [file.name for file in folder_path.iterdir() if file.is_file()]
|
||||
logger.info(f"Listed {len(files)} files in flow {flow_id}.")
|
||||
return files
|
||||
|
||||
async def delete_file(self, flow_id: str, file_name: str):
|
||||
"""
|
||||
Delete a file from the local storage.
|
||||
|
||||
:param flow_id: The identifier for the flow.
|
||||
:param file_name: The name of the file to be deleted.
|
||||
"""
|
||||
file_path = self.data_dir / flow_id / file_name
|
||||
if file_path.exists():
|
||||
file_path.unlink()
|
||||
logger.info(f"File {file_name} deleted successfully from flow {flow_id}.")
|
||||
else:
|
||||
logger.warning(f"Attempted to delete non-existent file {file_name} in flow {flow_id}.")
|
||||
|
||||
def teardown(self):
|
||||
pass
|
||||
"""Perform any cleanup operations when the service is being torn down."""
|
||||
pass # No specific teardown actions required for local storage at the moment.
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue