feat: implement S3 bucket uploader component and unit test (#6146)
* S3 Bucket Uploader, unit test and module init * [autofix.ci] apply automated fixes * Updated UT as per feedback * Added version control fixture * [autofix.ci] apply automated fixes * Style changes requested by reviewer * Really, 1 extra character? * Ruf styling --------- Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
parent
e5f654a752
commit
92d4fd0d52
5 changed files with 395 additions and 0 deletions
|
|
@ -3,6 +3,7 @@ from .csv_to_data import CSVToDataComponent
|
|||
from .directory import DirectoryComponent
|
||||
from .file import FileComponent
|
||||
from .json_to_data import JSONToDataComponent
|
||||
from .s3_bucket_uploader import S3BucketUploaderComponent
|
||||
from .sql_executor import SQLExecutorComponent
|
||||
from .url import URLComponent
|
||||
from .webhook import WebhookComponent
|
||||
|
|
@ -13,6 +14,7 @@ __all__ = [
|
|||
"DirectoryComponent",
|
||||
"FileComponent",
|
||||
"JSONToDataComponent",
|
||||
"S3BucketUploaderComponent",
|
||||
"SQLExecutorComponent",
|
||||
"URLComponent",
|
||||
"WebhookComponent",
|
||||
|
|
|
|||
207
src/backend/base/langflow/components/data/s3_bucket_uploader.py
Normal file
207
src/backend/base/langflow/components/data/s3_bucket_uploader.py
Normal file
|
|
@ -0,0 +1,207 @@
|
|||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import boto3
|
||||
|
||||
from langflow.custom import Component
|
||||
from langflow.io import (
|
||||
BoolInput,
|
||||
DropdownInput,
|
||||
HandleInput,
|
||||
Output,
|
||||
SecretStrInput,
|
||||
StrInput,
|
||||
)
|
||||
|
||||
|
||||
class S3BucketUploaderComponent(Component):
|
||||
"""S3BucketUploaderComponent is a component responsible for uploading files to an S3 bucket.
|
||||
|
||||
It provides two strategies for file upload: "By Data" and "By File Name". The component
|
||||
requires AWS credentials and bucket details as inputs and processes files accordingly.
|
||||
|
||||
Attributes:
|
||||
display_name (str): The display name of the component.
|
||||
description (str): A brief description of the components functionality.
|
||||
icon (str): The icon representing the component.
|
||||
name (str): The internal name of the component.
|
||||
inputs (list): A list of input configurations required by the component.
|
||||
outputs (list): A list of output configurations provided by the component.
|
||||
|
||||
Methods:
|
||||
process_files() -> None:
|
||||
Processes files based on the selected strategy. Calls the appropriate method
|
||||
based on the strategy attribute.
|
||||
process_files_by_data() -> None:
|
||||
Processes and uploads files to an S3 bucket based on the data inputs. Iterates
|
||||
over the data inputs, logs the file path and text content, and uploads each file
|
||||
to the specified S3 bucket if both file path and text content are available.
|
||||
process_files_by_name() -> None:
|
||||
Processes and uploads files to an S3 bucket based on their names. Iterates through
|
||||
the list of data inputs, retrieves the file path from each data item, and uploads
|
||||
the file to the specified S3 bucket if the file path is available. Logs the file
|
||||
path being uploaded.
|
||||
_s3_client() -> Any:
|
||||
Creates and returns an S3 client using the provided AWS access key ID and secret
|
||||
access key.
|
||||
|
||||
Please note that this component requires the boto3 library to be installed. It is designed
|
||||
to work with File and Director components as inputs
|
||||
"""
|
||||
|
||||
display_name = "S3 Bucket Uploader"
|
||||
description = "Uploads files to S3 bucket."
|
||||
icon = "Globe"
|
||||
name = "s3bucketuploader"
|
||||
|
||||
inputs = [
|
||||
SecretStrInput(
|
||||
name="aws_access_key_id",
|
||||
display_name="AWS Access Key ID",
|
||||
required=True,
|
||||
password=True,
|
||||
info="AWS Access key ID.",
|
||||
),
|
||||
SecretStrInput(
|
||||
name="aws_secret_access_key",
|
||||
display_name="AWS Secret Key",
|
||||
required=True,
|
||||
password=True,
|
||||
info="AWS Secret Key.",
|
||||
),
|
||||
StrInput(
|
||||
name="bucket_name",
|
||||
display_name="Bucket Name",
|
||||
info="Enter the name of the bucket.",
|
||||
advanced=False,
|
||||
),
|
||||
DropdownInput(
|
||||
name="strategy",
|
||||
display_name="Strategy for file upload",
|
||||
options=["Store Data", "Store Original File"],
|
||||
value="By Data",
|
||||
info=(
|
||||
"Choose the strategy to upload the file. By Data means that the source file "
|
||||
"is parsed and stored as LangFlow data. By File Name means that the source "
|
||||
"file is uploaded as is."
|
||||
),
|
||||
),
|
||||
HandleInput(
|
||||
name="data_inputs",
|
||||
display_name="Data Inputs",
|
||||
info="The data to split.",
|
||||
input_types=["Data"],
|
||||
is_list=True,
|
||||
required=True,
|
||||
),
|
||||
StrInput(
|
||||
name="s3_prefix",
|
||||
display_name="S3 Prefix",
|
||||
info="Prefix for all files.",
|
||||
advanced=True,
|
||||
),
|
||||
BoolInput(
|
||||
name="strip_path",
|
||||
display_name="Strip Path",
|
||||
info="Removes path from file path.",
|
||||
required=True,
|
||||
advanced=True,
|
||||
),
|
||||
]
|
||||
|
||||
outputs = [
|
||||
Output(display_name="Writes to AWS Bucket", name="data", method="process_files"),
|
||||
]
|
||||
|
||||
def process_files(self) -> None:
|
||||
"""Process files based on the selected strategy.
|
||||
|
||||
This method uses a strategy pattern to process files. The strategy is determined
|
||||
by the `self.strategy` attribute, which can be either "By Data" or "By File Name".
|
||||
Depending on the strategy, the corresponding method (`process_files_by_data` or
|
||||
`process_files_by_name`) is called. If an invalid strategy is provided, an error
|
||||
is logged.
|
||||
|
||||
Returns:
|
||||
None
|
||||
"""
|
||||
strategy_methods = {
|
||||
"Store Data": self.process_files_by_data,
|
||||
"Store Original File": self.process_files_by_name,
|
||||
}
|
||||
strategy_methods.get(self.strategy, lambda: self.log("Invalid strategy"))()
|
||||
|
||||
def process_files_by_data(self) -> None:
|
||||
"""Processes and uploads files to an S3 bucket based on the data inputs.
|
||||
|
||||
This method iterates over the data inputs, logs the file path and text content,
|
||||
and uploads each file to the specified S3 bucket if both file path and text content
|
||||
are available.
|
||||
|
||||
Args:
|
||||
None
|
||||
|
||||
Returns:
|
||||
None
|
||||
"""
|
||||
for data_item in self.data_inputs:
|
||||
file_path = data_item.data.get("file_path")
|
||||
text_content = data_item.data.get("text")
|
||||
|
||||
if file_path and text_content:
|
||||
self._s3_client().put_object(
|
||||
Bucket=self.bucket_name, Key=self._normalize_path(file_path), Body=text_content
|
||||
)
|
||||
|
||||
def process_files_by_name(self) -> None:
|
||||
"""Processes and uploads files to an S3 bucket based on their names.
|
||||
|
||||
Iterates through the list of data inputs, retrieves the file path from each data item,
|
||||
and uploads the file to the specified S3 bucket if the file path is available.
|
||||
Logs the file path being uploaded.
|
||||
|
||||
Returns:
|
||||
None
|
||||
"""
|
||||
for data_item in self.data_inputs:
|
||||
file_path = data_item.data.get("file_path")
|
||||
self.log(f"Uploading file: {file_path}")
|
||||
if file_path:
|
||||
self._s3_client().upload_file(file_path, Bucket=self.bucket_name, Key=self._normalize_path(file_path))
|
||||
|
||||
def _s3_client(self) -> Any:
|
||||
"""Creates and returns an S3 client using the provided AWS access key ID and secret access key.
|
||||
|
||||
Returns:
|
||||
Any: A boto3 S3 client instance.
|
||||
"""
|
||||
return boto3.client(
|
||||
"s3",
|
||||
aws_access_key_id=self.aws_access_key_id,
|
||||
aws_secret_access_key=self.aws_secret_access_key,
|
||||
)
|
||||
|
||||
def _normalize_path(self, file_path) -> str:
|
||||
"""Process the file path based on the s3_prefix and path_as_prefix.
|
||||
|
||||
Args:
|
||||
file_path (str): The original file path.
|
||||
s3_prefix (str): The S3 prefix to use.
|
||||
path_as_prefix (bool): Whether to use the file path as the S3 prefix.
|
||||
|
||||
Returns:
|
||||
str: The processed file path.
|
||||
"""
|
||||
prefix = self.s3_prefix
|
||||
strip_path = self.strip_path
|
||||
processed_path: str = file_path
|
||||
|
||||
if strip_path:
|
||||
# Filename only
|
||||
processed_path = Path(file_path).name
|
||||
|
||||
# Concatenate the s3_prefix if it exists
|
||||
if prefix:
|
||||
processed_path = str(Path(prefix) / processed_path)
|
||||
|
||||
return processed_path
|
||||
|
|
@ -0,0 +1,136 @@
|
|||
import os
|
||||
import tempfile
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
|
||||
import boto3
|
||||
import pytest
|
||||
from langflow.components.data.s3_bucket_uploader import S3BucketUploaderComponent
|
||||
from langflow.schema.data import Data
|
||||
|
||||
from tests.base import ComponentTestBaseWithoutClient
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
not os.environ.get("AWS_ACCESS_KEY_ID") or not os.environ.get("AWS_SECRET_ACCESS_KEY"),
|
||||
reason="Environment variable AWS_ACCESS_KEY_ID or AWS_SECRET_ACCESS_KEY is not defined.",
|
||||
)
|
||||
class TestS3UploaderComponent(ComponentTestBaseWithoutClient):
|
||||
"""Unit tests for the S3BucketUploaderComponent.
|
||||
|
||||
This test class inherits from ComponentTestBaseWithoutClient and includes several pytest fixtures and a test method
|
||||
to verify the functionality of the S3BucketUploaderComponent.
|
||||
|
||||
Fixtures:
|
||||
component_class: Returns the component class to be tested.
|
||||
file_names_mapping: Returns an empty list since this component doesn't have version-specific files.
|
||||
default_kwargs: Returns an empty dictionary since this component doesn't have any default arguments.
|
||||
temp_files: Creates three temporary files with predefined content and yields them as Data objects.
|
||||
Cleans up the files after the test.
|
||||
s3_bucket: Creates a unique S3 bucket for testing, yields the bucket name, and deletes the bucket
|
||||
and its contents after the test.
|
||||
|
||||
Test Methods:
|
||||
test_upload: Tests the upload functionality of the S3BucketUploaderComponent by uploading temporary files
|
||||
to the S3 bucket and verifying their content.
|
||||
"""
|
||||
|
||||
@pytest.fixture
|
||||
def component_class(self):
|
||||
"""Return the component class to test."""
|
||||
return S3BucketUploaderComponent
|
||||
|
||||
@pytest.fixture
|
||||
def file_names_mapping(self):
|
||||
"""Return an empty list since this component doesn't have version-specific files."""
|
||||
|
||||
@pytest.fixture
|
||||
def default_kwargs(self):
|
||||
"""Return an empty dictionary since this component doesn't have any default arguments."""
|
||||
return {}
|
||||
|
||||
@pytest.fixture
|
||||
def temp_files(self):
|
||||
"""Setup: Create three temporary files."""
|
||||
temp_files = []
|
||||
contents = [
|
||||
b"Lorem ipsum dolor sit amet, consectetur adipiscing elit.",
|
||||
b"Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.",
|
||||
b"Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris "
|
||||
b"nisi ut aliquip ex ea commodo consequat.",
|
||||
]
|
||||
|
||||
for content in contents:
|
||||
with tempfile.NamedTemporaryFile(suffix=".txt", delete=False) as temp_file:
|
||||
temp_file.write(content)
|
||||
temp_file.flush()
|
||||
temp_file.close()
|
||||
temp_files.append(temp_file.name)
|
||||
|
||||
data = [
|
||||
Data(data={"file_path": file_path, "text": Path(file_path).read_text(encoding="utf-8")})
|
||||
for file_path in temp_files
|
||||
]
|
||||
|
||||
yield data
|
||||
|
||||
# Teardown: Explicitly delete the files
|
||||
for temp_file in temp_files:
|
||||
Path(temp_file).unlink()
|
||||
|
||||
@pytest.fixture
|
||||
def s3_bucket(self) -> str:
|
||||
"""Generate a unique bucket name (AWS requires globally unique names)."""
|
||||
bucket_name = f"graphrag-test-bucket-{uuid.uuid4().hex[:8]}"
|
||||
|
||||
# Initialize S3 client using environment variables for credentials
|
||||
s3 = boto3.client("s3")
|
||||
|
||||
try:
|
||||
# Create an S3 bucket in your default region
|
||||
s3.create_bucket(Bucket=bucket_name)
|
||||
|
||||
yield bucket_name
|
||||
|
||||
finally:
|
||||
# Teardown: Delete the bucket and its contents
|
||||
try:
|
||||
# List and delete all objects in the bucket
|
||||
objects = s3.list_objects_v2(Bucket=bucket_name).get("Contents", [])
|
||||
for obj in objects:
|
||||
s3.delete_object(Bucket=bucket_name, Key=obj["Key"])
|
||||
|
||||
# Delete the bucket
|
||||
s3.delete_bucket(Bucket=bucket_name)
|
||||
except boto3.exceptions.Boto3Error as e:
|
||||
pytest.fail(f"Error during teardown: {e}")
|
||||
|
||||
def test_upload(self, temp_files, s3_bucket):
|
||||
"""Test uploading files to an S3 bucket."""
|
||||
component = S3BucketUploaderComponent()
|
||||
|
||||
# Set AWS credentials from environment variables
|
||||
aws_access_key_id = os.getenv("AWS_ACCESS_KEY_ID")
|
||||
aws_secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY")
|
||||
component.set_attributes(
|
||||
{
|
||||
"aws_access_key_id": aws_access_key_id,
|
||||
"aws_secret_access_key": aws_secret_access_key,
|
||||
"bucket_name": s3_bucket,
|
||||
"strategy": "Store Original File",
|
||||
"data_inputs": temp_files,
|
||||
"s3_prefix": "test",
|
||||
"strip_path": True,
|
||||
}
|
||||
)
|
||||
|
||||
component.process_files()
|
||||
|
||||
# Check if the files were uploaded. Assumes key and secret are set via environment variables
|
||||
s3 = boto3.client("s3")
|
||||
|
||||
for temp_file in temp_files:
|
||||
key = f"test/{Path(temp_file.data['file_path']).name}"
|
||||
response = s3.get_object(Bucket=s3_bucket, Key=key)
|
||||
with Path(temp_file.data["file_path"]).open("rb") as f:
|
||||
assert response["Body"].read() == f.read()
|
||||
Loading…
Add table
Add a link
Reference in a new issue