From 68c36c415ec8a524ce4b7f9b13691cdb77f972d8 Mon Sep 17 00:00:00 2001 From: Raphael Valdetaro <79842132+raphaelchristi@users.noreply.github.com> Date: Fri, 20 Dec 2024 09:17:17 -0300 Subject: [PATCH] feat(component): enhance merge data with standard operations (#5125) * feat(component): enhance merge data with standard operations - Add standard merge operations (concatenate, append, merge, join) - Add operation selection via dropdown - Return DataFrame output type - Implement separate merge strategies * style(component): improve merge data formatting - Add MIN_INPUTS_REQUIRED constant - Use descriptive DataFrame variable names - Move return statement to else block - Use list comprehension for better performance - Fix unused loop variable - Improve overall code formatting * [autofix.ci] apply automated fixes * Update src/backend/base/langflow/components/processing/merge_data.py Co-authored-by: Gabriel Luiz Freitas Almeida * refactor: adjust merge_data operations to use langflow.schema.DataFrame * Update Merge Data component name and description * [autofix.ci] apply automated fixes * refactor: enhance data merging logic in DataMergerComponent - Improved type hinting for combined data structures to enhance code clarity. - Streamlined the concatenation and merging operations to ensure consistent handling of string and object types. - Updated the logic to correctly append values to lists when merging data inputs, improving data integrity in the merging process. * revert merge data changes * add data merger component * refactor: remove DataMergerComponent and clean up imports - Deleted the DataMergerComponent to streamline the processing components. - Updated the __init__.py file to reflect the removal of the DataMergerComponent from the exports. * refactor: enhance MergeDataComponent with new merge operations - Introduced a new enum, MergeOperation, to define various data merging strategies: CONCATENATE, APPEND, MERGE, and JOIN. - Updated the merge_data method to return a DataFrame instead of a list of Data objects, improving data handling. - Enhanced input validation to ensure a minimum number of data inputs are provided. - Streamlined the merging logic to support different operations, improving flexibility and usability of the component. --------- Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: Gabriel Luiz Freitas Almeida --- .../components/processing/merge_data.py | 150 +++++++++--------- 1 file changed, 74 insertions(+), 76 deletions(-) diff --git a/src/backend/base/langflow/components/processing/merge_data.py b/src/backend/base/langflow/components/processing/merge_data.py index 819895ff0..015c6991f 100644 --- a/src/backend/base/langflow/components/processing/merge_data.py +++ b/src/backend/base/langflow/components/processing/merge_data.py @@ -1,94 +1,92 @@ +from enum import Enum +from typing import cast + from loguru import logger from langflow.custom import Component -from langflow.io import DataInput, Output -from langflow.schema import Data +from langflow.io import DataInput, DropdownInput, Output +from langflow.schema import DataFrame + + +class MergeOperation(str, Enum): + CONCATENATE = "concatenate" + APPEND = "append" + MERGE = "merge" + JOIN = "join" class MergeDataComponent(Component): - """MergeDataComponent is responsible for combining multiple Data objects into a unified list of Data objects. - - It ensures that all keys across the input Data objects are present in each merged Data object. - Missing keys are filled with empty strings to maintain consistency. - """ - display_name = "Merge Data" - description = ( - "Combines multiple Data objects into a unified list, ensuring all keys are present in each Data object." - ) + description = "Combines data using merge operations" icon = "merge" + MIN_INPUTS_REQUIRED = 2 + inputs = [ - DataInput( - name="data_inputs", - display_name="Data Inputs", - is_list=True, - info="A list of Data inputs objects to be merged.", + DataInput(name="data_inputs", display_name="Data Inputs", info="Dados para combinar", is_list=True), + DropdownInput( + name="operation", + display_name="Merge Operation", + options=[op.value for op in MergeOperation], + value=MergeOperation.CONCATENATE.value, ), ] - outputs = [ - Output( - display_name="Merged Data", - name="merged_data", - method="merge_data", - ), - ] + outputs = [Output(display_name="DataFrame", name="merged_data", method="merge_data")] - def merge_data(self) -> list[Data]: - """Merges multiple Data objects into a single list of Data objects. - - Ensures that all keys from the input Data objects are present in each merged Data object. - Missing keys are filled with empty strings. - - Returns: - List[Data]: A list of merged Data objects with consistent keys. - """ - logger.info("Initiating the data merging process.") - - data_inputs: list[Data] = self.data_inputs - logger.debug(f"Received {len(data_inputs)} data input(s) for merging.") - - if not data_inputs: - logger.warning("No data inputs provided. Returning an empty list.") - return [] - - # Collect all unique keys from all Data objects - all_keys: set[str] = set() - for idx, data_input in enumerate(data_inputs): - if not isinstance(data_input, Data): - error_message = f"Data input at index {idx} is not of type Data." - logger.error(error_message) - type_error_message = ( - f"All items in data_inputs must be of type Data. Item at index {idx} is {type(data_input)}" - ) - raise TypeError(type_error_message) - all_keys.update(data_input.data.keys()) - logger.debug(f"Collected {len(all_keys)} unique key(s) from input data.") + def merge_data(self) -> DataFrame: + if not self.data_inputs or len(self.data_inputs) < self.MIN_INPUTS_REQUIRED: + empty_dataframe = DataFrame() + self.status = empty_dataframe + return empty_dataframe + operation = MergeOperation(self.operation) try: - # Create new list of Data objects with missing keys filled with empty strings - merged_data_list = [] - for idx, data_input in enumerate(data_inputs): - merged_data_dict = {} - - for key in all_keys: - # Use the existing value if the key exists, otherwise use an empty string - value = data_input.data.get(key, "") - if key not in data_input.data: - log_message = f"Key '{key}' missing in data input at index {idx}. Assigning empty string." - logger.debug(log_message) - merged_data_dict[key] = value - - merged_data = Data( - text_key=data_input.text_key, data=merged_data_dict, default_value=data_input.default_value - ) - merged_data_list.append(merged_data) - logger.debug("Merged Data object created for input at index: " + str(idx)) - - except Exception: - logger.exception("An error occurred during the data merging process.") + merged_dataframe = self._process_operation(operation) + self.status = merged_dataframe + except Exception as e: + logger.error(f"Erro durante operação {operation}: {e!s}") raise + else: + return merged_dataframe - logger.info("Data merging process completed successfully.") - return merged_data_list + def _process_operation(self, operation: MergeOperation) -> DataFrame: + if operation == MergeOperation.CONCATENATE: + combined_data: dict[str, str | object] = {} + for data_input in self.data_inputs: + for key, value in data_input.data.items(): + if key in combined_data: + if isinstance(combined_data[key], str) and isinstance(value, str): + combined_data[key] = f"{combined_data[key]}\n{value}" + else: + combined_data[key] = value + else: + combined_data[key] = value + return DataFrame([combined_data]) + + if operation == MergeOperation.APPEND: + rows = [data_input.data for data_input in self.data_inputs] + return DataFrame(rows) + + if operation == MergeOperation.MERGE: + result_data: dict[str, str | list[str] | object] = {} + for data_input in self.data_inputs: + for key, value in data_input.data.items(): + if key in result_data and isinstance(value, str): + if isinstance(result_data[key], list): + cast(list[str], result_data[key]).append(value) + else: + result_data[key] = [result_data[key], value] + else: + result_data[key] = value + return DataFrame([result_data]) + + if operation == MergeOperation.JOIN: + combined_data = {} + for idx, data_input in enumerate(self.data_inputs, 1): + for key, value in data_input.data.items(): + new_key = f"{key}_doc{idx}" if idx > 1 else key + combined_data[new_key] = value + return DataFrame([combined_data]) + + return DataFrame()