fix: refactoring Merge Data component (#4059)

* fix: refactoring Merge Data component to properly mesrge different Data objects

* fix: refactoring Merge Data component to properly mesrge different Data objects

* fix: refactoring MergeData active component
This commit is contained in:
João 2024-10-08 18:40:45 -03:00 committed by GitHub
commit 04fa6351a2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 176 additions and 42 deletions

View file

@ -1,27 +1,94 @@
from langflow.custom import CustomComponent
from loguru import logger
from langflow.custom import Component
from langflow.io import DataInput, Output
from langflow.schema import Data
class MergeDataComponent(CustomComponent):
class MergeDataComponent(Component):
"""
MergeDataComponent is responsible for combining multiple Data objects into a unified list of Data objects.
It ensures that all keys across the input Data objects are present in each merged Data object.
Missing keys are filled with empty strings to maintain consistency.
"""
display_name = "Merge Data"
description = "Merges data."
beta: bool = True
name = "MergeData"
description = (
"Combines multiple Data objects into a unified list, ensuring all keys are present in each Data object."
)
icon = "merge"
field_config = {
"data": {"display_name": "Data"},
}
inputs = [
DataInput(
name="data_inputs",
display_name="Data Inputs",
is_list=True,
info="A list of Data inputs objects to be merged.",
),
]
def build(self, data: list[Data]) -> Data:
if not data:
return Data()
if len(data) == 1:
return data[0]
merged_data = Data()
for value in data:
if merged_data is None:
merged_data = value
else:
merged_data += value
self.status = merged_data
return merged_data
outputs = [
Output(
display_name="Merged Data",
name="merged_data",
method="merge_data",
),
]
def merge_data(self) -> list[Data]:
"""
Merges multiple Data objects into a single list of Data objects.
Ensures that all keys from the input Data objects are present in each merged Data object.
Missing keys are filled with empty strings.
Returns:
List[Data]: A list of merged Data objects with consistent keys.
"""
logger.info("Initiating the data merging process.")
try:
data_inputs: list[Data] = self.data_inputs
logger.debug(f"Received {len(data_inputs)} data input(s) for merging.")
if not data_inputs:
logger.warning("No data inputs provided. Returning an empty list.")
return []
# Collect all unique keys from all Data objects
all_keys: set[str] = set()
for idx, data_input in enumerate(data_inputs):
if not isinstance(data_input, Data):
error_message = f"Data input at index {idx} is not of type Data."
logger.error(error_message)
type_error_message = (
"All items in data_inputs must be of type Data. " f"Item at index {idx} is {type(data_input)}"
)
raise TypeError(type_error_message)
all_keys.update(data_input.data.keys())
logger.debug(f"Collected {len(all_keys)} unique key(s) from input data.")
# Create new list of Data objects with missing keys filled with empty strings
merged_data_list = []
for idx, data_input in enumerate(data_inputs):
merged_data_dict = {}
for key in all_keys:
# Use the existing value if the key exists, otherwise use an empty string
value = data_input.data.get(key, "")
if key not in data_input.data:
log_message = f"Key '{key}' missing in data input at index {idx}. " "Assigning empty string."
logger.debug(log_message)
merged_data_dict[key] = value
merged_data = Data(
text_key=data_input.text_key, data=merged_data_dict, default_value=data_input.default_value
)
merged_data_list.append(merged_data)
logger.debug(f"Merged Data object created for input at index {idx}.")
logger.info("Data merging process completed successfully.")
return merged_data_list
except Exception as e:
logger.exception("An error occurred during the data merging process.")
raise e

View file

@ -1,27 +1,94 @@
from langflow.custom import CustomComponent
from loguru import logger
from langflow.custom import Component
from langflow.io import DataInput, Output
from langflow.schema import Data
class MergeDataComponent(CustomComponent):
class MergeDataComponent(Component):
"""
MergeDataComponent is responsible for combining multiple Data objects into a unified list of Data objects.
It ensures that all keys across the input Data objects are present in each merged Data object.
Missing keys are filled with empty strings to maintain consistency.
"""
display_name = "Merge Data"
description = "Combines multiple data sources into a single unified Data object."
beta: bool = True
name = "MergeData"
description = (
"Combines multiple Data objects into a unified list, ensuring all keys are present in each Data object."
)
icon = "merge"
field_config = {
"data": {"display_name": "Data"},
}
inputs = [
DataInput(
name="data_inputs",
display_name="Data Inputs",
is_list=True,
info="A list of Data inputs objects to be merged.",
),
]
def build(self, data: list[Data]) -> Data:
if not data:
return Data()
if len(data) == 1:
return data[0]
merged_data = Data()
for value in data:
if merged_data is None:
merged_data = value
else:
merged_data += value
self.status = merged_data
return merged_data
outputs = [
Output(
display_name="Merged Data",
name="merged_data",
method="merge_data",
),
]
def merge_data(self) -> list[Data]:
"""
Merges multiple Data objects into a single list of Data objects.
Ensures that all keys from the input Data objects are present in each merged Data object.
Missing keys are filled with empty strings.
Returns:
List[Data]: A list of merged Data objects with consistent keys.
"""
logger.info("Initiating the data merging process.")
try:
data_inputs: list[Data] = self.data_inputs
logger.debug(f"Received {len(data_inputs)} data input(s) for merging.")
if not data_inputs:
logger.warning("No data inputs provided. Returning an empty list.")
return []
# Collect all unique keys from all Data objects
all_keys: set[str] = set()
for idx, data_input in enumerate(data_inputs):
if not isinstance(data_input, Data):
error_message = f"Data input at index {idx} is not of type Data."
logger.error(error_message)
type_error_message = (
"All items in data_inputs must be of type Data. " f"Item at index {idx} is {type(data_input)}"
)
raise TypeError(type_error_message)
all_keys.update(data_input.data.keys())
logger.debug(f"Collected {len(all_keys)} unique key(s) from input data.")
# Create new list of Data objects with missing keys filled with empty strings
merged_data_list = []
for idx, data_input in enumerate(data_inputs):
merged_data_dict = {}
for key in all_keys:
# Use the existing value if the key exists, otherwise use an empty string
value = data_input.data.get(key, "")
if key not in data_input.data:
log_message = f"Key '{key}' missing in data input at index {idx}. " "Assigning empty string."
logger.debug(log_message)
merged_data_dict[key] = value
merged_data = Data(
text_key=data_input.text_key, data=merged_data_dict, default_value=data_input.default_value
)
merged_data_list.append(merged_data)
logger.debug("Merged Data object created for input at index: " + str(idx))
logger.info("Data merging process completed successfully.")
return merged_data_list
except Exception as e:
logger.exception("An error occurred during the data merging process.")
raise e