ref: Add ruff rules for Pathlib (PTH) (#4035)
Add ruff rules for Pathlib (PTH)
This commit is contained in:
parent
32b5da8d1f
commit
a021009ee5
22 changed files with 90 additions and 76 deletions
|
|
@ -94,7 +94,8 @@ def partition_file_to_data(file_path: str, silent_errors: bool) -> Data | None:
|
|||
|
||||
|
||||
def read_text_file(file_path: str) -> str:
|
||||
with open(file_path, "rb") as f:
|
||||
_file_path = Path(file_path)
|
||||
with _file_path.open("rb") as f:
|
||||
raw_data = f.read()
|
||||
result = chardet.detect(raw_data)
|
||||
encoding = result["encoding"]
|
||||
|
|
@ -102,7 +103,7 @@ def read_text_file(file_path: str) -> str:
|
|||
if encoding in ["Windows-1252", "Windows-1254", "MacRoman"]:
|
||||
encoding = "utf-8"
|
||||
|
||||
with open(file_path, encoding=encoding) as f:
|
||||
with _file_path.open(encoding=encoding) as f:
|
||||
return f.read()
|
||||
|
||||
|
||||
|
|
@ -116,7 +117,7 @@ def read_docx_file(file_path: str) -> str:
|
|||
def parse_pdf_to_text(file_path: str) -> str:
|
||||
from pypdf import PdfReader # type: ignore
|
||||
|
||||
with open(file_path, "rb") as f:
|
||||
with Path(file_path).open("rb") as f:
|
||||
reader = PdfReader(f)
|
||||
return "\n\n".join([page.extract_text() for page in reader.pages])
|
||||
|
||||
|
|
|
|||
|
|
@ -22,12 +22,13 @@ class JsonAgentComponent(LCAgentComponent):
|
|||
]
|
||||
|
||||
def build_agent(self) -> AgentExecutor:
|
||||
path = Path(self.path)
|
||||
if self.path.endswith("yaml") or self.path.endswith("yml"):
|
||||
with open(self.path) as file:
|
||||
with path.open() as file:
|
||||
yaml_dict = yaml.load(file, Loader=yaml.FullLoader)
|
||||
spec = JsonSpec(dict_=yaml_dict)
|
||||
else:
|
||||
spec = JsonSpec.from_file(Path(self.path))
|
||||
spec = JsonSpec.from_file(path)
|
||||
toolkit = JsonToolkit(spec=spec)
|
||||
|
||||
return create_json_agent(llm=self.llm, toolkit=toolkit, **self.get_agent_kwargs())
|
||||
|
|
|
|||
|
|
@ -24,12 +24,13 @@ class OpenAPIAgentComponent(LCAgentComponent):
|
|||
]
|
||||
|
||||
def build_agent(self) -> AgentExecutor:
|
||||
path = Path(self.path)
|
||||
if self.path.endswith("yaml") or self.path.endswith("yml"):
|
||||
with open(self.path) as file:
|
||||
with path.open() as file:
|
||||
yaml_dict = yaml.load(file, Loader=yaml.FullLoader)
|
||||
spec = JsonSpec(dict_=yaml_dict)
|
||||
else:
|
||||
spec = JsonSpec.from_file(Path(self.path))
|
||||
spec = JsonSpec.from_file(path)
|
||||
requests_wrapper = TextRequestsWrapper()
|
||||
toolkit = OpenAPIToolkit.from_llm(
|
||||
llm=self.llm,
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
import os
|
||||
from pathlib import Path
|
||||
|
||||
import assemblyai as aai
|
||||
from loguru import logger
|
||||
|
|
@ -161,7 +161,7 @@ class AssemblyAITranscriptionJobCreator(Component):
|
|||
logger.warning("Both an audio file an audio URL were specified. The audio URL was ignored.")
|
||||
|
||||
# Check if the file exists
|
||||
if not os.path.exists(self.audio_file):
|
||||
if not Path(self.audio_file).exists():
|
||||
self.status = "Error: Audio file not found"
|
||||
return Data(data={"error": "Error: Audio file not found"})
|
||||
audio = self.audio_file
|
||||
|
|
|
|||
|
|
@ -64,7 +64,7 @@ class GitLoaderComponent(Component):
|
|||
This is necessary because when searches are performed using
|
||||
the content_filter, binary files need to be ignored.
|
||||
"""
|
||||
with open(file_path, "rb") as file:
|
||||
with Path(file_path).open("rb") as file:
|
||||
return b"\x00" in file.read(1024)
|
||||
|
||||
def build_gitloader(self) -> GitLoader:
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ class CSVToDataComponent(Component):
|
|||
if file_path.suffix.lower() != ".csv":
|
||||
msg = "The provided file must be a CSV file."
|
||||
raise ValueError(msg)
|
||||
with open(file_path, newline="", encoding="utf-8") as csvfile:
|
||||
with file_path.open(newline="", encoding="utf-8") as csvfile:
|
||||
csv_data = csvfile.read()
|
||||
|
||||
elif self.csv_path:
|
||||
|
|
@ -59,7 +59,7 @@ class CSVToDataComponent(Component):
|
|||
if file_path.suffix.lower() != ".csv":
|
||||
msg = "The provided file must be a CSV file."
|
||||
raise ValueError(msg)
|
||||
with open(file_path, newline="", encoding="utf-8") as csvfile:
|
||||
with file_path.open(newline="", encoding="utf-8") as csvfile:
|
||||
csv_data = csvfile.read()
|
||||
|
||||
elif self.csv_string:
|
||||
|
|
|
|||
|
|
@ -54,7 +54,7 @@ class JSONToDataComponent(Component):
|
|||
if file_path.suffix.lower() != ".json":
|
||||
msg = "The provided file must be a JSON file."
|
||||
raise ValueError(msg)
|
||||
with open(file_path, encoding="utf-8") as jsonfile:
|
||||
with file_path.open(encoding="utf-8") as jsonfile:
|
||||
json_data = jsonfile.read()
|
||||
|
||||
elif self.json_path:
|
||||
|
|
@ -62,7 +62,7 @@ class JSONToDataComponent(Component):
|
|||
if file_path.suffix.lower() != ".json":
|
||||
msg = "The provided file must be a JSON file."
|
||||
raise ValueError(msg)
|
||||
with open(file_path, encoding="utf-8") as jsonfile:
|
||||
with file_path.open(encoding="utf-8") as jsonfile:
|
||||
json_data = jsonfile.read()
|
||||
|
||||
elif self.json_string:
|
||||
|
|
|
|||
|
|
@ -1,3 +1,5 @@
|
|||
from pathlib import Path
|
||||
|
||||
from langchain.text_splitter import CharacterTextSplitter
|
||||
from langchain_community.vectorstores.redis import Redis
|
||||
|
||||
|
|
@ -53,7 +55,7 @@ class RedisVectorStoreComponent(LCVectorStoreComponent):
|
|||
documents.append(_input.to_lc_document())
|
||||
else:
|
||||
documents.append(_input)
|
||||
with open("docuemnts.txt", "w") as f:
|
||||
with Path("docuemnts.txt").open("w") as f:
|
||||
f.write(str(documents))
|
||||
|
||||
if not documents:
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ import ast
|
|||
import contextlib
|
||||
import inspect
|
||||
import traceback
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from cachetools import TTLCache, keys
|
||||
|
|
@ -30,7 +31,7 @@ def find_class_ast_node(class_obj):
|
|||
return None, []
|
||||
|
||||
# Read the source code from the file
|
||||
with open(source_file) as file:
|
||||
with Path(source_file).open() as file:
|
||||
source_code = file.read()
|
||||
|
||||
# Parse the source code into an AST
|
||||
|
|
|
|||
|
|
@ -1,6 +1,5 @@
|
|||
import ast
|
||||
import asyncio
|
||||
import os
|
||||
import zlib
|
||||
from pathlib import Path
|
||||
|
||||
|
|
@ -58,8 +57,8 @@ class DirectoryReader:
|
|||
|
||||
def is_valid_path(self) -> bool:
|
||||
"""Check if the directory path is valid by comparing it to the base path."""
|
||||
fullpath = os.path.normpath(os.path.join(self.directory_path))
|
||||
return fullpath.startswith(self.base_path)
|
||||
fullpath = Path(self.directory_path).resolve()
|
||||
return not self.base_path or fullpath.is_relative_to(self.base_path)
|
||||
|
||||
def is_empty_file(self, file_content):
|
||||
"""
|
||||
|
|
@ -107,16 +106,17 @@ class DirectoryReader:
|
|||
"""
|
||||
Read and return the content of a file.
|
||||
"""
|
||||
if not os.path.isfile(file_path):
|
||||
_file_path = Path(file_path)
|
||||
if not _file_path.is_file():
|
||||
return None
|
||||
with open(file_path, encoding="utf-8") as file:
|
||||
with _file_path.open(encoding="utf-8") as file:
|
||||
# UnicodeDecodeError: 'charmap' codec can't decode byte 0x9d in position 3069: character maps to <undefined>
|
||||
try:
|
||||
return file.read()
|
||||
except UnicodeDecodeError:
|
||||
# This is happening in Windows, so we need to open the file in binary mode
|
||||
# The file is always just a python file, so we can safely read it as utf-8
|
||||
with open(file_path, "rb") as file:
|
||||
with _file_path.open("rb") as file:
|
||||
return file.read().decode("utf-8")
|
||||
|
||||
def get_files(self):
|
||||
|
|
@ -245,15 +245,16 @@ class DirectoryReader:
|
|||
logger.debug("-------------------- Building component menu list --------------------")
|
||||
|
||||
for file_path in file_paths:
|
||||
menu_name = os.path.basename(os.path.dirname(file_path))
|
||||
filename = os.path.basename(file_path)
|
||||
_file_path = Path(file_path)
|
||||
menu_name = _file_path.parent.name
|
||||
filename = _file_path.name
|
||||
validation_result, result_content = self.process_file(file_path)
|
||||
if not validation_result:
|
||||
logger.error(f"Error while processing file {file_path}")
|
||||
|
||||
menu_result = self.find_menu(response, menu_name) or {
|
||||
"name": menu_name,
|
||||
"path": os.path.dirname(file_path),
|
||||
"path": str(_file_path.parent),
|
||||
"components": [],
|
||||
}
|
||||
component_name = filename.split(".")[0]
|
||||
|
|
@ -323,15 +324,16 @@ class DirectoryReader:
|
|||
results = await asyncio.gather(*tasks)
|
||||
|
||||
for file_path, (validation_result, result_content) in zip(file_paths, results, strict=True):
|
||||
menu_name = os.path.basename(os.path.dirname(file_path))
|
||||
filename = os.path.basename(file_path)
|
||||
_file_path = Path(file_path)
|
||||
menu_name = _file_path.parent.name
|
||||
filename = _file_path.name
|
||||
|
||||
if not validation_result:
|
||||
logger.error(f"Error while processing file {file_path}")
|
||||
|
||||
menu_result = self.find_menu(response, menu_name) or {
|
||||
"name": menu_name,
|
||||
"path": os.path.dirname(file_path),
|
||||
"path": str(_file_path.parent),
|
||||
"components": [],
|
||||
}
|
||||
component_name = filename.split(".")[0]
|
||||
|
|
|
|||
|
|
@ -1,6 +1,5 @@
|
|||
import copy
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
import time
|
||||
from collections import defaultdict
|
||||
|
|
@ -352,7 +351,7 @@ def load_starter_projects(retries=3, delay=1) -> list[tuple[Path, dict]]:
|
|||
for file in folder.glob("*.json"):
|
||||
attempt = 0
|
||||
while attempt < retries:
|
||||
with open(file, encoding="utf-8") as f:
|
||||
with file.open(encoding="utf-8") as f:
|
||||
try:
|
||||
project = orjson.loads(f.read())
|
||||
starter_projects.append((file, project))
|
||||
|
|
@ -372,12 +371,12 @@ def copy_profile_pictures():
|
|||
origin = Path(__file__).parent / "profile_pictures"
|
||||
target = Path(config_dir) / "profile_pictures"
|
||||
|
||||
if not os.path.exists(origin):
|
||||
if not origin.exists():
|
||||
msg = f"The source folder '{origin}' does not exist."
|
||||
raise ValueError(msg)
|
||||
|
||||
if not os.path.exists(target):
|
||||
os.makedirs(target)
|
||||
if not target.exists():
|
||||
target.mkdir(parents=True)
|
||||
|
||||
try:
|
||||
shutil.copytree(origin, target, dirs_exist_ok=True)
|
||||
|
|
@ -411,9 +410,9 @@ def get_project_data(project):
|
|||
)
|
||||
|
||||
|
||||
def update_project_file(project_path, project, updated_project_data):
|
||||
def update_project_file(project_path: Path, project: dict, updated_project_data):
|
||||
project["data"] = updated_project_data
|
||||
with open(project_path, "w", encoding="utf-8") as f:
|
||||
with project_path.open("w", encoding="utf-8") as f:
|
||||
f.write(orjson.dumps(project, option=ORJSON_OPTIONS).decode())
|
||||
logger.info(f"Updated starter project {project['name']} file")
|
||||
|
||||
|
|
@ -516,14 +515,15 @@ def load_flows_from_directory():
|
|||
|
||||
with session_scope() as session:
|
||||
user_id = get_user_by_username(session, settings_service.auth_settings.SUPERUSER).id
|
||||
files = [f for f in os.listdir(flows_path) if os.path.isfile(os.path.join(flows_path, f))]
|
||||
for filename in files:
|
||||
if not filename.endswith(".json"):
|
||||
_flows_path = Path(flows_path)
|
||||
files = [f for f in _flows_path.iterdir() if f.is_file()]
|
||||
for f in files:
|
||||
if f.suffix != ".json":
|
||||
continue
|
||||
logger.info(f"Loading flow from file: {filename}")
|
||||
with open(os.path.join(flows_path, filename), encoding="utf-8") as file:
|
||||
logger.info(f"Loading flow from file: {f.name}")
|
||||
with f.open(encoding="utf-8") as file:
|
||||
flow = orjson.loads(file.read())
|
||||
no_json_name = filename.replace(".json", "")
|
||||
no_json_name = f.stem
|
||||
flow_endpoint_name = flow.get("endpoint_name")
|
||||
if _is_valid_uuid(no_json_name):
|
||||
flow["id"] = no_json_name
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ import json
|
|||
import os
|
||||
import re
|
||||
from io import BytesIO
|
||||
from pathlib import Path
|
||||
|
||||
import yaml
|
||||
from langchain_core.language_models import BaseLanguageModel
|
||||
|
|
@ -14,12 +15,13 @@ from langflow.services.deps import get_settings_service
|
|||
|
||||
|
||||
def load_file_into_dict(file_path: str) -> dict:
|
||||
if not os.path.exists(file_path):
|
||||
_file_path = Path(file_path)
|
||||
if not _file_path.exists():
|
||||
msg = f"File not found: {file_path}"
|
||||
raise FileNotFoundError(msg)
|
||||
|
||||
# Files names are UUID, so we can't find the extension
|
||||
with open(file_path) as file:
|
||||
with _file_path.open() as file:
|
||||
try:
|
||||
data = json.load(file)
|
||||
except json.JSONDecodeError:
|
||||
|
|
|
|||
|
|
@ -53,7 +53,7 @@ def load_flow_from_json(
|
|||
update_settings(cache=cache)
|
||||
|
||||
if isinstance(flow, str | Path):
|
||||
with open(flow, encoding="utf-8") as f:
|
||||
with Path(flow).open(encoding="utf-8") as f:
|
||||
flow_graph = json.load(f)
|
||||
# If input is a dictionary, assume it's a JSON object
|
||||
elif isinstance(flow, dict):
|
||||
|
|
|
|||
|
|
@ -1,9 +1,11 @@
|
|||
from pathlib import Path
|
||||
|
||||
import httpx
|
||||
|
||||
from langflow.services.database.models.flow.model import FlowBase
|
||||
|
||||
|
||||
def upload(file_path, host, flow_id):
|
||||
def upload(file_path: str, host: str, flow_id: str):
|
||||
"""
|
||||
Upload a file to Langflow and return the file path.
|
||||
|
||||
|
|
@ -20,7 +22,7 @@ def upload(file_path, host, flow_id):
|
|||
"""
|
||||
try:
|
||||
url = f"{host}/api/v1/upload/{flow_id}"
|
||||
with open(file_path, "rb") as file:
|
||||
with Path(file_path).open("rb") as file:
|
||||
response = httpx.post(url, files={"file": file})
|
||||
if response.status_code == 200 or response.status_code == 201:
|
||||
return response.json()
|
||||
|
|
|
|||
|
|
@ -1,7 +1,6 @@
|
|||
import base64
|
||||
import contextlib
|
||||
import hashlib
|
||||
import os
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
|
@ -33,7 +32,7 @@ def create_cache_folder(func):
|
|||
cache_path = Path(CACHE_DIR) / PREFIX
|
||||
|
||||
# Create the destination folder if it doesn't exist
|
||||
os.makedirs(cache_path, exist_ok=True)
|
||||
cache_path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
return func(*args, **kwargs)
|
||||
|
||||
|
|
@ -50,7 +49,7 @@ def clear_old_cache_files(max_cache_size: int = 3):
|
|||
|
||||
for cache_file in cache_files_sorted_by_mtime[max_cache_size:]:
|
||||
with contextlib.suppress(OSError):
|
||||
os.remove(cache_file)
|
||||
cache_file.unlink()
|
||||
|
||||
|
||||
def filter_json(json_data):
|
||||
|
|
@ -102,13 +101,13 @@ def save_binary_file(content: str, file_name: str, accepted_types: list[str]) ->
|
|||
decoded_bytes = base64.b64decode(data)
|
||||
|
||||
# Create the full file path
|
||||
file_path = os.path.join(cache_path, file_name)
|
||||
file_path = cache_path / file_name
|
||||
|
||||
# Save the binary content to the file
|
||||
with open(file_path, "wb") as file:
|
||||
with file_path.open("wb") as file:
|
||||
file.write(decoded_bytes)
|
||||
|
||||
return file_path
|
||||
return str(file_path)
|
||||
|
||||
|
||||
@create_cache_folder
|
||||
|
|
@ -150,7 +149,8 @@ def save_uploaded_file(file: UploadFile, folder_name):
|
|||
|
||||
# Save the file with the hash as its name
|
||||
file_path = folder_path / file_name
|
||||
with open(file_path, "wb") as new_file:
|
||||
|
||||
with file_path.open("wb") as new_file:
|
||||
while chunk := file_object.read(8192):
|
||||
new_file.write(chunk)
|
||||
|
||||
|
|
|
|||
|
|
@ -175,7 +175,7 @@ class DatabaseService(Service):
|
|||
# which is a buffer
|
||||
# I don't want to output anything
|
||||
# subprocess.DEVNULL is an int
|
||||
with open(self.script_location / "alembic.log", "w") as buffer:
|
||||
with (self.script_location / "alembic.log").open("w") as buffer:
|
||||
alembic_cfg = Config(stdout=buffer)
|
||||
# alembic_cfg.attributes["connection"] = session
|
||||
alembic_cfg.set_main_option("script_location", str(self.script_location))
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@ from __future__ import annotations
|
|||
|
||||
import importlib
|
||||
import inspect
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from loguru import logger
|
||||
|
|
@ -20,15 +20,15 @@ class PluginService(Service):
|
|||
def __init__(self, settings_service: SettingsService):
|
||||
self.plugins: dict[str, BasePlugin] = {}
|
||||
# plugin_dir = settings_service.settings.PLUGIN_DIR
|
||||
self.plugin_dir = os.path.dirname(__file__)
|
||||
self.plugin_dir = Path(__file__).parent
|
||||
self.plugins_base_module = "langflow.services.plugins"
|
||||
self.load_plugins()
|
||||
|
||||
def load_plugins(self):
|
||||
base_files = ["base.py", "service.py", "factory.py", "__init__.py"]
|
||||
for module in os.listdir(self.plugin_dir):
|
||||
if module.endswith(".py") and module not in base_files:
|
||||
plugin_name = module[:-3]
|
||||
for module in self.plugin_dir.iterdir():
|
||||
if module.suffix == ".py" and module.name not in base_files:
|
||||
plugin_name = module.stem
|
||||
module_path = f"{self.plugins_base_module}.{plugin_name}"
|
||||
try:
|
||||
mod = importlib.import_module(module_path)
|
||||
|
|
|
|||
|
|
@ -360,7 +360,7 @@ class Settings(BaseSettings):
|
|||
|
||||
|
||||
def save_settings_to_yaml(settings: Settings, file_path: str):
|
||||
with open(file_path, "w") as f:
|
||||
with Path(file_path).open("w") as f:
|
||||
settings_dict = settings.model_dump()
|
||||
yaml.dump(settings_dict, f)
|
||||
|
||||
|
|
@ -369,11 +369,12 @@ def load_settings_from_yaml(file_path: str) -> Settings:
|
|||
# Check if a string is a valid path or a file name
|
||||
if "/" not in file_path:
|
||||
# Get current path
|
||||
current_path = os.path.dirname(os.path.abspath(__file__))
|
||||
current_path = Path(__file__).resolve().parent
|
||||
_file_path = Path(current_path) / file_path
|
||||
else:
|
||||
_file_path = Path(file_path)
|
||||
|
||||
file_path = os.path.join(current_path, file_path)
|
||||
|
||||
with open(file_path) as f:
|
||||
with _file_path.open() as f:
|
||||
settings_dict = yaml.safe_load(f)
|
||||
settings_dict = {k.upper(): v for k, v in settings_dict.items()}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
import yaml
|
||||
from loguru import logger
|
||||
|
|
@ -23,11 +23,12 @@ class SettingsService(Service):
|
|||
# Check if a string is a valid path or a file name
|
||||
if "/" not in file_path:
|
||||
# Get current path
|
||||
current_path = os.path.dirname(os.path.abspath(__file__))
|
||||
current_path = Path(__file__).resolve().parent
|
||||
_file_path = Path(current_path) / file_path
|
||||
else:
|
||||
_file_path = Path(file_path)
|
||||
|
||||
file_path = os.path.join(current_path, file_path)
|
||||
|
||||
with open(file_path) as f:
|
||||
with _file_path.open() as f:
|
||||
settings_dict = yaml.safe_load(f)
|
||||
settings_dict = {k.upper(): v for k, v in settings_dict.items()}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,20 +1,19 @@
|
|||
import os
|
||||
import platform
|
||||
from pathlib import Path
|
||||
|
||||
from loguru import logger
|
||||
|
||||
|
||||
def set_secure_permissions(file_path):
|
||||
def set_secure_permissions(file_path: Path):
|
||||
if platform.system() in ["Linux", "Darwin"]: # Unix/Linux/Mac
|
||||
os.chmod(file_path, 0o600)
|
||||
file_path.chmod(0o600)
|
||||
elif platform.system() == "Windows":
|
||||
import win32api
|
||||
import win32con
|
||||
import win32security
|
||||
|
||||
user, domain, _ = win32security.LookupAccountName("", win32api.GetUserName())
|
||||
sd = win32security.GetFileSecurity(file_path, win32security.DACL_SECURITY_INFORMATION)
|
||||
sd = win32security.GetFileSecurity(str(file_path), win32security.DACL_SECURITY_INFORMATION)
|
||||
dacl = win32security.ACL()
|
||||
|
||||
# Set the new DACL for the file: read and write access for the owner, no access for everyone else
|
||||
|
|
@ -24,7 +23,7 @@ def set_secure_permissions(file_path):
|
|||
user,
|
||||
)
|
||||
sd.SetSecurityDescriptorDacl(1, dacl, 0)
|
||||
win32security.SetFileSecurity(file_path, win32security.DACL_SECURITY_INFORMATION, sd)
|
||||
win32security.SetFileSecurity(str(file_path), win32security.DACL_SECURITY_INFORMATION, sd)
|
||||
else:
|
||||
print("Unsupported OS")
|
||||
|
||||
|
|
|
|||
|
|
@ -35,7 +35,7 @@ class LocalStorageService(StorageService):
|
|||
file_path = folder_path / file_name
|
||||
|
||||
def write_file(file_path: Path, data: bytes) -> None:
|
||||
with open(file_path, "wb") as f:
|
||||
with Path(file_path).open("wb") as f:
|
||||
f.write(data)
|
||||
|
||||
try:
|
||||
|
|
@ -61,7 +61,7 @@ class LocalStorageService(StorageService):
|
|||
raise FileNotFoundError(msg)
|
||||
|
||||
def read_file(file_path: Path) -> bytes:
|
||||
with open(file_path, "rb") as f:
|
||||
with Path(file_path).open("rb") as f:
|
||||
return f.read()
|
||||
|
||||
content = await asyncio.get_event_loop().run_in_executor(None, read_file, file_path)
|
||||
|
|
|
|||
|
|
@ -64,6 +64,7 @@ select = [
|
|||
"PD",
|
||||
"PIE",
|
||||
"PT",
|
||||
"PTH",
|
||||
"PYI",
|
||||
"Q",
|
||||
"RET",
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue