Creating BaseFileComponent, integrating with API Request "to file" feature (#4731)
* adding ability for APIRequest to retry and save to a file * [autofix.ci] apply automated fixes * adding ability for APIRequest to retry and save to a file * [autofix.ci] apply automated fixes * initial refactor of FileComponent to handle Data input * shifting potentially common logic into BaseFileComponent * improving readability and fixing problems * [autofix.ci] apply automated fixes * addressing linting * [autofix.ci] apply automated fixes * linting part 2 * [autofix.ci] apply automated fixes * linting part 3 * preserve input fields on data objects * [autofix.ci] apply automated fixes * ensuring processed data is linked to correct file data object * [autofix.ci] apply automated fixes * addressing linting * [autofix.ci] apply automated fixes * fixing edge case * allowing specific failure of missing file without forcing silent_errors * Base_file mypy errors * [autofix.ci] apply automated fixes * Update base_file.py * Update base_file.py * Fix starter templates * Update test_vector_store_rag.py --------- Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: Eric Hare <ericrhare@gmail.com>
This commit is contained in:
parent
654b44347b
commit
c6f27b5af8
8 changed files with 863 additions and 250 deletions
|
|
@ -0,0 +1,5 @@
|
|||
from .base_file import BaseFileComponent
|
||||
|
||||
__all__ = [
|
||||
"BaseFileComponent",
|
||||
]
|
||||
475
src/backend/base/langflow/base/data/base_file.py
Normal file
475
src/backend/base/langflow/base/data/base_file.py
Normal file
|
|
@ -0,0 +1,475 @@
|
|||
import shutil
|
||||
import tarfile
|
||||
from abc import ABC, abstractmethod
|
||||
from pathlib import Path
|
||||
from tempfile import TemporaryDirectory
|
||||
from zipfile import ZipFile, is_zipfile
|
||||
|
||||
from langflow.custom import Component
|
||||
from langflow.io import BoolInput, FileInput, HandleInput, Output
|
||||
from langflow.schema import Data
|
||||
|
||||
|
||||
class BaseFileComponent(Component, ABC):
|
||||
"""Base class for handling file processing components.
|
||||
|
||||
This class provides common functionality for resolving, validating, and
|
||||
processing file paths. Child classes must define valid file extensions
|
||||
and implement the `process_files` method.
|
||||
"""
|
||||
|
||||
class BaseFile:
|
||||
"""Internal class to represent a file with additional metadata."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
data: Data | list[Data],
|
||||
path: Path,
|
||||
*,
|
||||
delete_after_processing: bool = False,
|
||||
silent_errors: bool = False,
|
||||
):
|
||||
self._data = data if isinstance(data, list) else [data]
|
||||
self.path = path
|
||||
self.delete_after_processing = delete_after_processing
|
||||
self._silent_errors = silent_errors
|
||||
|
||||
@property
|
||||
def data(self) -> list[Data]:
|
||||
return self._data or []
|
||||
|
||||
@data.setter
|
||||
def data(self, value: Data | list[Data]):
|
||||
if isinstance(value, Data):
|
||||
self._data = [value]
|
||||
elif isinstance(value, list) and all(isinstance(item, Data) for item in value):
|
||||
self._data = value
|
||||
else:
|
||||
msg = f"data must be a Data object or a list of Data objects. Got: {type(value)}"
|
||||
if not self._silent_errors:
|
||||
raise ValueError(msg)
|
||||
|
||||
def merge_data(self, new_data: Data | list[Data] | None) -> list[Data]:
|
||||
r"""Generate a new list of Data objects by merging `new_data` into the current `data`.
|
||||
|
||||
Args:
|
||||
new_data (Data | list[Data] | None): The new Data object(s) to merge into each existing Data object.
|
||||
If None, the current `data` is returned unchanged.
|
||||
|
||||
Returns:
|
||||
list[Data]: A new list of Data objects with `new_data` merged.
|
||||
"""
|
||||
if new_data is None:
|
||||
return self.data
|
||||
|
||||
if isinstance(new_data, Data):
|
||||
new_data_list = [new_data]
|
||||
elif isinstance(new_data, list) and all(isinstance(item, Data) for item in new_data):
|
||||
new_data_list = new_data
|
||||
else:
|
||||
msg = "new_data must be a Data object, a list of Data objects, or None."
|
||||
if not self._silent_errors:
|
||||
raise ValueError(msg)
|
||||
return self.data
|
||||
|
||||
return [
|
||||
Data(data={**data.data, **new_data_item.data}) for data in self.data for new_data_item in new_data_list
|
||||
]
|
||||
|
||||
def __str__(self):
|
||||
if len(self.data) == 0:
|
||||
text_preview = ""
|
||||
elif len(self.data) == 1:
|
||||
max_text_length = 50
|
||||
text_preview = self.data.get_text()[:max_text_length]
|
||||
if len(self.data.get_text()) > max_text_length:
|
||||
text_preview += "..."
|
||||
text_preview = f"text_preview='{text_preview}'"
|
||||
else:
|
||||
text_preview = f"{len(self.data)} data objects"
|
||||
return (
|
||||
f"BaseFile(path={self.path}"
|
||||
f", delete_after_processing={self.delete_after_processing}"
|
||||
f", {text_preview}"
|
||||
)
|
||||
|
||||
# Subclasses can override these class variables
|
||||
VALID_EXTENSIONS: list[str] = [] # To be overridden by child classes
|
||||
IGNORE_STARTS_WITH = [".", "__MACOSX"]
|
||||
|
||||
SERVER_FILE_PATH_FIELDNAME = "file_path"
|
||||
SUPPORTED_BUNDLE_EXTENSIONS = ["zip", "tar", "tgz", "bz2", "gz"]
|
||||
|
||||
file_path: list[Data] | None = None
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
# Dynamically update FileInput to include valid extensions and bundles
|
||||
self._base_inputs[0].file_types = [*self.valid_extensions, *self.SUPPORTED_BUNDLE_EXTENSIONS]
|
||||
|
||||
file_types = ", ".join(self.valid_extensions)
|
||||
bundles = ", ".join(self.SUPPORTED_BUNDLE_EXTENSIONS)
|
||||
self._base_inputs[
|
||||
0
|
||||
].info = f"Supported file extensions: {file_types}; optionally bundled in file extensions: {bundles}"
|
||||
|
||||
_base_inputs = [
|
||||
FileInput(
|
||||
name="path",
|
||||
display_name="Path",
|
||||
fileTypes=[], # Dynamically set in __init__
|
||||
info="", # Dynamically set in __init__
|
||||
required=False,
|
||||
),
|
||||
HandleInput(
|
||||
name="file_path",
|
||||
display_name="Server File Path",
|
||||
info=(
|
||||
f"Data object with a '{SERVER_FILE_PATH_FIELDNAME}' property pointing to server file. "
|
||||
"Supercedes 'Path'. "
|
||||
),
|
||||
required=False,
|
||||
input_types=["Data"],
|
||||
),
|
||||
BoolInput(
|
||||
name="silent_errors",
|
||||
display_name="Silent Errors",
|
||||
advanced=True,
|
||||
info="If true, errors will not raise an exception.",
|
||||
),
|
||||
BoolInput(
|
||||
name="delete_server_file_after_processing",
|
||||
display_name="Delete Server File After Processing",
|
||||
advanced=True,
|
||||
value=True,
|
||||
info="If true, the Server File Path will be deleted after processing.",
|
||||
),
|
||||
BoolInput(
|
||||
name="ignore_unsupported_extensions",
|
||||
display_name="Ignore Unsupported Extensions",
|
||||
advanced=True,
|
||||
value=True,
|
||||
info="If true, files with unsupported extensions will not be processed.",
|
||||
),
|
||||
BoolInput(
|
||||
name="ignore_unspecified_files",
|
||||
display_name="Ignore Unspecified Files",
|
||||
advanced=True,
|
||||
value=False,
|
||||
info=f"If true, Data with no '{SERVER_FILE_PATH_FIELDNAME}' property will be ignored.",
|
||||
),
|
||||
]
|
||||
|
||||
_base_outputs = [Output(display_name="Data", name="data", method="load_files")]
|
||||
|
||||
@abstractmethod
|
||||
def process_files(self, file_list: list[BaseFile]) -> list[BaseFile]:
|
||||
"""Processes a list of files.
|
||||
|
||||
Args:
|
||||
file_list (list[BaseFile]): A list of file objects.
|
||||
|
||||
Returns:
|
||||
list[BaseFile]: A list of BaseFile objects with updated `data`.
|
||||
"""
|
||||
|
||||
def load_files(self) -> list[Data]:
|
||||
"""Loads and parses file(s), including unpacked file bundles.
|
||||
|
||||
Returns:
|
||||
list[Data]: Parsed data from the processed files.
|
||||
"""
|
||||
self._temp_dirs: list[TemporaryDirectory] = []
|
||||
final_files = [] # Initialize to avoid UnboundLocalError
|
||||
try:
|
||||
# Step 1: Validate the provided paths
|
||||
files = self._validate_and_resolve_paths()
|
||||
|
||||
# Step 2: Handle bundles recursively
|
||||
all_files = self._unpack_and_collect_files(files)
|
||||
|
||||
# Step 3: Final validation of file types
|
||||
final_files = self._filter_and_mark_files(all_files)
|
||||
|
||||
# Step 4: Process files
|
||||
processed_files = self.process_files(final_files)
|
||||
|
||||
# Extract and flatten Data objects to return
|
||||
return [data for file in processed_files for data in file.data if file.data]
|
||||
|
||||
finally:
|
||||
# Delete temporary directories
|
||||
for temp_dir in self._temp_dirs:
|
||||
temp_dir.cleanup()
|
||||
|
||||
# Delete files marked for deletion
|
||||
for file in final_files:
|
||||
if file.delete_after_processing and file.path.exists():
|
||||
if file.path.is_dir():
|
||||
shutil.rmtree(file.path)
|
||||
else:
|
||||
file.path.unlink()
|
||||
|
||||
@property
|
||||
def valid_extensions(self) -> list[str]:
|
||||
"""Returns valid file extensions for the class.
|
||||
|
||||
This property can be overridden by child classes to provide specific
|
||||
extensions.
|
||||
|
||||
Returns:
|
||||
list[str]: A list of valid file extensions without the leading dot.
|
||||
"""
|
||||
return self.VALID_EXTENSIONS
|
||||
|
||||
@property
|
||||
def ignore_starts_with(self) -> list[str]:
|
||||
"""Returns prefixes to ignore when unpacking file bundles.
|
||||
|
||||
Returns:
|
||||
list[str]: A list of prefixes to ignore when unpacking file bundles.
|
||||
"""
|
||||
return self.IGNORE_STARTS_WITH
|
||||
|
||||
def rollup_data(
|
||||
self,
|
||||
base_files: list[BaseFile],
|
||||
data_list: list[Data | None],
|
||||
path_field: str = SERVER_FILE_PATH_FIELDNAME,
|
||||
) -> list[BaseFile]:
|
||||
r"""Rolls up Data objects into corresponding BaseFile objects in order given by `base_files`.
|
||||
|
||||
Args:
|
||||
base_files (list[BaseFile]): The original BaseFile objects.
|
||||
data_list (list[Data | None]): The list of data to be aggregated into the BaseFile objects.
|
||||
path_field (str): The field name on the data_list objects that holds the file path as a string.
|
||||
|
||||
Returns:
|
||||
list[BaseFile]: A new list of BaseFile objects with merged `data` attributes.
|
||||
"""
|
||||
|
||||
def _build_data_dict(data_list: list[Data | None], data_list_field: str) -> dict[str, list[Data]]:
|
||||
"""Builds a dictionary grouping Data objects by a specified field."""
|
||||
data_dict: dict[str, list[Data]] = {}
|
||||
for data in data_list:
|
||||
if data is None:
|
||||
continue
|
||||
key = data.data.get(data_list_field)
|
||||
if key is None:
|
||||
msg = f"Data object missing required field '{data_list_field}': {data}"
|
||||
self.log(msg)
|
||||
if not self.silent_errors:
|
||||
msg = f"Data object missing required field '{data_list_field}': {data}"
|
||||
self.log(msg)
|
||||
raise ValueError(msg)
|
||||
continue
|
||||
data_dict.setdefault(key, []).append(data)
|
||||
return data_dict
|
||||
|
||||
# Build the data dictionary from the provided data_list
|
||||
data_dict = _build_data_dict(data_list, path_field)
|
||||
|
||||
# Generate the updated list of BaseFile objects, preserving the order of base_files
|
||||
updated_base_files = []
|
||||
for base_file in base_files:
|
||||
new_data_list = data_dict.get(str(base_file.path), [])
|
||||
merged_data_list = base_file.merge_data(new_data_list)
|
||||
updated_base_files.append(
|
||||
BaseFileComponent.BaseFile(
|
||||
data=merged_data_list,
|
||||
path=base_file.path,
|
||||
delete_after_processing=base_file.delete_after_processing,
|
||||
)
|
||||
)
|
||||
|
||||
return updated_base_files
|
||||
|
||||
def _validate_and_resolve_paths(self) -> list[BaseFile]:
|
||||
"""Validate that all input paths exist and are valid, and create BaseFile instances.
|
||||
|
||||
Returns:
|
||||
list[BaseFile]: A list of valid BaseFile instances.
|
||||
|
||||
Raises:
|
||||
ValueError: If any path does not exist.
|
||||
"""
|
||||
resolved_files = []
|
||||
|
||||
def add_file(data: Data, path: str | Path, *, delete_after_processing: bool):
|
||||
resolved_path = Path(self.resolve_path(str(path)))
|
||||
if not resolved_path.exists():
|
||||
msg = f"File or directory not found: {path}"
|
||||
self.log(msg)
|
||||
if not self.silent_errors:
|
||||
raise ValueError(msg)
|
||||
resolved_files.append(
|
||||
BaseFileComponent.BaseFile(data, resolved_path, delete_after_processing=delete_after_processing)
|
||||
)
|
||||
|
||||
if self.path and (not hasattr(self, "file_path") or not self.file_path):
|
||||
# Wrap self.path into a Data object
|
||||
data_obj = Data(data={"file_path": self.path})
|
||||
add_file(data=data_obj, path=self.path, delete_after_processing=False)
|
||||
elif hasattr(self, "file_path") and self.file_path:
|
||||
if isinstance(self.file_path, Data):
|
||||
self.file_path = [self.file_path]
|
||||
elif not isinstance(self.file_path, list):
|
||||
msg = f"Expected list of Data objects in file_path but got {type(self.file_path)}."
|
||||
self.log(msg)
|
||||
if not self.silent_errors:
|
||||
raise ValueError(msg)
|
||||
return []
|
||||
|
||||
for obj in self.file_path:
|
||||
if not isinstance(obj, Data):
|
||||
msg = f"Expected Data object in file_path but got {type(obj)}."
|
||||
self.log(msg)
|
||||
if not self.silent_errors:
|
||||
raise ValueError(msg)
|
||||
continue
|
||||
|
||||
server_file_path = obj.data.get(self.SERVER_FILE_PATH_FIELDNAME)
|
||||
if server_file_path:
|
||||
add_file(
|
||||
data=obj,
|
||||
path=server_file_path,
|
||||
delete_after_processing=self.delete_server_file_after_processing,
|
||||
)
|
||||
elif not self.ignore_unspecified_files:
|
||||
msg = f"Data object missing '{self.SERVER_FILE_PATH_FIELDNAME}' property."
|
||||
self.log(msg)
|
||||
if not self.silent_errors:
|
||||
raise ValueError(msg)
|
||||
else:
|
||||
msg = f"Ignoring Data object missing '{self.SERVER_FILE_PATH_FIELDNAME}' property:\n{obj}"
|
||||
self.log(msg)
|
||||
|
||||
return resolved_files
|
||||
|
||||
def _unpack_and_collect_files(self, files: list[BaseFile]) -> list[BaseFile]:
|
||||
"""Recursively unpack bundles and collect files into BaseFile instances.
|
||||
|
||||
Args:
|
||||
files (list[BaseFile]): List of BaseFile instances to process.
|
||||
|
||||
Returns:
|
||||
list[BaseFile]: Updated list of BaseFile instances.
|
||||
"""
|
||||
collected_files = []
|
||||
|
||||
for file in files:
|
||||
path = file.path
|
||||
delete_after_processing = file.delete_after_processing
|
||||
data = file.data
|
||||
|
||||
if path.is_dir():
|
||||
# Recurse into directories
|
||||
collected_files.extend(
|
||||
[
|
||||
BaseFileComponent.BaseFile(data, sub_path, delete_after_processing=delete_after_processing)
|
||||
for sub_path in path.rglob("*")
|
||||
if sub_path.is_file()
|
||||
]
|
||||
)
|
||||
elif path.suffix[1:] in self.SUPPORTED_BUNDLE_EXTENSIONS:
|
||||
# Unpack supported bundles
|
||||
temp_dir = TemporaryDirectory()
|
||||
self._temp_dirs.append(temp_dir)
|
||||
temp_dir_path = Path(temp_dir.name)
|
||||
self._unpack_bundle(path, temp_dir_path)
|
||||
subpaths = list(temp_dir_path.iterdir())
|
||||
self.log(f"Unpacked bundle {path.name} into {subpaths}")
|
||||
collected_files.extend(
|
||||
[
|
||||
BaseFileComponent.BaseFile(data, sub_path, delete_after_processing=delete_after_processing)
|
||||
for sub_path in subpaths
|
||||
]
|
||||
)
|
||||
else:
|
||||
collected_files.append(file)
|
||||
|
||||
# Recurse again if any directories or bundles are left in the list
|
||||
if any(
|
||||
file.path.is_dir() or file.path.suffix[1:] in self.SUPPORTED_BUNDLE_EXTENSIONS for file in collected_files
|
||||
):
|
||||
return self._unpack_and_collect_files(collected_files)
|
||||
|
||||
return collected_files
|
||||
|
||||
def _unpack_bundle(self, bundle_path: Path, output_dir: Path):
|
||||
"""Unpack a bundle into a temporary directory.
|
||||
|
||||
Args:
|
||||
bundle_path (Path): Path to the bundle.
|
||||
output_dir (Path): Directory where files will be extracted.
|
||||
|
||||
Raises:
|
||||
ValueError: If the bundle format is unsupported or cannot be read.
|
||||
"""
|
||||
|
||||
def _safe_extract_zip(bundle: ZipFile, output_dir: Path):
|
||||
"""Safely extract ZIP files."""
|
||||
for member in bundle.namelist():
|
||||
member_path = output_dir / member
|
||||
# Ensure no path traversal outside `output_dir`
|
||||
if not member_path.resolve().is_relative_to(output_dir.resolve()):
|
||||
msg = f"Attempted Path Traversal in ZIP File: {member}"
|
||||
raise ValueError(msg)
|
||||
bundle.extract(member, path=output_dir)
|
||||
|
||||
def _safe_extract_tar(bundle: tarfile.TarFile, output_dir: Path):
|
||||
"""Safely extract TAR files."""
|
||||
for member in bundle.getmembers():
|
||||
member_path = output_dir / member.name
|
||||
# Ensure no path traversal outside `output_dir`
|
||||
if not member_path.resolve().is_relative_to(output_dir.resolve()):
|
||||
msg = f"Attempted Path Traversal in TAR File: {member.name}"
|
||||
raise ValueError(msg)
|
||||
bundle.extract(member, path=output_dir)
|
||||
|
||||
# Check and extract based on file type
|
||||
if is_zipfile(bundle_path):
|
||||
with ZipFile(bundle_path, "r") as zip_bundle:
|
||||
_safe_extract_zip(zip_bundle, output_dir)
|
||||
elif tarfile.is_tarfile(bundle_path):
|
||||
with tarfile.open(bundle_path, "r:*") as tar_bundle:
|
||||
_safe_extract_tar(tar_bundle, output_dir)
|
||||
else:
|
||||
msg = f"Unsupported bundle format: {bundle_path.suffix}"
|
||||
raise ValueError(msg)
|
||||
|
||||
def _filter_and_mark_files(self, files: list[BaseFile]) -> list[BaseFile]:
|
||||
"""Validate file types and mark files for removal.
|
||||
|
||||
Args:
|
||||
files (list[BaseFile]): List of BaseFile instances.
|
||||
|
||||
Returns:
|
||||
list[BaseFile]: Validated BaseFile instances.
|
||||
|
||||
Raises:
|
||||
ValueError: If unsupported files are encountered and `ignore_unsupported_extensions` is False.
|
||||
"""
|
||||
final_files = []
|
||||
ignored_files = []
|
||||
|
||||
for file in files:
|
||||
if not file.path.is_file():
|
||||
self.log(f"Not a file: {file.path.name}")
|
||||
continue
|
||||
|
||||
if file.path.suffix[1:] not in self.valid_extensions:
|
||||
if self.ignore_unsupported_extensions:
|
||||
ignored_files.append(file.path.name)
|
||||
continue
|
||||
msg = f"Unsupported file extension: {file.path.suffix}"
|
||||
self.log(msg)
|
||||
if not self.silent_errors:
|
||||
raise ValueError(msg)
|
||||
|
||||
final_files.append(file)
|
||||
|
||||
if ignored_files:
|
||||
self.log(f"Ignored files: {ignored_files}")
|
||||
|
||||
return final_files
|
||||
|
|
@ -1,25 +1,14 @@
|
|||
from pathlib import Path
|
||||
from tempfile import NamedTemporaryFile
|
||||
from zipfile import ZipFile, is_zipfile
|
||||
|
||||
from langflow.base.data import BaseFileComponent
|
||||
from langflow.base.data.utils import TEXT_FILE_TYPES, parallel_load_data, parse_text_file_to_data
|
||||
from langflow.custom import Component
|
||||
from langflow.io import BoolInput, FileInput, IntInput, Output
|
||||
from langflow.io import BoolInput, IntInput
|
||||
from langflow.schema import Data
|
||||
|
||||
|
||||
class FileComponent(Component):
|
||||
"""Handles loading of individual or zipped text files.
|
||||
class FileComponent(BaseFileComponent):
|
||||
"""Handles loading and processing of individual or zipped text files.
|
||||
|
||||
Processes multiple valid files within a zip archive if provided.
|
||||
|
||||
Attributes:
|
||||
display_name: Display name of the component.
|
||||
description: Brief component description.
|
||||
icon: Icon to represent the component.
|
||||
name: Identifier for the component.
|
||||
inputs: Inputs required by the component.
|
||||
outputs: Output of the component after processing files.
|
||||
This component supports processing multiple valid files within a zip archive,
|
||||
resolving paths, validating file types, and optionally using multithreading for processing.
|
||||
"""
|
||||
|
||||
display_name = "File"
|
||||
|
|
@ -27,185 +16,78 @@ class FileComponent(Component):
|
|||
icon = "file-text"
|
||||
name = "File"
|
||||
|
||||
VALID_EXTENSIONS = TEXT_FILE_TYPES
|
||||
|
||||
inputs = [
|
||||
FileInput(
|
||||
name="path",
|
||||
display_name="Path",
|
||||
file_types=[*TEXT_FILE_TYPES, "zip"],
|
||||
info=f"Supported file types: {', '.join([*TEXT_FILE_TYPES, 'zip'])}",
|
||||
),
|
||||
BoolInput(
|
||||
name="silent_errors",
|
||||
display_name="Silent Errors",
|
||||
advanced=True,
|
||||
info="If true, errors will not raise an exception.",
|
||||
),
|
||||
*BaseFileComponent._base_inputs,
|
||||
BoolInput(
|
||||
name="use_multithreading",
|
||||
display_name="Use Multithreading",
|
||||
display_name="[Deprecated] Use Multithreading",
|
||||
advanced=True,
|
||||
info="If true, parallel processing will be enabled for zip files.",
|
||||
value=True,
|
||||
info="Set 'Processing Concurrency' greater than 1 to enable multithreading.",
|
||||
),
|
||||
IntInput(
|
||||
name="concurrency_multithreading",
|
||||
display_name="Multithreading Concurrency",
|
||||
advanced=True,
|
||||
info="The maximum number of workers to use, if concurrency is enabled",
|
||||
value=4,
|
||||
display_name="Processing Concurrency",
|
||||
advanced=False,
|
||||
info="When multiple files are being processed, the number of files to process concurrently.",
|
||||
value=1,
|
||||
),
|
||||
]
|
||||
|
||||
outputs = [Output(display_name="Data", name="data", method="load_file")]
|
||||
outputs = [
|
||||
*BaseFileComponent._base_outputs,
|
||||
]
|
||||
|
||||
def load_file(self) -> Data:
|
||||
"""Load and parse file(s) from a zip archive.
|
||||
|
||||
Raises:
|
||||
ValueError: If no file is uploaded or file path is invalid.
|
||||
|
||||
Returns:
|
||||
Data: Parsed data from file(s).
|
||||
"""
|
||||
# Check if the file path is provided
|
||||
if not self.path:
|
||||
self.log("File path is missing.")
|
||||
msg = "Please upload a file for processing."
|
||||
|
||||
raise ValueError(msg)
|
||||
|
||||
resolved_path = Path(self.resolve_path(self.path))
|
||||
try:
|
||||
# Check if the file is a zip archive
|
||||
if is_zipfile(resolved_path):
|
||||
self.log(f"Processing zip file: {resolved_path.name}.")
|
||||
|
||||
return self._process_zip_file(
|
||||
resolved_path,
|
||||
silent_errors=self.silent_errors,
|
||||
parallel=self.use_multithreading,
|
||||
)
|
||||
|
||||
self.log(f"Processing single file: {resolved_path.name}.")
|
||||
|
||||
return self._process_single_file(resolved_path, silent_errors=self.silent_errors)
|
||||
except FileNotFoundError:
|
||||
self.log(f"File not found: {resolved_path.name}.")
|
||||
|
||||
raise
|
||||
|
||||
def _process_zip_file(self, zip_path: Path, *, silent_errors: bool = False, parallel: bool = False) -> Data:
|
||||
"""Process text files within a zip archive.
|
||||
def process_files(self, file_list: list[BaseFileComponent.BaseFile]) -> list[BaseFileComponent.BaseFile]:
|
||||
"""Processes files either sequentially or in parallel, depending on concurrency settings.
|
||||
|
||||
Args:
|
||||
zip_path: Path to the zip file.
|
||||
silent_errors: Suppresses errors if True.
|
||||
parallel: Enables parallel processing if True.
|
||||
file_list (list[BaseFileComponent.BaseFile]): List of files to process.
|
||||
|
||||
Returns:
|
||||
list[Data]: Combined data from all valid files.
|
||||
|
||||
Raises:
|
||||
ValueError: If no valid files found in the archive.
|
||||
list[BaseFileComponent.BaseFile]: Updated list of files with merged data.
|
||||
"""
|
||||
data: list[Data] = []
|
||||
with ZipFile(zip_path, "r") as zip_file:
|
||||
# Filter file names based on extensions in TEXT_FILE_TYPES and ignore hidden files
|
||||
valid_files = [
|
||||
name
|
||||
for name in zip_file.namelist()
|
||||
if (
|
||||
any(name.endswith(ext) for ext in TEXT_FILE_TYPES)
|
||||
and not name.startswith("__MACOSX")
|
||||
and not name.startswith(".")
|
||||
)
|
||||
]
|
||||
|
||||
# Raise an error if no valid files found
|
||||
if not valid_files:
|
||||
self.log("No valid files in the zip archive.")
|
||||
def process_file(file_path: str, *, silent_errors: bool = False) -> Data | None:
|
||||
"""Processes a single file and returns its Data object."""
|
||||
try:
|
||||
return parse_text_file_to_data(file_path, silent_errors=silent_errors)
|
||||
except FileNotFoundError as e:
|
||||
msg = f"File not found: {file_path}. Error: {e}"
|
||||
self.log(msg)
|
||||
if not silent_errors:
|
||||
raise
|
||||
return None
|
||||
except Exception as e:
|
||||
msg = f"Unexpected error processing {file_path}: {e}"
|
||||
self.log(msg)
|
||||
if not silent_errors:
|
||||
raise
|
||||
return None
|
||||
|
||||
# Return empty data if silent_errors is True
|
||||
if silent_errors:
|
||||
return data # type: ignore[return-value]
|
||||
if not file_list:
|
||||
self.log("No files to process.")
|
||||
return file_list
|
||||
|
||||
# Raise an error if no valid files found
|
||||
msg = "No valid files in the zip archive."
|
||||
raise ValueError(msg)
|
||||
concurrency = 1 if not self.use_multithreading else max(1, self.concurrency_multithreading)
|
||||
file_count = len(file_list)
|
||||
|
||||
# Define a function to process each file
|
||||
def process_file(file_name, silent_errors=silent_errors):
|
||||
with NamedTemporaryFile(delete=False) as temp_file:
|
||||
temp_path = Path(temp_file.name).with_name(file_name)
|
||||
with zip_file.open(file_name) as file_content:
|
||||
temp_path.write_bytes(file_content.read())
|
||||
try:
|
||||
return self._process_single_file(temp_path, silent_errors=silent_errors)
|
||||
finally:
|
||||
temp_path.unlink()
|
||||
parallel_processing_threshold = 2
|
||||
if concurrency < parallel_processing_threshold or file_count < parallel_processing_threshold:
|
||||
if file_count > 1:
|
||||
self.log(f"Processing {file_count} files sequentially.")
|
||||
processed_data = [process_file(str(file.path), silent_errors=self.silent_errors) for file in file_list]
|
||||
else:
|
||||
self.log(f"Starting parallel processing of {file_count} files with concurrency: {concurrency}.")
|
||||
file_paths = [str(file.path) for file in file_list]
|
||||
processed_data = parallel_load_data(
|
||||
file_paths,
|
||||
silent_errors=self.silent_errors,
|
||||
load_function=process_file,
|
||||
max_concurrency=concurrency,
|
||||
)
|
||||
|
||||
# Process files in parallel if specified
|
||||
if parallel:
|
||||
self.log(
|
||||
f"Initializing parallel Thread Pool Executor with max workers: "
|
||||
f"{self.concurrency_multithreading}."
|
||||
)
|
||||
|
||||
# Process files in parallel
|
||||
initial_data = parallel_load_data(
|
||||
valid_files,
|
||||
silent_errors=silent_errors,
|
||||
load_function=process_file,
|
||||
max_concurrency=self.concurrency_multithreading,
|
||||
)
|
||||
|
||||
# Filter out empty data
|
||||
data = list(filter(None, initial_data))
|
||||
else:
|
||||
# Sequential processing
|
||||
data = [process_file(file_name) for file_name in valid_files]
|
||||
|
||||
self.log(f"Successfully processed zip file: {zip_path.name}.")
|
||||
|
||||
return data # type: ignore[return-value]
|
||||
|
||||
def _process_single_file(self, file_path: Path, *, silent_errors: bool = False) -> Data:
|
||||
"""Process a single file.
|
||||
|
||||
Args:
|
||||
file_path: Path to the file.
|
||||
silent_errors: Suppresses errors if True.
|
||||
|
||||
Returns:
|
||||
Data: Parsed data from the file.
|
||||
|
||||
Raises:
|
||||
ValueError: For unsupported file formats.
|
||||
"""
|
||||
# Check if the file type is supported
|
||||
if not any(file_path.suffix == ext for ext in ["." + f for f in TEXT_FILE_TYPES]):
|
||||
self.log(f"Unsupported file type: {file_path.suffix}")
|
||||
|
||||
# Return empty data if silent_errors is True
|
||||
if silent_errors:
|
||||
return Data()
|
||||
|
||||
msg = f"Unsupported file type: {file_path.suffix}"
|
||||
raise ValueError(msg)
|
||||
|
||||
try:
|
||||
# Parse the text file as appropriate
|
||||
data = parse_text_file_to_data(str(file_path), silent_errors=silent_errors) # type: ignore[assignment]
|
||||
if not data:
|
||||
data = Data()
|
||||
|
||||
self.log(f"Successfully processed file: {file_path.name}.")
|
||||
except Exception as e:
|
||||
self.log(f"Error processing file {file_path.name}: {e}")
|
||||
|
||||
# Return empty data if silent_errors is True
|
||||
if not silent_errors:
|
||||
raise
|
||||
|
||||
data = Data()
|
||||
|
||||
return data
|
||||
# Use rollup_basefile_data to merge processed data with BaseFile objects
|
||||
return self.rollup_data(file_list, processed_data)
|
||||
|
|
|
|||
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
|
|
@ -24,7 +24,7 @@ Answer:
|
|||
"""
|
||||
file_component = FileComponent()
|
||||
parse_data_component = ParseDataComponent()
|
||||
parse_data_component.set(data=file_component.load_file)
|
||||
parse_data_component.set(data=file_component.load_files)
|
||||
|
||||
chat_input = ChatInput()
|
||||
prompt_component = PromptComponent()
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ def ingestion_graph():
|
|||
# Ingestion Graph
|
||||
file_component = FileComponent()
|
||||
text_splitter = SplitTextComponent()
|
||||
text_splitter.set(data_inputs=file_component.load_file)
|
||||
text_splitter.set(data_inputs=file_component.load_files)
|
||||
openai_embeddings = OpenAIEmbeddingsComponent()
|
||||
vector_store = AstraVectorStoreComponent()
|
||||
vector_store.set(
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@ def ingestion_graph():
|
|||
file_component.set(path="test.txt")
|
||||
file_component.set_on_output(name="data", value=Data(text="This is a test file."), cache=True)
|
||||
text_splitter = SplitTextComponent(_id="text-splitter-123")
|
||||
text_splitter.set(data_inputs=file_component.load_file)
|
||||
text_splitter.set(data_inputs=file_component.load_files)
|
||||
openai_embeddings = OpenAIEmbeddingsComponent(_id="openai-embeddings-123")
|
||||
openai_embeddings.set(
|
||||
openai_api_key="sk-123", openai_api_base="https://api.openai.com/v1", openai_api_type="openai"
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue