From c6f27b5af8a7d6b14904f68595ca85d840f643e8 Mon Sep 17 00:00:00 2001 From: Phil Miesle Date: Tue, 26 Nov 2024 04:36:17 +0000 Subject: [PATCH] Creating BaseFileComponent, integrating with API Request "to file" feature (#4731) * adding ability for APIRequest to retry and save to a file * [autofix.ci] apply automated fixes * adding ability for APIRequest to retry and save to a file * [autofix.ci] apply automated fixes * initial refactor of FileComponent to handle Data input * shifting potentially common logic into BaseFileComponent * improving readability and fixing problems * [autofix.ci] apply automated fixes * addressing linting * [autofix.ci] apply automated fixes * linting part 2 * [autofix.ci] apply automated fixes * linting part 3 * preserve input fields on data objects * [autofix.ci] apply automated fixes * ensuring processed data is linked to correct file data object * [autofix.ci] apply automated fixes * addressing linting * [autofix.ci] apply automated fixes * fixing edge case * allowing specific failure of missing file without forcing silent_errors * Base_file mypy errors * [autofix.ci] apply automated fixes * Update base_file.py * Update base_file.py * Fix starter templates * Update test_vector_store_rag.py --------- Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: Eric Hare --- .../base/langflow/base/data/__init__.py | 5 + .../base/langflow/base/data/base_file.py | 475 ++++++++++++++++++ .../base/langflow/components/data/file.py | 238 +++------ .../starter_projects/Document Q&A.json | 299 ++++++++--- .../starter_projects/Vector Store RAG.json | 90 +++- .../starter_projects/document_qa.py | 2 +- .../starter_projects/vector_store_rag.py | 2 +- .../starter_projects/test_vector_store_rag.py | 2 +- 8 files changed, 863 insertions(+), 250 deletions(-) create mode 100644 src/backend/base/langflow/base/data/base_file.py diff --git a/src/backend/base/langflow/base/data/__init__.py b/src/backend/base/langflow/base/data/__init__.py index e69de29bb..8a92e12b0 100644 --- a/src/backend/base/langflow/base/data/__init__.py +++ b/src/backend/base/langflow/base/data/__init__.py @@ -0,0 +1,5 @@ +from .base_file import BaseFileComponent + +__all__ = [ + "BaseFileComponent", +] diff --git a/src/backend/base/langflow/base/data/base_file.py b/src/backend/base/langflow/base/data/base_file.py new file mode 100644 index 000000000..3dadbe8cf --- /dev/null +++ b/src/backend/base/langflow/base/data/base_file.py @@ -0,0 +1,475 @@ +import shutil +import tarfile +from abc import ABC, abstractmethod +from pathlib import Path +from tempfile import TemporaryDirectory +from zipfile import ZipFile, is_zipfile + +from langflow.custom import Component +from langflow.io import BoolInput, FileInput, HandleInput, Output +from langflow.schema import Data + + +class BaseFileComponent(Component, ABC): + """Base class for handling file processing components. + + This class provides common functionality for resolving, validating, and + processing file paths. Child classes must define valid file extensions + and implement the `process_files` method. + """ + + class BaseFile: + """Internal class to represent a file with additional metadata.""" + + def __init__( + self, + data: Data | list[Data], + path: Path, + *, + delete_after_processing: bool = False, + silent_errors: bool = False, + ): + self._data = data if isinstance(data, list) else [data] + self.path = path + self.delete_after_processing = delete_after_processing + self._silent_errors = silent_errors + + @property + def data(self) -> list[Data]: + return self._data or [] + + @data.setter + def data(self, value: Data | list[Data]): + if isinstance(value, Data): + self._data = [value] + elif isinstance(value, list) and all(isinstance(item, Data) for item in value): + self._data = value + else: + msg = f"data must be a Data object or a list of Data objects. Got: {type(value)}" + if not self._silent_errors: + raise ValueError(msg) + + def merge_data(self, new_data: Data | list[Data] | None) -> list[Data]: + r"""Generate a new list of Data objects by merging `new_data` into the current `data`. + + Args: + new_data (Data | list[Data] | None): The new Data object(s) to merge into each existing Data object. + If None, the current `data` is returned unchanged. + + Returns: + list[Data]: A new list of Data objects with `new_data` merged. + """ + if new_data is None: + return self.data + + if isinstance(new_data, Data): + new_data_list = [new_data] + elif isinstance(new_data, list) and all(isinstance(item, Data) for item in new_data): + new_data_list = new_data + else: + msg = "new_data must be a Data object, a list of Data objects, or None." + if not self._silent_errors: + raise ValueError(msg) + return self.data + + return [ + Data(data={**data.data, **new_data_item.data}) for data in self.data for new_data_item in new_data_list + ] + + def __str__(self): + if len(self.data) == 0: + text_preview = "" + elif len(self.data) == 1: + max_text_length = 50 + text_preview = self.data.get_text()[:max_text_length] + if len(self.data.get_text()) > max_text_length: + text_preview += "..." + text_preview = f"text_preview='{text_preview}'" + else: + text_preview = f"{len(self.data)} data objects" + return ( + f"BaseFile(path={self.path}" + f", delete_after_processing={self.delete_after_processing}" + f", {text_preview}" + ) + + # Subclasses can override these class variables + VALID_EXTENSIONS: list[str] = [] # To be overridden by child classes + IGNORE_STARTS_WITH = [".", "__MACOSX"] + + SERVER_FILE_PATH_FIELDNAME = "file_path" + SUPPORTED_BUNDLE_EXTENSIONS = ["zip", "tar", "tgz", "bz2", "gz"] + + file_path: list[Data] | None = None + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + # Dynamically update FileInput to include valid extensions and bundles + self._base_inputs[0].file_types = [*self.valid_extensions, *self.SUPPORTED_BUNDLE_EXTENSIONS] + + file_types = ", ".join(self.valid_extensions) + bundles = ", ".join(self.SUPPORTED_BUNDLE_EXTENSIONS) + self._base_inputs[ + 0 + ].info = f"Supported file extensions: {file_types}; optionally bundled in file extensions: {bundles}" + + _base_inputs = [ + FileInput( + name="path", + display_name="Path", + fileTypes=[], # Dynamically set in __init__ + info="", # Dynamically set in __init__ + required=False, + ), + HandleInput( + name="file_path", + display_name="Server File Path", + info=( + f"Data object with a '{SERVER_FILE_PATH_FIELDNAME}' property pointing to server file. " + "Supercedes 'Path'. " + ), + required=False, + input_types=["Data"], + ), + BoolInput( + name="silent_errors", + display_name="Silent Errors", + advanced=True, + info="If true, errors will not raise an exception.", + ), + BoolInput( + name="delete_server_file_after_processing", + display_name="Delete Server File After Processing", + advanced=True, + value=True, + info="If true, the Server File Path will be deleted after processing.", + ), + BoolInput( + name="ignore_unsupported_extensions", + display_name="Ignore Unsupported Extensions", + advanced=True, + value=True, + info="If true, files with unsupported extensions will not be processed.", + ), + BoolInput( + name="ignore_unspecified_files", + display_name="Ignore Unspecified Files", + advanced=True, + value=False, + info=f"If true, Data with no '{SERVER_FILE_PATH_FIELDNAME}' property will be ignored.", + ), + ] + + _base_outputs = [Output(display_name="Data", name="data", method="load_files")] + + @abstractmethod + def process_files(self, file_list: list[BaseFile]) -> list[BaseFile]: + """Processes a list of files. + + Args: + file_list (list[BaseFile]): A list of file objects. + + Returns: + list[BaseFile]: A list of BaseFile objects with updated `data`. + """ + + def load_files(self) -> list[Data]: + """Loads and parses file(s), including unpacked file bundles. + + Returns: + list[Data]: Parsed data from the processed files. + """ + self._temp_dirs: list[TemporaryDirectory] = [] + final_files = [] # Initialize to avoid UnboundLocalError + try: + # Step 1: Validate the provided paths + files = self._validate_and_resolve_paths() + + # Step 2: Handle bundles recursively + all_files = self._unpack_and_collect_files(files) + + # Step 3: Final validation of file types + final_files = self._filter_and_mark_files(all_files) + + # Step 4: Process files + processed_files = self.process_files(final_files) + + # Extract and flatten Data objects to return + return [data for file in processed_files for data in file.data if file.data] + + finally: + # Delete temporary directories + for temp_dir in self._temp_dirs: + temp_dir.cleanup() + + # Delete files marked for deletion + for file in final_files: + if file.delete_after_processing and file.path.exists(): + if file.path.is_dir(): + shutil.rmtree(file.path) + else: + file.path.unlink() + + @property + def valid_extensions(self) -> list[str]: + """Returns valid file extensions for the class. + + This property can be overridden by child classes to provide specific + extensions. + + Returns: + list[str]: A list of valid file extensions without the leading dot. + """ + return self.VALID_EXTENSIONS + + @property + def ignore_starts_with(self) -> list[str]: + """Returns prefixes to ignore when unpacking file bundles. + + Returns: + list[str]: A list of prefixes to ignore when unpacking file bundles. + """ + return self.IGNORE_STARTS_WITH + + def rollup_data( + self, + base_files: list[BaseFile], + data_list: list[Data | None], + path_field: str = SERVER_FILE_PATH_FIELDNAME, + ) -> list[BaseFile]: + r"""Rolls up Data objects into corresponding BaseFile objects in order given by `base_files`. + + Args: + base_files (list[BaseFile]): The original BaseFile objects. + data_list (list[Data | None]): The list of data to be aggregated into the BaseFile objects. + path_field (str): The field name on the data_list objects that holds the file path as a string. + + Returns: + list[BaseFile]: A new list of BaseFile objects with merged `data` attributes. + """ + + def _build_data_dict(data_list: list[Data | None], data_list_field: str) -> dict[str, list[Data]]: + """Builds a dictionary grouping Data objects by a specified field.""" + data_dict: dict[str, list[Data]] = {} + for data in data_list: + if data is None: + continue + key = data.data.get(data_list_field) + if key is None: + msg = f"Data object missing required field '{data_list_field}': {data}" + self.log(msg) + if not self.silent_errors: + msg = f"Data object missing required field '{data_list_field}': {data}" + self.log(msg) + raise ValueError(msg) + continue + data_dict.setdefault(key, []).append(data) + return data_dict + + # Build the data dictionary from the provided data_list + data_dict = _build_data_dict(data_list, path_field) + + # Generate the updated list of BaseFile objects, preserving the order of base_files + updated_base_files = [] + for base_file in base_files: + new_data_list = data_dict.get(str(base_file.path), []) + merged_data_list = base_file.merge_data(new_data_list) + updated_base_files.append( + BaseFileComponent.BaseFile( + data=merged_data_list, + path=base_file.path, + delete_after_processing=base_file.delete_after_processing, + ) + ) + + return updated_base_files + + def _validate_and_resolve_paths(self) -> list[BaseFile]: + """Validate that all input paths exist and are valid, and create BaseFile instances. + + Returns: + list[BaseFile]: A list of valid BaseFile instances. + + Raises: + ValueError: If any path does not exist. + """ + resolved_files = [] + + def add_file(data: Data, path: str | Path, *, delete_after_processing: bool): + resolved_path = Path(self.resolve_path(str(path))) + if not resolved_path.exists(): + msg = f"File or directory not found: {path}" + self.log(msg) + if not self.silent_errors: + raise ValueError(msg) + resolved_files.append( + BaseFileComponent.BaseFile(data, resolved_path, delete_after_processing=delete_after_processing) + ) + + if self.path and (not hasattr(self, "file_path") or not self.file_path): + # Wrap self.path into a Data object + data_obj = Data(data={"file_path": self.path}) + add_file(data=data_obj, path=self.path, delete_after_processing=False) + elif hasattr(self, "file_path") and self.file_path: + if isinstance(self.file_path, Data): + self.file_path = [self.file_path] + elif not isinstance(self.file_path, list): + msg = f"Expected list of Data objects in file_path but got {type(self.file_path)}." + self.log(msg) + if not self.silent_errors: + raise ValueError(msg) + return [] + + for obj in self.file_path: + if not isinstance(obj, Data): + msg = f"Expected Data object in file_path but got {type(obj)}." + self.log(msg) + if not self.silent_errors: + raise ValueError(msg) + continue + + server_file_path = obj.data.get(self.SERVER_FILE_PATH_FIELDNAME) + if server_file_path: + add_file( + data=obj, + path=server_file_path, + delete_after_processing=self.delete_server_file_after_processing, + ) + elif not self.ignore_unspecified_files: + msg = f"Data object missing '{self.SERVER_FILE_PATH_FIELDNAME}' property." + self.log(msg) + if not self.silent_errors: + raise ValueError(msg) + else: + msg = f"Ignoring Data object missing '{self.SERVER_FILE_PATH_FIELDNAME}' property:\n{obj}" + self.log(msg) + + return resolved_files + + def _unpack_and_collect_files(self, files: list[BaseFile]) -> list[BaseFile]: + """Recursively unpack bundles and collect files into BaseFile instances. + + Args: + files (list[BaseFile]): List of BaseFile instances to process. + + Returns: + list[BaseFile]: Updated list of BaseFile instances. + """ + collected_files = [] + + for file in files: + path = file.path + delete_after_processing = file.delete_after_processing + data = file.data + + if path.is_dir(): + # Recurse into directories + collected_files.extend( + [ + BaseFileComponent.BaseFile(data, sub_path, delete_after_processing=delete_after_processing) + for sub_path in path.rglob("*") + if sub_path.is_file() + ] + ) + elif path.suffix[1:] in self.SUPPORTED_BUNDLE_EXTENSIONS: + # Unpack supported bundles + temp_dir = TemporaryDirectory() + self._temp_dirs.append(temp_dir) + temp_dir_path = Path(temp_dir.name) + self._unpack_bundle(path, temp_dir_path) + subpaths = list(temp_dir_path.iterdir()) + self.log(f"Unpacked bundle {path.name} into {subpaths}") + collected_files.extend( + [ + BaseFileComponent.BaseFile(data, sub_path, delete_after_processing=delete_after_processing) + for sub_path in subpaths + ] + ) + else: + collected_files.append(file) + + # Recurse again if any directories or bundles are left in the list + if any( + file.path.is_dir() or file.path.suffix[1:] in self.SUPPORTED_BUNDLE_EXTENSIONS for file in collected_files + ): + return self._unpack_and_collect_files(collected_files) + + return collected_files + + def _unpack_bundle(self, bundle_path: Path, output_dir: Path): + """Unpack a bundle into a temporary directory. + + Args: + bundle_path (Path): Path to the bundle. + output_dir (Path): Directory where files will be extracted. + + Raises: + ValueError: If the bundle format is unsupported or cannot be read. + """ + + def _safe_extract_zip(bundle: ZipFile, output_dir: Path): + """Safely extract ZIP files.""" + for member in bundle.namelist(): + member_path = output_dir / member + # Ensure no path traversal outside `output_dir` + if not member_path.resolve().is_relative_to(output_dir.resolve()): + msg = f"Attempted Path Traversal in ZIP File: {member}" + raise ValueError(msg) + bundle.extract(member, path=output_dir) + + def _safe_extract_tar(bundle: tarfile.TarFile, output_dir: Path): + """Safely extract TAR files.""" + for member in bundle.getmembers(): + member_path = output_dir / member.name + # Ensure no path traversal outside `output_dir` + if not member_path.resolve().is_relative_to(output_dir.resolve()): + msg = f"Attempted Path Traversal in TAR File: {member.name}" + raise ValueError(msg) + bundle.extract(member, path=output_dir) + + # Check and extract based on file type + if is_zipfile(bundle_path): + with ZipFile(bundle_path, "r") as zip_bundle: + _safe_extract_zip(zip_bundle, output_dir) + elif tarfile.is_tarfile(bundle_path): + with tarfile.open(bundle_path, "r:*") as tar_bundle: + _safe_extract_tar(tar_bundle, output_dir) + else: + msg = f"Unsupported bundle format: {bundle_path.suffix}" + raise ValueError(msg) + + def _filter_and_mark_files(self, files: list[BaseFile]) -> list[BaseFile]: + """Validate file types and mark files for removal. + + Args: + files (list[BaseFile]): List of BaseFile instances. + + Returns: + list[BaseFile]: Validated BaseFile instances. + + Raises: + ValueError: If unsupported files are encountered and `ignore_unsupported_extensions` is False. + """ + final_files = [] + ignored_files = [] + + for file in files: + if not file.path.is_file(): + self.log(f"Not a file: {file.path.name}") + continue + + if file.path.suffix[1:] not in self.valid_extensions: + if self.ignore_unsupported_extensions: + ignored_files.append(file.path.name) + continue + msg = f"Unsupported file extension: {file.path.suffix}" + self.log(msg) + if not self.silent_errors: + raise ValueError(msg) + + final_files.append(file) + + if ignored_files: + self.log(f"Ignored files: {ignored_files}") + + return final_files diff --git a/src/backend/base/langflow/components/data/file.py b/src/backend/base/langflow/components/data/file.py index 8a5088aae..95a5b7722 100644 --- a/src/backend/base/langflow/components/data/file.py +++ b/src/backend/base/langflow/components/data/file.py @@ -1,25 +1,14 @@ -from pathlib import Path -from tempfile import NamedTemporaryFile -from zipfile import ZipFile, is_zipfile - +from langflow.base.data import BaseFileComponent from langflow.base.data.utils import TEXT_FILE_TYPES, parallel_load_data, parse_text_file_to_data -from langflow.custom import Component -from langflow.io import BoolInput, FileInput, IntInput, Output +from langflow.io import BoolInput, IntInput from langflow.schema import Data -class FileComponent(Component): - """Handles loading of individual or zipped text files. +class FileComponent(BaseFileComponent): + """Handles loading and processing of individual or zipped text files. - Processes multiple valid files within a zip archive if provided. - - Attributes: - display_name: Display name of the component. - description: Brief component description. - icon: Icon to represent the component. - name: Identifier for the component. - inputs: Inputs required by the component. - outputs: Output of the component after processing files. + This component supports processing multiple valid files within a zip archive, + resolving paths, validating file types, and optionally using multithreading for processing. """ display_name = "File" @@ -27,185 +16,78 @@ class FileComponent(Component): icon = "file-text" name = "File" + VALID_EXTENSIONS = TEXT_FILE_TYPES + inputs = [ - FileInput( - name="path", - display_name="Path", - file_types=[*TEXT_FILE_TYPES, "zip"], - info=f"Supported file types: {', '.join([*TEXT_FILE_TYPES, 'zip'])}", - ), - BoolInput( - name="silent_errors", - display_name="Silent Errors", - advanced=True, - info="If true, errors will not raise an exception.", - ), + *BaseFileComponent._base_inputs, BoolInput( name="use_multithreading", - display_name="Use Multithreading", + display_name="[Deprecated] Use Multithreading", advanced=True, - info="If true, parallel processing will be enabled for zip files.", + value=True, + info="Set 'Processing Concurrency' greater than 1 to enable multithreading.", ), IntInput( name="concurrency_multithreading", - display_name="Multithreading Concurrency", - advanced=True, - info="The maximum number of workers to use, if concurrency is enabled", - value=4, + display_name="Processing Concurrency", + advanced=False, + info="When multiple files are being processed, the number of files to process concurrently.", + value=1, ), ] - outputs = [Output(display_name="Data", name="data", method="load_file")] + outputs = [ + *BaseFileComponent._base_outputs, + ] - def load_file(self) -> Data: - """Load and parse file(s) from a zip archive. - - Raises: - ValueError: If no file is uploaded or file path is invalid. - - Returns: - Data: Parsed data from file(s). - """ - # Check if the file path is provided - if not self.path: - self.log("File path is missing.") - msg = "Please upload a file for processing." - - raise ValueError(msg) - - resolved_path = Path(self.resolve_path(self.path)) - try: - # Check if the file is a zip archive - if is_zipfile(resolved_path): - self.log(f"Processing zip file: {resolved_path.name}.") - - return self._process_zip_file( - resolved_path, - silent_errors=self.silent_errors, - parallel=self.use_multithreading, - ) - - self.log(f"Processing single file: {resolved_path.name}.") - - return self._process_single_file(resolved_path, silent_errors=self.silent_errors) - except FileNotFoundError: - self.log(f"File not found: {resolved_path.name}.") - - raise - - def _process_zip_file(self, zip_path: Path, *, silent_errors: bool = False, parallel: bool = False) -> Data: - """Process text files within a zip archive. + def process_files(self, file_list: list[BaseFileComponent.BaseFile]) -> list[BaseFileComponent.BaseFile]: + """Processes files either sequentially or in parallel, depending on concurrency settings. Args: - zip_path: Path to the zip file. - silent_errors: Suppresses errors if True. - parallel: Enables parallel processing if True. + file_list (list[BaseFileComponent.BaseFile]): List of files to process. Returns: - list[Data]: Combined data from all valid files. - - Raises: - ValueError: If no valid files found in the archive. + list[BaseFileComponent.BaseFile]: Updated list of files with merged data. """ - data: list[Data] = [] - with ZipFile(zip_path, "r") as zip_file: - # Filter file names based on extensions in TEXT_FILE_TYPES and ignore hidden files - valid_files = [ - name - for name in zip_file.namelist() - if ( - any(name.endswith(ext) for ext in TEXT_FILE_TYPES) - and not name.startswith("__MACOSX") - and not name.startswith(".") - ) - ] - # Raise an error if no valid files found - if not valid_files: - self.log("No valid files in the zip archive.") + def process_file(file_path: str, *, silent_errors: bool = False) -> Data | None: + """Processes a single file and returns its Data object.""" + try: + return parse_text_file_to_data(file_path, silent_errors=silent_errors) + except FileNotFoundError as e: + msg = f"File not found: {file_path}. Error: {e}" + self.log(msg) + if not silent_errors: + raise + return None + except Exception as e: + msg = f"Unexpected error processing {file_path}: {e}" + self.log(msg) + if not silent_errors: + raise + return None - # Return empty data if silent_errors is True - if silent_errors: - return data # type: ignore[return-value] + if not file_list: + self.log("No files to process.") + return file_list - # Raise an error if no valid files found - msg = "No valid files in the zip archive." - raise ValueError(msg) + concurrency = 1 if not self.use_multithreading else max(1, self.concurrency_multithreading) + file_count = len(file_list) - # Define a function to process each file - def process_file(file_name, silent_errors=silent_errors): - with NamedTemporaryFile(delete=False) as temp_file: - temp_path = Path(temp_file.name).with_name(file_name) - with zip_file.open(file_name) as file_content: - temp_path.write_bytes(file_content.read()) - try: - return self._process_single_file(temp_path, silent_errors=silent_errors) - finally: - temp_path.unlink() + parallel_processing_threshold = 2 + if concurrency < parallel_processing_threshold or file_count < parallel_processing_threshold: + if file_count > 1: + self.log(f"Processing {file_count} files sequentially.") + processed_data = [process_file(str(file.path), silent_errors=self.silent_errors) for file in file_list] + else: + self.log(f"Starting parallel processing of {file_count} files with concurrency: {concurrency}.") + file_paths = [str(file.path) for file in file_list] + processed_data = parallel_load_data( + file_paths, + silent_errors=self.silent_errors, + load_function=process_file, + max_concurrency=concurrency, + ) - # Process files in parallel if specified - if parallel: - self.log( - f"Initializing parallel Thread Pool Executor with max workers: " - f"{self.concurrency_multithreading}." - ) - - # Process files in parallel - initial_data = parallel_load_data( - valid_files, - silent_errors=silent_errors, - load_function=process_file, - max_concurrency=self.concurrency_multithreading, - ) - - # Filter out empty data - data = list(filter(None, initial_data)) - else: - # Sequential processing - data = [process_file(file_name) for file_name in valid_files] - - self.log(f"Successfully processed zip file: {zip_path.name}.") - - return data # type: ignore[return-value] - - def _process_single_file(self, file_path: Path, *, silent_errors: bool = False) -> Data: - """Process a single file. - - Args: - file_path: Path to the file. - silent_errors: Suppresses errors if True. - - Returns: - Data: Parsed data from the file. - - Raises: - ValueError: For unsupported file formats. - """ - # Check if the file type is supported - if not any(file_path.suffix == ext for ext in ["." + f for f in TEXT_FILE_TYPES]): - self.log(f"Unsupported file type: {file_path.suffix}") - - # Return empty data if silent_errors is True - if silent_errors: - return Data() - - msg = f"Unsupported file type: {file_path.suffix}" - raise ValueError(msg) - - try: - # Parse the text file as appropriate - data = parse_text_file_to_data(str(file_path), silent_errors=silent_errors) # type: ignore[assignment] - if not data: - data = Data() - - self.log(f"Successfully processed file: {file_path.name}.") - except Exception as e: - self.log(f"Error processing file {file_path.name}: {e}") - - # Return empty data if silent_errors is True - if not silent_errors: - raise - - data = Data() - - return data + # Use rollup_basefile_data to merge processed data with BaseFile objects + return self.rollup_data(file_list, processed_data) diff --git a/src/backend/base/langflow/initial_setup/starter_projects/Document Q&A.json b/src/backend/base/langflow/initial_setup/starter_projects/Document Q&A.json index 65078b8ec..b4b721b3e 100644 --- a/src/backend/base/langflow/initial_setup/starter_projects/Document Q&A.json +++ b/src/backend/base/langflow/initial_setup/starter_projects/Document Q&A.json @@ -9,12 +9,16 @@ "dataType": "ChatInput", "id": "ChatInput-1lWBj", "name": "message", - "output_types": ["Message"] + "output_types": [ + "Message" + ] }, "targetHandle": { "fieldName": "input_value", "id": "OpenAIModel-HIx8w", - "inputTypes": ["Message"], + "inputTypes": [ + "Message" + ], "type": "str" } }, @@ -32,12 +36,16 @@ "dataType": "OpenAIModel", "id": "OpenAIModel-HIx8w", "name": "text_output", - "output_types": ["Message"] + "output_types": [ + "Message" + ] }, "targetHandle": { "fieldName": "input_value", "id": "ChatOutput-hKFON", - "inputTypes": ["Message"], + "inputTypes": [ + "Message" + ], "type": "str" } }, @@ -54,12 +62,16 @@ "dataType": "File", "id": "File-dlDLp", "name": "data", - "output_types": ["Data"] + "output_types": [ + "Data" + ] }, "targetHandle": { "fieldName": "data", "id": "ParseData-mIiSz", - "inputTypes": ["Data"], + "inputTypes": [ + "Data" + ], "type": "other" } }, @@ -75,12 +87,17 @@ "dataType": "ParseData", "id": "ParseData-mIiSz", "name": "text", - "output_types": ["Message"] + "output_types": [ + "Message" + ] }, "targetHandle": { "fieldName": "Document", "id": "Prompt-L5CiD", - "inputTypes": ["Message", "Text"], + "inputTypes": [ + "Message", + "Text" + ], "type": "str" } }, @@ -96,12 +113,16 @@ "dataType": "Prompt", "id": "Prompt-L5CiD", "name": "prompt", - "output_types": ["Message"] + "output_types": [ + "Message" + ] }, "targetHandle": { "fieldName": "system_message", "id": "OpenAIModel-HIx8w", - "inputTypes": ["Message"], + "inputTypes": [ + "Message" + ], "type": "str" } }, @@ -119,7 +140,9 @@ "display_name": "Chat Input", "id": "ChatInput-1lWBj", "node": { - "base_classes": ["Message"], + "base_classes": [ + "Message" + ], "beta": false, "conditional_paths": [], "custom_fields": {}, @@ -148,7 +171,9 @@ "method": "message_response", "name": "message", "selected": "Message", - "types": ["Message"], + "types": [ + "Message" + ], "value": "__UNDEFINED__" } ], @@ -161,7 +186,9 @@ "display_name": "Background Color", "dynamic": false, "info": "The background color of the icon.", - "input_types": ["Message"], + "input_types": [ + "Message" + ], "list": false, "load_from_db": false, "name": "background_color", @@ -180,7 +207,9 @@ "display_name": "Icon", "dynamic": false, "info": "The icon of the message.", - "input_types": ["Message"], + "input_types": [ + "Message" + ], "list": false, "load_from_db": false, "name": "chat_icon", @@ -257,7 +286,9 @@ "display_name": "Text", "dynamic": false, "info": "Message to be passed as input.", - "input_types": ["Message"], + "input_types": [ + "Message" + ], "list": false, "load_from_db": false, "multiline": true, @@ -277,7 +308,10 @@ "dynamic": false, "info": "Type of sender.", "name": "sender", - "options": ["Machine", "User"], + "options": [ + "Machine", + "User" + ], "placeholder": "", "required": false, "show": true, @@ -291,7 +325,9 @@ "display_name": "Sender Name", "dynamic": false, "info": "Name of the sender.", - "input_types": ["Message"], + "input_types": [ + "Message" + ], "list": false, "load_from_db": false, "name": "sender_name", @@ -309,7 +345,9 @@ "display_name": "Session ID", "dynamic": false, "info": "The session ID of the chat. If empty, the current session ID parameter will be used.", - "input_types": ["Message"], + "input_types": [ + "Message" + ], "list": false, "load_from_db": false, "name": "session_id", @@ -344,7 +382,9 @@ "display_name": "Text Color", "dynamic": false, "info": "The text color of the name", - "input_types": ["Message"], + "input_types": [ + "Message" + ], "list": false, "load_from_db": false, "name": "text_color", @@ -382,7 +422,9 @@ "display_name": "Chat Output", "id": "ChatOutput-hKFON", "node": { - "base_classes": ["Message"], + "base_classes": [ + "Message" + ], "beta": false, "conditional_paths": [], "custom_fields": {}, @@ -414,7 +456,9 @@ "method": "message_response", "name": "message", "selected": "Message", - "types": ["Message"], + "types": [ + "Message" + ], "value": "__UNDEFINED__" } ], @@ -427,7 +471,9 @@ "display_name": "Background Color", "dynamic": false, "info": "The background color of the icon.", - "input_types": ["Message"], + "input_types": [ + "Message" + ], "list": false, "load_from_db": false, "name": "background_color", @@ -447,7 +493,9 @@ "display_name": "Icon", "dynamic": false, "info": "The icon of the message.", - "input_types": ["Message"], + "input_types": [ + "Message" + ], "list": false, "load_from_db": false, "name": "chat_icon", @@ -485,7 +533,9 @@ "display_name": "Data Template", "dynamic": false, "info": "Template to convert Data to Text. If left empty, it will be dynamically set to the Data's text key.", - "input_types": ["Message"], + "input_types": [ + "Message" + ], "list": false, "load_from_db": false, "name": "data_template", @@ -505,7 +555,9 @@ "display_name": "Text", "dynamic": false, "info": "Message to be passed as output.", - "input_types": ["Message"], + "input_types": [ + "Message" + ], "list": false, "load_from_db": false, "name": "input_value", @@ -526,7 +578,10 @@ "dynamic": false, "info": "Type of sender.", "name": "sender", - "options": ["Machine", "User"], + "options": [ + "Machine", + "User" + ], "placeholder": "", "required": false, "show": true, @@ -542,7 +597,9 @@ "display_name": "Sender Name", "dynamic": false, "info": "Name of the sender.", - "input_types": ["Message"], + "input_types": [ + "Message" + ], "list": false, "load_from_db": false, "name": "sender_name", @@ -562,7 +619,9 @@ "display_name": "Session ID", "dynamic": false, "info": "The session ID of the chat. If empty, the current session ID parameter will be used.", - "input_types": ["Message"], + "input_types": [ + "Message" + ], "list": false, "load_from_db": false, "name": "session_id", @@ -598,7 +657,9 @@ "display_name": "Text Color", "dynamic": false, "info": "The text color of the name", - "input_types": ["Message"], + "input_types": [ + "Message" + ], "list": false, "load_from_db": false, "name": "text_color", @@ -638,7 +699,9 @@ "display_name": "Parse Data", "id": "ParseData-mIiSz", "node": { - "base_classes": ["Message"], + "base_classes": [ + "Message" + ], "beta": false, "conditional_paths": [], "custom_fields": {}, @@ -646,7 +709,11 @@ "display_name": "Parse Data", "documentation": "", "edited": false, - "field_order": ["data", "template", "sep"], + "field_order": [ + "data", + "template", + "sep" + ], "frozen": false, "icon": "braces", "legacy": false, @@ -660,7 +727,9 @@ "method": "parse_data", "name": "text", "selected": "Message", - "types": ["Message"], + "types": [ + "Message" + ], "value": "__UNDEFINED__" } ], @@ -690,7 +759,9 @@ "display_name": "Data", "dynamic": false, "info": "The data to convert to text.", - "input_types": ["Data"], + "input_types": [ + "Data" + ], "list": false, "name": "data", "placeholder": "", @@ -723,7 +794,9 @@ "display_name": "Template", "dynamic": false, "info": "The template to use for formatting the data. It can contain the keys {text}, {data} or any other key in the Data.", - "input_types": ["Message"], + "input_types": [ + "Message" + ], "list": false, "load_from_db": false, "multiline": true, @@ -788,7 +861,10 @@ "data": { "id": "OpenAIModel-HIx8w", "node": { - "base_classes": ["LanguageModel", "Message"], + "base_classes": [ + "LanguageModel", + "Message" + ], "beta": false, "conditional_paths": [], "custom_fields": {}, @@ -825,7 +901,9 @@ "name": "text_output", "required_inputs": [], "selected": "Message", - "types": ["Message"], + "types": [ + "Message" + ], "value": "__UNDEFINED__" }, { @@ -835,7 +913,9 @@ "name": "model_output", "required_inputs": [], "selected": "LanguageModel", - "types": ["LanguageModel"], + "types": [ + "LanguageModel" + ], "value": "__UNDEFINED__" } ], @@ -848,7 +928,9 @@ "display_name": "OpenAI API Key", "dynamic": false, "info": "The OpenAI API Key to use for the OpenAI model.", - "input_types": ["Message"], + "input_types": [ + "Message" + ], "load_from_db": true, "name": "api_key", "password": true, @@ -883,7 +965,9 @@ "display_name": "Input", "dynamic": false, "info": "", - "input_types": ["Message"], + "input_types": [ + "Message" + ], "list": false, "load_from_db": false, "name": "input_value", @@ -999,7 +1083,9 @@ "display_name": "Output Parser", "dynamic": false, "info": "The parser to use to parse the output of the model", - "input_types": ["OutputParser"], + "input_types": [ + "OutputParser" + ], "list": false, "name": "output_parser", "placeholder": "", @@ -1064,7 +1150,9 @@ "display_name": "System Message", "dynamic": false, "info": "System message to pass to the model.", - "input_types": ["Message"], + "input_types": [ + "Message" + ], "list": false, "load_from_db": false, "name": "system_message", @@ -1149,7 +1237,9 @@ "data": { "id": "File-dlDLp", "node": { - "base_classes": ["Data"], + "base_classes": [ + "Data" + ], "beta": false, "conditional_paths": [], "custom_fields": {}, @@ -1172,10 +1262,13 @@ { "cache": true, "display_name": "Data", - "method": "load_file", + "method": "load_files", "name": "data", + "required_inputs": [], "selected": "Data", - "types": ["Data"], + "types": [ + "Data" + ], "value": "__UNDEFINED__" } ], @@ -1198,14 +1291,14 @@ "show": true, "title_case": false, "type": "code", - "value": "from pathlib import Path\nfrom tempfile import NamedTemporaryFile\nfrom zipfile import ZipFile, is_zipfile\n\nfrom langflow.base.data.utils import TEXT_FILE_TYPES, parallel_load_data, parse_text_file_to_data\nfrom langflow.custom import Component\nfrom langflow.io import BoolInput, FileInput, IntInput, Output\nfrom langflow.schema import Data\n\n\nclass FileComponent(Component):\n \"\"\"Handles loading of individual or zipped text files.\n\n Processes multiple valid files within a zip archive if provided.\n\n Attributes:\n display_name: Display name of the component.\n description: Brief component description.\n icon: Icon to represent the component.\n name: Identifier for the component.\n inputs: Inputs required by the component.\n outputs: Output of the component after processing files.\n \"\"\"\n\n display_name = \"File\"\n description = \"Load a file to be used in your project.\"\n icon = \"file-text\"\n name = \"File\"\n\n inputs = [\n FileInput(\n name=\"path\",\n display_name=\"Path\",\n file_types=[*TEXT_FILE_TYPES, \"zip\"],\n info=f\"Supported file types: {', '.join([*TEXT_FILE_TYPES, 'zip'])}\",\n ),\n BoolInput(\n name=\"silent_errors\",\n display_name=\"Silent Errors\",\n advanced=True,\n info=\"If true, errors will not raise an exception.\",\n ),\n BoolInput(\n name=\"use_multithreading\",\n display_name=\"Use Multithreading\",\n advanced=True,\n info=\"If true, parallel processing will be enabled for zip files.\",\n ),\n IntInput(\n name=\"concurrency_multithreading\",\n display_name=\"Multithreading Concurrency\",\n advanced=True,\n info=\"The maximum number of workers to use, if concurrency is enabled\",\n value=4,\n ),\n ]\n\n outputs = [Output(display_name=\"Data\", name=\"data\", method=\"load_file\")]\n\n def load_file(self) -> Data:\n \"\"\"Load and parse file(s) from a zip archive.\n\n Raises:\n ValueError: If no file is uploaded or file path is invalid.\n\n Returns:\n Data: Parsed data from file(s).\n \"\"\"\n # Check if the file path is provided\n if not self.path:\n self.log(\"File path is missing.\")\n msg = \"Please upload a file for processing.\"\n\n raise ValueError(msg)\n\n resolved_path = Path(self.resolve_path(self.path))\n try:\n # Check if the file is a zip archive\n if is_zipfile(resolved_path):\n self.log(f\"Processing zip file: {resolved_path.name}.\")\n\n return self._process_zip_file(\n resolved_path,\n silent_errors=self.silent_errors,\n parallel=self.use_multithreading,\n )\n\n self.log(f\"Processing single file: {resolved_path.name}.\")\n\n return self._process_single_file(resolved_path, silent_errors=self.silent_errors)\n except FileNotFoundError:\n self.log(f\"File not found: {resolved_path.name}.\")\n\n raise\n\n def _process_zip_file(self, zip_path: Path, *, silent_errors: bool = False, parallel: bool = False) -> Data:\n \"\"\"Process text files within a zip archive.\n\n Args:\n zip_path: Path to the zip file.\n silent_errors: Suppresses errors if True.\n parallel: Enables parallel processing if True.\n\n Returns:\n list[Data]: Combined data from all valid files.\n\n Raises:\n ValueError: If no valid files found in the archive.\n \"\"\"\n data: list[Data] = []\n with ZipFile(zip_path, \"r\") as zip_file:\n # Filter file names based on extensions in TEXT_FILE_TYPES and ignore hidden files\n valid_files = [\n name\n for name in zip_file.namelist()\n if (\n any(name.endswith(ext) for ext in TEXT_FILE_TYPES)\n and not name.startswith(\"__MACOSX\")\n and not name.startswith(\".\")\n )\n ]\n\n # Raise an error if no valid files found\n if not valid_files:\n self.log(\"No valid files in the zip archive.\")\n\n # Return empty data if silent_errors is True\n if silent_errors:\n return data # type: ignore[return-value]\n\n # Raise an error if no valid files found\n msg = \"No valid files in the zip archive.\"\n raise ValueError(msg)\n\n # Define a function to process each file\n def process_file(file_name, silent_errors=silent_errors):\n with NamedTemporaryFile(delete=False) as temp_file:\n temp_path = Path(temp_file.name).with_name(file_name)\n with zip_file.open(file_name) as file_content:\n temp_path.write_bytes(file_content.read())\n try:\n return self._process_single_file(temp_path, silent_errors=silent_errors)\n finally:\n temp_path.unlink()\n\n # Process files in parallel if specified\n if parallel:\n self.log(\n f\"Initializing parallel Thread Pool Executor with max workers: \"\n f\"{self.concurrency_multithreading}.\"\n )\n\n # Process files in parallel\n initial_data = parallel_load_data(\n valid_files,\n silent_errors=silent_errors,\n load_function=process_file,\n max_concurrency=self.concurrency_multithreading,\n )\n\n # Filter out empty data\n data = list(filter(None, initial_data))\n else:\n # Sequential processing\n data = [process_file(file_name) for file_name in valid_files]\n\n self.log(f\"Successfully processed zip file: {zip_path.name}.\")\n\n return data # type: ignore[return-value]\n\n def _process_single_file(self, file_path: Path, *, silent_errors: bool = False) -> Data:\n \"\"\"Process a single file.\n\n Args:\n file_path: Path to the file.\n silent_errors: Suppresses errors if True.\n\n Returns:\n Data: Parsed data from the file.\n\n Raises:\n ValueError: For unsupported file formats.\n \"\"\"\n # Check if the file type is supported\n if not any(file_path.suffix == ext for ext in [\".\" + f for f in TEXT_FILE_TYPES]):\n self.log(f\"Unsupported file type: {file_path.suffix}\")\n\n # Return empty data if silent_errors is True\n if silent_errors:\n return Data()\n\n msg = f\"Unsupported file type: {file_path.suffix}\"\n raise ValueError(msg)\n\n try:\n # Parse the text file as appropriate\n data = parse_text_file_to_data(str(file_path), silent_errors=silent_errors) # type: ignore[assignment]\n if not data:\n data = Data()\n\n self.log(f\"Successfully processed file: {file_path.name}.\")\n except Exception as e:\n self.log(f\"Error processing file {file_path.name}: {e}\")\n\n # Return empty data if silent_errors is True\n if not silent_errors:\n raise\n\n data = Data()\n\n return data\n" + "value": "from langflow.base.data import BaseFileComponent\nfrom langflow.base.data.utils import TEXT_FILE_TYPES, parallel_load_data, parse_text_file_to_data\nfrom langflow.io import BoolInput, IntInput\nfrom langflow.schema import Data\n\n\nclass FileComponent(BaseFileComponent):\n \"\"\"Handles loading and processing of individual or zipped text files.\n\n This component supports processing multiple valid files within a zip archive,\n resolving paths, validating file types, and optionally using multithreading for processing.\n \"\"\"\n\n display_name = \"File\"\n description = \"Load a file to be used in your project.\"\n icon = \"file-text\"\n name = \"File\"\n\n VALID_EXTENSIONS = TEXT_FILE_TYPES\n\n inputs = [\n *BaseFileComponent._base_inputs,\n BoolInput(\n name=\"use_multithreading\",\n display_name=\"[Deprecated] Use Multithreading\",\n advanced=True,\n value=True,\n info=\"Set 'Processing Concurrency' greater than 1 to enable multithreading.\",\n ),\n IntInput(\n name=\"concurrency_multithreading\",\n display_name=\"Processing Concurrency\",\n advanced=False,\n info=\"When multiple files are being processed, the number of files to process concurrently.\",\n value=1,\n ),\n ]\n\n outputs = [\n *BaseFileComponent._base_outputs,\n ]\n\n def process_files(self, file_list: list[BaseFileComponent.BaseFile]) -> list[BaseFileComponent.BaseFile]:\n \"\"\"Processes files either sequentially or in parallel, depending on concurrency settings.\n\n Args:\n file_list (list[BaseFileComponent.BaseFile]): List of files to process.\n\n Returns:\n list[BaseFileComponent.BaseFile]: Updated list of files with merged data.\n \"\"\"\n\n def process_file(file_path: str, *, silent_errors: bool = False) -> Data | None:\n \"\"\"Processes a single file and returns its Data object.\"\"\"\n try:\n return parse_text_file_to_data(file_path, silent_errors=silent_errors)\n except FileNotFoundError as e:\n msg = f\"File not found: {file_path}. Error: {e}\"\n self.log(msg)\n if not silent_errors:\n raise\n return None\n except Exception as e:\n msg = f\"Unexpected error processing {file_path}: {e}\"\n self.log(msg)\n if not silent_errors:\n raise\n return None\n\n if not file_list:\n self.log(\"No files to process.\")\n return file_list\n\n concurrency = 1 if not self.use_multithreading else max(1, self.concurrency_multithreading)\n file_count = len(file_list)\n\n parallel_processing_threshold = 2\n if concurrency < parallel_processing_threshold or file_count < parallel_processing_threshold:\n if file_count > 1:\n self.log(f\"Processing {file_count} files sequentially.\")\n processed_data = [process_file(str(file.path), silent_errors=self.silent_errors) for file in file_list]\n else:\n self.log(f\"Starting parallel processing of {file_count} files with concurrency: {concurrency}.\")\n file_paths = [str(file.path) for file in file_list]\n processed_data = parallel_load_data(\n file_paths,\n silent_errors=self.silent_errors,\n load_function=process_file,\n max_concurrency=concurrency,\n )\n\n # Use rollup_basefile_data to merge processed data with BaseFile objects\n return self.rollup_data(file_list, processed_data)\n" }, "concurrency_multithreading": { "_input_type": "IntInput", - "advanced": true, - "display_name": "Multithreading Concurrency", + "advanced": false, + "display_name": "Processing Concurrency", "dynamic": false, - "info": "The maximum number of workers to use, if concurrency is enabled", + "info": "When multiple files are being processed, the number of files to process concurrently.", "list": false, "name": "concurrency_multithreading", "placeholder": "", @@ -1216,6 +1309,73 @@ "type": "int", "value": 4 }, + "delete_server_file_after_processing": { + "_input_type": "BoolInput", + "advanced": true, + "display_name": "Delete Server File After Processing", + "dynamic": false, + "info": "If true, the Server File Path will be deleted after processing.", + "list": false, + "name": "delete_server_file_after_processing", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "trace_as_metadata": true, + "type": "bool", + "value": true + }, + "file_path": { + "_input_type": "HandleInput", + "advanced": false, + "display_name": "Server File Path", + "dynamic": false, + "info": "Data object with a 'file_path' property pointing to server file. Supercedes 'Path'. ", + "input_types": [ + "Data" + ], + "list": false, + "name": "file_path", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "trace_as_metadata": true, + "type": "other", + "value": "" + }, + "ignore_unspecified_files": { + "_input_type": "BoolInput", + "advanced": true, + "display_name": "Ignore Unspecified Files", + "dynamic": false, + "info": "If true, Data with no 'file_path' property will be ignored.", + "list": false, + "name": "ignore_unspecified_files", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "trace_as_metadata": true, + "type": "bool", + "value": false + }, + "ignore_unsupported_extensions": { + "_input_type": "BoolInput", + "advanced": true, + "display_name": "Ignore Unsupported Extensions", + "dynamic": false, + "info": "If true, files with unsupported extensions will not be processed.", + "list": false, + "name": "ignore_unsupported_extensions", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "trace_as_metadata": true, + "type": "bool", + "value": true + }, "path": { "_input_type": "FileInput", "advanced": false, @@ -1240,10 +1400,14 @@ "js", "ts", "tsx", - "zip" + "zip", + "tar", + "tgz", + "bz2", + "gz" ], "file_path": "", - "info": "Supported file types: txt, md, mdx, csv, json, yaml, yml, xml, html, htm, pdf, docx, py, sh, sql, js, ts, tsx, zip", + "info": "Supported file extensions: txt, md, mdx, csv, json, yaml, yml, xml, html, htm, pdf, docx, py, sh, sql, js, ts, tsx; optionally bundled in file extensions: zip, tar, tgz, bz2, gz", "list": false, "name": "path", "placeholder": "", @@ -1273,9 +1437,9 @@ "use_multithreading": { "_input_type": "BoolInput", "advanced": true, - "display_name": "Use Multithreading", + "display_name": "[Deprecated] Use Multithreading", "dynamic": false, - "info": "If true, parallel processing will be enabled for zip files.", + "info": "Set 'Processing Concurrency' greater than 1 to enable multithreading.", "list": false, "name": "use_multithreading", "placeholder": "", @@ -1312,18 +1476,24 @@ "display_name": "Prompt", "id": "Prompt-L5CiD", "node": { - "base_classes": ["Message"], + "base_classes": [ + "Message" + ], "beta": false, "conditional_paths": [], "custom_fields": { - "template": ["Document"] + "template": [ + "Document" + ] }, "description": "Create a prompt template with dynamic variables.", "display_name": "Prompt", "documentation": "", "edited": false, "error": null, - "field_order": ["template"], + "field_order": [ + "template" + ], "frozen": false, "full_path": null, "icon": "prompts", @@ -1342,7 +1512,9 @@ "method": "build_prompt", "name": "prompt", "selected": "Message", - "types": ["Message"], + "types": [ + "Message" + ], "value": "__UNDEFINED__" } ], @@ -1356,7 +1528,10 @@ "fileTypes": [], "file_path": "", "info": "", - "input_types": ["Message", "Text"], + "input_types": [ + "Message", + "Text" + ], "list": false, "load_from_db": false, "multiline": true, @@ -1432,11 +1607,15 @@ }, "description": "Integrates PDF reading with a language model to answer document-specific questions. Ideal for small-scale texts, it facilitates direct queries with immediate insights.", "endpoint_name": null, + "gradient": "3", "icon": "FileQuestion", "id": "febba2f9-69b3-484b-8aef-65626810ec8a", - "gradient": "3", "is_component": false, "last_tested_version": "1.0.19.post2", "name": "Document Q&A", - "tags": ["rag", "q-a", "openai"] -} + "tags": [ + "rag", + "q-a", + "openai" + ] +} \ No newline at end of file diff --git a/src/backend/base/langflow/initial_setup/starter_projects/Vector Store RAG.json b/src/backend/base/langflow/initial_setup/starter_projects/Vector Store RAG.json index 7e4d6423d..4009f4a7b 100644 --- a/src/backend/base/langflow/initial_setup/starter_projects/Vector Store RAG.json +++ b/src/backend/base/langflow/initial_setup/starter_projects/Vector Store RAG.json @@ -3788,8 +3788,9 @@ { "cache": true, "display_name": "Data", - "method": "load_file", + "method": "load_files", "name": "data", + "required_inputs": [], "selected": "Data", "types": [ "Data" @@ -3816,14 +3817,14 @@ "show": true, "title_case": false, "type": "code", - "value": "from pathlib import Path\nfrom tempfile import NamedTemporaryFile\nfrom zipfile import ZipFile, is_zipfile\n\nfrom langflow.base.data.utils import TEXT_FILE_TYPES, parallel_load_data, parse_text_file_to_data\nfrom langflow.custom import Component\nfrom langflow.io import BoolInput, FileInput, IntInput, Output\nfrom langflow.schema import Data\n\n\nclass FileComponent(Component):\n \"\"\"Handles loading of individual or zipped text files.\n\n Processes multiple valid files within a zip archive if provided.\n\n Attributes:\n display_name: Display name of the component.\n description: Brief component description.\n icon: Icon to represent the component.\n name: Identifier for the component.\n inputs: Inputs required by the component.\n outputs: Output of the component after processing files.\n \"\"\"\n\n display_name = \"File\"\n description = \"Load a file to be used in your project.\"\n icon = \"file-text\"\n name = \"File\"\n\n inputs = [\n FileInput(\n name=\"path\",\n display_name=\"Path\",\n file_types=[*TEXT_FILE_TYPES, \"zip\"],\n info=f\"Supported file types: {', '.join([*TEXT_FILE_TYPES, 'zip'])}\",\n ),\n BoolInput(\n name=\"silent_errors\",\n display_name=\"Silent Errors\",\n advanced=True,\n info=\"If true, errors will not raise an exception.\",\n ),\n BoolInput(\n name=\"use_multithreading\",\n display_name=\"Use Multithreading\",\n advanced=True,\n info=\"If true, parallel processing will be enabled for zip files.\",\n ),\n IntInput(\n name=\"concurrency_multithreading\",\n display_name=\"Multithreading Concurrency\",\n advanced=True,\n info=\"The maximum number of workers to use, if concurrency is enabled\",\n value=4,\n ),\n ]\n\n outputs = [Output(display_name=\"Data\", name=\"data\", method=\"load_file\")]\n\n def load_file(self) -> Data:\n \"\"\"Load and parse file(s) from a zip archive.\n\n Raises:\n ValueError: If no file is uploaded or file path is invalid.\n\n Returns:\n Data: Parsed data from file(s).\n \"\"\"\n # Check if the file path is provided\n if not self.path:\n self.log(\"File path is missing.\")\n msg = \"Please upload a file for processing.\"\n\n raise ValueError(msg)\n\n resolved_path = Path(self.resolve_path(self.path))\n try:\n # Check if the file is a zip archive\n if is_zipfile(resolved_path):\n self.log(f\"Processing zip file: {resolved_path.name}.\")\n\n return self._process_zip_file(\n resolved_path,\n silent_errors=self.silent_errors,\n parallel=self.use_multithreading,\n )\n\n self.log(f\"Processing single file: {resolved_path.name}.\")\n\n return self._process_single_file(resolved_path, silent_errors=self.silent_errors)\n except FileNotFoundError:\n self.log(f\"File not found: {resolved_path.name}.\")\n\n raise\n\n def _process_zip_file(self, zip_path: Path, *, silent_errors: bool = False, parallel: bool = False) -> Data:\n \"\"\"Process text files within a zip archive.\n\n Args:\n zip_path: Path to the zip file.\n silent_errors: Suppresses errors if True.\n parallel: Enables parallel processing if True.\n\n Returns:\n list[Data]: Combined data from all valid files.\n\n Raises:\n ValueError: If no valid files found in the archive.\n \"\"\"\n data: list[Data] = []\n with ZipFile(zip_path, \"r\") as zip_file:\n # Filter file names based on extensions in TEXT_FILE_TYPES and ignore hidden files\n valid_files = [\n name\n for name in zip_file.namelist()\n if (\n any(name.endswith(ext) for ext in TEXT_FILE_TYPES)\n and not name.startswith(\"__MACOSX\")\n and not name.startswith(\".\")\n )\n ]\n\n # Raise an error if no valid files found\n if not valid_files:\n self.log(\"No valid files in the zip archive.\")\n\n # Return empty data if silent_errors is True\n if silent_errors:\n return data # type: ignore[return-value]\n\n # Raise an error if no valid files found\n msg = \"No valid files in the zip archive.\"\n raise ValueError(msg)\n\n # Define a function to process each file\n def process_file(file_name, silent_errors=silent_errors):\n with NamedTemporaryFile(delete=False) as temp_file:\n temp_path = Path(temp_file.name).with_name(file_name)\n with zip_file.open(file_name) as file_content:\n temp_path.write_bytes(file_content.read())\n try:\n return self._process_single_file(temp_path, silent_errors=silent_errors)\n finally:\n temp_path.unlink()\n\n # Process files in parallel if specified\n if parallel:\n self.log(\n f\"Initializing parallel Thread Pool Executor with max workers: \"\n f\"{self.concurrency_multithreading}.\"\n )\n\n # Process files in parallel\n initial_data = parallel_load_data(\n valid_files,\n silent_errors=silent_errors,\n load_function=process_file,\n max_concurrency=self.concurrency_multithreading,\n )\n\n # Filter out empty data\n data = list(filter(None, initial_data))\n else:\n # Sequential processing\n data = [process_file(file_name) for file_name in valid_files]\n\n self.log(f\"Successfully processed zip file: {zip_path.name}.\")\n\n return data # type: ignore[return-value]\n\n def _process_single_file(self, file_path: Path, *, silent_errors: bool = False) -> Data:\n \"\"\"Process a single file.\n\n Args:\n file_path: Path to the file.\n silent_errors: Suppresses errors if True.\n\n Returns:\n Data: Parsed data from the file.\n\n Raises:\n ValueError: For unsupported file formats.\n \"\"\"\n # Check if the file type is supported\n if not any(file_path.suffix == ext for ext in [\".\" + f for f in TEXT_FILE_TYPES]):\n self.log(f\"Unsupported file type: {file_path.suffix}\")\n\n # Return empty data if silent_errors is True\n if silent_errors:\n return Data()\n\n msg = f\"Unsupported file type: {file_path.suffix}\"\n raise ValueError(msg)\n\n try:\n # Parse the text file as appropriate\n data = parse_text_file_to_data(str(file_path), silent_errors=silent_errors) # type: ignore[assignment]\n if not data:\n data = Data()\n\n self.log(f\"Successfully processed file: {file_path.name}.\")\n except Exception as e:\n self.log(f\"Error processing file {file_path.name}: {e}\")\n\n # Return empty data if silent_errors is True\n if not silent_errors:\n raise\n\n data = Data()\n\n return data\n" + "value": "from langflow.base.data import BaseFileComponent\nfrom langflow.base.data.utils import TEXT_FILE_TYPES, parallel_load_data, parse_text_file_to_data\nfrom langflow.io import BoolInput, IntInput\nfrom langflow.schema import Data\n\n\nclass FileComponent(BaseFileComponent):\n \"\"\"Handles loading and processing of individual or zipped text files.\n\n This component supports processing multiple valid files within a zip archive,\n resolving paths, validating file types, and optionally using multithreading for processing.\n \"\"\"\n\n display_name = \"File\"\n description = \"Load a file to be used in your project.\"\n icon = \"file-text\"\n name = \"File\"\n\n VALID_EXTENSIONS = TEXT_FILE_TYPES\n\n inputs = [\n *BaseFileComponent._base_inputs,\n BoolInput(\n name=\"use_multithreading\",\n display_name=\"[Deprecated] Use Multithreading\",\n advanced=True,\n value=True,\n info=\"Set 'Processing Concurrency' greater than 1 to enable multithreading.\",\n ),\n IntInput(\n name=\"concurrency_multithreading\",\n display_name=\"Processing Concurrency\",\n advanced=False,\n info=\"When multiple files are being processed, the number of files to process concurrently.\",\n value=1,\n ),\n ]\n\n outputs = [\n *BaseFileComponent._base_outputs,\n ]\n\n def process_files(self, file_list: list[BaseFileComponent.BaseFile]) -> list[BaseFileComponent.BaseFile]:\n \"\"\"Processes files either sequentially or in parallel, depending on concurrency settings.\n\n Args:\n file_list (list[BaseFileComponent.BaseFile]): List of files to process.\n\n Returns:\n list[BaseFileComponent.BaseFile]: Updated list of files with merged data.\n \"\"\"\n\n def process_file(file_path: str, *, silent_errors: bool = False) -> Data | None:\n \"\"\"Processes a single file and returns its Data object.\"\"\"\n try:\n return parse_text_file_to_data(file_path, silent_errors=silent_errors)\n except FileNotFoundError as e:\n msg = f\"File not found: {file_path}. Error: {e}\"\n self.log(msg)\n if not silent_errors:\n raise\n return None\n except Exception as e:\n msg = f\"Unexpected error processing {file_path}: {e}\"\n self.log(msg)\n if not silent_errors:\n raise\n return None\n\n if not file_list:\n self.log(\"No files to process.\")\n return file_list\n\n concurrency = 1 if not self.use_multithreading else max(1, self.concurrency_multithreading)\n file_count = len(file_list)\n\n parallel_processing_threshold = 2\n if concurrency < parallel_processing_threshold or file_count < parallel_processing_threshold:\n if file_count > 1:\n self.log(f\"Processing {file_count} files sequentially.\")\n processed_data = [process_file(str(file.path), silent_errors=self.silent_errors) for file in file_list]\n else:\n self.log(f\"Starting parallel processing of {file_count} files with concurrency: {concurrency}.\")\n file_paths = [str(file.path) for file in file_list]\n processed_data = parallel_load_data(\n file_paths,\n silent_errors=self.silent_errors,\n load_function=process_file,\n max_concurrency=concurrency,\n )\n\n # Use rollup_basefile_data to merge processed data with BaseFile objects\n return self.rollup_data(file_list, processed_data)\n" }, "concurrency_multithreading": { "_input_type": "IntInput", - "advanced": true, - "display_name": "Multithreading Concurrency", + "advanced": false, + "display_name": "Processing Concurrency", "dynamic": false, - "info": "The maximum number of workers to use, if concurrency is enabled", + "info": "When multiple files are being processed, the number of files to process concurrently.", "list": false, "name": "concurrency_multithreading", "placeholder": "", @@ -3834,6 +3835,73 @@ "type": "int", "value": 4 }, + "delete_server_file_after_processing": { + "_input_type": "BoolInput", + "advanced": true, + "display_name": "Delete Server File After Processing", + "dynamic": false, + "info": "If true, the Server File Path will be deleted after processing.", + "list": false, + "name": "delete_server_file_after_processing", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "trace_as_metadata": true, + "type": "bool", + "value": true + }, + "file_path": { + "_input_type": "HandleInput", + "advanced": false, + "display_name": "Server File Path", + "dynamic": false, + "info": "Data object with a 'file_path' property pointing to server file. Supercedes 'Path'. ", + "input_types": [ + "Data" + ], + "list": false, + "name": "file_path", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "trace_as_metadata": true, + "type": "other", + "value": "" + }, + "ignore_unspecified_files": { + "_input_type": "BoolInput", + "advanced": true, + "display_name": "Ignore Unspecified Files", + "dynamic": false, + "info": "If true, Data with no 'file_path' property will be ignored.", + "list": false, + "name": "ignore_unspecified_files", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "trace_as_metadata": true, + "type": "bool", + "value": false + }, + "ignore_unsupported_extensions": { + "_input_type": "BoolInput", + "advanced": true, + "display_name": "Ignore Unsupported Extensions", + "dynamic": false, + "info": "If true, files with unsupported extensions will not be processed.", + "list": false, + "name": "ignore_unsupported_extensions", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "trace_as_metadata": true, + "type": "bool", + "value": true + }, "path": { "_input_type": "FileInput", "advanced": false, @@ -3858,10 +3926,14 @@ "js", "ts", "tsx", - "zip" + "zip", + "tar", + "tgz", + "bz2", + "gz" ], "file_path": "", - "info": "Supported file types: txt, md, mdx, csv, json, yaml, yml, xml, html, htm, pdf, docx, py, sh, sql, js, ts, tsx, zip", + "info": "Supported file extensions: txt, md, mdx, csv, json, yaml, yml, xml, html, htm, pdf, docx, py, sh, sql, js, ts, tsx; optionally bundled in file extensions: zip, tar, tgz, bz2, gz", "list": false, "name": "path", "placeholder": "", @@ -3891,9 +3963,9 @@ "use_multithreading": { "_input_type": "BoolInput", "advanced": true, - "display_name": "Use Multithreading", + "display_name": "[Deprecated] Use Multithreading", "dynamic": false, - "info": "If true, parallel processing will be enabled for zip files.", + "info": "Set 'Processing Concurrency' greater than 1 to enable multithreading.", "list": false, "name": "use_multithreading", "placeholder": "", diff --git a/src/backend/base/langflow/initial_setup/starter_projects/document_qa.py b/src/backend/base/langflow/initial_setup/starter_projects/document_qa.py index 562c94e90..3185903dd 100644 --- a/src/backend/base/langflow/initial_setup/starter_projects/document_qa.py +++ b/src/backend/base/langflow/initial_setup/starter_projects/document_qa.py @@ -24,7 +24,7 @@ Answer: """ file_component = FileComponent() parse_data_component = ParseDataComponent() - parse_data_component.set(data=file_component.load_file) + parse_data_component.set(data=file_component.load_files) chat_input = ChatInput() prompt_component = PromptComponent() diff --git a/src/backend/base/langflow/initial_setup/starter_projects/vector_store_rag.py b/src/backend/base/langflow/initial_setup/starter_projects/vector_store_rag.py index f7ffdb0eb..c367fd374 100644 --- a/src/backend/base/langflow/initial_setup/starter_projects/vector_store_rag.py +++ b/src/backend/base/langflow/initial_setup/starter_projects/vector_store_rag.py @@ -16,7 +16,7 @@ def ingestion_graph(): # Ingestion Graph file_component = FileComponent() text_splitter = SplitTextComponent() - text_splitter.set(data_inputs=file_component.load_file) + text_splitter.set(data_inputs=file_component.load_files) openai_embeddings = OpenAIEmbeddingsComponent() vector_store = AstraVectorStoreComponent() vector_store.set( diff --git a/src/backend/tests/unit/initial_setup/starter_projects/test_vector_store_rag.py b/src/backend/tests/unit/initial_setup/starter_projects/test_vector_store_rag.py index d11014881..b829f5cc8 100644 --- a/src/backend/tests/unit/initial_setup/starter_projects/test_vector_store_rag.py +++ b/src/backend/tests/unit/initial_setup/starter_projects/test_vector_store_rag.py @@ -24,7 +24,7 @@ def ingestion_graph(): file_component.set(path="test.txt") file_component.set_on_output(name="data", value=Data(text="This is a test file."), cache=True) text_splitter = SplitTextComponent(_id="text-splitter-123") - text_splitter.set(data_inputs=file_component.load_file) + text_splitter.set(data_inputs=file_component.load_files) openai_embeddings = OpenAIEmbeddingsComponent(_id="openai-embeddings-123") openai_embeddings.set( openai_api_key="sk-123", openai_api_base="https://api.openai.com/v1", openai_api_type="openai"