diff --git a/src/backend/base/langflow/components/helpers/batch_run.py b/src/backend/base/langflow/components/helpers/batch_run.py index 7e6468faa..9bf329f51 100644 --- a/src/backend/base/langflow/components/helpers/batch_run.py +++ b/src/backend/base/langflow/components/helpers/batch_run.py @@ -1,32 +1,23 @@ from __future__ import annotations -import operator from typing import TYPE_CHECKING, Any +import toml # type: ignore[import-untyped] from loguru import logger from langflow.custom import Component -from langflow.io import ( - BoolInput, - DataFrameInput, - HandleInput, - MessageTextInput, - MultilineInput, - Output, -) +from langflow.io import BoolInput, DataFrameInput, HandleInput, MessageTextInput, MultilineInput, Output from langflow.schema import DataFrame if TYPE_CHECKING: + from collections.abc import Hashable + from langchain_core.runnables import Runnable class BatchRunComponent(Component): display_name = "Batch Run" - description = ( - "Runs a language model over each row of a DataFrame's text column and returns a new " - "DataFrame with three columns: '**text_input**' (the original text), " - "'**model_response**' (the model's response),and '**batch_index**' (the processing order)." - ) + description = "Runs an LLM over each row of a DataFrame's column. If no column is set, the entire row is passed." icon = "List" beta = True @@ -40,7 +31,7 @@ class BatchRunComponent(Component): ), MultilineInput( name="system_message", - display_name="System Message", + display_name="Instructions", info="Multi-line system instruction for all rows in the DataFrame.", required=False, ), @@ -53,16 +44,26 @@ class BatchRunComponent(Component): MessageTextInput( name="column_name", display_name="Column Name", - info="The name of the DataFrame column to treat as text messages. Default='text'.", - value="text", - required=True, + info=( + "The name of the DataFrame column to treat as text messages. " + "If empty, all columns will be formatted in TOML." + ), + required=False, + advanced=False, + ), + MessageTextInput( + name="output_column_name", + display_name="Output Column Name", + info="Name of the column where the model's response will be stored.", + value="model_response", + required=False, advanced=True, ), BoolInput( name="enable_metadata", display_name="Enable Metadata", info="If True, add metadata to the output DataFrame.", - value=True, + value=False, required=False, advanced=True, ), @@ -70,23 +71,29 @@ class BatchRunComponent(Component): outputs = [ Output( - display_name="Batch Results", + display_name="DataFrame", name="batch_results", method="run_batch", - info="A DataFrame with columns: 'text_input', 'model_response', 'batch_index', and 'metadata'.", + info="A DataFrame with all original columns plus the model's response column.", ), ] - def _create_base_row(self, text_input: str = "", model_response: str = "", batch_index: int = -1) -> dict[str, Any]: - """Create a base row with optional metadata.""" - return { - "text_input": text_input, - "model_response": model_response, - "batch_index": batch_index, - } + def _format_row_as_toml(self, row: dict[Hashable, Any]) -> str: + """Convert a dictionary (row) into a TOML-formatted string.""" + formatted_dict = {str(col): {"value": str(val)} for col, val in row.items()} + return toml.dumps(formatted_dict) + + def _create_base_row( + self, original_row: dict[Hashable, Any], model_response: str = "", batch_index: int = -1 + ) -> dict[Hashable, Any]: + """Create a base row with original columns and additional metadata.""" + row = original_row.copy() + row[self.output_column_name] = model_response + row["batch_index"] = batch_index + return row def _add_metadata( - self, row: dict[str, Any], *, success: bool = True, system_msg: str = "", error: str | None = None + self, row: dict[Hashable, Any], *, success: bool = True, system_msg: str = "", error: str | None = None ) -> None: """Add metadata to a row if enabled.""" if not self.enable_metadata: @@ -95,8 +102,8 @@ class BatchRunComponent(Component): if success: row["metadata"] = { "has_system_message": bool(system_msg), - "input_length": len(row["text_input"]), - "response_length": len(row["model_response"]), + "input_length": len(row.get("text_input", "")), + "response_length": len(row[self.output_column_name]), "processing_status": "success", } else: @@ -110,10 +117,10 @@ class BatchRunComponent(Component): Returns: DataFrame: A new DataFrame containing: - - text_input: The original input text - - model_response: The model's response - - batch_index: The processing order - - metadata: Additional processing information + - All original columns + - The model's response column (customizable name) + - 'batch_index' column for processing order + - 'metadata' (optional) Raises: ValueError: If the specified column is not found in the DataFrame @@ -122,22 +129,25 @@ class BatchRunComponent(Component): model: Runnable = self.model system_msg = self.system_message or "" df: DataFrame = self.df - col_name = self.column_name or "text" + col_name = self.column_name or "" # Validate inputs first if not isinstance(df, DataFrame): msg = f"Expected DataFrame input, got {type(df)}" raise TypeError(msg) - if col_name not in df.columns: + if col_name and col_name not in df.columns: msg = f"Column '{col_name}' not found in the DataFrame. Available columns: {', '.join(df.columns)}" raise ValueError(msg) try: - # Convert the specified column to a list of strings - user_texts = df[col_name].astype(str).tolist() - total_rows = len(user_texts) + # Determine text input for each row + if col_name: + user_texts = df[col_name].astype(str).tolist() + else: + user_texts = [self._format_row_as_toml(row) for row in df.to_dict(orient="records")] + total_rows = len(user_texts) logger.info(f"Processing {total_rows} rows with batch run") # Prepare the batch of conversations @@ -166,17 +176,15 @@ class BatchRunComponent(Component): ] # Sort by index to maintain order - responses_with_idx.sort(key=operator.itemgetter(0)) + responses_with_idx.sort(key=lambda x: x[0]) # Build the final data with enhanced metadata - rows: list[dict[str, Any]] = [] - for idx, response in responses_with_idx: - resp_text = response.content if hasattr(response, "content") else str(response) - row = self._create_base_row( - text_input=user_texts[idx], - model_response=resp_text, - batch_index=idx, - ) + rows: list[dict[Hashable, Any]] = [] + for idx, (original_row, response) in enumerate( + zip(df.to_dict(orient="records"), responses_with_idx, strict=False) + ): + response_text = response[1].content if hasattr(response[1], "content") else str(response[1]) + row = self._create_base_row(original_row, model_response=response_text, batch_index=idx) self._add_metadata(row, success=True, system_msg=system_msg) rows.append(row) @@ -190,6 +198,6 @@ class BatchRunComponent(Component): except (KeyError, AttributeError) as e: # Handle data structure and attribute access errors logger.error(f"Data processing error: {e!s}") - error_row = self._create_base_row() + error_row = self._create_base_row({col: "" for col in df.columns}, model_response="", batch_index=-1) self._add_metadata(error_row, success=False, error=str(e)) return DataFrame([error_row]) diff --git a/src/backend/base/langflow/initial_setup/starter_projects/Youtube Analysis.json b/src/backend/base/langflow/initial_setup/starter_projects/Youtube Analysis.json index 6ab52efe5..6415202fa 100644 --- a/src/backend/base/langflow/initial_setup/starter_projects/Youtube Analysis.json +++ b/src/backend/base/langflow/initial_setup/starter_projects/Youtube Analysis.json @@ -345,7 +345,7 @@ { "allows_loop": false, "cache": true, - "display_name": "Batch Results", + "display_name": "DataFrame", "method": "run_batch", "name": "batch_results", "selected": "DataFrame", @@ -376,20 +376,20 @@ "show": true, "title_case": false, "type": "code", - "value": "from __future__ import annotations\n\nimport operator\nfrom typing import TYPE_CHECKING, Any\n\nfrom loguru import logger\n\nfrom langflow.custom import Component\nfrom langflow.io import (\n BoolInput,\n DataFrameInput,\n HandleInput,\n MessageTextInput,\n MultilineInput,\n Output,\n)\nfrom langflow.schema import DataFrame\n\nif TYPE_CHECKING:\n from langchain_core.runnables import Runnable\n\n\nclass BatchRunComponent(Component):\n display_name = \"Batch Run\"\n description = (\n \"Runs a language model over each row of a DataFrame's text column and returns a new \"\n \"DataFrame with three columns: '**text_input**' (the original text), \"\n \"'**model_response**' (the model's response),and '**batch_index**' (the processing order).\"\n )\n icon = \"List\"\n beta = True\n\n inputs = [\n HandleInput(\n name=\"model\",\n display_name=\"Language Model\",\n info=\"Connect the 'Language Model' output from your LLM component here.\",\n input_types=[\"LanguageModel\"],\n required=True,\n ),\n MultilineInput(\n name=\"system_message\",\n display_name=\"System Message\",\n info=\"Multi-line system instruction for all rows in the DataFrame.\",\n required=False,\n ),\n DataFrameInput(\n name=\"df\",\n display_name=\"DataFrame\",\n info=\"The DataFrame whose column (specified by 'column_name') we'll treat as text messages.\",\n required=True,\n ),\n MessageTextInput(\n name=\"column_name\",\n display_name=\"Column Name\",\n info=\"The name of the DataFrame column to treat as text messages. Default='text'.\",\n value=\"text\",\n required=True,\n advanced=True,\n ),\n BoolInput(\n name=\"enable_metadata\",\n display_name=\"Enable Metadata\",\n info=\"If True, add metadata to the output DataFrame.\",\n value=True,\n required=False,\n advanced=True,\n ),\n ]\n\n outputs = [\n Output(\n display_name=\"Batch Results\",\n name=\"batch_results\",\n method=\"run_batch\",\n info=\"A DataFrame with columns: 'text_input', 'model_response', 'batch_index', and 'metadata'.\",\n ),\n ]\n\n def _create_base_row(self, text_input: str = \"\", model_response: str = \"\", batch_index: int = -1) -> dict[str, Any]:\n \"\"\"Create a base row with optional metadata.\"\"\"\n return {\n \"text_input\": text_input,\n \"model_response\": model_response,\n \"batch_index\": batch_index,\n }\n\n def _add_metadata(\n self, row: dict[str, Any], *, success: bool = True, system_msg: str = \"\", error: str | None = None\n ) -> None:\n \"\"\"Add metadata to a row if enabled.\"\"\"\n if not self.enable_metadata:\n return\n\n if success:\n row[\"metadata\"] = {\n \"has_system_message\": bool(system_msg),\n \"input_length\": len(row[\"text_input\"]),\n \"response_length\": len(row[\"model_response\"]),\n \"processing_status\": \"success\",\n }\n else:\n row[\"metadata\"] = {\n \"error\": error,\n \"processing_status\": \"failed\",\n }\n\n async def run_batch(self) -> DataFrame:\n \"\"\"Process each row in df[column_name] with the language model asynchronously.\n\n Returns:\n DataFrame: A new DataFrame containing:\n - text_input: The original input text\n - model_response: The model's response\n - batch_index: The processing order\n - metadata: Additional processing information\n\n Raises:\n ValueError: If the specified column is not found in the DataFrame\n TypeError: If the model is not compatible or input types are wrong\n \"\"\"\n model: Runnable = self.model\n system_msg = self.system_message or \"\"\n df: DataFrame = self.df\n col_name = self.column_name or \"text\"\n\n # Validate inputs first\n if not isinstance(df, DataFrame):\n msg = f\"Expected DataFrame input, got {type(df)}\"\n raise TypeError(msg)\n\n if col_name not in df.columns:\n msg = f\"Column '{col_name}' not found in the DataFrame. Available columns: {', '.join(df.columns)}\"\n raise ValueError(msg)\n\n try:\n # Convert the specified column to a list of strings\n user_texts = df[col_name].astype(str).tolist()\n total_rows = len(user_texts)\n\n logger.info(f\"Processing {total_rows} rows with batch run\")\n\n # Prepare the batch of conversations\n conversations = [\n [{\"role\": \"system\", \"content\": system_msg}, {\"role\": \"user\", \"content\": text}]\n if system_msg\n else [{\"role\": \"user\", \"content\": text}]\n for text in user_texts\n ]\n\n # Configure the model with project info and callbacks\n model = model.with_config(\n {\n \"run_name\": self.display_name,\n \"project_name\": self.get_project_name(),\n \"callbacks\": self.get_langchain_callbacks(),\n }\n )\n\n # Process batches and track progress\n responses_with_idx = [\n (idx, response)\n for idx, response in zip(\n range(len(conversations)), await model.abatch(list(conversations)), strict=True\n )\n ]\n\n # Sort by index to maintain order\n responses_with_idx.sort(key=operator.itemgetter(0))\n\n # Build the final data with enhanced metadata\n rows: list[dict[str, Any]] = []\n for idx, response in responses_with_idx:\n resp_text = response.content if hasattr(response, \"content\") else str(response)\n row = self._create_base_row(\n text_input=user_texts[idx],\n model_response=resp_text,\n batch_index=idx,\n )\n self._add_metadata(row, success=True, system_msg=system_msg)\n rows.append(row)\n\n # Log progress\n if (idx + 1) % max(1, total_rows // 10) == 0:\n logger.info(f\"Processed {idx + 1}/{total_rows} rows\")\n\n logger.info(\"Batch processing completed successfully\")\n return DataFrame(rows)\n\n except (KeyError, AttributeError) as e:\n # Handle data structure and attribute access errors\n logger.error(f\"Data processing error: {e!s}\")\n error_row = self._create_base_row()\n self._add_metadata(error_row, success=False, error=str(e))\n return DataFrame([error_row])\n" + "value": "from __future__ import annotations\n\nfrom collections.abc import Hashable\nfrom typing import TYPE_CHECKING, Any\n\nimport toml # type: ignore[import-untyped]\nfrom loguru import logger\n\nfrom langflow.custom import Component\nfrom langflow.io import BoolInput, DataFrameInput, HandleInput, MessageTextInput, MultilineInput, Output\nfrom langflow.schema import DataFrame\n\nif TYPE_CHECKING:\n from langchain_core.runnables import Runnable\n\n\nclass BatchRunComponent(Component):\n display_name = \"Batch Run\"\n description = \"Runs an LLM over each row of a DataFrame's column. If no column is set, the entire row is passed.\"\n icon = \"List\"\n beta = True\n\n inputs = [\n HandleInput(\n name=\"model\",\n display_name=\"Language Model\",\n info=\"Connect the 'Language Model' output from your LLM component here.\",\n input_types=[\"LanguageModel\"],\n required=True,\n ),\n MultilineInput(\n name=\"system_message\",\n display_name=\"Instructions\",\n info=\"Multi-line system instruction for all rows in the DataFrame.\",\n required=False,\n ),\n DataFrameInput(\n name=\"df\",\n display_name=\"DataFrame\",\n info=\"The DataFrame whose column (specified by 'column_name') we'll treat as text messages.\",\n required=True,\n ),\n MessageTextInput(\n name=\"column_name\",\n display_name=\"Column Name\",\n info=(\n \"The name of the DataFrame column to treat as text messages. \"\n \"If empty, all columns will be formatted in TOML.\"\n ),\n required=False,\n advanced=False,\n ),\n MessageTextInput(\n name=\"output_column_name\",\n display_name=\"Output Column Name\",\n info=\"Name of the column where the model's response will be stored.\",\n value=\"model_response\",\n required=False,\n advanced=True,\n ),\n BoolInput(\n name=\"enable_metadata\",\n display_name=\"Enable Metadata\",\n info=\"If True, add metadata to the output DataFrame.\",\n value=False,\n required=False,\n advanced=True,\n ),\n ]\n\n outputs = [\n Output(\n display_name=\"DataFrame\",\n name=\"batch_results\",\n method=\"run_batch\",\n info=\"A DataFrame with all original columns plus the model's response column.\",\n ),\n ]\n\n def _format_row_as_toml(self, row: dict[Hashable, Any]) -> str:\n \"\"\"Convert a dictionary (row) into a TOML-formatted string.\"\"\"\n formatted_dict = {str(col): {\"value\": str(val)} for col, val in row.items()}\n return toml.dumps(formatted_dict)\n\n def _create_base_row(\n self, original_row: dict[Hashable, Any], model_response: str = \"\", batch_index: int = -1\n ) -> dict[Hashable, Any]:\n \"\"\"Create a base row with original columns and additional metadata.\"\"\"\n row = original_row.copy()\n row[self.output_column_name] = model_response\n row[\"batch_index\"] = batch_index\n return row\n\n def _add_metadata(\n self, row: dict[Hashable, Any], *, success: bool = True, system_msg: str = \"\", error: str | None = None\n ) -> None:\n \"\"\"Add metadata to a row if enabled.\"\"\"\n if not self.enable_metadata:\n return\n\n if success:\n row[\"metadata\"] = {\n \"has_system_message\": bool(system_msg),\n \"input_length\": len(row.get(\"text_input\", \"\")),\n \"response_length\": len(row[self.output_column_name]),\n \"processing_status\": \"success\",\n }\n else:\n row[\"metadata\"] = {\n \"error\": error,\n \"processing_status\": \"failed\",\n }\n\n async def run_batch(self) -> DataFrame:\n \"\"\"Process each row in df[column_name] with the language model asynchronously.\n\n Returns:\n DataFrame: A new DataFrame containing:\n - All original columns\n - The model's response column (customizable name)\n - 'batch_index' column for processing order\n - 'metadata' (optional)\n\n Raises:\n ValueError: If the specified column is not found in the DataFrame\n TypeError: If the model is not compatible or input types are wrong\n \"\"\"\n model: Runnable = self.model\n system_msg = self.system_message or \"\"\n df: DataFrame = self.df\n col_name = self.column_name or \"\"\n\n # Validate inputs first\n if not isinstance(df, DataFrame):\n msg = f\"Expected DataFrame input, got {type(df)}\"\n raise TypeError(msg)\n\n if col_name and col_name not in df.columns:\n msg = f\"Column '{col_name}' not found in the DataFrame. Available columns: {', '.join(df.columns)}\"\n raise ValueError(msg)\n\n try:\n # Determine text input for each row\n if col_name:\n user_texts = df[col_name].astype(str).tolist()\n else:\n user_texts = [self._format_row_as_toml(row) for row in df.to_dict(orient=\"records\")]\n\n total_rows = len(user_texts)\n logger.info(f\"Processing {total_rows} rows with batch run\")\n\n # Prepare the batch of conversations\n conversations = [\n [{\"role\": \"system\", \"content\": system_msg}, {\"role\": \"user\", \"content\": text}]\n if system_msg\n else [{\"role\": \"user\", \"content\": text}]\n for text in user_texts\n ]\n\n # Configure the model with project info and callbacks\n model = model.with_config(\n {\n \"run_name\": self.display_name,\n \"project_name\": self.get_project_name(),\n \"callbacks\": self.get_langchain_callbacks(),\n }\n )\n\n # Process batches and track progress\n responses_with_idx = [\n (idx, response)\n for idx, response in zip(\n range(len(conversations)), await model.abatch(list(conversations)), strict=True\n )\n ]\n\n # Sort by index to maintain order\n responses_with_idx.sort(key=lambda x: x[0])\n\n # Build the final data with enhanced metadata\n rows: list[dict[Hashable, Any]] = []\n for idx, (original_row, response) in enumerate(\n zip(df.to_dict(orient=\"records\"), responses_with_idx, strict=False)\n ):\n response_text = response[1].content if hasattr(response[1], \"content\") else str(response[1])\n row = self._create_base_row(original_row, model_response=response_text, batch_index=idx)\n self._add_metadata(row, success=True, system_msg=system_msg)\n rows.append(row)\n\n # Log progress\n if (idx + 1) % max(1, total_rows // 10) == 0:\n logger.info(f\"Processed {idx + 1}/{total_rows} rows\")\n\n logger.info(\"Batch processing completed successfully\")\n return DataFrame(rows)\n\n except (KeyError, AttributeError) as e:\n # Handle data structure and attribute access errors\n logger.error(f\"Data processing error: {e!s}\")\n error_row = self._create_base_row({col: \"\" for col in df.columns}, model_response=\"\", batch_index=-1)\n self._add_metadata(error_row, success=False, error=str(e))\n return DataFrame([error_row])\n" }, "column_name": { "_input_type": "StrInput", - "advanced": true, + "advanced": false, "display_name": "Column Name", "dynamic": false, - "info": "The name of the DataFrame column to treat as text messages. Default='text'.", + "info": "The name of the DataFrame column to treat as text messages. If empty, all columns will be formatted in TOML.", "list": false, "list_add_label": "Add More", "load_from_db": false, "name": "column_name", "placeholder": "", - "required": true, + "required": false, "show": true, "title_case": false, "tool_mode": false, @@ -457,10 +457,33 @@ "type": "other", "value": "" }, + "output_column_name": { + "_input_type": "MessageTextInput", + "advanced": true, + "display_name": "Output Column Name", + "dynamic": false, + "info": "Name of the column where the model's response will be stored.", + "input_types": [ + "Message" + ], + "list": false, + "list_add_label": "Add More", + "load_from_db": false, + "name": "output_column_name", + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "tool_mode": false, + "trace_as_input": true, + "trace_as_metadata": true, + "type": "str", + "value": "model_response" + }, "system_message": { "_input_type": "MultilineInput", "advanced": false, - "display_name": "System Message", + "display_name": "Instructions", "dynamic": false, "info": "Multi-line system instruction for all rows in the DataFrame.", "input_types": [ diff --git a/src/backend/tests/unit/components/helpers/test_batch_run_component.py b/src/backend/tests/unit/components/helpers/test_batch_run_component.py index cb2e33d6d..8e5c78d2b 100644 --- a/src/backend/tests/unit/components/helpers/test_batch_run_component.py +++ b/src/backend/tests/unit/components/helpers/test_batch_run_component.py @@ -46,7 +46,7 @@ class TestBatchRunComponent(ComponentTestBaseWithoutClient): # Verify the results assert isinstance(result, DataFrame) - assert "text_input" in result.columns + assert "text" in result.columns assert "model_response" in result.columns assert "metadata" in result.columns assert len(result) == 3 @@ -121,7 +121,7 @@ class TestBatchRunComponent(ComponentTestBaseWithoutClient): assert error_row["metadata"]["processing_status"] == "failed" assert "Mock error during batch processing" in error_row["metadata"]["error"] # Verify base row structure - assert error_row["text_input"] == "" + assert error_row["text"] == "" assert error_row["model_response"] == "" assert error_row["batch_index"] == -1 @@ -149,45 +149,66 @@ class TestBatchRunComponent(ComponentTestBaseWithoutClient): # Verify no metadata assert "metadata" not in error_row # Verify base row structure - assert error_row["text_input"] == "" + assert error_row["text"] == "" assert error_row["model_response"] == "" assert error_row["batch_index"] == -1 def test_create_base_row(self): component = BatchRunComponent() - row = component._create_base_row(text_input="test_input", model_response="test_response", batch_index=1) - - assert row == { - "text_input": "test_input", - "model_response": "test_response", - "batch_index": 1, - } + row = component._create_base_row( + original_row={"text_input": "test_input"}, + model_response="test_response", + batch_index=1, + ) + assert row["text_input"] == "test_input" + assert row["model_response"] == "test_response" + assert row["batch_index"] == 1 def test_add_metadata_success(self): component = BatchRunComponent(enable_metadata=True) - row = component._create_base_row(text_input="test_input", model_response="test_response", batch_index=1) - component._add_metadata(row, success=True, system_msg="test_system") + + # Passa text_input dentro do dicionário original_row + original_row = {"text_input": "test_input"} + row = component._create_base_row( + original_row=original_row, + model_response="test_response", + batch_index=1, + ) + + component._add_metadata(row, success=True, system_msg="Instructions here") assert "metadata" in row assert row["metadata"]["has_system_message"] is True - assert row["metadata"]["processing_status"] == "success" assert row["metadata"]["input_length"] == len("test_input") assert row["metadata"]["response_length"] == len("test_response") + assert row["metadata"]["processing_status"] == "success" def test_add_metadata_failure(self): component = BatchRunComponent(enable_metadata=True) - row = component._create_base_row() - component._add_metadata(row, success=False, error="test_error") + + # Fornecendo um original_row vazio (poderia conter outras chaves se necessário) + row = component._create_base_row(original_row={}, model_response="", batch_index=1) + + # Adiciona metadata simulando falha + component._add_metadata(row, success=False, error="Simulated error") assert "metadata" in row assert row["metadata"]["processing_status"] == "failed" - assert row["metadata"]["error"] == "test_error" + assert row["metadata"]["error"] == "Simulated error" def test_metadata_disabled(self): component = BatchRunComponent(enable_metadata=False) - row = component._create_base_row(text_input="test") - component._add_metadata(row, success=True) + # Fornece text_input dentro do dicionário original_row + row = component._create_base_row( + original_row={"text_input": "test"}, + model_response="response", + batch_index=0, + ) + + component._add_metadata(row, success=True, system_msg="test") + + # Como o metadata está desabilitado, ele não deve existir assert "metadata" not in row async def test_invalid_column_name(self): @@ -229,7 +250,9 @@ class TestBatchRunComponent(ComponentTestBaseWithoutClient): result = await component.run_batch() assert isinstance(result, DataFrame) - assert all(isinstance(text, str) for text in result["text_input"]) - assert all(str(num) in text for num, text in zip(test_df["text"], result["text_input"], strict=False)) + assert all(isinstance(text, int) for text in result["text"]) + assert all( + str(num) in response for num, response in zip(test_df["text"], result["model_response"], strict=False) + ) result_dicts = result.to_dict("records") assert all(row["metadata"]["processing_status"] == "success" for row in result_dicts)