diff --git a/src/backend/base/langflow/components/helpers/__init__.py b/src/backend/base/langflow/components/helpers/__init__.py index 176282a92..917b2c176 100644 --- a/src/backend/base/langflow/components/helpers/__init__.py +++ b/src/backend/base/langflow/components/helpers/__init__.py @@ -7,6 +7,7 @@ from .store_message import MessageStoreComponent from .structured_output import StructuredOutputComponent __all__ = [ + "BatchRunComponent", "CreateListComponent", "CurrentDateComponent", "IDGeneratorComponent", diff --git a/src/backend/base/langflow/components/helpers/batch_run.py b/src/backend/base/langflow/components/helpers/batch_run.py new file mode 100644 index 000000000..6c9140060 --- /dev/null +++ b/src/backend/base/langflow/components/helpers/batch_run.py @@ -0,0 +1,101 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from langflow.custom import Component +from langflow.io import DataFrameInput, HandleInput, MultilineInput, Output, StrInput +from langflow.schema import DataFrame + +if TYPE_CHECKING: + from langchain_core.runnables import Runnable + + +class BatchRunComponent(Component): + display_name = "Batch Run" + description = ( + "Runs a language model over each row of a DataFrame's text column and returns a new " + "DataFrame with two columns: 'text_input' (the original text) and 'model_response' " + "containing the model's response." + ) + icon = "List" + beta = True + + inputs = [ + HandleInput( + name="model", + display_name="Language Model", + info="Connect the 'Language Model' output from your LLM component here.", + input_types=["LanguageModel"], + ), + MultilineInput( + name="system_message", + display_name="System Message", + info="Multi-line system instruction for all rows in the DataFrame.", + required=False, + ), + DataFrameInput( + name="df", + display_name="DataFrame", + info="The DataFrame whose column (specified by 'column_name') we'll treat as text messages.", + ), + StrInput( + name="column_name", + display_name="Column Name", + info="The name of the DataFrame column to treat as text messages. Default='text'.", + value="text", + ), + ] + + outputs = [ + Output( + display_name="Batch Results", + name="batch_results", + method="run_batch", + info="A DataFrame with two columns: 'text_input' and 'model_response'.", + ), + ] + + async def run_batch(self) -> DataFrame: + """For each row in df[column_name], combine that text with system_message, then invoke the model asynchronously. + + Returns a new DataFrame of the same length, with columns 'text_input' and 'model_response'. + """ + model: Runnable = self.model + system_msg = self.system_message or "" + df: DataFrame = self.df + col_name = self.column_name or "text" + + if col_name not in df.columns: + msg = f"Column '{col_name}' not found in the DataFrame." + raise ValueError(msg) + + # Convert the specified column to a list of strings + user_texts = df[col_name].astype(str).tolist() + + # Prepare the batch of conversations + conversations = [ + [{"role": "system", "content": system_msg}, {"role": "user", "content": text}] + if system_msg + else [{"role": "user", "content": text}] + for text in user_texts + ] + model = model.with_config( + { + "run_name": self.display_name, + "project_name": self.get_project_name(), + "callbacks": self.get_langchain_callbacks(), + } + ) + + responses = await model.abatch(conversations) + + # Build the final data, each row has 'text_input' + 'model_response' + rows = [] + for original_text, response in zip(user_texts, responses, strict=False): + resp_text = response.content if hasattr(response, "content") else str(response) + + row = {"text_input": original_text, "model_response": resp_text} + rows.append(row) + + # Convert to a new DataFrame + return DataFrame(rows) # Langflow DataFrame from a list of dicts diff --git a/src/backend/tests/unit/components/helpers/test_batch_run_component.py b/src/backend/tests/unit/components/helpers/test_batch_run_component.py new file mode 100644 index 000000000..6f19a6e56 --- /dev/null +++ b/src/backend/tests/unit/components/helpers/test_batch_run_component.py @@ -0,0 +1,88 @@ +import re + +import pytest +from langflow.components.helpers.batch_run import BatchRunComponent +from langflow.schema import DataFrame + +from tests.base import ComponentTestBaseWithoutClient +from tests.unit.mock_language_model import MockLanguageModel + + +class TestBatchRunComponent(ComponentTestBaseWithoutClient): + @pytest.fixture + def component_class(self): + """Return the component class to test.""" + return BatchRunComponent + + @pytest.fixture + def default_kwargs(self): + """Return the default kwargs for the component.""" + return { + "model": MockLanguageModel(), + "df": DataFrame({"text": ["Hello"]}), + "column_name": "text", + } + + @pytest.fixture + def file_names_mapping(self): + """Return an empty list since this component doesn't have version-specific files.""" + return [] + + async def test_successful_batch_run_with_system_message(self): + # Create test data + test_df = DataFrame({"text": ["Hello", "World", "Test"]}) + + component = BatchRunComponent( + model=MockLanguageModel(), system_message="You are a helpful assistant", df=test_df, column_name="text" + ) + + # Run the batch process + result = await component.run_batch() + + # Verify the results + assert isinstance(result, DataFrame) + assert "text_input" in result.columns + assert "model_response" in result.columns + assert len(result) == 3 + assert all(isinstance(resp, str) for resp in result["model_response"]) + + async def test_batch_run_without_system_message(self): + test_df = DataFrame({"text": ["Hello", "World"]}) + + component = BatchRunComponent(model=MockLanguageModel(), df=test_df, column_name="text") + + result = await component.run_batch() + + assert isinstance(result, DataFrame) + assert len(result) == 2 + assert all(isinstance(resp, str) for resp in result["model_response"]) + + async def test_invalid_column_name(self): + component = BatchRunComponent( + model=MockLanguageModel(), df=DataFrame({"text": ["Hello"]}), column_name="nonexistent_column" + ) + + with pytest.raises(ValueError, match=re.escape("Column 'nonexistent_column' not found in the DataFrame.")): + await component.run_batch() + + async def test_empty_dataframe(self): + component = BatchRunComponent(model=MockLanguageModel(), df=DataFrame({"text": []}), column_name="text") + + result = await component.run_batch() + assert isinstance(result, DataFrame) + assert len(result) == 0 + + async def test_non_string_column_conversion(self): + test_df = DataFrame( + { + "text": [123, 456, 789] # Numeric values + } + ) + + component = BatchRunComponent(model=MockLanguageModel(), df=test_df, column_name="text") + + result = await component.run_batch() + + assert isinstance(result, DataFrame) + assert all(isinstance(text, str) for text in result["text_input"]) + assert all(str(num) in text for num, text in zip(test_df["text"], result["text_input"], strict=False)) diff --git a/src/backend/tests/unit/components/helpers/test_structured_output_component.py b/src/backend/tests/unit/components/helpers/test_structured_output_component.py index 14d028235..be2d07d79 100644 --- a/src/backend/tests/unit/components/helpers/test_structured_output_component.py +++ b/src/backend/tests/unit/components/helpers/test_structured_output_component.py @@ -8,7 +8,7 @@ from langflow.inputs.inputs import TableInput from langflow.schema.data import Data from pydantic import BaseModel -from tests.unit.useful import MockLanguageModel +from tests.unit.mock_language_model import MockLanguageModel class TestStructuredOutputComponent: diff --git a/src/backend/tests/unit/useful.py b/src/backend/tests/unit/mock_language_model.py similarity index 100% rename from src/backend/tests/unit/useful.py rename to src/backend/tests/unit/mock_language_model.py