ref: Use pathlib read/write functions (#4177)

Use pathlib read/write functions
This commit is contained in:
Christophe Bornet 2024-10-19 18:05:18 +02:00 committed by GitHub
commit 140cf890e3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 72 additions and 85 deletions

View file

@ -109,16 +109,14 @@ def partition_file_to_data(file_path: str, *, silent_errors: bool) -> Data | Non
def read_text_file(file_path: str) -> str:
_file_path = Path(file_path)
with _file_path.open("rb") as f:
raw_data = f.read()
result = chardet.detect(raw_data)
encoding = result["encoding"]
raw_data = _file_path.read_bytes()
result = chardet.detect(raw_data)
encoding = result["encoding"]
if encoding in {"Windows-1252", "Windows-1254", "MacRoman"}:
encoding = "utf-8"
if encoding in {"Windows-1252", "Windows-1254", "MacRoman"}:
encoding = "utf-8"
with _file_path.open(encoding=encoding) as f:
return f.read()
return _file_path.read_text(encoding=encoding)
def read_docx_file(file_path: str) -> str:

View file

@ -23,8 +23,8 @@ class JsonAgentComponent(LCAgentComponent):
def build_agent(self) -> AgentExecutor:
path = Path(self.path)
if self.path.endswith("yaml") or self.path.endswith("yml"):
with path.open() as file:
if path.suffix in ("yaml", "yml"):
with path.open(encoding="utf-8") as file:
yaml_dict = yaml.load(file, Loader=yaml.FullLoader)
spec = JsonSpec(dict_=yaml_dict)
else:

View file

@ -25,8 +25,8 @@ class OpenAPIAgentComponent(LCAgentComponent):
def build_agent(self) -> AgentExecutor:
path = Path(self.path)
if self.path.endswith("yaml") or self.path.endswith("yml"):
with path.open() as file:
if path.suffix in ("yaml", "yml"):
with path.open(encoding="utf-8") as file:
yaml_dict = yaml.load(file, Loader=yaml.FullLoader)
spec = JsonSpec(dict_=yaml_dict)
else:

View file

@ -88,9 +88,8 @@ class GitLoaderComponent(Component):
content_regex = re.compile(content_filter_pattern)
def content_filter(file_path: Path) -> bool:
with file_path.open("r", encoding="utf-8", errors="ignore") as file:
content = file.read()
return bool(content_regex.search(content))
content = file_path.read_text(encoding="utf-8", errors="ignore")
return bool(content_regex.search(content))
file_filters.append(content_filter)

View file

@ -55,16 +55,14 @@ class JSONToDataComponent(Component):
if file_path.suffix.lower() != ".json":
self.status = "The provided file must be a JSON file."
else:
with file_path.open(encoding="utf-8") as jsonfile:
json_data = jsonfile.read()
json_data = file_path.read_text(encoding="utf-8")
elif self.json_path:
file_path = Path(self.json_path)
if file_path.suffix.lower() != ".json":
self.status = "The provided file must be a JSON file."
else:
with file_path.open(encoding="utf-8") as jsonfile:
json_data = jsonfile.read()
json_data = file_path.read_text(encoding="utf-8")
else:
json_data = self.json_string

View file

@ -53,8 +53,7 @@ class RedisVectorStoreComponent(LCVectorStoreComponent):
documents.append(_input.to_lc_document())
else:
documents.append(_input)
with Path("docuemnts.txt").open("w", encoding="utf-8") as f:
f.write(str(documents))
Path("docuemnts.txt").write_text(str(documents), encoding="utf-8")
if not documents:
if self.schema is None:

View file

@ -32,8 +32,7 @@ def find_class_ast_node(class_obj):
return None, []
# Read the source code from the file
with Path(source_file).open(encoding="utf-8") as file:
source_code = file.read()
source_code = Path(source_file).read_text(encoding="utf-8")
# Parse the source code into an AST
tree = ast.parse(source_code)

View file

@ -362,18 +362,18 @@ def load_starter_projects(retries=3, delay=1) -> list[tuple[Path, dict]]:
for file in folder.glob("*.json"):
attempt = 0
while attempt < retries:
with file.open(encoding="utf-8") as f:
try:
project = orjson.loads(f.read())
starter_projects.append((file, project))
logger.info(f"Loaded starter project {file}")
break # Break if load is successful
except orjson.JSONDecodeError as e:
attempt += 1
if attempt >= retries:
msg = f"Error loading starter project {file}: {e}"
raise ValueError(msg) from e
time.sleep(delay) # Wait before retrying
content = file.read_text(encoding="utf-8")
try:
project = orjson.loads(content)
starter_projects.append((file, project))
logger.info(f"Loaded starter project {file}")
break # Break if load is successful
except orjson.JSONDecodeError as e:
attempt += 1
if attempt >= retries:
msg = f"Error loading starter project {file}: {e}"
raise ValueError(msg) from e
time.sleep(delay) # Wait before retrying
return starter_projects
@ -427,8 +427,7 @@ def get_project_data(project):
def update_project_file(project_path: Path, project: dict, updated_project_data):
project["data"] = updated_project_data
with project_path.open("w", encoding="utf-8") as f:
f.write(orjson.dumps(project, option=ORJSON_OPTIONS).decode())
project_path.write_text(orjson.dumps(project, option=ORJSON_OPTIONS).decode(), encoding="utf-8")
logger.info(f"Updated starter project {project['name']} file")
@ -539,44 +538,44 @@ def load_flows_from_directory():
if f.suffix != ".json":
continue
logger.info(f"Loading flow from file: {f.name}")
with f.open(encoding="utf-8") as file:
flow = orjson.loads(file.read())
no_json_name = f.stem
flow_endpoint_name = flow.get("endpoint_name")
if _is_valid_uuid(no_json_name):
flow["id"] = no_json_name
flow_id = flow.get("id")
content = f.read_text(encoding="utf-8")
flow = orjson.loads(content)
no_json_name = f.stem
flow_endpoint_name = flow.get("endpoint_name")
if _is_valid_uuid(no_json_name):
flow["id"] = no_json_name
flow_id = flow.get("id")
existing = find_existing_flow(session, flow_id, flow_endpoint_name)
if existing:
logger.debug(f"Found existing flow: {existing.name}")
logger.info(f"Updating existing flow: {flow_id} with endpoint name {flow_endpoint_name}")
for key, value in flow.items():
if hasattr(existing, key):
# flow dict from json and db representation are not 100% the same
setattr(existing, key, value)
existing.updated_at = datetime.now(tz=timezone.utc).astimezone()
existing.user_id = user_id
existing = find_existing_flow(session, flow_id, flow_endpoint_name)
if existing:
logger.debug(f"Found existing flow: {existing.name}")
logger.info(f"Updating existing flow: {flow_id} with endpoint name {flow_endpoint_name}")
for key, value in flow.items():
if hasattr(existing, key):
# flow dict from json and db representation are not 100% the same
setattr(existing, key, value)
existing.updated_at = datetime.now(tz=timezone.utc).astimezone()
existing.user_id = user_id
# Generally, folder_id should not be None, but we must check this due to the previous
# behavior where flows could be added and folder_id was None, orphaning
# them within Langflow.
if existing.folder_id is None:
folder_id = get_default_folder_id(session, user_id)
existing.folder_id = folder_id
session.add(existing)
else:
logger.info(f"Creating new flow: {flow_id} with endpoint name {flow_endpoint_name}")
# Current behavior loads all new flows into default folder
# Generally, folder_id should not be None, but we must check this due to the previous
# behavior where flows could be added and folder_id was None, orphaning
# them within Langflow.
if existing.folder_id is None:
folder_id = get_default_folder_id(session, user_id)
existing.folder_id = folder_id
flow["user_id"] = user_id
flow["folder_id"] = folder_id
flow = Flow.model_validate(flow, from_attributes=True)
flow.updated_at = datetime.now(tz=timezone.utc).astimezone()
session.add(flow)
session.add(existing)
else:
logger.info(f"Creating new flow: {flow_id} with endpoint name {flow_endpoint_name}")
# Current behavior loads all new flows into default folder
folder_id = get_default_folder_id(session, user_id)
flow["user_id"] = user_id
flow["folder_id"] = folder_id
flow = Flow.model_validate(flow, from_attributes=True)
flow.updated_at = datetime.now(tz=timezone.utc).astimezone()
session.add(flow)
def find_existing_flow(session, flow_id, flow_endpoint_name):

View file

@ -21,7 +21,7 @@ def load_file_into_dict(file_path: str) -> dict:
raise FileNotFoundError(msg)
# Files names are UUID, so we can't find the extension
with _file_path.open() as file:
with _file_path.open(encoding="utf-8") as file:
try:
data = json.load(file)
except json.JSONDecodeError:

View file

@ -104,8 +104,7 @@ def save_binary_file(content: str, file_name: str, accepted_types: list[str]) ->
file_path = cache_path / file_name
# Save the binary content to the file
with file_path.open("wb") as file:
file.write(decoded_bytes)
file_path.write_bytes(decoded_bytes)
return str(file_path)

View file

@ -178,7 +178,7 @@ class DatabaseService(Service):
# which is a buffer
# I don't want to output anything
# subprocess.DEVNULL is an int
with (self.script_location / "alembic.log").open("w") as buffer:
with (self.script_location / "alembic.log").open("w", encoding="utf-8") as buffer:
alembic_cfg = Config(stdout=buffer)
# alembic_cfg.attributes["connection"] = session
alembic_cfg.set_main_option("script_location", str(self.script_location))

View file

@ -382,7 +382,7 @@ def load_settings_from_yaml(file_path: str) -> Settings:
else:
_file_path = Path(file_path)
with _file_path.open() as f:
with _file_path.open(encoding="utf-8") as f:
settings_dict = yaml.safe_load(f)
settings_dict = {k.upper(): v for k, v in settings_dict.items()}

View file

@ -28,7 +28,7 @@ class SettingsService(Service):
else:
_file_path = Path(file_path)
with _file_path.open() as f:
with _file_path.open(encoding="utf-8") as f:
settings_dict = yaml.safe_load(f)
settings_dict = {k.upper(): v for k, v in settings_dict.items()}

View file

@ -29,8 +29,7 @@ def set_secure_permissions(file_path: Path):
def write_secret_to_file(path: Path, value: str) -> None:
with path.open("wb") as f:
f.write(value.encode("utf-8"))
path.write_text(value, encoding="utf-8")
try:
set_secure_permissions(path)
except Exception: # noqa: BLE001
@ -38,5 +37,4 @@ def write_secret_to_file(path: Path, value: str) -> None:
def read_secret_from_file(path: Path) -> str:
with path.open("r") as f:
return f.read()
return path.read_text(encoding="utf-8")

View file

@ -34,8 +34,7 @@ class LocalStorageService(StorageService):
file_path = folder_path / file_name
def write_file(file_path: Path, data: bytes) -> None:
with Path(file_path).open("wb") as f:
f.write(data)
file_path.write_bytes(data)
try:
await asyncio.get_event_loop().run_in_executor(None, write_file, file_path, data)
@ -59,8 +58,7 @@ class LocalStorageService(StorageService):
raise FileNotFoundError(msg)
def read_file(file_path: Path) -> bytes:
with Path(file_path).open("rb") as f:
return f.read()
return file_path.read_bytes()
content = await asyncio.get_event_loop().run_in_executor(None, read_file, file_path)
logger.debug(f"File {file_name} retrieved successfully from flow {flow_id}.")