feat: add batch run beta component (#5489)

* feat: add batch run beta component

* [autofix.ci] apply automated fixes

* Update batch_run to run async using Runnable

* [autofix.ci] apply automated fixes

* refactor: streamline BatchRunComponent by removing unused synchronous model invocation and enhancing async processing

- Consolidated imports and improved type checking for LanguageModel.
- Simplified the run_batch method by directly using the model's asynchronous capabilities.
- Enhanced error handling for missing DataFrame columns.
- Cleaned up comments and improved code readability.

* refactor: update BatchRunComponent to use Runnable for improved async processing

- Added future annotations for better type hinting.
- Replaced LanguageModel with Runnable in type checking and method implementation.
- Enhanced code clarity and maintainability by consolidating imports.

* test: add unit tests for BatchRunComponent functionality

- Introduced a new test suite for BatchRunComponent to validate its behavior.
- Added tests for successful batch runs with and without system messages.
- Implemented tests for handling invalid column names and empty DataFrames.
- Included a test to ensure non-string columns are converted to strings during processing.

* refactor: rename useful.py to mock_language_model.py and update imports

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: Gabriel Luiz Freitas Almeida <gabriel@langflow.org>
This commit is contained in:
Rodrigo Nader 2025-01-08 16:49:11 -03:00 committed by GitHub
commit 75c3c1ce97
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 191 additions and 1 deletions

View file

@ -7,6 +7,7 @@ from .store_message import MessageStoreComponent
from .structured_output import StructuredOutputComponent
__all__ = [
"BatchRunComponent",
"CreateListComponent",
"CurrentDateComponent",
"IDGeneratorComponent",

View file

@ -0,0 +1,101 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from langflow.custom import Component
from langflow.io import DataFrameInput, HandleInput, MultilineInput, Output, StrInput
from langflow.schema import DataFrame
if TYPE_CHECKING:
from langchain_core.runnables import Runnable
class BatchRunComponent(Component):
display_name = "Batch Run"
description = (
"Runs a language model over each row of a DataFrame's text column and returns a new "
"DataFrame with two columns: 'text_input' (the original text) and 'model_response' "
"containing the model's response."
)
icon = "List"
beta = True
inputs = [
HandleInput(
name="model",
display_name="Language Model",
info="Connect the 'Language Model' output from your LLM component here.",
input_types=["LanguageModel"],
),
MultilineInput(
name="system_message",
display_name="System Message",
info="Multi-line system instruction for all rows in the DataFrame.",
required=False,
),
DataFrameInput(
name="df",
display_name="DataFrame",
info="The DataFrame whose column (specified by 'column_name') we'll treat as text messages.",
),
StrInput(
name="column_name",
display_name="Column Name",
info="The name of the DataFrame column to treat as text messages. Default='text'.",
value="text",
),
]
outputs = [
Output(
display_name="Batch Results",
name="batch_results",
method="run_batch",
info="A DataFrame with two columns: 'text_input' and 'model_response'.",
),
]
async def run_batch(self) -> DataFrame:
"""For each row in df[column_name], combine that text with system_message, then invoke the model asynchronously.
Returns a new DataFrame of the same length, with columns 'text_input' and 'model_response'.
"""
model: Runnable = self.model
system_msg = self.system_message or ""
df: DataFrame = self.df
col_name = self.column_name or "text"
if col_name not in df.columns:
msg = f"Column '{col_name}' not found in the DataFrame."
raise ValueError(msg)
# Convert the specified column to a list of strings
user_texts = df[col_name].astype(str).tolist()
# Prepare the batch of conversations
conversations = [
[{"role": "system", "content": system_msg}, {"role": "user", "content": text}]
if system_msg
else [{"role": "user", "content": text}]
for text in user_texts
]
model = model.with_config(
{
"run_name": self.display_name,
"project_name": self.get_project_name(),
"callbacks": self.get_langchain_callbacks(),
}
)
responses = await model.abatch(conversations)
# Build the final data, each row has 'text_input' + 'model_response'
rows = []
for original_text, response in zip(user_texts, responses, strict=False):
resp_text = response.content if hasattr(response, "content") else str(response)
row = {"text_input": original_text, "model_response": resp_text}
rows.append(row)
# Convert to a new DataFrame
return DataFrame(rows) # Langflow DataFrame from a list of dicts

View file

@ -0,0 +1,88 @@
import re
import pytest
from langflow.components.helpers.batch_run import BatchRunComponent
from langflow.schema import DataFrame
from tests.base import ComponentTestBaseWithoutClient
from tests.unit.mock_language_model import MockLanguageModel
class TestBatchRunComponent(ComponentTestBaseWithoutClient):
@pytest.fixture
def component_class(self):
"""Return the component class to test."""
return BatchRunComponent
@pytest.fixture
def default_kwargs(self):
"""Return the default kwargs for the component."""
return {
"model": MockLanguageModel(),
"df": DataFrame({"text": ["Hello"]}),
"column_name": "text",
}
@pytest.fixture
def file_names_mapping(self):
"""Return an empty list since this component doesn't have version-specific files."""
return []
async def test_successful_batch_run_with_system_message(self):
# Create test data
test_df = DataFrame({"text": ["Hello", "World", "Test"]})
component = BatchRunComponent(
model=MockLanguageModel(), system_message="You are a helpful assistant", df=test_df, column_name="text"
)
# Run the batch process
result = await component.run_batch()
# Verify the results
assert isinstance(result, DataFrame)
assert "text_input" in result.columns
assert "model_response" in result.columns
assert len(result) == 3
assert all(isinstance(resp, str) for resp in result["model_response"])
async def test_batch_run_without_system_message(self):
test_df = DataFrame({"text": ["Hello", "World"]})
component = BatchRunComponent(model=MockLanguageModel(), df=test_df, column_name="text")
result = await component.run_batch()
assert isinstance(result, DataFrame)
assert len(result) == 2
assert all(isinstance(resp, str) for resp in result["model_response"])
async def test_invalid_column_name(self):
component = BatchRunComponent(
model=MockLanguageModel(), df=DataFrame({"text": ["Hello"]}), column_name="nonexistent_column"
)
with pytest.raises(ValueError, match=re.escape("Column 'nonexistent_column' not found in the DataFrame.")):
await component.run_batch()
async def test_empty_dataframe(self):
component = BatchRunComponent(model=MockLanguageModel(), df=DataFrame({"text": []}), column_name="text")
result = await component.run_batch()
assert isinstance(result, DataFrame)
assert len(result) == 0
async def test_non_string_column_conversion(self):
test_df = DataFrame(
{
"text": [123, 456, 789] # Numeric values
}
)
component = BatchRunComponent(model=MockLanguageModel(), df=test_df, column_name="text")
result = await component.run_batch()
assert isinstance(result, DataFrame)
assert all(isinstance(text, str) for text in result["text_input"])
assert all(str(num) in text for num, text in zip(test_df["text"], result["text_input"], strict=False))

View file

@ -8,7 +8,7 @@ from langflow.inputs.inputs import TableInput
from langflow.schema.data import Data
from pydantic import BaseModel
from tests.unit.useful import MockLanguageModel
from tests.unit.mock_language_model import MockLanguageModel
class TestStructuredOutputComponent: