diff --git a/src/backend/base/langflow/components/processing/dataframe_operations.py b/src/backend/base/langflow/components/processing/dataframe_operations.py new file mode 100644 index 000000000..be7a13033 --- /dev/null +++ b/src/backend/base/langflow/components/processing/dataframe_operations.py @@ -0,0 +1,212 @@ +from langflow.custom import Component +from langflow.io import BoolInput, DataFrameInput, DropdownInput, IntInput, MessageTextInput, Output, StrInput +from langflow.schema import DataFrame + + +class DataFrameOperationsComponent(Component): + display_name = "DataFrame Operations" + description = "Perform various operations on a DataFrame." + icon = "table" + + # Available operations + OPERATION_CHOICES = [ + "Add Column", + "Drop Column", + "Filter", + "Head", + "Rename Column", + "Replace Value", + "Select Columns", + "Sort", + "Tail", + ] + + inputs = [ + DataFrameInput( + name="df", + display_name="DataFrame", + info="The input DataFrame to operate on.", + ), + DropdownInput( + name="operation", + display_name="Operation", + options=OPERATION_CHOICES, + info="Select the DataFrame operation to perform.", + real_time_refresh=True, + ), + StrInput( + name="column_name", + display_name="Column Name", + info="The column name to use for the operation.", + dynamic=True, + show=False, + ), + MessageTextInput( + name="filter_value", + display_name="Filter Value", + info="The value to filter rows by.", + dynamic=True, + show=False, + ), + BoolInput( + name="ascending", + display_name="Sort Ascending", + info="Whether to sort in ascending order.", + dynamic=True, + show=False, + value=True, + ), + StrInput( + name="new_column_name", + display_name="New Column Name", + info="The new column name when renaming or adding a column.", + dynamic=True, + show=False, + ), + MessageTextInput( + name="new_column_value", + display_name="New Column Value", + info="The value to populate the new column with.", + dynamic=True, + show=False, + ), + StrInput( + name="columns_to_select", + display_name="Columns to Select", + dynamic=True, + is_list=True, + show=False, + ), + IntInput( + name="num_rows", + display_name="Number of Rows", + info="Number of rows to return (for head/tail).", + dynamic=True, + show=False, + value=5, + ), + MessageTextInput( + name="replace_value", + display_name="Value to Replace", + info="The value to replace in the column.", + dynamic=True, + show=False, + ), + MessageTextInput( + name="replacement_value", + display_name="Replacement Value", + info="The value to replace with.", + dynamic=True, + show=False, + ), + ] + + outputs = [ + Output( + display_name="DataFrame", + name="output", + method="perform_operation", + info="The resulting DataFrame after the operation.", + ) + ] + + def update_build_config(self, build_config, field_value, field_name=None): + # Hide all dynamic fields by default + dynamic_fields = [ + "column_name", + "filter_value", + "ascending", + "new_column_name", + "new_column_value", + "columns_to_select", + "num_rows", + "replace_value", + "replacement_value", + ] + for field in dynamic_fields: + build_config[field]["show"] = False + + # Show relevant fields based on the selected operation + if field_name == "operation": + if field_value == "Filter": + build_config["column_name"]["show"] = True + build_config["filter_value"]["show"] = True + elif field_value == "Sort": + build_config["column_name"]["show"] = True + build_config["ascending"]["show"] = True + elif field_value == "Drop Column": + build_config["column_name"]["show"] = True + elif field_value == "Rename Column": + build_config["column_name"]["show"] = True + build_config["new_column_name"]["show"] = True + elif field_value == "Add Column": + build_config["new_column_name"]["show"] = True + build_config["new_column_value"]["show"] = True + elif field_value == "Select Columns": + build_config["columns_to_select"]["show"] = True + elif field_value in ["Head", "Tail"]: + build_config["num_rows"]["show"] = True + elif field_value == "Replace Value": + build_config["column_name"]["show"] = True + build_config["replace_value"]["show"] = True + build_config["replacement_value"]["show"] = True + + return build_config + + def perform_operation(self) -> DataFrame: + dataframe_copy = self.df.copy() + operation = self.operation + + if operation == "Filter": + return self.filter_rows_by_value(dataframe_copy) + if operation == "Sort": + return self.sort_by_column(dataframe_copy) + if operation == "Drop Column": + return self.drop_column(dataframe_copy) + if operation == "Rename Column": + return self.rename_column(dataframe_copy) + if operation == "Add Column": + return self.add_column(dataframe_copy) + if operation == "Select Columns": + return self.select_columns(dataframe_copy) + if operation == "Head": + return self.head(dataframe_copy) + if operation == "Tail": + return self.tail(dataframe_copy) + if operation == "Replace Value": + return self.replace_values(dataframe_copy) + msg = f"Unsupported operation: {operation}" + + raise ValueError(msg) + + # Existing methods + def filter_rows_by_value(self, df: DataFrame) -> DataFrame: + return DataFrame(df[df[self.column_name] == self.filter_value]) + + def sort_by_column(self, df: DataFrame) -> DataFrame: + return DataFrame(df.sort_values(by=self.column_name, ascending=self.ascending)) + + def drop_column(self, df: DataFrame) -> DataFrame: + return DataFrame(df.drop(columns=[self.column_name])) + + def rename_column(self, df: DataFrame) -> DataFrame: + return DataFrame(df.rename(columns={self.column_name: self.new_column_name})) + + def add_column(self, df: DataFrame) -> DataFrame: + df[self.new_column_name] = [self.new_column_value] * len(df) + return DataFrame(df) + + def select_columns(self, df: DataFrame) -> DataFrame: + columns = [col.strip() for col in self.columns_to_select] + return DataFrame(df[columns]) + + # New methods + def head(self, df: DataFrame) -> DataFrame: + return DataFrame(df.head(self.num_rows)) + + def tail(self, df: DataFrame) -> DataFrame: + return DataFrame(df.tail(self.num_rows)) + + def replace_values(self, df: DataFrame) -> DataFrame: + df[self.column_name] = df[self.column_name].replace(self.replace_value, self.replacement_value) + return DataFrame(df) diff --git a/src/backend/tests/unit/components/processing/__init__.py b/src/backend/tests/unit/components/processing/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/backend/tests/unit/components/processing/test_dataframe_operations.py b/src/backend/tests/unit/components/processing/test_dataframe_operations.py new file mode 100644 index 000000000..df8fdab22 --- /dev/null +++ b/src/backend/tests/unit/components/processing/test_dataframe_operations.py @@ -0,0 +1,84 @@ +import pandas as pd +import pytest +from langflow.components.processing.dataframe_operations import DataFrameOperationsComponent + + +@pytest.fixture +def sample_dataframe(): + data = {"A": [1, 2, 3, 4, 5], "B": [5, 4, 3, 2, 1], "C": ["a", "b", "c", "d", "e"]} + return pd.DataFrame(data) + + +@pytest.mark.parametrize( + ("operation", "expected_columns", "expected_values"), + [ + ("Add Column", ["A", "B", "C", "D"], [1, 5, "a", 10]), + ("Drop Column", ["A", "C"], None), + ("Filter", ["A", "B", "C"], [3, 3, "c"]), + ("Sort", ["A", "B", "C"], [5, 1, "e"]), + ("Rename Column", ["Z", "B", "C"], None), + ("Select Columns", ["A", "C"], None), + ("Head", ["A", "B", "C"], [1, 5, "a"]), + ("Tail", ["A", "B", "C"], [5, 1, "e"]), + ("Replace Value", ["A", "B", "C"], [1, 5, "z"]), + ], +) +def test_operations(sample_dataframe, operation, expected_columns, expected_values): + component = DataFrameOperationsComponent() + component.df = sample_dataframe + component.operation = operation + + if operation == "Add Column": + component.new_column_name = "D" + component.new_column_value = 10 + elif operation == "Drop Column": + component.column_name = "B" + elif operation == "Filter": + component.column_name = "A" + component.filter_value = 3 + elif operation == "Sort": + component.column_name = "A" + component.ascending = False + elif operation == "Rename Column": + component.column_name = "A" + component.new_column_name = "Z" + elif operation == "Select Columns": + component.columns_to_select = ["A", "C"] + elif operation in ("Head", "Tail"): + component.num_rows = 1 + elif operation == "Replace Value": + component.column_name = "C" + component.replace_value = "a" + component.replacement_value = "z" + + result = component.perform_operation() + + assert list(result.columns) == expected_columns + if expected_values is not None and isinstance(expected_values, list): + assert list(result.iloc[0]) == expected_values + + +def test_empty_dataframe(): + component = DataFrameOperationsComponent() + component.df = pd.DataFrame() + component.operation = "Head" + component.num_rows = 3 + result = component.perform_operation() + assert result.empty + + +def test_non_existent_column(): + component = DataFrameOperationsComponent() + component.df = pd.DataFrame({"A": [1, 2, 3]}) + component.operation = "Drop Column" + component.column_name = "B" + with pytest.raises(KeyError): + component.perform_operation() + + +def test_invalid_operation(): + component = DataFrameOperationsComponent() + component.df = pd.DataFrame({"A": [1, 2, 3]}) + component.operation = "Invalid Operation" + with pytest.raises(ValueError, match="Unsupported operation: Invalid Operation"): + component.perform_operation()