diff --git a/src/backend/base/langflow/base/data/base_file.py b/src/backend/base/langflow/base/data/base_file.py index 4e3ff00ec..26d3300c8 100644 --- a/src/backend/base/langflow/base/data/base_file.py +++ b/src/backend/base/langflow/base/data/base_file.py @@ -127,6 +127,7 @@ class BaseFileComponent(Component, ABC): required=False, input_types=["Data", "Message"], is_list=True, + advanced=True, ), BoolInput( name="silent_errors", diff --git a/src/backend/base/langflow/components/data/file.py b/src/backend/base/langflow/components/data/file.py index ff970b67a..b797451a8 100644 --- a/src/backend/base/langflow/components/data/file.py +++ b/src/backend/base/langflow/components/data/file.py @@ -30,7 +30,7 @@ class FileComponent(BaseFileComponent): IntInput( name="concurrency_multithreading", display_name="Processing Concurrency", - advanced=False, + advanced=True, info="When multiple files are being processed, the number of files to process concurrently.", value=1, ), diff --git a/src/backend/base/langflow/initial_setup/starter_projects/Document Q&A.json b/src/backend/base/langflow/initial_setup/starter_projects/Document Q&A.json index 647a0f99a..151cd42cd 100644 --- a/src/backend/base/langflow/initial_setup/starter_projects/Document Q&A.json +++ b/src/backend/base/langflow/initial_setup/starter_projects/Document Q&A.json @@ -977,11 +977,11 @@ "show": true, "title_case": false, "type": "code", - "value": "from langflow.base.data import BaseFileComponent\nfrom langflow.base.data.utils import TEXT_FILE_TYPES, parallel_load_data, parse_text_file_to_data\nfrom langflow.io import BoolInput, IntInput\nfrom langflow.schema import Data\n\n\nclass FileComponent(BaseFileComponent):\n \"\"\"Handles loading and processing of individual or zipped text files.\n\n This component supports processing multiple valid files within a zip archive,\n resolving paths, validating file types, and optionally using multithreading for processing.\n \"\"\"\n\n display_name = \"File\"\n description = \"Load a file to be used in your project.\"\n icon = \"file-text\"\n name = \"File\"\n\n VALID_EXTENSIONS = TEXT_FILE_TYPES\n\n inputs = [\n *BaseFileComponent._base_inputs,\n BoolInput(\n name=\"use_multithreading\",\n display_name=\"[Deprecated] Use Multithreading\",\n advanced=True,\n value=True,\n info=\"Set 'Processing Concurrency' greater than 1 to enable multithreading.\",\n ),\n IntInput(\n name=\"concurrency_multithreading\",\n display_name=\"Processing Concurrency\",\n advanced=False,\n info=\"When multiple files are being processed, the number of files to process concurrently.\",\n value=1,\n ),\n ]\n\n outputs = [\n *BaseFileComponent._base_outputs,\n ]\n\n def process_files(self, file_list: list[BaseFileComponent.BaseFile]) -> list[BaseFileComponent.BaseFile]:\n \"\"\"Processes files either sequentially or in parallel, depending on concurrency settings.\n\n Args:\n file_list (list[BaseFileComponent.BaseFile]): List of files to process.\n\n Returns:\n list[BaseFileComponent.BaseFile]: Updated list of files with merged data.\n \"\"\"\n\n def process_file(file_path: str, *, silent_errors: bool = False) -> Data | None:\n \"\"\"Processes a single file and returns its Data object.\"\"\"\n try:\n return parse_text_file_to_data(file_path, silent_errors=silent_errors)\n except FileNotFoundError as e:\n msg = f\"File not found: {file_path}. Error: {e}\"\n self.log(msg)\n if not silent_errors:\n raise\n return None\n except Exception as e:\n msg = f\"Unexpected error processing {file_path}: {e}\"\n self.log(msg)\n if not silent_errors:\n raise\n return None\n\n if not file_list:\n msg = \"No files to process.\"\n raise ValueError(msg)\n\n concurrency = 1 if not self.use_multithreading else max(1, self.concurrency_multithreading)\n file_count = len(file_list)\n\n parallel_processing_threshold = 2\n if concurrency < parallel_processing_threshold or file_count < parallel_processing_threshold:\n if file_count > 1:\n self.log(f\"Processing {file_count} files sequentially.\")\n processed_data = [process_file(str(file.path), silent_errors=self.silent_errors) for file in file_list]\n else:\n self.log(f\"Starting parallel processing of {file_count} files with concurrency: {concurrency}.\")\n file_paths = [str(file.path) for file in file_list]\n processed_data = parallel_load_data(\n file_paths,\n silent_errors=self.silent_errors,\n load_function=process_file,\n max_concurrency=concurrency,\n )\n\n # Use rollup_basefile_data to merge processed data with BaseFile objects\n return self.rollup_data(file_list, processed_data)\n" + "value": "from langflow.base.data import BaseFileComponent\nfrom langflow.base.data.utils import TEXT_FILE_TYPES, parallel_load_data, parse_text_file_to_data\nfrom langflow.io import BoolInput, IntInput\nfrom langflow.schema import Data\n\n\nclass FileComponent(BaseFileComponent):\n \"\"\"Handles loading and processing of individual or zipped text files.\n\n This component supports processing multiple valid files within a zip archive,\n resolving paths, validating file types, and optionally using multithreading for processing.\n \"\"\"\n\n display_name = \"File\"\n description = \"Load a file to be used in your project.\"\n icon = \"file-text\"\n name = \"File\"\n\n VALID_EXTENSIONS = TEXT_FILE_TYPES\n\n inputs = [\n *BaseFileComponent._base_inputs,\n BoolInput(\n name=\"use_multithreading\",\n display_name=\"[Deprecated] Use Multithreading\",\n advanced=True,\n value=True,\n info=\"Set 'Processing Concurrency' greater than 1 to enable multithreading.\",\n ),\n IntInput(\n name=\"concurrency_multithreading\",\n display_name=\"Processing Concurrency\",\n advanced=True,\n info=\"When multiple files are being processed, the number of files to process concurrently.\",\n value=1,\n ),\n ]\n\n outputs = [\n *BaseFileComponent._base_outputs,\n ]\n\n def process_files(self, file_list: list[BaseFileComponent.BaseFile]) -> list[BaseFileComponent.BaseFile]:\n \"\"\"Processes files either sequentially or in parallel, depending on concurrency settings.\n\n Args:\n file_list (list[BaseFileComponent.BaseFile]): List of files to process.\n\n Returns:\n list[BaseFileComponent.BaseFile]: Updated list of files with merged data.\n \"\"\"\n\n def process_file(file_path: str, *, silent_errors: bool = False) -> Data | None:\n \"\"\"Processes a single file and returns its Data object.\"\"\"\n try:\n return parse_text_file_to_data(file_path, silent_errors=silent_errors)\n except FileNotFoundError as e:\n msg = f\"File not found: {file_path}. Error: {e}\"\n self.log(msg)\n if not silent_errors:\n raise\n return None\n except Exception as e:\n msg = f\"Unexpected error processing {file_path}: {e}\"\n self.log(msg)\n if not silent_errors:\n raise\n return None\n\n if not file_list:\n msg = \"No files to process.\"\n raise ValueError(msg)\n\n concurrency = 1 if not self.use_multithreading else max(1, self.concurrency_multithreading)\n file_count = len(file_list)\n\n parallel_processing_threshold = 2\n if concurrency < parallel_processing_threshold or file_count < parallel_processing_threshold:\n if file_count > 1:\n self.log(f\"Processing {file_count} files sequentially.\")\n processed_data = [process_file(str(file.path), silent_errors=self.silent_errors) for file in file_list]\n else:\n self.log(f\"Starting parallel processing of {file_count} files with concurrency: {concurrency}.\")\n file_paths = [str(file.path) for file in file_list]\n processed_data = parallel_load_data(\n file_paths,\n silent_errors=self.silent_errors,\n load_function=process_file,\n max_concurrency=concurrency,\n )\n\n # Use rollup_basefile_data to merge processed data with BaseFile objects\n return self.rollup_data(file_list, processed_data)\n" }, "concurrency_multithreading": { "_input_type": "IntInput", - "advanced": false, + "advanced": true, "display_name": "Processing Concurrency", "dynamic": false, "info": "When multiple files are being processed, the number of files to process concurrently.", @@ -1013,7 +1013,7 @@ }, "file_path": { "_input_type": "HandleInput", - "advanced": false, + "advanced": true, "display_name": "Server File Path", "dynamic": false, "info": "Data object with a 'file_path' property pointing to server file or a Message object with a path to the file. Supercedes 'Path' but supports same file types.", diff --git a/src/backend/base/langflow/initial_setup/starter_projects/Vector Store RAG.json b/src/backend/base/langflow/initial_setup/starter_projects/Vector Store RAG.json index ac4d6ffc7..a09921c00 100644 --- a/src/backend/base/langflow/initial_setup/starter_projects/Vector Store RAG.json +++ b/src/backend/base/langflow/initial_setup/starter_projects/Vector Store RAG.json @@ -2493,11 +2493,11 @@ "show": true, "title_case": false, "type": "code", - "value": "from langflow.base.data import BaseFileComponent\nfrom langflow.base.data.utils import TEXT_FILE_TYPES, parallel_load_data, parse_text_file_to_data\nfrom langflow.io import BoolInput, IntInput\nfrom langflow.schema import Data\n\n\nclass FileComponent(BaseFileComponent):\n \"\"\"Handles loading and processing of individual or zipped text files.\n\n This component supports processing multiple valid files within a zip archive,\n resolving paths, validating file types, and optionally using multithreading for processing.\n \"\"\"\n\n display_name = \"File\"\n description = \"Load a file to be used in your project.\"\n icon = \"file-text\"\n name = \"File\"\n\n VALID_EXTENSIONS = TEXT_FILE_TYPES\n\n inputs = [\n *BaseFileComponent._base_inputs,\n BoolInput(\n name=\"use_multithreading\",\n display_name=\"[Deprecated] Use Multithreading\",\n advanced=True,\n value=True,\n info=\"Set 'Processing Concurrency' greater than 1 to enable multithreading.\",\n ),\n IntInput(\n name=\"concurrency_multithreading\",\n display_name=\"Processing Concurrency\",\n advanced=False,\n info=\"When multiple files are being processed, the number of files to process concurrently.\",\n value=1,\n ),\n ]\n\n outputs = [\n *BaseFileComponent._base_outputs,\n ]\n\n def process_files(self, file_list: list[BaseFileComponent.BaseFile]) -> list[BaseFileComponent.BaseFile]:\n \"\"\"Processes files either sequentially or in parallel, depending on concurrency settings.\n\n Args:\n file_list (list[BaseFileComponent.BaseFile]): List of files to process.\n\n Returns:\n list[BaseFileComponent.BaseFile]: Updated list of files with merged data.\n \"\"\"\n\n def process_file(file_path: str, *, silent_errors: bool = False) -> Data | None:\n \"\"\"Processes a single file and returns its Data object.\"\"\"\n try:\n return parse_text_file_to_data(file_path, silent_errors=silent_errors)\n except FileNotFoundError as e:\n msg = f\"File not found: {file_path}. Error: {e}\"\n self.log(msg)\n if not silent_errors:\n raise\n return None\n except Exception as e:\n msg = f\"Unexpected error processing {file_path}: {e}\"\n self.log(msg)\n if not silent_errors:\n raise\n return None\n\n if not file_list:\n msg = \"No files to process.\"\n raise ValueError(msg)\n\n concurrency = 1 if not self.use_multithreading else max(1, self.concurrency_multithreading)\n file_count = len(file_list)\n\n parallel_processing_threshold = 2\n if concurrency < parallel_processing_threshold or file_count < parallel_processing_threshold:\n if file_count > 1:\n self.log(f\"Processing {file_count} files sequentially.\")\n processed_data = [process_file(str(file.path), silent_errors=self.silent_errors) for file in file_list]\n else:\n self.log(f\"Starting parallel processing of {file_count} files with concurrency: {concurrency}.\")\n file_paths = [str(file.path) for file in file_list]\n processed_data = parallel_load_data(\n file_paths,\n silent_errors=self.silent_errors,\n load_function=process_file,\n max_concurrency=concurrency,\n )\n\n # Use rollup_basefile_data to merge processed data with BaseFile objects\n return self.rollup_data(file_list, processed_data)\n" + "value": "from langflow.base.data import BaseFileComponent\nfrom langflow.base.data.utils import TEXT_FILE_TYPES, parallel_load_data, parse_text_file_to_data\nfrom langflow.io import BoolInput, IntInput\nfrom langflow.schema import Data\n\n\nclass FileComponent(BaseFileComponent):\n \"\"\"Handles loading and processing of individual or zipped text files.\n\n This component supports processing multiple valid files within a zip archive,\n resolving paths, validating file types, and optionally using multithreading for processing.\n \"\"\"\n\n display_name = \"File\"\n description = \"Load a file to be used in your project.\"\n icon = \"file-text\"\n name = \"File\"\n\n VALID_EXTENSIONS = TEXT_FILE_TYPES\n\n inputs = [\n *BaseFileComponent._base_inputs,\n BoolInput(\n name=\"use_multithreading\",\n display_name=\"[Deprecated] Use Multithreading\",\n advanced=True,\n value=True,\n info=\"Set 'Processing Concurrency' greater than 1 to enable multithreading.\",\n ),\n IntInput(\n name=\"concurrency_multithreading\",\n display_name=\"Processing Concurrency\",\n advanced=True,\n info=\"When multiple files are being processed, the number of files to process concurrently.\",\n value=1,\n ),\n ]\n\n outputs = [\n *BaseFileComponent._base_outputs,\n ]\n\n def process_files(self, file_list: list[BaseFileComponent.BaseFile]) -> list[BaseFileComponent.BaseFile]:\n \"\"\"Processes files either sequentially or in parallel, depending on concurrency settings.\n\n Args:\n file_list (list[BaseFileComponent.BaseFile]): List of files to process.\n\n Returns:\n list[BaseFileComponent.BaseFile]: Updated list of files with merged data.\n \"\"\"\n\n def process_file(file_path: str, *, silent_errors: bool = False) -> Data | None:\n \"\"\"Processes a single file and returns its Data object.\"\"\"\n try:\n return parse_text_file_to_data(file_path, silent_errors=silent_errors)\n except FileNotFoundError as e:\n msg = f\"File not found: {file_path}. Error: {e}\"\n self.log(msg)\n if not silent_errors:\n raise\n return None\n except Exception as e:\n msg = f\"Unexpected error processing {file_path}: {e}\"\n self.log(msg)\n if not silent_errors:\n raise\n return None\n\n if not file_list:\n msg = \"No files to process.\"\n raise ValueError(msg)\n\n concurrency = 1 if not self.use_multithreading else max(1, self.concurrency_multithreading)\n file_count = len(file_list)\n\n parallel_processing_threshold = 2\n if concurrency < parallel_processing_threshold or file_count < parallel_processing_threshold:\n if file_count > 1:\n self.log(f\"Processing {file_count} files sequentially.\")\n processed_data = [process_file(str(file.path), silent_errors=self.silent_errors) for file in file_list]\n else:\n self.log(f\"Starting parallel processing of {file_count} files with concurrency: {concurrency}.\")\n file_paths = [str(file.path) for file in file_list]\n processed_data = parallel_load_data(\n file_paths,\n silent_errors=self.silent_errors,\n load_function=process_file,\n max_concurrency=concurrency,\n )\n\n # Use rollup_basefile_data to merge processed data with BaseFile objects\n return self.rollup_data(file_list, processed_data)\n" }, "concurrency_multithreading": { "_input_type": "IntInput", - "advanced": false, + "advanced": true, "display_name": "Processing Concurrency", "dynamic": false, "info": "When multiple files are being processed, the number of files to process concurrently.", @@ -2529,7 +2529,7 @@ }, "file_path": { "_input_type": "HandleInput", - "advanced": false, + "advanced": true, "display_name": "Server File Path", "dynamic": false, "info": "Data object with a 'file_path' property pointing to server file or a Message object with a path to the file. Supercedes 'Path' but supports same file types.",