feat(component): enhance merge data with standard operations (#5125)

* feat(component): enhance merge data with standard operations

- Add standard merge operations (concatenate, append, merge, join)
- Add operation selection via dropdown
- Return DataFrame output type
- Implement separate merge strategies

* style(component): improve merge data formatting

- Add MIN_INPUTS_REQUIRED constant
- Use descriptive DataFrame variable names
- Move return statement to else block
- Use list comprehension for better performance
- Fix unused loop variable
- Improve overall code formatting

* [autofix.ci] apply automated fixes

* Update src/backend/base/langflow/components/processing/merge_data.py

Co-authored-by: Gabriel Luiz Freitas Almeida <gabriel@langflow.org>

* refactor: adjust merge_data operations to use langflow.schema.DataFrame

* Update Merge Data component name and description

* [autofix.ci] apply automated fixes

* refactor: enhance data merging logic in DataMergerComponent

- Improved type hinting for combined data structures to enhance code clarity.
- Streamlined the concatenation and merging operations to ensure consistent handling of string and object types.
- Updated the logic to correctly append values to lists when merging data inputs, improving data integrity in the merging process.

* revert merge data changes

* add data merger component

* refactor: remove DataMergerComponent and clean up imports

- Deleted the DataMergerComponent to streamline the processing components.
- Updated the __init__.py file to reflect the removal of the DataMergerComponent from the exports.

* refactor: enhance MergeDataComponent with new merge operations

- Introduced a new enum, MergeOperation, to define various data merging strategies: CONCATENATE, APPEND, MERGE, and JOIN.
- Updated the merge_data method to return a DataFrame instead of a list of Data objects, improving data handling.
- Enhanced input validation to ensure a minimum number of data inputs are provided.
- Streamlined the merging logic to support different operations, improving flexibility and usability of the component.

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: Gabriel Luiz Freitas Almeida <gabriel@langflow.org>
This commit is contained in:
Raphael Valdetaro 2024-12-20 09:17:17 -03:00 committed by GitHub
commit 68c36c415e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -1,94 +1,92 @@
from enum import Enum
from typing import cast
from loguru import logger
from langflow.custom import Component
from langflow.io import DataInput, Output
from langflow.schema import Data
from langflow.io import DataInput, DropdownInput, Output
from langflow.schema import DataFrame
class MergeOperation(str, Enum):
CONCATENATE = "concatenate"
APPEND = "append"
MERGE = "merge"
JOIN = "join"
class MergeDataComponent(Component):
"""MergeDataComponent is responsible for combining multiple Data objects into a unified list of Data objects.
It ensures that all keys across the input Data objects are present in each merged Data object.
Missing keys are filled with empty strings to maintain consistency.
"""
display_name = "Merge Data"
description = (
"Combines multiple Data objects into a unified list, ensuring all keys are present in each Data object."
)
description = "Combines data using merge operations"
icon = "merge"
MIN_INPUTS_REQUIRED = 2
inputs = [
DataInput(
name="data_inputs",
display_name="Data Inputs",
is_list=True,
info="A list of Data inputs objects to be merged.",
DataInput(name="data_inputs", display_name="Data Inputs", info="Dados para combinar", is_list=True),
DropdownInput(
name="operation",
display_name="Merge Operation",
options=[op.value for op in MergeOperation],
value=MergeOperation.CONCATENATE.value,
),
]
outputs = [
Output(
display_name="Merged Data",
name="merged_data",
method="merge_data",
),
]
outputs = [Output(display_name="DataFrame", name="merged_data", method="merge_data")]
def merge_data(self) -> list[Data]:
"""Merges multiple Data objects into a single list of Data objects.
Ensures that all keys from the input Data objects are present in each merged Data object.
Missing keys are filled with empty strings.
Returns:
List[Data]: A list of merged Data objects with consistent keys.
"""
logger.info("Initiating the data merging process.")
data_inputs: list[Data] = self.data_inputs
logger.debug(f"Received {len(data_inputs)} data input(s) for merging.")
if not data_inputs:
logger.warning("No data inputs provided. Returning an empty list.")
return []
# Collect all unique keys from all Data objects
all_keys: set[str] = set()
for idx, data_input in enumerate(data_inputs):
if not isinstance(data_input, Data):
error_message = f"Data input at index {idx} is not of type Data."
logger.error(error_message)
type_error_message = (
f"All items in data_inputs must be of type Data. Item at index {idx} is {type(data_input)}"
)
raise TypeError(type_error_message)
all_keys.update(data_input.data.keys())
logger.debug(f"Collected {len(all_keys)} unique key(s) from input data.")
def merge_data(self) -> DataFrame:
if not self.data_inputs or len(self.data_inputs) < self.MIN_INPUTS_REQUIRED:
empty_dataframe = DataFrame()
self.status = empty_dataframe
return empty_dataframe
operation = MergeOperation(self.operation)
try:
# Create new list of Data objects with missing keys filled with empty strings
merged_data_list = []
for idx, data_input in enumerate(data_inputs):
merged_data_dict = {}
for key in all_keys:
# Use the existing value if the key exists, otherwise use an empty string
value = data_input.data.get(key, "")
if key not in data_input.data:
log_message = f"Key '{key}' missing in data input at index {idx}. Assigning empty string."
logger.debug(log_message)
merged_data_dict[key] = value
merged_data = Data(
text_key=data_input.text_key, data=merged_data_dict, default_value=data_input.default_value
)
merged_data_list.append(merged_data)
logger.debug("Merged Data object created for input at index: " + str(idx))
except Exception:
logger.exception("An error occurred during the data merging process.")
merged_dataframe = self._process_operation(operation)
self.status = merged_dataframe
except Exception as e:
logger.error(f"Erro durante operação {operation}: {e!s}")
raise
else:
return merged_dataframe
logger.info("Data merging process completed successfully.")
return merged_data_list
def _process_operation(self, operation: MergeOperation) -> DataFrame:
if operation == MergeOperation.CONCATENATE:
combined_data: dict[str, str | object] = {}
for data_input in self.data_inputs:
for key, value in data_input.data.items():
if key in combined_data:
if isinstance(combined_data[key], str) and isinstance(value, str):
combined_data[key] = f"{combined_data[key]}\n{value}"
else:
combined_data[key] = value
else:
combined_data[key] = value
return DataFrame([combined_data])
if operation == MergeOperation.APPEND:
rows = [data_input.data for data_input in self.data_inputs]
return DataFrame(rows)
if operation == MergeOperation.MERGE:
result_data: dict[str, str | list[str] | object] = {}
for data_input in self.data_inputs:
for key, value in data_input.data.items():
if key in result_data and isinstance(value, str):
if isinstance(result_data[key], list):
cast(list[str], result_data[key]).append(value)
else:
result_data[key] = [result_data[key], value]
else:
result_data[key] = value
return DataFrame([result_data])
if operation == MergeOperation.JOIN:
combined_data = {}
for idx, data_input in enumerate(self.data_inputs, 1):
for key, value in data_input.data.items():
new_key = f"{key}_doc{idx}" if idx > 1 else key
combined_data[new_key] = value
return DataFrame([combined_data])
return DataFrame()