From 4a195f73c581c39d19ab7b4c01aeb68b5814c604 Mon Sep 17 00:00:00 2001 From: Edwin Jose Date: Fri, 16 May 2025 10:43:39 -0400 Subject: [PATCH] feat: Add new data operations component (#7755) * new data ops component * update actions * component add on feature * add copy of old data processing * Delete data_operations copy.py * [autofix.ci] apply automated fixes * lint and format error fixes * [autofix.ci] apply automated fixes * add test for the Processing component * Update data_operations.py * Update data_operations.py * [autofix.ci] apply automated fixes * Update data_operations.py * Update data_operations.py * Update data_operations.py * Update data_operations.py * update tests --------- Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- .../components/processing/__init__.py | 2 + .../components/processing/data_operations.py | 435 ++++++++++++++++++ src/backend/base/langflow/inputs/inputs.py | 1 + .../base/langflow/utils/component_utils.py | 24 +- .../test_data_operations_component.py | 193 ++++++++ 5 files changed, 646 insertions(+), 9 deletions(-) create mode 100644 src/backend/base/langflow/components/processing/data_operations.py create mode 100644 src/backend/tests/unit/components/processing/test_data_operations_component.py diff --git a/src/backend/base/langflow/components/processing/__init__.py b/src/backend/base/langflow/components/processing/__init__.py index de249865e..f947d7341 100644 --- a/src/backend/base/langflow/components/processing/__init__.py +++ b/src/backend/base/langflow/components/processing/__init__.py @@ -1,6 +1,7 @@ from .alter_metadata import AlterMetadataComponent from .combine_text import CombineTextComponent from .create_data import CreateDataComponent +from .data_operations import DataOperationsComponent from .extract_key import ExtractDataKeyComponent from .filter_data_values import DataFilterComponent from .json_cleaner import JSONCleaner @@ -21,6 +22,7 @@ __all__ = [ "CombineTextComponent", "CreateDataComponent", "DataFilterComponent", + "DataOperationsComponent", "ExtractDataKeyComponent", "JSONCleaner", "LLMRouterComponent", diff --git a/src/backend/base/langflow/components/processing/data_operations.py b/src/backend/base/langflow/components/processing/data_operations.py new file mode 100644 index 000000000..852429fbf --- /dev/null +++ b/src/backend/base/langflow/components/processing/data_operations.py @@ -0,0 +1,435 @@ +import ast +from typing import TYPE_CHECKING, Any + +from langflow.custom import Component +from langflow.inputs import DictInput, DropdownInput, MessageTextInput, SortableListInput +from langflow.io import DataInput, Output +from langflow.logging import logger +from langflow.schema import Data +from langflow.schema.dotdict import dotdict +from langflow.utils.component_utils import set_current_fields + +if TYPE_CHECKING: + from collections.abc import Callable + +ACTION_CONFIG = { + "Select Keys": {"is_list": False, "log_msg": "setting filter fields"}, + "Literal Eval": {"is_list": False, "log_msg": "setting evaluate fields"}, + "Combine": {"is_list": True, "log_msg": "setting combine fields"}, + "Filter Values": {"is_list": False, "log_msg": "setting filter values fields"}, + "Append or Update": {"is_list": False, "log_msg": "setting Append or Update fields"}, + "Remove Keys": {"is_list": False, "log_msg": "setting remove keys fields"}, + "Rename Keys": {"is_list": False, "log_msg": "setting rename keys fields"}, +} +OPERATORS = { + "equals": lambda a, b: str(a) == str(b), + "not equals": lambda a, b: str(a) != str(b), + "contains": lambda a, b: str(b) in str(a), + "starts with": lambda a, b: str(a).startswith(str(b)), + "ends with": lambda a, b: str(a).endswith(str(b)), +} + + +class DataOperationsComponent(Component): + display_name = "Data Operations" + description = "Perform various operations on a Data object." + icon = "file-json" + name = "DataOperations" + default_keys = ["operations", "data"] + metadata = { + "keywords": [ + "data", + "operations", + "filter values", + "Append or Update", + "remove keys", + "rename keys", + "select keys", + "literal eval", + "combine", + "filter", + "append", + "update", + "remove", + "rename", + "data operations", + "data manipulation", + "data transformation", + "data filtering", + "data selection", + "data combination", + ], + } + actions_data = { + "Select Keys": ["select_keys_input", "operations"], + "Literal Eval": [], + "Combine": [], + "Filter Values": ["filter_values", "operations", "operator", "filter_key"], + "Append or Update": ["append_update_data", "operations"], + "Remove Keys": ["remove_keys_input", "operations"], + "Rename Keys": ["rename_keys_input", "operations"], + } + + inputs = [ + DataInput(name="data", display_name="Data", info="Data object to filter.", required=True, is_list=True), + SortableListInput( + name="operations", + display_name="Operations", + placeholder="Select Operation", + info="List of operations to perform on the data.", + options=[ + {"name": "Select Keys", "icon": "lasso-select"}, + {"name": "Literal Eval", "icon": "braces"}, + {"name": "Combine", "icon": "merge"}, + {"name": "Filter Values", "icon": "filter"}, + {"name": "Append or Update", "icon": "circle-plus"}, + {"name": "Remove Keys", "icon": "eraser"}, + {"name": "Rename Keys", "icon": "pencil-line"}, + ], + real_time_refresh=True, + limit=1, + ), + # select keys inputs + MessageTextInput( + name="select_keys_input", + display_name="Select Keys", + info="List of keys to select from the data.", + show=False, + is_list=True, + ), + # filter values inputs + MessageTextInput( + name="filter_key", + display_name="Filter Key", + info="Key to filter by.", + is_list=True, + show=False, + ), + DropdownInput( + name="operator", + display_name="Comparison Operator", + options=["equals", "not equals", "contains", "starts with", "ends with"], + info="The operator to apply for comparing the values.", + value="equals", + advanced=False, + show=False, + ), + DictInput( + name="filter_values", + display_name="Filter Values", + info="List of values to filter by.", + show=False, + is_list=True, + ), + # update/ Append data inputs + DictInput( + name="append_update_data", + display_name="Append or Update", + info="Data to Append or Updatethe existing data with.", + show=False, + value={"key": "value"}, + is_list=True, + ), + # remove keys inputs + MessageTextInput( + name="remove_keys_input", + display_name="Remove Keys", + info="List of keys to remove from the data.", + show=False, + is_list=True, + ), + # rename keys inputs + DictInput( + name="rename_keys_input", + display_name="Rename Keys", + info="List of keys to rename in the data.", + show=False, + is_list=True, + value={"old_key": "new_key"}, + ), + ] + outputs = [ + Output(display_name="Data", name="data_output", method="as_data"), + ] + + # Helper methods for data operations + def get_data_dict(self) -> dict: + """Extract data dictionary from Data object.""" + # TODO: rasie error if it s list of data objects + data = self.data[0] if isinstance(self.data, list) and len(self.data) == 1 else self.data + return data.model_dump() + + def get_normalized_data(self) -> dict: + """Get normalized data dictionary, handling the 'data' key if present.""" + data_dict = self.get_data_dict() + return data_dict.get("data", data_dict) + + def data_is_list(self) -> bool: + """Check if data contains multiple items.""" + return isinstance(self.data, list) and len(self.data) > 1 + + def validate_single_data(self, operation: str) -> None: + """Validate that the operation is being performed on a single data object.""" + if self.data_is_list(): + msg = f"{operation} operation is not supported for multiple data objects." + raise ValueError(msg) + + def operation_exception(self, operations: list[str]) -> None: + """Raise exception for incompatible operations.""" + msg = f"{operations} operations are not supported in combination with each other." + raise ValueError(msg) + + # Data transformation operations + def select_keys(self, evaluate: bool | None = None) -> Data: + """Select specific keys from the data dictionary.""" + self.validate_single_data("Select Keys") + data_dict = self.get_normalized_data() + filter_criteria: list[str] = self.select_keys_input + + # Filter the data + if len(filter_criteria) == 1 and filter_criteria[0] == "data": + filtered = data_dict["data"] + else: + if not all(key in data_dict for key in filter_criteria): + msg = f"Select key not found in data. Available keys: {list(data_dict.keys())}" + raise ValueError(msg) + filtered = {key: value for key, value in data_dict.items() if key in filter_criteria} + + # Create a new Data object with the filtered data + if evaluate: + filtered = self.recursive_eval(filtered) + + # Return a new Data object with the filtered data directly in the data attribute + return Data(data=filtered) + + def remove_keys(self) -> Data: + """Remove specified keys from the data dictionary.""" + self.validate_single_data("Remove Keys") + data_dict = self.get_normalized_data() + remove_keys_input: list[str] = self.remove_keys_input + + for key in remove_keys_input: + if key in data_dict: + data_dict.pop(key) + else: + logger.warning(f"Key '{key}' not found in data. Skipping removal.") + + return Data(**data_dict) + + def rename_keys(self) -> Data: + """Rename keys in the data dictionary.""" + self.validate_single_data("Rename Keys") + data_dict = self.get_normalized_data() + rename_keys_input: dict[str, str] = self.rename_keys_input + + for old_key, new_key in rename_keys_input.items(): + if old_key in data_dict: + data_dict[new_key] = data_dict[old_key] + data_dict.pop(old_key) + else: + msg = f"Key '{old_key}' not found in data. Skipping rename." + raise ValueError(msg) + + return Data(**data_dict) + + def recursive_eval(self, data: Any) -> Any: + """Recursively evaluate string values in a dictionary or list. + + If the value is a string that can be evaluated, it will be evaluated. + Otherwise, the original value is returned. + """ + if isinstance(data, dict): + return {k: self.recursive_eval(v) for k, v in data.items()} + if isinstance(data, list): + return [self.recursive_eval(item) for item in data] + if isinstance(data, str): + try: + # Only attempt to evaluate strings that look like Python literals + if ( + data.strip().startswith(("{", "[", "(", "'", '"')) + or data.strip().lower() in ("true", "false", "none") + or data.strip().replace(".", "").isdigit() + ): + return ast.literal_eval(data) + # return data + except (ValueError, SyntaxError, TypeError, MemoryError): + # If evaluation fails for any reason, return the original string + return data + else: + return data + return data + + def evaluate_data(self) -> Data: + """Evaluate string values in the data dictionary.""" + self.validate_single_data("Literal Eval") + logger.info("evaluating data") + return Data(**self.recursive_eval(self.get_data_dict())) + + def combine_data(self, evaluate: bool | None = None) -> Data: + """Combine multiple data objects into one.""" + logger.info("combining data") + if not self.data_is_list(): + return self.data[0] if self.data else Data(data={}) + + if len(self.data) == 1: + msg = "Combine operation requires multiple data inputs." + raise ValueError(msg) + + data_dicts = [data.model_dump().get("data", data.model_dump()) for data in self.data] + combined_data = {} + + for data_dict in data_dicts: + for key, value in data_dict.items(): + if key not in combined_data: + combined_data[key] = value + elif isinstance(combined_data[key], list): + if isinstance(value, list): + combined_data[key].extend(value) + else: + combined_data[key].append(value) + else: + # If current value is not a list, convert it to list and add new value + combined_data[key] = ( + [combined_data[key], value] if not isinstance(value, list) else [combined_data[key], *value] + ) + + if evaluate: + combined_data = self.recursive_eval(combined_data) + + return Data(**combined_data) + + def compare_values(self, item_value: Any, filter_value: str, operator: str) -> bool: + """Compare values based on the specified operator.""" + comparison_func = OPERATORS.get(operator) + if comparison_func: + return comparison_func(item_value, filter_value) + return False + + def filter_data(self, input_data: list[dict[str, Any]], filter_key: str, filter_value: str, operator: str) -> list: + """Filter list data based on key, value, and operator.""" + # Validate inputs + if not input_data: + self.status = "Input data is empty." + return [] + + if not filter_key or not filter_value: + self.status = "Filter key or value is missing." + return input_data + + # Filter the data + filtered_data = [] + for item in input_data: + if isinstance(item, dict) and filter_key in item: + if self.compare_values(item[filter_key], filter_value, operator): + filtered_data.append(item) + else: + self.status = f"Warning: Some items don't have the key '{filter_key}' or are not dictionaries." + + return filtered_data + + def multi_filter_data(self) -> Data: + """Apply multiple filters to the data.""" + self.validate_single_data("Filter Values") + data_filtered = self.get_normalized_data() + + for filter_key in self.filter_key: + if filter_key not in data_filtered: + msg = f"Filter key '{filter_key}' not found in data. Available keys: {list(data_filtered.keys())}" + raise ValueError(msg) + + if isinstance(data_filtered[filter_key], list): + for filter_data in self.filter_values: + filter_value = self.filter_values.get(filter_data) + if filter_value is not None: + data_filtered[filter_key] = self.filter_data( + input_data=data_filtered[filter_key], + filter_key=filter_data, + filter_value=filter_value, + operator=self.operator, + ) + else: + msg = f"Filter key '{filter_key}' is not a list." + raise TypeError(msg) + + return Data(**data_filtered) + + def append_update(self) -> Data: + """Append or Update with new key-value pairs.""" + self.validate_single_data("Append or Update") + data_filtered = self.get_normalized_data() + + for key, value in self.append_update_data.items(): + data_filtered[key] = value + + return Data(**data_filtered) + + # Configuration and execution methods + def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None) -> dotdict: + """Update build configuration based on selected action.""" + if field_name != "operations": + return build_config + + build_config["operations"]["value"] = field_value + selected_actions = [action["name"] for action in field_value] + + # Handle single action case + if len(selected_actions) == 1 and selected_actions[0] in ACTION_CONFIG: + action = selected_actions[0] + config = ACTION_CONFIG[action] + + build_config["data"]["is_list"] = config["is_list"] + logger.info(config["log_msg"]) + + return set_current_fields( + build_config=build_config, + action_fields=self.actions_data, + selected_action=action, + default_fields=self.default_keys, + ) + + # Handle no operations case + if not selected_actions: + logger.info("setting default fields") + return set_current_fields( + build_config=build_config, + action_fields=self.actions_data, + selected_action=None, + default_fields=self.default_keys, + ) + + return build_config + + def as_data(self) -> Data: + """Execute the selected action on the data.""" + if not hasattr(self, "operations") or not self.operations: + return Data(data={}) + + selected_actions = [action["name"] for action in self.operations] + logger.info(f"selected_actions: {selected_actions}") + + # Only handle single action case for now + if len(selected_actions) != 1: + return Data(data={}) + + action = selected_actions[0] + + # Explicitly type the action_map + action_map: dict[str, Callable[[], Data]] = { + "Select Keys": self.select_keys, + "Literal Eval": self.evaluate_data, + "Combine": self.combine_data, + "Filter Values": self.multi_filter_data, + "Append or Update": self.append_update, + "Remove Keys": self.remove_keys, + "Rename Keys": self.rename_keys, + } + + handler: Callable[[], Data] | None = action_map.get(action) + if handler: + try: + return handler() + except Exception as e: + logger.error(f"Error executing {action}: {e!s}") + raise + + return Data(data={}) diff --git a/src/backend/base/langflow/inputs/inputs.py b/src/backend/base/langflow/inputs/inputs.py index 1255b42de..04bff2b32 100644 --- a/src/backend/base/langflow/inputs/inputs.py +++ b/src/backend/base/langflow/inputs/inputs.py @@ -458,6 +458,7 @@ class DictInput(BaseInputMixin, ListableInputMixin, InputTraceMixin, ToolModeMix """ field_type: SerializableFieldTypes = FieldTypes.DICT + # value: dict | None = {"key": "value"} # Note do not set value to an empty dict, it will break the component in dynamic update build config # value: dict | None = {} value: dict = Field(default_factory=dict) diff --git a/src/backend/base/langflow/utils/component_utils.py b/src/backend/base/langflow/utils/component_utils.py index 6740bab99..6f9c2ee4e 100644 --- a/src/backend/base/langflow/utils/component_utils.py +++ b/src/backend/base/langflow/utils/component_utils.py @@ -68,10 +68,10 @@ def set_multiple_field_display( """Set display property for multiple fields at once.""" if fields is not None: for field, visibility in fields.items(): - set_field_display(build_config, field, visibility) + build_config = set_field_display(build_config, field, visibility) elif field_list is not None: for field in field_list: - set_field_display(build_config, field, is_visible) + build_config = set_field_display(build_config, field, is_visible) return build_config @@ -91,10 +91,10 @@ def set_multiple_field_advanced( """Set advanced property for multiple fields at once.""" if fields is not None: for field, advanced in fields.items(): - set_field_advanced(build_config, field, advanced) + build_config = set_field_advanced(build_config, field, advanced) elif field_list is not None: for field in field_list: - set_field_advanced(build_config, field, is_advanced) + build_config = set_field_advanced(build_config, field, is_advanced) return build_config @@ -114,7 +114,7 @@ def merge_build_configs(base_config: dotdict, override_config: dotdict) -> dotdi def set_current_fields( build_config: dotdict, action_fields: dict[str, list[str]], - selected_action: str, + selected_action: str | None = None, default_fields: list[str] = DEFAULT_FIELDS, ) -> dotdict: """Set the current fields for a selected action.""" @@ -122,10 +122,16 @@ def set_current_fields( # we need to show action of one field and disable the rest if selected_action in action_fields: for field in action_fields[selected_action]: - set_field_display(build_config=build_config, field=field, is_visible=True) - for field in action_fields[selected_action]: - set_field_display(build_config=build_config, field=field, is_visible=False) + build_config = set_field_display(build_config=build_config, field=field, is_visible=True) + for key, value in action_fields.items(): + if key != selected_action: + for field in value: + build_config = set_field_display(build_config=build_config, field=field, is_visible=False) + if selected_action is None: + for value in action_fields.values(): + for field in value: + build_config = set_field_display(build_config=build_config, field=field, is_visible=False) if default_fields is not None: for field in default_fields: - set_field_display(build_config=build_config, field=field, is_visible=True) + build_config = set_field_display(build_config=build_config, field=field, is_visible=True) return build_config diff --git a/src/backend/tests/unit/components/processing/test_data_operations_component.py b/src/backend/tests/unit/components/processing/test_data_operations_component.py new file mode 100644 index 000000000..aa873dfc3 --- /dev/null +++ b/src/backend/tests/unit/components/processing/test_data_operations_component.py @@ -0,0 +1,193 @@ +import pytest +from langflow.components.processing.data_operations import DataOperationsComponent +from langflow.schema import Data + +from tests.base import ComponentTestBaseWithoutClient + + +class TestDataOperationsComponent(ComponentTestBaseWithoutClient): + @pytest.fixture + def component_class(self): + """Return the component class to test.""" + return DataOperationsComponent + + @pytest.fixture + def default_kwargs(self): + """Return the default kwargs for the component.""" + return { + "data": Data(data={"key1": "value1", "key2": "value2", "key3": "value3"}), + "actions": [{"name": "Select Keys"}], + "select_keys_input": ["key1", "key2"], + } + + @pytest.fixture + def file_names_mapping(self): + """Return an empty list since this component doesn't have version-specific files.""" + return [] + + def test_select_keys(self): + """Test the Select Keys operation.""" + component = DataOperationsComponent( + data=Data(data={"key1": "value1", "key2": "value2", "key3": "value3"}), + operations=[{"name": "Select Keys"}], + select_keys_input=["key1", "key2"], + ) + + result = component.as_data() + assert isinstance(result, Data) + assert "key1" in result.data + assert "key2" in result.data + assert "key3" not in result.data + assert result.data["key1"] == "value1" + assert result.data["key2"] == "value2" + + def test_remove_keys(self): + """Test the Remove Keys operation.""" + component = DataOperationsComponent( + data=Data(data={"key1": "value1", "key2": "value2", "key3": "value3"}), + operations=[{"name": "Remove Keys"}], + remove_keys_input=["key3"], + ) + + result = component.as_data() + assert isinstance(result, Data) + assert "key1" in result.data + assert "key2" in result.data + assert "key3" not in result.data + + def test_rename_keys(self): + """Test the Rename Keys operation.""" + component = DataOperationsComponent( + data=Data(data={"key1": "value1", "key2": "value2"}), + operations=[{"name": "Rename Keys"}], + rename_keys_input={"key1": "new_key1"}, + ) + + result = component.as_data() + assert isinstance(result, Data) + assert "new_key1" in result.data + assert "key1" not in result.data + assert result.data["new_key1"] == "value1" + + def test_literal_eval(self): + """Test the Literal Eval operation.""" + component = DataOperationsComponent( + data=Data(data={"list_as_string": "[1, 2, 3]", "dict_as_string": "{'a': 1, 'b': 2}"}), + operations=[{"name": "Literal Eval"}], + ) + + result = component.as_data() + assert isinstance(result, Data) + assert isinstance(result.data["list_as_string"], list) + assert result.data["list_as_string"] == [1, 2, 3] + assert isinstance(result.data["dict_as_string"], dict) + assert result.data["dict_as_string"] == {"a": 1, "b": 2} + + def test_combine(self): + """Test the Combine operation.""" + data1 = Data(data={"key1": "value1"}) + data2 = Data(data={"key2": "value2"}) + + component = DataOperationsComponent( + data=[data1, data2], + operations=[{"name": "Combine"}], + ) + + result = component.as_data() + assert isinstance(result, Data) + assert "key1" in result.data + assert "key2" in result.data + assert result.data["key1"] == "value1" + assert result.data["key2"] == "value2" + + def test_combine_with_overlapping_keys(self): + """Test the Combine operation with overlapping keys.""" + data1 = Data(data={"common_key": "value1", "key1": "value1"}) + data2 = Data(data={"common_key": "value2", "key2": "value2"}) + + component = DataOperationsComponent( + data=[data1, data2], + operations=[{"name": "Combine"}], + ) + + result = component.as_data() + assert isinstance(result, Data) + assert result.data["common_key"] == ["value1", "value2"] # Combined string values + assert result.data["key1"] == "value1" + assert result.data["key2"] == "value2" + + def test_append_update(self): + """Test the Append or Update Data operation.""" + component = DataOperationsComponent( + data=Data(data={"existing_key": "existing_value"}), + operations=[{"name": "Append or Update"}], + append_update_data={"new_key": "new_value", "existing_key": "updated_value"}, + ) + + result = component.as_data() + assert isinstance(result, Data) + assert result.data["existing_key"] == "updated_value" + assert result.data["new_key"] == "new_value" + + def test_filter_values(self): + """Test the Filter Values operation.""" + nested_data = { + "items": [ + {"id": 1, "name": "Item 1"}, + {"id": 2, "name": "Item 2"}, + {"id": 3, "name": "Different Item"}, + ] + } + + component = DataOperationsComponent( + data=Data(data=nested_data), + operations=[{"name": "Filter Values"}], + filter_key=["items"], + filter_values={"name": "Item"}, + operator="contains", + ) + + result = component.as_data() + assert isinstance(result, Data) + assert len(result.data["items"]) == 3 + assert result.data["items"][0]["id"] == 1 + assert result.data["items"][1]["id"] == 2 + + def test_no_actions(self): + """Test behavior when no actions are specified.""" + component = DataOperationsComponent( + data=Data(data={"key1": "value1"}), + operations=[], + ) + + result = component.as_data() + assert isinstance(result, Data) + assert result.data == {} + + def test_get_normalized_data(self): + """Test the get_normalized_data helper method.""" + component = DataOperationsComponent( + data=Data(data={"key1": "value1"}), + operations=[], + ) + + # Add data under the "data" key + component.data = Data(data={"test": {"key2": "value2"}}) + normalized = component.get_normalized_data() + assert normalized == {"test": {"key2": "value2"}} + + # Test without the "data" key + component.data = Data(data={"key3": "value3"}) + normalized = component.get_normalized_data() + assert normalized == {"key3": "value3"} + + def test_validate_single_data_with_multiple_data(self): + """Test that operations that don't support multiple data objects raise an error.""" + component = DataOperationsComponent( + data=[Data(data={"key1": "value1"}), Data(data={"key2": "value2"})], + operations=[{"name": "Select Keys"}], + select_keys_input=["key1"], + ) + + with pytest.raises(ValueError, match="Select Keys operation is not supported for multiple data objects"): + component.as_data()