From 92d4fd0d52954242fba902e5dd709f57334fe2cc Mon Sep 17 00:00:00 2001 From: Pedro Pacheco <3083335+pedrocassalpacheco@users.noreply.github.com> Date: Wed, 5 Mar 2025 06:34:55 -0700 Subject: [PATCH] feat: implement S3 bucket uploader component and unit test (#6146) * S3 Bucket Uploader, unit test and module init * [autofix.ci] apply automated fixes * Updated UT as per feedback * Added version control fixture * [autofix.ci] apply automated fixes * Style changes requested by reviewer * Really, 1 extra character? * Ruf styling --------- Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- pyproject.toml | 2 + .../base/langflow/components/data/__init__.py | 2 + .../components/data/s3_bucket_uploader.py | 207 ++++++++++++++++++ .../data/test_s3_uploader_component.py | 136 ++++++++++++ uv.lock | 48 ++++ 5 files changed, 395 insertions(+) create mode 100644 src/backend/base/langflow/components/data/s3_bucket_uploader.py create mode 100644 src/backend/tests/unit/components/data/test_s3_uploader_component.py diff --git a/pyproject.toml b/pyproject.toml index cab4bf5d0..0c01510e5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -113,6 +113,8 @@ dependencies = [ "pydantic-ai>=0.0.19", "smolagents>=1.8.0", "apify-client>=1.8.1", + "pylint>=3.3.4", + "ruff>=0.9.7", ] [dependency-groups] diff --git a/src/backend/base/langflow/components/data/__init__.py b/src/backend/base/langflow/components/data/__init__.py index 770713cca..6db67934f 100644 --- a/src/backend/base/langflow/components/data/__init__.py +++ b/src/backend/base/langflow/components/data/__init__.py @@ -3,6 +3,7 @@ from .csv_to_data import CSVToDataComponent from .directory import DirectoryComponent from .file import FileComponent from .json_to_data import JSONToDataComponent +from .s3_bucket_uploader import S3BucketUploaderComponent from .sql_executor import SQLExecutorComponent from .url import URLComponent from .webhook import WebhookComponent @@ -13,6 +14,7 @@ __all__ = [ "DirectoryComponent", "FileComponent", "JSONToDataComponent", + "S3BucketUploaderComponent", "SQLExecutorComponent", "URLComponent", "WebhookComponent", diff --git a/src/backend/base/langflow/components/data/s3_bucket_uploader.py b/src/backend/base/langflow/components/data/s3_bucket_uploader.py new file mode 100644 index 000000000..c5299f7a9 --- /dev/null +++ b/src/backend/base/langflow/components/data/s3_bucket_uploader.py @@ -0,0 +1,207 @@ +from pathlib import Path +from typing import Any + +import boto3 + +from langflow.custom import Component +from langflow.io import ( + BoolInput, + DropdownInput, + HandleInput, + Output, + SecretStrInput, + StrInput, +) + + +class S3BucketUploaderComponent(Component): + """S3BucketUploaderComponent is a component responsible for uploading files to an S3 bucket. + + It provides two strategies for file upload: "By Data" and "By File Name". The component + requires AWS credentials and bucket details as inputs and processes files accordingly. + + Attributes: + display_name (str): The display name of the component. + description (str): A brief description of the components functionality. + icon (str): The icon representing the component. + name (str): The internal name of the component. + inputs (list): A list of input configurations required by the component. + outputs (list): A list of output configurations provided by the component. + + Methods: + process_files() -> None: + Processes files based on the selected strategy. Calls the appropriate method + based on the strategy attribute. + process_files_by_data() -> None: + Processes and uploads files to an S3 bucket based on the data inputs. Iterates + over the data inputs, logs the file path and text content, and uploads each file + to the specified S3 bucket if both file path and text content are available. + process_files_by_name() -> None: + Processes and uploads files to an S3 bucket based on their names. Iterates through + the list of data inputs, retrieves the file path from each data item, and uploads + the file to the specified S3 bucket if the file path is available. Logs the file + path being uploaded. + _s3_client() -> Any: + Creates and returns an S3 client using the provided AWS access key ID and secret + access key. + + Please note that this component requires the boto3 library to be installed. It is designed + to work with File and Director components as inputs + """ + + display_name = "S3 Bucket Uploader" + description = "Uploads files to S3 bucket." + icon = "Globe" + name = "s3bucketuploader" + + inputs = [ + SecretStrInput( + name="aws_access_key_id", + display_name="AWS Access Key ID", + required=True, + password=True, + info="AWS Access key ID.", + ), + SecretStrInput( + name="aws_secret_access_key", + display_name="AWS Secret Key", + required=True, + password=True, + info="AWS Secret Key.", + ), + StrInput( + name="bucket_name", + display_name="Bucket Name", + info="Enter the name of the bucket.", + advanced=False, + ), + DropdownInput( + name="strategy", + display_name="Strategy for file upload", + options=["Store Data", "Store Original File"], + value="By Data", + info=( + "Choose the strategy to upload the file. By Data means that the source file " + "is parsed and stored as LangFlow data. By File Name means that the source " + "file is uploaded as is." + ), + ), + HandleInput( + name="data_inputs", + display_name="Data Inputs", + info="The data to split.", + input_types=["Data"], + is_list=True, + required=True, + ), + StrInput( + name="s3_prefix", + display_name="S3 Prefix", + info="Prefix for all files.", + advanced=True, + ), + BoolInput( + name="strip_path", + display_name="Strip Path", + info="Removes path from file path.", + required=True, + advanced=True, + ), + ] + + outputs = [ + Output(display_name="Writes to AWS Bucket", name="data", method="process_files"), + ] + + def process_files(self) -> None: + """Process files based on the selected strategy. + + This method uses a strategy pattern to process files. The strategy is determined + by the `self.strategy` attribute, which can be either "By Data" or "By File Name". + Depending on the strategy, the corresponding method (`process_files_by_data` or + `process_files_by_name`) is called. If an invalid strategy is provided, an error + is logged. + + Returns: + None + """ + strategy_methods = { + "Store Data": self.process_files_by_data, + "Store Original File": self.process_files_by_name, + } + strategy_methods.get(self.strategy, lambda: self.log("Invalid strategy"))() + + def process_files_by_data(self) -> None: + """Processes and uploads files to an S3 bucket based on the data inputs. + + This method iterates over the data inputs, logs the file path and text content, + and uploads each file to the specified S3 bucket if both file path and text content + are available. + + Args: + None + + Returns: + None + """ + for data_item in self.data_inputs: + file_path = data_item.data.get("file_path") + text_content = data_item.data.get("text") + + if file_path and text_content: + self._s3_client().put_object( + Bucket=self.bucket_name, Key=self._normalize_path(file_path), Body=text_content + ) + + def process_files_by_name(self) -> None: + """Processes and uploads files to an S3 bucket based on their names. + + Iterates through the list of data inputs, retrieves the file path from each data item, + and uploads the file to the specified S3 bucket if the file path is available. + Logs the file path being uploaded. + + Returns: + None + """ + for data_item in self.data_inputs: + file_path = data_item.data.get("file_path") + self.log(f"Uploading file: {file_path}") + if file_path: + self._s3_client().upload_file(file_path, Bucket=self.bucket_name, Key=self._normalize_path(file_path)) + + def _s3_client(self) -> Any: + """Creates and returns an S3 client using the provided AWS access key ID and secret access key. + + Returns: + Any: A boto3 S3 client instance. + """ + return boto3.client( + "s3", + aws_access_key_id=self.aws_access_key_id, + aws_secret_access_key=self.aws_secret_access_key, + ) + + def _normalize_path(self, file_path) -> str: + """Process the file path based on the s3_prefix and path_as_prefix. + + Args: + file_path (str): The original file path. + s3_prefix (str): The S3 prefix to use. + path_as_prefix (bool): Whether to use the file path as the S3 prefix. + + Returns: + str: The processed file path. + """ + prefix = self.s3_prefix + strip_path = self.strip_path + processed_path: str = file_path + + if strip_path: + # Filename only + processed_path = Path(file_path).name + + # Concatenate the s3_prefix if it exists + if prefix: + processed_path = str(Path(prefix) / processed_path) + + return processed_path diff --git a/src/backend/tests/unit/components/data/test_s3_uploader_component.py b/src/backend/tests/unit/components/data/test_s3_uploader_component.py new file mode 100644 index 000000000..a690615fe --- /dev/null +++ b/src/backend/tests/unit/components/data/test_s3_uploader_component.py @@ -0,0 +1,136 @@ +import os +import tempfile +import uuid +from pathlib import Path + +import boto3 +import pytest +from langflow.components.data.s3_bucket_uploader import S3BucketUploaderComponent +from langflow.schema.data import Data + +from tests.base import ComponentTestBaseWithoutClient + + +@pytest.mark.skipif( + not os.environ.get("AWS_ACCESS_KEY_ID") or not os.environ.get("AWS_SECRET_ACCESS_KEY"), + reason="Environment variable AWS_ACCESS_KEY_ID or AWS_SECRET_ACCESS_KEY is not defined.", +) +class TestS3UploaderComponent(ComponentTestBaseWithoutClient): + """Unit tests for the S3BucketUploaderComponent. + + This test class inherits from ComponentTestBaseWithoutClient and includes several pytest fixtures and a test method + to verify the functionality of the S3BucketUploaderComponent. + + Fixtures: + component_class: Returns the component class to be tested. + file_names_mapping: Returns an empty list since this component doesn't have version-specific files. + default_kwargs: Returns an empty dictionary since this component doesn't have any default arguments. + temp_files: Creates three temporary files with predefined content and yields them as Data objects. + Cleans up the files after the test. + s3_bucket: Creates a unique S3 bucket for testing, yields the bucket name, and deletes the bucket + and its contents after the test. + + Test Methods: + test_upload: Tests the upload functionality of the S3BucketUploaderComponent by uploading temporary files + to the S3 bucket and verifying their content. + """ + + @pytest.fixture + def component_class(self): + """Return the component class to test.""" + return S3BucketUploaderComponent + + @pytest.fixture + def file_names_mapping(self): + """Return an empty list since this component doesn't have version-specific files.""" + + @pytest.fixture + def default_kwargs(self): + """Return an empty dictionary since this component doesn't have any default arguments.""" + return {} + + @pytest.fixture + def temp_files(self): + """Setup: Create three temporary files.""" + temp_files = [] + contents = [ + b"Lorem ipsum dolor sit amet, consectetur adipiscing elit.", + b"Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.", + b"Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris " + b"nisi ut aliquip ex ea commodo consequat.", + ] + + for content in contents: + with tempfile.NamedTemporaryFile(suffix=".txt", delete=False) as temp_file: + temp_file.write(content) + temp_file.flush() + temp_file.close() + temp_files.append(temp_file.name) + + data = [ + Data(data={"file_path": file_path, "text": Path(file_path).read_text(encoding="utf-8")}) + for file_path in temp_files + ] + + yield data + + # Teardown: Explicitly delete the files + for temp_file in temp_files: + Path(temp_file).unlink() + + @pytest.fixture + def s3_bucket(self) -> str: + """Generate a unique bucket name (AWS requires globally unique names).""" + bucket_name = f"graphrag-test-bucket-{uuid.uuid4().hex[:8]}" + + # Initialize S3 client using environment variables for credentials + s3 = boto3.client("s3") + + try: + # Create an S3 bucket in your default region + s3.create_bucket(Bucket=bucket_name) + + yield bucket_name + + finally: + # Teardown: Delete the bucket and its contents + try: + # List and delete all objects in the bucket + objects = s3.list_objects_v2(Bucket=bucket_name).get("Contents", []) + for obj in objects: + s3.delete_object(Bucket=bucket_name, Key=obj["Key"]) + + # Delete the bucket + s3.delete_bucket(Bucket=bucket_name) + except boto3.exceptions.Boto3Error as e: + pytest.fail(f"Error during teardown: {e}") + + def test_upload(self, temp_files, s3_bucket): + """Test uploading files to an S3 bucket.""" + component = S3BucketUploaderComponent() + + # Set AWS credentials from environment variables + aws_access_key_id = os.getenv("AWS_ACCESS_KEY_ID") + aws_secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY") + component.set_attributes( + { + "aws_access_key_id": aws_access_key_id, + "aws_secret_access_key": aws_secret_access_key, + "bucket_name": s3_bucket, + "strategy": "Store Original File", + "data_inputs": temp_files, + "s3_prefix": "test", + "strip_path": True, + } + ) + + component.process_files() + + # Check if the files were uploaded. Assumes key and secret are set via environment variables + s3 = boto3.client("s3") + + for temp_file in temp_files: + key = f"test/{Path(temp_file.data['file_path']).name}" + response = s3.get_object(Bucket=s3_bucket, Key=key) + with Path(temp_file.data["file_path"]).open("rb") as f: + assert response["Body"].read() == f.read() diff --git a/uv.lock b/uv.lock index d41754036..646a37e07 100644 --- a/uv.lock +++ b/uv.lock @@ -428,6 +428,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/db/48/684c270724bc3f8d12714556d201aa4610623da919505a6a09e56f50ef6a/astrapy-1.5.2-py3-none-any.whl", hash = "sha256:598b86de723727a11ec43e1c7fe682ecb42d63d37a94165fb08de41c20103f56", size = 177128 }, ] +[[package]] +name = "astroid" +version = "3.3.8" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "typing-extensions", marker = "python_full_version < '3.11'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/80/c5/5c83c48bbf547f3dd8b587529db7cf5a265a3368b33e85e76af8ff6061d3/astroid-3.3.8.tar.gz", hash = "sha256:a88c7994f914a4ea8572fac479459f4955eeccc877be3f2d959a33273b0cf40b", size = 398196 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/07/28/0bc8a17d6cd4cc3c79ae41b7105a2b9a327c110e5ddd37a8a27b29a5c8a2/astroid-3.3.8-py3-none-any.whl", hash = "sha256:187ccc0c248bfbba564826c26f070494f7bc964fd286b6d9fff4420e55de828c", size = 275153 }, +] + [[package]] name = "asttokens" version = "2.4.1" @@ -4312,6 +4324,7 @@ dependencies = [ { name = "pyarrow" }, { name = "pydantic-ai" }, { name = "pydantic-settings" }, + { name = "pylint" }, { name = "pymongo" }, { name = "pytube" }, { name = "pywin32", marker = "sys_platform == 'win32'" }, @@ -4319,6 +4332,7 @@ dependencies = [ { name = "qianfan" }, { name = "ragstack-ai-knowledge-store" }, { name = "redis" }, + { name = "ruff" }, { name = "scrapegraph-py" }, { name = "smolagents" }, { name = "spider-client" }, @@ -4491,6 +4505,7 @@ requires-dist = [ { name = "pyarrow", specifier = "==19.0.0" }, { name = "pydantic-ai", specifier = ">=0.0.19" }, { name = "pydantic-settings", specifier = "==2.4.0" }, + { name = "pylint", specifier = ">=3.3.4" }, { name = "pymilvus", extras = ["bulk-writer", "model"], marker = "extra == 'nv-ingest'", specifier = "==2.5.0" }, { name = "pymongo", specifier = "==4.10.1" }, { name = "python-pptx", marker = "extra == 'nv-ingest'", specifier = "==0.6.23" }, @@ -4500,6 +4515,7 @@ requires-dist = [ { name = "qianfan", specifier = "==0.3.5" }, { name = "ragstack-ai-knowledge-store", specifier = "==0.2.1" }, { name = "redis", specifier = "==5.2.1" }, + { name = "ruff", specifier = ">=0.9.7" }, { name = "scrapegraph-py", specifier = ">=1.12.0" }, { name = "sentence-transformers", marker = "extra == 'local'", specifier = ">=2.3.1" }, { name = "smolagents", specifier = ">=1.8.0" }, @@ -5309,6 +5325,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/8f/8e/9ad090d3553c280a8060fbf6e24dc1c0c29704ee7d1c372f0c174aa59285/matplotlib_inline-0.1.7-py3-none-any.whl", hash = "sha256:df192d39a4ff8f21b1895d72e6a13f5fcc5099f00fa84384e0ea28c2cc0653ca", size = 9899 }, ] +[[package]] +name = "mccabe" +version = "0.7.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/e7/ff/0ffefdcac38932a54d2b5eed4e0ba8a408f215002cd178ad1df0f2806ff8/mccabe-0.7.0.tar.gz", hash = "sha256:348e0240c33b60bbdf4e523192ef919f28cb2c3d7d5c7794f74009290f236325", size = 9658 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/27/1a/1f68f9ba0c207934b35b86a8ca3aad8395a3d6dd7921c0686e23853ff5a9/mccabe-0.7.0-py2.py3-none-any.whl", hash = "sha256:6c2d30ab6be0e4a46919781807b4f0d834ebdd6c6e3dca0bda5a15f863427b6e", size = 7350 }, +] + [[package]] name = "mcp" version = "1.1.3" @@ -5970,6 +5995,7 @@ name = "nvidia-nccl-cu12" version = "2.20.5" source = { registry = "https://pypi.org/simple" } wheels = [ + { url = "https://files.pythonhosted.org/packages/c1/bb/d09dda47c881f9ff504afd6f9ca4f502ded6d8fc2f572cacc5e39da91c28/nvidia_nccl_cu12-2.20.5-py3-none-manylinux2014_aarch64.whl", hash = "sha256:1fc150d5c3250b170b29410ba682384b14581db722b2531b0d8d33c595f33d01", size = 176238458 }, { url = "https://files.pythonhosted.org/packages/4b/2a/0a131f572aa09f741c30ccd45a8e56316e8be8dfc7bc19bf0ab7cfef7b19/nvidia_nccl_cu12-2.20.5-py3-none-manylinux2014_x86_64.whl", hash = "sha256:057f6bf9685f75215d0c53bf3ac4a10b3e6578351de307abad9e18a99182af56", size = 176249402 }, ] @@ -5979,6 +6005,7 @@ version = "12.8.61" source = { registry = "https://pypi.org/simple" } wheels = [ { url = "https://files.pythonhosted.org/packages/03/f8/9d85593582bd99b8d7c65634d2304780aefade049b2b94d96e44084be90b/nvidia_nvjitlink_cu12-12.8.61-py3-none-manylinux2010_x86_64.manylinux_2_12_x86_64.whl", hash = "sha256:45fd79f2ae20bd67e8bc411055939049873bfd8fac70ff13bd4865e0b9bdab17", size = 39243473 }, + { url = "https://files.pythonhosted.org/packages/af/53/698f3758f48c5fcb1112721e40cc6714da3980d3c7e93bae5b29dafa9857/nvidia_nvjitlink_cu12-12.8.61-py3-none-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:9b80ecab31085dda3ce3b41d043be0ec739216c3fc633b8abe212d5a30026df0", size = 38374634 }, ] [[package]] @@ -7107,6 +7134,8 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/39/1b/d0b013bf7d1af7cf0a6a4fce13f5fe5813ab225313755367b36e714a63f8/pycryptodome-3.21.0-cp36-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:18caa8cfbc676eaaf28613637a89980ad2fd96e00c564135bf90bc3f0b34dd93", size = 2254397 }, { url = "https://files.pythonhosted.org/packages/14/71/4cbd3870d3e926c34706f705d6793159ac49d9a213e3ababcdade5864663/pycryptodome-3.21.0-cp36-abi3-win32.whl", hash = "sha256:280b67d20e33bb63171d55b1067f61fbd932e0b1ad976b3a184303a3dad22764", size = 1775641 }, { url = "https://files.pythonhosted.org/packages/43/1d/81d59d228381576b92ecede5cd7239762c14001a828bdba30d64896e9778/pycryptodome-3.21.0-cp36-abi3-win_amd64.whl", hash = "sha256:b7aa25fc0baa5b1d95b7633af4f5f1838467f1815442b22487426f94e0d66c53", size = 1812863 }, + { url = "https://files.pythonhosted.org/packages/25/b3/09ff7072e6d96c9939c24cf51d3c389d7c345bf675420355c22402f71b68/pycryptodome-3.21.0-pp27-pypy_73-manylinux2010_x86_64.whl", hash = "sha256:2cb635b67011bc147c257e61ce864879ffe6d03342dc74b6045059dfbdedafca", size = 1691593 }, + { url = "https://files.pythonhosted.org/packages/a8/91/38e43628148f68ba9b68dedbc323cf409e537fd11264031961fd7c744034/pycryptodome-3.21.0-pp27-pypy_73-win32.whl", hash = "sha256:4c26a2f0dc15f81ea3afa3b0c87b87e501f235d332b7f27e2225ecb80c0b1cdd", size = 1765997 }, { url = "https://files.pythonhosted.org/packages/08/16/ae464d4ac338c1dd41f89c41f9488e54f7d2a3acf93bb920bb193b99f8e3/pycryptodome-3.21.0-pp310-pypy310_pp73-macosx_10_15_x86_64.whl", hash = "sha256:d5ebe0763c982f069d3877832254f64974139f4f9655058452603ff559c482e8", size = 1615855 }, { url = "https://files.pythonhosted.org/packages/1e/8c/b0cee957eee1950ce7655006b26a8894cee1dc4b8747ae913684352786eb/pycryptodome-3.21.0-pp310-pypy310_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7ee86cbde706be13f2dec5a42b52b1c1d1cbb90c8e405c68d0755134735c8dc6", size = 1650018 }, { url = "https://files.pythonhosted.org/packages/93/4d/d7138068089b99f6b0368622e60f97a577c936d75f533552a82613060c58/pycryptodome-3.21.0-pp310-pypy310_pp73-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:0fd54003ec3ce4e0f16c484a10bc5d8b9bd77fa662a12b85779a2d2d85d67ee0", size = 1687977 }, @@ -7310,6 +7339,25 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/61/ad/689f02752eeec26aed679477e80e632ef1b682313be70793d798c1d5fc8f/PyJWT-2.10.1-py3-none-any.whl", hash = "sha256:dcdd193e30abefd5debf142f9adfcdd2b58004e644f25406ffaebd50bd98dacb", size = 22997 }, ] +[[package]] +name = "pylint" +version = "3.3.4" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "astroid" }, + { name = "colorama", marker = "sys_platform == 'win32'" }, + { name = "dill" }, + { name = "isort" }, + { name = "mccabe" }, + { name = "platformdirs" }, + { name = "tomli", marker = "python_full_version < '3.11'" }, + { name = "tomlkit" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/ab/b9/50be49afc91469f832c4bf12318ab4abe56ee9aa3700a89aad5359ad195f/pylint-3.3.4.tar.gz", hash = "sha256:74ae7a38b177e69a9b525d0794bd8183820bfa7eb68cc1bee6e8ed22a42be4ce", size = 1518905 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/0d/8b/eef15df5f4e7aa393de31feb96ca9a3d6639669bd59d589d0685d5ef4e62/pylint-3.3.4-py3-none-any.whl", hash = "sha256:289e6a1eb27b453b08436478391a48cd53bb0efb824873f949e709350f3de018", size = 522280 }, +] + [[package]] name = "pymilvus" version = "2.5.0"