diff --git a/pyproject.toml b/pyproject.toml index 7a95b96db..b585983e6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -113,7 +113,6 @@ dependencies = [ "langchain-elasticsearch>=0.2.0", "opensearch-py>=2.7.1", "langchain-ollama>=0.2.0", - "pymupdf~=1.24.13", "sqlalchemy[aiosqlite,postgresql_psycopg2binary,postgresql_psycopgbinary]>=2.0.36", "atlassian-python-api>=3.41.16", ] diff --git a/src/backend/base/langflow/components/data/file.py b/src/backend/base/langflow/components/data/file.py index 67d47c8f7..8a5088aae 100644 --- a/src/backend/base/langflow/components/data/file.py +++ b/src/backend/base/langflow/components/data/file.py @@ -1,13 +1,10 @@ -from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path from tempfile import NamedTemporaryFile from zipfile import ZipFile, is_zipfile -import fitz - -from langflow.base.data.utils import TEXT_FILE_TYPES, parse_text_file_to_data +from langflow.base.data.utils import TEXT_FILE_TYPES, parallel_load_data, parse_text_file_to_data from langflow.custom import Component -from langflow.io import BoolInput, FileInput, Output +from langflow.io import BoolInput, FileInput, IntInput, Output from langflow.schema import Data @@ -49,6 +46,13 @@ class FileComponent(Component): advanced=True, info="If true, parallel processing will be enabled for zip files.", ), + IntInput( + name="concurrency_multithreading", + display_name="Multithreading Concurrency", + advanced=True, + info="The maximum number of workers to use, if concurrency is enabled", + value=4, + ), ] outputs = [Output(display_name="Data", name="data", method="load_file")] @@ -74,6 +78,7 @@ class FileComponent(Component): # Check if the file is a zip archive if is_zipfile(resolved_path): self.log(f"Processing zip file: {resolved_path.name}.") + return self._process_zip_file( resolved_path, silent_errors=self.silent_errors, @@ -81,9 +86,11 @@ class FileComponent(Component): ) self.log(f"Processing single file: {resolved_path.name}.") + return self._process_single_file(resolved_path, silent_errors=self.silent_errors) except FileNotFoundError: self.log(f"File not found: {resolved_path.name}.") + raise def _process_zip_file(self, zip_path: Path, *, silent_errors: bool = False, parallel: bool = False) -> Data: @@ -126,7 +133,7 @@ class FileComponent(Component): raise ValueError(msg) # Define a function to process each file - def process_file(file_name): + def process_file(file_name, silent_errors=silent_errors): with NamedTemporaryFile(delete=False) as temp_file: temp_path = Path(temp_file.name).with_name(file_name) with zip_file.open(file_name) as file_content: @@ -138,19 +145,24 @@ class FileComponent(Component): # Process files in parallel if specified if parallel: - self.log("Initializing parallel Thread Pool Executor.") - with ThreadPoolExecutor() as executor: - futures = {executor.submit(process_file, file): file for file in valid_files} - for future in as_completed(futures): - try: - data.append(future.result()) - except Exception as e: - self.log(f"Error processing file {futures[future]}: {e}") - if not silent_errors: - raise + self.log( + f"Initializing parallel Thread Pool Executor with max workers: " + f"{self.concurrency_multithreading}." + ) + + # Process files in parallel + initial_data = parallel_load_data( + valid_files, + silent_errors=silent_errors, + load_function=process_file, + max_concurrency=self.concurrency_multithreading, + ) + + # Filter out empty data + data = list(filter(None, initial_data)) else: # Sequential processing - data.extend([process_file(file_name) for file_name in valid_files]) + data = [process_file(file_name) for file_name in valid_files] self.log(f"Successfully processed zip file: {zip_path.name}.") @@ -169,20 +181,8 @@ class FileComponent(Component): Raises: ValueError: For unsupported file formats. """ - - # Define a function to extract text from a PDF file - def pdf_to_text(filepath): - text = "" - - # Open the PDF file - with fitz.open(filepath) as pdf: - for page in pdf: - text += page.get_text() + "\n" - - return text - # Check if the file type is supported - if not any(file_path.suffix == ext for ext in ["." + f for f in [*TEXT_FILE_TYPES, "pdf"]]): + if not any(file_path.suffix == ext for ext in ["." + f for f in TEXT_FILE_TYPES]): self.log(f"Unsupported file type: {file_path.suffix}") # Return empty data if silent_errors is True @@ -193,13 +193,10 @@ class FileComponent(Component): raise ValueError(msg) try: - # Parse the file based on the file type - if file_path.suffix == ".pdf": - data = Data(data={"file_path": file_path, "text": pdf_to_text(file_path)}) - else: - data = parse_text_file_to_data(str(file_path), silent_errors=silent_errors) # type: ignore[assignment] - if not data: - data = Data() + # Parse the text file as appropriate + data = parse_text_file_to_data(str(file_path), silent_errors=silent_errors) # type: ignore[assignment] + if not data: + data = Data() self.log(f"Successfully processed file: {file_path.name}.") except Exception as e: diff --git a/src/backend/base/langflow/initial_setup/starter_projects/Document QA.json b/src/backend/base/langflow/initial_setup/starter_projects/Document QA.json index 5191bfb86..2ee343195 100644 --- a/src/backend/base/langflow/initial_setup/starter_projects/Document QA.json +++ b/src/backend/base/langflow/initial_setup/starter_projects/Document QA.json @@ -1333,7 +1333,23 @@ "show": true, "title_case": false, "type": "code", - "value": "from concurrent.futures import ThreadPoolExecutor, as_completed\nfrom pathlib import Path\nfrom tempfile import NamedTemporaryFile\nfrom zipfile import ZipFile, is_zipfile\n\nimport fitz\n\nfrom langflow.base.data.utils import TEXT_FILE_TYPES, parse_text_file_to_data\nfrom langflow.custom import Component\nfrom langflow.io import BoolInput, FileInput, Output\nfrom langflow.schema import Data\n\n\nclass FileComponent(Component):\n \"\"\"Handles loading of individual or zipped text files.\n\n Processes multiple valid files within a zip archive if provided.\n\n Attributes:\n display_name: Display name of the component.\n description: Brief component description.\n icon: Icon to represent the component.\n name: Identifier for the component.\n inputs: Inputs required by the component.\n outputs: Output of the component after processing files.\n \"\"\"\n\n display_name = \"File\"\n description = \"Load a file to be used in your project.\"\n icon = \"file-text\"\n name = \"File\"\n\n inputs = [\n FileInput(\n name=\"path\",\n display_name=\"Path\",\n file_types=[*TEXT_FILE_TYPES, \"zip\"],\n info=f\"Supported file types: {', '.join([*TEXT_FILE_TYPES, 'zip'])}\",\n ),\n BoolInput(\n name=\"silent_errors\",\n display_name=\"Silent Errors\",\n advanced=True,\n info=\"If true, errors will not raise an exception.\",\n ),\n BoolInput(\n name=\"use_multithreading\",\n display_name=\"Use Multithreading\",\n advanced=True,\n info=\"If true, parallel processing will be enabled for zip files.\",\n ),\n ]\n\n outputs = [Output(display_name=\"Data\", name=\"data\", method=\"load_file\")]\n\n def load_file(self) -> Data:\n \"\"\"Load and parse file(s) from a zip archive.\n\n Raises:\n ValueError: If no file is uploaded or file path is invalid.\n\n Returns:\n Data: Parsed data from file(s).\n \"\"\"\n # Check if the file path is provided\n if not self.path:\n self.log(\"File path is missing.\")\n msg = \"Please upload a file for processing.\"\n\n raise ValueError(msg)\n\n resolved_path = Path(self.resolve_path(self.path))\n try:\n # Check if the file is a zip archive\n if is_zipfile(resolved_path):\n self.log(f\"Processing zip file: {resolved_path.name}.\")\n return self._process_zip_file(\n resolved_path,\n silent_errors=self.silent_errors,\n parallel=self.use_multithreading,\n )\n\n self.log(f\"Processing single file: {resolved_path.name}.\")\n return self._process_single_file(resolved_path, silent_errors=self.silent_errors)\n except FileNotFoundError:\n self.log(f\"File not found: {resolved_path.name}.\")\n raise\n\n def _process_zip_file(self, zip_path: Path, *, silent_errors: bool = False, parallel: bool = False) -> Data:\n \"\"\"Process text files within a zip archive.\n\n Args:\n zip_path: Path to the zip file.\n silent_errors: Suppresses errors if True.\n parallel: Enables parallel processing if True.\n\n Returns:\n list[Data]: Combined data from all valid files.\n\n Raises:\n ValueError: If no valid files found in the archive.\n \"\"\"\n data: list[Data] = []\n with ZipFile(zip_path, \"r\") as zip_file:\n # Filter file names based on extensions in TEXT_FILE_TYPES and ignore hidden files\n valid_files = [\n name\n for name in zip_file.namelist()\n if (\n any(name.endswith(ext) for ext in TEXT_FILE_TYPES)\n and not name.startswith(\"__MACOSX\")\n and not name.startswith(\".\")\n )\n ]\n\n # Raise an error if no valid files found\n if not valid_files:\n self.log(\"No valid files in the zip archive.\")\n\n # Return empty data if silent_errors is True\n if silent_errors:\n return data # type: ignore[return-value]\n\n # Raise an error if no valid files found\n msg = \"No valid files in the zip archive.\"\n raise ValueError(msg)\n\n # Define a function to process each file\n def process_file(file_name):\n with NamedTemporaryFile(delete=False) as temp_file:\n temp_path = Path(temp_file.name).with_name(file_name)\n with zip_file.open(file_name) as file_content:\n temp_path.write_bytes(file_content.read())\n try:\n return self._process_single_file(temp_path, silent_errors=silent_errors)\n finally:\n temp_path.unlink()\n\n # Process files in parallel if specified\n if parallel:\n self.log(\"Initializing parallel Thread Pool Executor.\")\n with ThreadPoolExecutor() as executor:\n futures = {executor.submit(process_file, file): file for file in valid_files}\n for future in as_completed(futures):\n try:\n data.append(future.result())\n except Exception as e:\n self.log(f\"Error processing file {futures[future]}: {e}\")\n if not silent_errors:\n raise\n else:\n # Sequential processing\n data.extend([process_file(file_name) for file_name in valid_files])\n\n self.log(f\"Successfully processed zip file: {zip_path.name}.\")\n\n return data # type: ignore[return-value]\n\n def _process_single_file(self, file_path: Path, *, silent_errors: bool = False) -> Data:\n \"\"\"Process a single file.\n\n Args:\n file_path: Path to the file.\n silent_errors: Suppresses errors if True.\n\n Returns:\n Data: Parsed data from the file.\n\n Raises:\n ValueError: For unsupported file formats.\n \"\"\"\n\n # Define a function to extract text from a PDF file\n def pdf_to_text(filepath):\n text = \"\"\n\n # Open the PDF file\n with fitz.open(filepath) as pdf:\n for page in pdf:\n text += page.get_text() + \"\\n\"\n\n return text\n\n # Check if the file type is supported\n if not any(file_path.suffix == ext for ext in [\".\" + f for f in [*TEXT_FILE_TYPES, \"pdf\"]]):\n self.log(f\"Unsupported file type: {file_path.suffix}\")\n\n # Return empty data if silent_errors is True\n if silent_errors:\n return Data()\n\n msg = f\"Unsupported file type: {file_path.suffix}\"\n raise ValueError(msg)\n\n try:\n # Parse the file based on the file type\n if file_path.suffix == \".pdf\":\n data = Data(data={\"file_path\": file_path, \"text\": pdf_to_text(file_path)})\n else:\n data = parse_text_file_to_data(str(file_path), silent_errors=silent_errors) # type: ignore[assignment]\n if not data:\n data = Data()\n\n self.log(f\"Successfully processed file: {file_path.name}.\")\n except Exception as e:\n self.log(f\"Error processing file {file_path.name}: {e}\")\n\n # Return empty data if silent_errors is True\n if not silent_errors:\n raise\n\n data = Data()\n\n return data\n" + "value": "from pathlib import Path\nfrom tempfile import NamedTemporaryFile\nfrom zipfile import ZipFile, is_zipfile\n\nfrom langflow.base.data.utils import TEXT_FILE_TYPES, parallel_load_data, parse_text_file_to_data\nfrom langflow.custom import Component\nfrom langflow.io import BoolInput, FileInput, IntInput, Output\nfrom langflow.schema import Data\n\n\nclass FileComponent(Component):\n \"\"\"Handles loading of individual or zipped text files.\n\n Processes multiple valid files within a zip archive if provided.\n\n Attributes:\n display_name: Display name of the component.\n description: Brief component description.\n icon: Icon to represent the component.\n name: Identifier for the component.\n inputs: Inputs required by the component.\n outputs: Output of the component after processing files.\n \"\"\"\n\n display_name = \"File\"\n description = \"Load a file to be used in your project.\"\n icon = \"file-text\"\n name = \"File\"\n\n inputs = [\n FileInput(\n name=\"path\",\n display_name=\"Path\",\n file_types=[*TEXT_FILE_TYPES, \"zip\"],\n info=f\"Supported file types: {', '.join([*TEXT_FILE_TYPES, 'zip'])}\",\n ),\n BoolInput(\n name=\"silent_errors\",\n display_name=\"Silent Errors\",\n advanced=True,\n info=\"If true, errors will not raise an exception.\",\n ),\n BoolInput(\n name=\"use_multithreading\",\n display_name=\"Use Multithreading\",\n advanced=True,\n info=\"If true, parallel processing will be enabled for zip files.\",\n ),\n IntInput(\n name=\"concurrency_multithreading\",\n display_name=\"Multithreading Concurrency\",\n advanced=True,\n info=\"The maximum number of workers to use, if concurrency is enabled\",\n value=4,\n ),\n ]\n\n outputs = [Output(display_name=\"Data\", name=\"data\", method=\"load_file\")]\n\n def load_file(self) -> Data:\n \"\"\"Load and parse file(s) from a zip archive.\n\n Raises:\n ValueError: If no file is uploaded or file path is invalid.\n\n Returns:\n Data: Parsed data from file(s).\n \"\"\"\n # Check if the file path is provided\n if not self.path:\n self.log(\"File path is missing.\")\n msg = \"Please upload a file for processing.\"\n\n raise ValueError(msg)\n\n resolved_path = Path(self.resolve_path(self.path))\n try:\n # Check if the file is a zip archive\n if is_zipfile(resolved_path):\n self.log(f\"Processing zip file: {resolved_path.name}.\")\n return self._process_zip_file(\n resolved_path,\n silent_errors=self.silent_errors,\n parallel=self.use_multithreading,\n )\n\n self.log(f\"Processing single file: {resolved_path.name}.\")\n return self._process_single_file(resolved_path, silent_errors=self.silent_errors)\n except FileNotFoundError:\n self.log(f\"File not found: {resolved_path.name}.\")\n raise\n\n def _process_zip_file(self, zip_path: Path, *, silent_errors: bool = False, parallel: bool = False) -> Data:\n \"\"\"Process text files within a zip archive.\n\n Args:\n zip_path: Path to the zip file.\n silent_errors: Suppresses errors if True.\n parallel: Enables parallel processing if True.\n\n Returns:\n list[Data]: Combined data from all valid files.\n\n Raises:\n ValueError: If no valid files found in the archive.\n \"\"\"\n data: list[Data] = []\n with ZipFile(zip_path, \"r\") as zip_file:\n # Filter file names based on extensions in TEXT_FILE_TYPES and ignore hidden files\n valid_files = [\n name\n for name in zip_file.namelist()\n if (\n any(name.endswith(ext) for ext in TEXT_FILE_TYPES)\n and not name.startswith(\"__MACOSX\")\n and not name.startswith(\".\")\n )\n ]\n\n # Raise an error if no valid files found\n if not valid_files:\n self.log(\"No valid files in the zip archive.\")\n\n # Return empty data if silent_errors is True\n if silent_errors:\n return data # type: ignore[return-value]\n\n # Raise an error if no valid files found\n msg = \"No valid files in the zip archive.\"\n raise ValueError(msg)\n\n # Define a function to process each file\n def process_file(file_name, silent_errors=silent_errors):\n with NamedTemporaryFile(delete=False) as temp_file:\n temp_path = Path(temp_file.name).with_name(file_name)\n with zip_file.open(file_name) as file_content:\n temp_path.write_bytes(file_content.read())\n try:\n return self._process_single_file(temp_path, silent_errors=silent_errors)\n finally:\n temp_path.unlink()\n\n # Process files in parallel if specified\n if parallel:\n self.log(\n f\"Initializing parallel Thread Pool Executor with max workers: \"\n f\"{self.concurrency_multithreading}.\"\n )\n\n # Process files in parallel\n initial_data = parallel_load_data(\n valid_files,\n silent_errors=silent_errors,\n load_function=process_file,\n max_concurrency=self.concurrency_multithreading,\n )\n\n # Filter out empty data\n data = list(filter(None, initial_data))\n else:\n # Sequential processing\n data = [process_file(file_name) for file_name in valid_files]\n\n self.log(f\"Successfully processed zip file: {zip_path.name}.\")\n\n return data # type: ignore[return-value]\n\n def _process_single_file(self, file_path: Path, *, silent_errors: bool = False) -> Data:\n \"\"\"Process a single file.\n\n Args:\n file_path: Path to the file.\n silent_errors: Suppresses errors if True.\n\n Returns:\n Data: Parsed data from the file.\n\n Raises:\n ValueError: For unsupported file formats.\n \"\"\"\n # Check if the file type is supported\n if not any(file_path.suffix == ext for ext in [\".\" + f for f in TEXT_FILE_TYPES]):\n self.log(f\"Unsupported file type: {file_path.suffix}\")\n\n # Return empty data if silent_errors is True\n if silent_errors:\n return Data()\n\n msg = f\"Unsupported file type: {file_path.suffix}\"\n raise ValueError(msg)\n\n try:\n # Parse the text file as appropriate\n data = parse_text_file_to_data(str(file_path), silent_errors=silent_errors) # type: ignore[assignment]\n if not data:\n data = Data()\n\n self.log(f\"Successfully processed file: {file_path.name}.\")\n except Exception as e:\n self.log(f\"Error processing file {file_path.name}: {e}\")\n\n # Return empty data if silent_errors is True\n if not silent_errors:\n raise\n\n data = Data()\n\n return data\n" + }, + "concurrency_multithreading": { + "_input_type": "IntInput", + "advanced": true, + "display_name": "Multithreading Concurrency", + "dynamic": false, + "info": "The maximum number of workers to use, if concurrency is enabled", + "list": false, + "name": "concurrency_multithreading", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "trace_as_metadata": true, + "type": "int", + "value": 4 }, "path": { "advanced": false, diff --git a/src/backend/base/langflow/initial_setup/starter_projects/Vector Store RAG.json b/src/backend/base/langflow/initial_setup/starter_projects/Vector Store RAG.json index dc7f5428d..349091c3e 100644 --- a/src/backend/base/langflow/initial_setup/starter_projects/Vector Store RAG.json +++ b/src/backend/base/langflow/initial_setup/starter_projects/Vector Store RAG.json @@ -1788,7 +1788,23 @@ "show": true, "title_case": false, "type": "code", - "value": "from concurrent.futures import ThreadPoolExecutor, as_completed\nfrom pathlib import Path\nfrom tempfile import NamedTemporaryFile\nfrom zipfile import ZipFile, is_zipfile\n\nimport fitz\n\nfrom langflow.base.data.utils import TEXT_FILE_TYPES, parse_text_file_to_data\nfrom langflow.custom import Component\nfrom langflow.io import BoolInput, FileInput, Output\nfrom langflow.schema import Data\n\n\nclass FileComponent(Component):\n \"\"\"Handles loading of individual or zipped text files.\n\n Processes multiple valid files within a zip archive if provided.\n\n Attributes:\n display_name: Display name of the component.\n description: Brief component description.\n icon: Icon to represent the component.\n name: Identifier for the component.\n inputs: Inputs required by the component.\n outputs: Output of the component after processing files.\n \"\"\"\n\n display_name = \"File\"\n description = \"Load a file to be used in your project.\"\n icon = \"file-text\"\n name = \"File\"\n\n inputs = [\n FileInput(\n name=\"path\",\n display_name=\"Path\",\n file_types=[*TEXT_FILE_TYPES, \"zip\"],\n info=f\"Supported file types: {', '.join([*TEXT_FILE_TYPES, 'zip'])}\",\n ),\n BoolInput(\n name=\"silent_errors\",\n display_name=\"Silent Errors\",\n advanced=True,\n info=\"If true, errors will not raise an exception.\",\n ),\n BoolInput(\n name=\"use_multithreading\",\n display_name=\"Use Multithreading\",\n advanced=True,\n info=\"If true, parallel processing will be enabled for zip files.\",\n ),\n ]\n\n outputs = [Output(display_name=\"Data\", name=\"data\", method=\"load_file\")]\n\n def load_file(self) -> Data:\n \"\"\"Load and parse file(s) from a zip archive.\n\n Raises:\n ValueError: If no file is uploaded or file path is invalid.\n\n Returns:\n Data: Parsed data from file(s).\n \"\"\"\n # Check if the file path is provided\n if not self.path:\n self.log(\"File path is missing.\")\n msg = \"Please upload a file for processing.\"\n\n raise ValueError(msg)\n\n resolved_path = Path(self.resolve_path(self.path))\n try:\n # Check if the file is a zip archive\n if is_zipfile(resolved_path):\n self.log(f\"Processing zip file: {resolved_path.name}.\")\n return self._process_zip_file(\n resolved_path,\n silent_errors=self.silent_errors,\n parallel=self.use_multithreading,\n )\n\n self.log(f\"Processing single file: {resolved_path.name}.\")\n return self._process_single_file(resolved_path, silent_errors=self.silent_errors)\n except FileNotFoundError:\n self.log(f\"File not found: {resolved_path.name}.\")\n raise\n\n def _process_zip_file(self, zip_path: Path, *, silent_errors: bool = False, parallel: bool = False) -> Data:\n \"\"\"Process text files within a zip archive.\n\n Args:\n zip_path: Path to the zip file.\n silent_errors: Suppresses errors if True.\n parallel: Enables parallel processing if True.\n\n Returns:\n list[Data]: Combined data from all valid files.\n\n Raises:\n ValueError: If no valid files found in the archive.\n \"\"\"\n data: list[Data] = []\n with ZipFile(zip_path, \"r\") as zip_file:\n # Filter file names based on extensions in TEXT_FILE_TYPES and ignore hidden files\n valid_files = [\n name\n for name in zip_file.namelist()\n if (\n any(name.endswith(ext) for ext in TEXT_FILE_TYPES)\n and not name.startswith(\"__MACOSX\")\n and not name.startswith(\".\")\n )\n ]\n\n # Raise an error if no valid files found\n if not valid_files:\n self.log(\"No valid files in the zip archive.\")\n\n # Return empty data if silent_errors is True\n if silent_errors:\n return data # type: ignore[return-value]\n\n # Raise an error if no valid files found\n msg = \"No valid files in the zip archive.\"\n raise ValueError(msg)\n\n # Define a function to process each file\n def process_file(file_name):\n with NamedTemporaryFile(delete=False) as temp_file:\n temp_path = Path(temp_file.name).with_name(file_name)\n with zip_file.open(file_name) as file_content:\n temp_path.write_bytes(file_content.read())\n try:\n return self._process_single_file(temp_path, silent_errors=silent_errors)\n finally:\n temp_path.unlink()\n\n # Process files in parallel if specified\n if parallel:\n self.log(\"Initializing parallel Thread Pool Executor.\")\n with ThreadPoolExecutor() as executor:\n futures = {executor.submit(process_file, file): file for file in valid_files}\n for future in as_completed(futures):\n try:\n data.append(future.result())\n except Exception as e:\n self.log(f\"Error processing file {futures[future]}: {e}\")\n if not silent_errors:\n raise\n else:\n # Sequential processing\n data.extend([process_file(file_name) for file_name in valid_files])\n\n self.log(f\"Successfully processed zip file: {zip_path.name}.\")\n\n return data # type: ignore[return-value]\n\n def _process_single_file(self, file_path: Path, *, silent_errors: bool = False) -> Data:\n \"\"\"Process a single file.\n\n Args:\n file_path: Path to the file.\n silent_errors: Suppresses errors if True.\n\n Returns:\n Data: Parsed data from the file.\n\n Raises:\n ValueError: For unsupported file formats.\n \"\"\"\n\n # Define a function to extract text from a PDF file\n def pdf_to_text(filepath):\n text = \"\"\n\n # Open the PDF file\n with fitz.open(filepath) as pdf:\n for page in pdf:\n text += page.get_text() + \"\\n\"\n\n return text\n\n # Check if the file type is supported\n if not any(file_path.suffix == ext for ext in [\".\" + f for f in [*TEXT_FILE_TYPES, \"pdf\"]]):\n self.log(f\"Unsupported file type: {file_path.suffix}\")\n\n # Return empty data if silent_errors is True\n if silent_errors:\n return Data()\n\n msg = f\"Unsupported file type: {file_path.suffix}\"\n raise ValueError(msg)\n\n try:\n # Parse the file based on the file type\n if file_path.suffix == \".pdf\":\n data = Data(data={\"file_path\": file_path, \"text\": pdf_to_text(file_path)})\n else:\n data = parse_text_file_to_data(str(file_path), silent_errors=silent_errors) # type: ignore[assignment]\n if not data:\n data = Data()\n\n self.log(f\"Successfully processed file: {file_path.name}.\")\n except Exception as e:\n self.log(f\"Error processing file {file_path.name}: {e}\")\n\n # Return empty data if silent_errors is True\n if not silent_errors:\n raise\n\n data = Data()\n\n return data\n" + "value": "from pathlib import Path\nfrom tempfile import NamedTemporaryFile\nfrom zipfile import ZipFile, is_zipfile\n\nfrom langflow.base.data.utils import TEXT_FILE_TYPES, parallel_load_data, parse_text_file_to_data\nfrom langflow.custom import Component\nfrom langflow.io import BoolInput, FileInput, IntInput, Output\nfrom langflow.schema import Data\n\n\nclass FileComponent(Component):\n \"\"\"Handles loading of individual or zipped text files.\n\n Processes multiple valid files within a zip archive if provided.\n\n Attributes:\n display_name: Display name of the component.\n description: Brief component description.\n icon: Icon to represent the component.\n name: Identifier for the component.\n inputs: Inputs required by the component.\n outputs: Output of the component after processing files.\n \"\"\"\n\n display_name = \"File\"\n description = \"Load a file to be used in your project.\"\n icon = \"file-text\"\n name = \"File\"\n\n inputs = [\n FileInput(\n name=\"path\",\n display_name=\"Path\",\n file_types=[*TEXT_FILE_TYPES, \"zip\"],\n info=f\"Supported file types: {', '.join([*TEXT_FILE_TYPES, 'zip'])}\",\n ),\n BoolInput(\n name=\"silent_errors\",\n display_name=\"Silent Errors\",\n advanced=True,\n info=\"If true, errors will not raise an exception.\",\n ),\n BoolInput(\n name=\"use_multithreading\",\n display_name=\"Use Multithreading\",\n advanced=True,\n info=\"If true, parallel processing will be enabled for zip files.\",\n ),\n IntInput(\n name=\"concurrency_multithreading\",\n display_name=\"Multithreading Concurrency\",\n advanced=True,\n info=\"The maximum number of workers to use, if concurrency is enabled\",\n value=4,\n ),\n ]\n\n outputs = [Output(display_name=\"Data\", name=\"data\", method=\"load_file\")]\n\n def load_file(self) -> Data:\n \"\"\"Load and parse file(s) from a zip archive.\n\n Raises:\n ValueError: If no file is uploaded or file path is invalid.\n\n Returns:\n Data: Parsed data from file(s).\n \"\"\"\n # Check if the file path is provided\n if not self.path:\n self.log(\"File path is missing.\")\n msg = \"Please upload a file for processing.\"\n\n raise ValueError(msg)\n\n resolved_path = Path(self.resolve_path(self.path))\n try:\n # Check if the file is a zip archive\n if is_zipfile(resolved_path):\n self.log(f\"Processing zip file: {resolved_path.name}.\")\n return self._process_zip_file(\n resolved_path,\n silent_errors=self.silent_errors,\n parallel=self.use_multithreading,\n )\n\n self.log(f\"Processing single file: {resolved_path.name}.\")\n return self._process_single_file(resolved_path, silent_errors=self.silent_errors)\n except FileNotFoundError:\n self.log(f\"File not found: {resolved_path.name}.\")\n raise\n\n def _process_zip_file(self, zip_path: Path, *, silent_errors: bool = False, parallel: bool = False) -> Data:\n \"\"\"Process text files within a zip archive.\n\n Args:\n zip_path: Path to the zip file.\n silent_errors: Suppresses errors if True.\n parallel: Enables parallel processing if True.\n\n Returns:\n list[Data]: Combined data from all valid files.\n\n Raises:\n ValueError: If no valid files found in the archive.\n \"\"\"\n data: list[Data] = []\n with ZipFile(zip_path, \"r\") as zip_file:\n # Filter file names based on extensions in TEXT_FILE_TYPES and ignore hidden files\n valid_files = [\n name\n for name in zip_file.namelist()\n if (\n any(name.endswith(ext) for ext in TEXT_FILE_TYPES)\n and not name.startswith(\"__MACOSX\")\n and not name.startswith(\".\")\n )\n ]\n\n # Raise an error if no valid files found\n if not valid_files:\n self.log(\"No valid files in the zip archive.\")\n\n # Return empty data if silent_errors is True\n if silent_errors:\n return data # type: ignore[return-value]\n\n # Raise an error if no valid files found\n msg = \"No valid files in the zip archive.\"\n raise ValueError(msg)\n\n # Define a function to process each file\n def process_file(file_name, silent_errors=silent_errors):\n with NamedTemporaryFile(delete=False) as temp_file:\n temp_path = Path(temp_file.name).with_name(file_name)\n with zip_file.open(file_name) as file_content:\n temp_path.write_bytes(file_content.read())\n try:\n return self._process_single_file(temp_path, silent_errors=silent_errors)\n finally:\n temp_path.unlink()\n\n # Process files in parallel if specified\n if parallel:\n self.log(\n f\"Initializing parallel Thread Pool Executor with max workers: \"\n f\"{self.concurrency_multithreading}.\"\n )\n\n # Process files in parallel\n initial_data = parallel_load_data(\n valid_files,\n silent_errors=silent_errors,\n load_function=process_file,\n max_concurrency=self.concurrency_multithreading,\n )\n\n # Filter out empty data\n data = list(filter(None, initial_data))\n else:\n # Sequential processing\n data = [process_file(file_name) for file_name in valid_files]\n\n self.log(f\"Successfully processed zip file: {zip_path.name}.\")\n\n return data # type: ignore[return-value]\n\n def _process_single_file(self, file_path: Path, *, silent_errors: bool = False) -> Data:\n \"\"\"Process a single file.\n\n Args:\n file_path: Path to the file.\n silent_errors: Suppresses errors if True.\n\n Returns:\n Data: Parsed data from the file.\n\n Raises:\n ValueError: For unsupported file formats.\n \"\"\"\n # Check if the file type is supported\n if not any(file_path.suffix == ext for ext in [\".\" + f for f in TEXT_FILE_TYPES]):\n self.log(f\"Unsupported file type: {file_path.suffix}\")\n\n # Return empty data if silent_errors is True\n if silent_errors:\n return Data()\n\n msg = f\"Unsupported file type: {file_path.suffix}\"\n raise ValueError(msg)\n\n try:\n # Parse the text file as appropriate\n data = parse_text_file_to_data(str(file_path), silent_errors=silent_errors) # type: ignore[assignment]\n if not data:\n data = Data()\n\n self.log(f\"Successfully processed file: {file_path.name}.\")\n except Exception as e:\n self.log(f\"Error processing file {file_path.name}: {e}\")\n\n # Return empty data if silent_errors is True\n if not silent_errors:\n raise\n\n data = Data()\n\n return data\n" + }, + "concurrency_multithreading": { + "_input_type": "IntInput", + "advanced": true, + "display_name": "Multithreading Concurrency", + "dynamic": false, + "info": "The maximum number of workers to use, if concurrency is enabled", + "list": false, + "name": "concurrency_multithreading", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "trace_as_metadata": true, + "type": "int", + "value": 4 }, "path": { "advanced": false, diff --git a/src/backend/base/pyproject.toml b/src/backend/base/pyproject.toml index f432e123d..b7e95c5c0 100644 --- a/src/backend/base/pyproject.toml +++ b/src/backend/base/pyproject.toml @@ -135,7 +135,6 @@ dependencies = [ "duckdb>=1.0.0", "python-docx>=1.1.0", "jq>=1.7.0; sys_platform != 'win32'", - "pypdf>=4.2.0", "nest-asyncio>=1.6.0", "emoji>=2.12.0", "cryptography>=42.0.5,<44.0.0", @@ -162,6 +161,7 @@ dependencies = [ "assemblyai>=0.33.0", "fastapi-pagination>=0.12.29", "defusedxml>=0.7.1", + "pypdf~=5.1.0", ] [project.urls] diff --git a/uv.lock b/uv.lock index c35ac1d3a..b9f456271 100644 --- a/uv.lock +++ b/uv.lock @@ -3697,7 +3697,6 @@ dependencies = [ { name = "pyautogen" }, { name = "pydantic-settings" }, { name = "pymongo" }, - { name = "pymupdf" }, { name = "pytube" }, { name = "pywin32", marker = "sys_platform == 'win32'" }, { name = "qdrant-client" }, @@ -3847,7 +3846,6 @@ requires-dist = [ { name = "pyautogen", specifier = ">=0.2.0" }, { name = "pydantic-settings", specifier = "==2.4.0" }, { name = "pymongo", specifier = ">=4.6.0" }, - { name = "pymupdf", specifier = "~=1.24.13" }, { name = "pytube", specifier = ">=15.0.0" }, { name = "pywin32", marker = "sys_platform == 'win32'", specifier = ">=306" }, { name = "qdrant-client", specifier = "~=1.9.2" }, @@ -4088,7 +4086,7 @@ requires-dist = [ { name = "prometheus-client", specifier = ">=0.20.0" }, { name = "pydantic", specifier = ">=2.7.0" }, { name = "pydantic-settings", specifier = ">=2.2.0" }, - { name = "pypdf", specifier = ">=4.2.0" }, + { name = "pypdf", specifier = "~=5.1.0" }, { name = "pyperclip", specifier = ">=1.8.2" }, { name = "pytest", marker = "extra == 'dev'", specifier = ">=8.2.0" }, { name = "pytest-asyncio", marker = "extra == 'dev'", specifier = ">=0.23.0" }, @@ -6324,21 +6322,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/0d/2a/7c24a6144eaa06d18ed52822ea2b0f119fd9267cd1abbb75dae4d89a3803/pymongo-4.10.1-cp313-cp313-win_amd64.whl", hash = "sha256:45ee87a4e12337353242bc758accc7fb47a2f2d9ecc0382a61e64c8f01e86708", size = 976873 }, ] -[[package]] -name = "pymupdf" -version = "1.24.13" -source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/22/39/84efca63af4e5a014c1d4c21686469f99c3d1c160a3a0b902ac676f6ffd9/PyMuPDF-1.24.13.tar.gz", hash = "sha256:6ec3ab3c6d5cba60bfcf58daaa2d1a5b700b0366ce52be666445007351461fa4", size = 53655596 } -wheels = [ - { url = "https://files.pythonhosted.org/packages/ce/79/8d31a98ebeb329000406d6c36fb2ad42264d5a4a6915ebabbde332642204/PyMuPDF-1.24.13-cp39-abi3-macosx_10_9_x86_64.whl", hash = "sha256:c11bb9198af69d490b4b346421db827d875a28fbc760d239e691d4b3ed12b5ad", size = 19147116 }, - { url = "https://files.pythonhosted.org/packages/ea/fe/ff2bb633c0934ba43c36184b8ed025092e946994dc6b4c764a0079f0ab3c/PyMuPDF-1.24.13-cp39-abi3-macosx_11_0_arm64.whl", hash = "sha256:240d5c43daa9278db50d609162b48f673ab256d7e5c73eea67af517c1fc2d47c", size = 18406545 }, - { url = "https://files.pythonhosted.org/packages/5b/5f/916bb534fd498d069d68c7a52289ba78d27823c2d6f8c693889e288e31e4/PyMuPDF-1.24.13-cp39-abi3-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:e4c8808e62afbbde0f7b9c4151c4b1a5735911c2d39c34332860df600dba76f8", size = 19284324 }, - { url = "https://files.pythonhosted.org/packages/85/48/e4630eb58f4daed22a078e19db8a709d407d2e19316089675f6ed185f01a/PyMuPDF-1.24.13-cp39-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:c830610e4fde237fcf0532f1f8c1381453f48c164a5eadd0c6e5fd0bea1ca8e3", size = 19812221 }, - { url = "https://files.pythonhosted.org/packages/6d/22/5aa9e01747518878a54866b4d925abdc663c64c75f5fbc6a9706957a7a30/PyMuPDF-1.24.13-cp39-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:4520558580ac6b5a7164fda29fbc14e39d3114fd803420721500edbf47d04872", size = 20942140 }, - { url = "https://files.pythonhosted.org/packages/07/a4/2e545217436e7717642809c7392bd7d7156ba102e7a47acb22659bfd41de/PyMuPDF-1.24.13-cp39-abi3-win32.whl", hash = "sha256:ab22828d4fc205791ef1332a64893cbfc38cd9c331c5f46ae4537372ffee6fc1", size = 14943060 }, - { url = "https://files.pythonhosted.org/packages/38/80/f8d8ae555b237574005faef8a181a5c6a1d983e16a982b65ccc56a42faa2/PyMuPDF-1.24.13-cp39-abi3-win_amd64.whl", hash = "sha256:ec17914e4a560f4070212a2e84db5cc8b561d85d1ead193605a22f9561b03148", size = 16242035 }, -] - [[package]] name = "pynacl" version = "1.5.0"