diff --git a/src/backend/base/langflow/components/deactivated/MergeData.py b/src/backend/base/langflow/components/deactivated/MergeData.py index 3d2ac582c..4d6b6083a 100644 --- a/src/backend/base/langflow/components/deactivated/MergeData.py +++ b/src/backend/base/langflow/components/deactivated/MergeData.py @@ -1,27 +1,94 @@ -from langflow.custom import CustomComponent +from loguru import logger + +from langflow.custom import Component +from langflow.io import DataInput, Output from langflow.schema import Data -class MergeDataComponent(CustomComponent): +class MergeDataComponent(Component): + """ + MergeDataComponent is responsible for combining multiple Data objects into a unified list of Data objects. + It ensures that all keys across the input Data objects are present in each merged Data object. + Missing keys are filled with empty strings to maintain consistency. + """ + display_name = "Merge Data" - description = "Merges data." - beta: bool = True - name = "MergeData" + description = ( + "Combines multiple Data objects into a unified list, ensuring all keys are present in each Data object." + ) + icon = "merge" - field_config = { - "data": {"display_name": "Data"}, - } + inputs = [ + DataInput( + name="data_inputs", + display_name="Data Inputs", + is_list=True, + info="A list of Data inputs objects to be merged.", + ), + ] - def build(self, data: list[Data]) -> Data: - if not data: - return Data() - if len(data) == 1: - return data[0] - merged_data = Data() - for value in data: - if merged_data is None: - merged_data = value - else: - merged_data += value - self.status = merged_data - return merged_data + outputs = [ + Output( + display_name="Merged Data", + name="merged_data", + method="merge_data", + ), + ] + + def merge_data(self) -> list[Data]: + """ + Merges multiple Data objects into a single list of Data objects. + Ensures that all keys from the input Data objects are present in each merged Data object. + Missing keys are filled with empty strings. + + Returns: + List[Data]: A list of merged Data objects with consistent keys. + """ + logger.info("Initiating the data merging process.") + + try: + data_inputs: list[Data] = self.data_inputs + logger.debug(f"Received {len(data_inputs)} data input(s) for merging.") + + if not data_inputs: + logger.warning("No data inputs provided. Returning an empty list.") + return [] + + # Collect all unique keys from all Data objects + all_keys: set[str] = set() + for idx, data_input in enumerate(data_inputs): + if not isinstance(data_input, Data): + error_message = f"Data input at index {idx} is not of type Data." + logger.error(error_message) + type_error_message = ( + "All items in data_inputs must be of type Data. " f"Item at index {idx} is {type(data_input)}" + ) + raise TypeError(type_error_message) + all_keys.update(data_input.data.keys()) + logger.debug(f"Collected {len(all_keys)} unique key(s) from input data.") + + # Create new list of Data objects with missing keys filled with empty strings + merged_data_list = [] + for idx, data_input in enumerate(data_inputs): + merged_data_dict = {} + + for key in all_keys: + # Use the existing value if the key exists, otherwise use an empty string + value = data_input.data.get(key, "") + if key not in data_input.data: + log_message = f"Key '{key}' missing in data input at index {idx}. " "Assigning empty string." + logger.debug(log_message) + merged_data_dict[key] = value + + merged_data = Data( + text_key=data_input.text_key, data=merged_data_dict, default_value=data_input.default_value + ) + merged_data_list.append(merged_data) + logger.debug(f"Merged Data object created for input at index {idx}.") + + logger.info("Data merging process completed successfully.") + return merged_data_list + + except Exception as e: + logger.exception("An error occurred during the data merging process.") + raise e diff --git a/src/backend/base/langflow/components/helpers/MergeData.py b/src/backend/base/langflow/components/helpers/MergeData.py index 9d4847f08..dd14aa1ef 100644 --- a/src/backend/base/langflow/components/helpers/MergeData.py +++ b/src/backend/base/langflow/components/helpers/MergeData.py @@ -1,27 +1,94 @@ -from langflow.custom import CustomComponent +from loguru import logger + +from langflow.custom import Component +from langflow.io import DataInput, Output from langflow.schema import Data -class MergeDataComponent(CustomComponent): +class MergeDataComponent(Component): + """ + MergeDataComponent is responsible for combining multiple Data objects into a unified list of Data objects. + It ensures that all keys across the input Data objects are present in each merged Data object. + Missing keys are filled with empty strings to maintain consistency. + """ + display_name = "Merge Data" - description = "Combines multiple data sources into a single unified Data object." - beta: bool = True - name = "MergeData" + description = ( + "Combines multiple Data objects into a unified list, ensuring all keys are present in each Data object." + ) + icon = "merge" - field_config = { - "data": {"display_name": "Data"}, - } + inputs = [ + DataInput( + name="data_inputs", + display_name="Data Inputs", + is_list=True, + info="A list of Data inputs objects to be merged.", + ), + ] - def build(self, data: list[Data]) -> Data: - if not data: - return Data() - if len(data) == 1: - return data[0] - merged_data = Data() - for value in data: - if merged_data is None: - merged_data = value - else: - merged_data += value - self.status = merged_data - return merged_data + outputs = [ + Output( + display_name="Merged Data", + name="merged_data", + method="merge_data", + ), + ] + + def merge_data(self) -> list[Data]: + """ + Merges multiple Data objects into a single list of Data objects. + Ensures that all keys from the input Data objects are present in each merged Data object. + Missing keys are filled with empty strings. + + Returns: + List[Data]: A list of merged Data objects with consistent keys. + """ + logger.info("Initiating the data merging process.") + + try: + data_inputs: list[Data] = self.data_inputs + logger.debug(f"Received {len(data_inputs)} data input(s) for merging.") + + if not data_inputs: + logger.warning("No data inputs provided. Returning an empty list.") + return [] + + # Collect all unique keys from all Data objects + all_keys: set[str] = set() + for idx, data_input in enumerate(data_inputs): + if not isinstance(data_input, Data): + error_message = f"Data input at index {idx} is not of type Data." + logger.error(error_message) + type_error_message = ( + "All items in data_inputs must be of type Data. " f"Item at index {idx} is {type(data_input)}" + ) + raise TypeError(type_error_message) + all_keys.update(data_input.data.keys()) + logger.debug(f"Collected {len(all_keys)} unique key(s) from input data.") + + # Create new list of Data objects with missing keys filled with empty strings + merged_data_list = [] + for idx, data_input in enumerate(data_inputs): + merged_data_dict = {} + + for key in all_keys: + # Use the existing value if the key exists, otherwise use an empty string + value = data_input.data.get(key, "") + if key not in data_input.data: + log_message = f"Key '{key}' missing in data input at index {idx}. " "Assigning empty string." + logger.debug(log_message) + merged_data_dict[key] = value + + merged_data = Data( + text_key=data_input.text_key, data=merged_data_dict, default_value=data_input.default_value + ) + merged_data_list.append(merged_data) + logger.debug("Merged Data object created for input at index: " + str(idx)) + + logger.info("Data merging process completed successfully.") + return merged_data_list + + except Exception as e: + logger.exception("An error occurred during the data merging process.") + raise e