diff --git a/src/backend/base/langflow/components/processing/merge_data.py b/src/backend/base/langflow/components/processing/merge_data.py index 819895ff0..015c6991f 100644 --- a/src/backend/base/langflow/components/processing/merge_data.py +++ b/src/backend/base/langflow/components/processing/merge_data.py @@ -1,94 +1,92 @@ +from enum import Enum +from typing import cast + from loguru import logger from langflow.custom import Component -from langflow.io import DataInput, Output -from langflow.schema import Data +from langflow.io import DataInput, DropdownInput, Output +from langflow.schema import DataFrame + + +class MergeOperation(str, Enum): + CONCATENATE = "concatenate" + APPEND = "append" + MERGE = "merge" + JOIN = "join" class MergeDataComponent(Component): - """MergeDataComponent is responsible for combining multiple Data objects into a unified list of Data objects. - - It ensures that all keys across the input Data objects are present in each merged Data object. - Missing keys are filled with empty strings to maintain consistency. - """ - display_name = "Merge Data" - description = ( - "Combines multiple Data objects into a unified list, ensuring all keys are present in each Data object." - ) + description = "Combines data using merge operations" icon = "merge" + MIN_INPUTS_REQUIRED = 2 + inputs = [ - DataInput( - name="data_inputs", - display_name="Data Inputs", - is_list=True, - info="A list of Data inputs objects to be merged.", + DataInput(name="data_inputs", display_name="Data Inputs", info="Dados para combinar", is_list=True), + DropdownInput( + name="operation", + display_name="Merge Operation", + options=[op.value for op in MergeOperation], + value=MergeOperation.CONCATENATE.value, ), ] - outputs = [ - Output( - display_name="Merged Data", - name="merged_data", - method="merge_data", - ), - ] + outputs = [Output(display_name="DataFrame", name="merged_data", method="merge_data")] - def merge_data(self) -> list[Data]: - """Merges multiple Data objects into a single list of Data objects. - - Ensures that all keys from the input Data objects are present in each merged Data object. - Missing keys are filled with empty strings. - - Returns: - List[Data]: A list of merged Data objects with consistent keys. - """ - logger.info("Initiating the data merging process.") - - data_inputs: list[Data] = self.data_inputs - logger.debug(f"Received {len(data_inputs)} data input(s) for merging.") - - if not data_inputs: - logger.warning("No data inputs provided. Returning an empty list.") - return [] - - # Collect all unique keys from all Data objects - all_keys: set[str] = set() - for idx, data_input in enumerate(data_inputs): - if not isinstance(data_input, Data): - error_message = f"Data input at index {idx} is not of type Data." - logger.error(error_message) - type_error_message = ( - f"All items in data_inputs must be of type Data. Item at index {idx} is {type(data_input)}" - ) - raise TypeError(type_error_message) - all_keys.update(data_input.data.keys()) - logger.debug(f"Collected {len(all_keys)} unique key(s) from input data.") + def merge_data(self) -> DataFrame: + if not self.data_inputs or len(self.data_inputs) < self.MIN_INPUTS_REQUIRED: + empty_dataframe = DataFrame() + self.status = empty_dataframe + return empty_dataframe + operation = MergeOperation(self.operation) try: - # Create new list of Data objects with missing keys filled with empty strings - merged_data_list = [] - for idx, data_input in enumerate(data_inputs): - merged_data_dict = {} - - for key in all_keys: - # Use the existing value if the key exists, otherwise use an empty string - value = data_input.data.get(key, "") - if key not in data_input.data: - log_message = f"Key '{key}' missing in data input at index {idx}. Assigning empty string." - logger.debug(log_message) - merged_data_dict[key] = value - - merged_data = Data( - text_key=data_input.text_key, data=merged_data_dict, default_value=data_input.default_value - ) - merged_data_list.append(merged_data) - logger.debug("Merged Data object created for input at index: " + str(idx)) - - except Exception: - logger.exception("An error occurred during the data merging process.") + merged_dataframe = self._process_operation(operation) + self.status = merged_dataframe + except Exception as e: + logger.error(f"Erro durante operação {operation}: {e!s}") raise + else: + return merged_dataframe - logger.info("Data merging process completed successfully.") - return merged_data_list + def _process_operation(self, operation: MergeOperation) -> DataFrame: + if operation == MergeOperation.CONCATENATE: + combined_data: dict[str, str | object] = {} + for data_input in self.data_inputs: + for key, value in data_input.data.items(): + if key in combined_data: + if isinstance(combined_data[key], str) and isinstance(value, str): + combined_data[key] = f"{combined_data[key]}\n{value}" + else: + combined_data[key] = value + else: + combined_data[key] = value + return DataFrame([combined_data]) + + if operation == MergeOperation.APPEND: + rows = [data_input.data for data_input in self.data_inputs] + return DataFrame(rows) + + if operation == MergeOperation.MERGE: + result_data: dict[str, str | list[str] | object] = {} + for data_input in self.data_inputs: + for key, value in data_input.data.items(): + if key in result_data and isinstance(value, str): + if isinstance(result_data[key], list): + cast(list[str], result_data[key]).append(value) + else: + result_data[key] = [result_data[key], value] + else: + result_data[key] = value + return DataFrame([result_data]) + + if operation == MergeOperation.JOIN: + combined_data = {} + for idx, data_input in enumerate(self.data_inputs, 1): + for key, value in data_input.data.items(): + new_key = f"{key}_doc{idx}" if idx > 1 else key + combined_data[new_key] = value + return DataFrame([combined_data]) + + return DataFrame()