fix: change LoopComponent data input to DataFrame type and update display name (#8297)

* Update loop.py

* [autofix.ci] apply automated fixes

* Update loop-component.spec.ts

* change dataframe input name to inputs

* [autofix.ci] apply automated fixes

* 🐛 (typescript_test.yml): increase the maximum shard count to 40 to improve test distribution and efficiency

* 🔧 (typescript_test.yml): adjust optimal shard count calculation to use a maximum of 10 shards instead of 40 for better test distribution
🐛 (loop-component.spec.ts): fix getByTestId selector to match the updated element ID for testing purposes

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: Yuqi Tang <yuqi.tang@datastax.com>
Co-authored-by: cristhianzl <cristhian.lousa@gmail.com>
This commit is contained in:
Edwin Jose 2025-06-03 14:49:24 -05:00 committed by GitHub
commit 68f4905e2a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 6 additions and 8 deletions

View file

@ -14,9 +14,9 @@ class LoopComponent(Component):
inputs = [
HandleInput(
name="data",
display_name="Data or DataFrame",
display_name="Inputs",
info="The initial list of Data objects or DataFrame to iterate over.",
input_types=["Data", "DataFrame"],
input_types=["DataFrame"],
),
]

View file

@ -101,7 +101,6 @@
"fieldName": "data",
"id": "LoopComponent-9BK8B",
"inputTypes": [
"Data",
"DataFrame"
],
"type": "other"
@ -112,7 +111,7 @@
"source": "ArXivComponent-ylKFb",
"sourceHandle": "{œdataTypeœ: œArXivComponentœ, œidœ: œArXivComponent-ylKFbœ, œnameœ: œdataframeœ, œoutput_typesœ: [œDataFrameœ]}",
"target": "LoopComponent-9BK8B",
"targetHandle": "{œfieldNameœ: œdataœ, œidœ: œLoopComponent-9BK8Bœ, œinputTypesœ: [œDataœ, œDataFrameœ], œtypeœ: œotherœ}"
"targetHandle": "{œfieldNameœ: œdataœ, œidœ: œLoopComponent-9BK8Bœ, œinputTypesœ: [œDataFrameœ], œtypeœ: œotherœ}"
},
{
"animated": false,
@ -1728,16 +1727,15 @@
"show": true,
"title_case": false,
"type": "code",
"value": "from langflow.custom import Component\nfrom langflow.io import HandleInput, Output\nfrom langflow.schema import Data\nfrom langflow.schema.dataframe import DataFrame\n\n\nclass LoopComponent(Component):\n display_name = \"Loop\"\n description = (\n \"Iterates over a list of Data objects, outputting one item at a time and aggregating results from loop inputs.\"\n )\n icon = \"infinity\"\n\n inputs = [\n HandleInput(\n name=\"data\",\n display_name=\"Data or DataFrame\",\n info=\"The initial list of Data objects or DataFrame to iterate over.\",\n input_types=[\"Data\", \"DataFrame\"],\n ),\n ]\n\n outputs = [\n Output(display_name=\"Item\", name=\"item\", method=\"item_output\", allows_loop=True, group_outputs=True),\n Output(display_name=\"Done\", name=\"done\", method=\"done_output\", group_outputs=True),\n ]\n\n def initialize_data(self) -> None:\n \"\"\"Initialize the data list, context index, and aggregated list.\"\"\"\n if self.ctx.get(f\"{self._id}_initialized\", False):\n return\n\n # Ensure data is a list of Data objects\n data_list = self._validate_data(self.data)\n\n # Store the initial data and context variables\n self.update_ctx(\n {\n f\"{self._id}_data\": data_list,\n f\"{self._id}_index\": 0,\n f\"{self._id}_aggregated\": [],\n f\"{self._id}_initialized\": True,\n }\n )\n\n def _validate_data(self, data):\n \"\"\"Validate and return a list of Data objects.\"\"\"\n if isinstance(data, DataFrame):\n return data.to_data_list()\n if isinstance(data, Data):\n return [data]\n if isinstance(data, list) and all(isinstance(item, Data) for item in data):\n return data\n msg = \"The 'data' input must be a DataFrame, a list of Data objects, or a single Data object.\"\n raise TypeError(msg)\n\n def evaluate_stop_loop(self) -> bool:\n \"\"\"Evaluate whether to stop item or done output.\"\"\"\n current_index = self.ctx.get(f\"{self._id}_index\", 0)\n data_length = len(self.ctx.get(f\"{self._id}_data\", []))\n return current_index > data_length\n\n def item_output(self) -> Data:\n \"\"\"Output the next item in the list or stop if done.\"\"\"\n self.initialize_data()\n current_item = Data(text=\"\")\n\n if self.evaluate_stop_loop():\n self.stop(\"item\")\n else:\n # Get data list and current index\n data_list, current_index = self.loop_variables()\n if current_index < len(data_list):\n # Output current item and increment index\n try:\n current_item = data_list[current_index]\n except IndexError:\n current_item = Data(text=\"\")\n self.aggregated_output()\n self.update_ctx({f\"{self._id}_index\": current_index + 1})\n\n # Now we need to update the dependencies for the next run\n self.update_dependency()\n return current_item\n\n def update_dependency(self):\n item_dependency_id = self.get_incoming_edge_by_target_param(\"item\")\n\n self.graph.run_manager.run_predecessors[self._id].append(item_dependency_id)\n\n def done_output(self) -> DataFrame:\n \"\"\"Trigger the done output when iteration is complete.\"\"\"\n self.initialize_data()\n\n if self.evaluate_stop_loop():\n self.stop(\"item\")\n self.start(\"done\")\n\n aggregated = self.ctx.get(f\"{self._id}_aggregated\", [])\n\n return DataFrame(aggregated)\n self.stop(\"done\")\n return DataFrame([])\n\n def loop_variables(self):\n \"\"\"Retrieve loop variables from context.\"\"\"\n return (\n self.ctx.get(f\"{self._id}_data\", []),\n self.ctx.get(f\"{self._id}_index\", 0),\n )\n\n def aggregated_output(self) -> list[Data]:\n \"\"\"Return the aggregated list once all items are processed.\"\"\"\n self.initialize_data()\n\n # Get data list and aggregated list\n data_list = self.ctx.get(f\"{self._id}_data\", [])\n aggregated = self.ctx.get(f\"{self._id}_aggregated\", [])\n loop_input = self.item\n if loop_input is not None and not isinstance(loop_input, str) and len(aggregated) <= len(data_list):\n aggregated.append(loop_input)\n self.update_ctx({f\"{self._id}_aggregated\": aggregated})\n return aggregated\n"
"value": "from langflow.custom import Component\nfrom langflow.io import HandleInput, Output\nfrom langflow.schema import Data\nfrom langflow.schema.dataframe import DataFrame\n\n\nclass LoopComponent(Component):\n display_name = \"Loop\"\n description = (\n \"Iterates over a list of Data objects, outputting one item at a time and aggregating results from loop inputs.\"\n )\n icon = \"infinity\"\n\n inputs = [\n HandleInput(\n name=\"data\",\n display_name=\"Inputs\",\n info=\"The initial list of Data objects or DataFrame to iterate over.\",\n input_types=[\"DataFrame\"],\n ),\n ]\n\n outputs = [\n Output(display_name=\"Item\", name=\"item\", method=\"item_output\", allows_loop=True, group_outputs=True),\n Output(display_name=\"Done\", name=\"done\", method=\"done_output\", group_outputs=True),\n ]\n\n def initialize_data(self) -> None:\n \"\"\"Initialize the data list, context index, and aggregated list.\"\"\"\n if self.ctx.get(f\"{self._id}_initialized\", False):\n return\n\n # Ensure data is a list of Data objects\n data_list = self._validate_data(self.data)\n\n # Store the initial data and context variables\n self.update_ctx(\n {\n f\"{self._id}_data\": data_list,\n f\"{self._id}_index\": 0,\n f\"{self._id}_aggregated\": [],\n f\"{self._id}_initialized\": True,\n }\n )\n\n def _validate_data(self, data):\n \"\"\"Validate and return a list of Data objects.\"\"\"\n if isinstance(data, DataFrame):\n return data.to_data_list()\n if isinstance(data, Data):\n return [data]\n if isinstance(data, list) and all(isinstance(item, Data) for item in data):\n return data\n msg = \"The 'data' input must be a DataFrame, a list of Data objects, or a single Data object.\"\n raise TypeError(msg)\n\n def evaluate_stop_loop(self) -> bool:\n \"\"\"Evaluate whether to stop item or done output.\"\"\"\n current_index = self.ctx.get(f\"{self._id}_index\", 0)\n data_length = len(self.ctx.get(f\"{self._id}_data\", []))\n return current_index > data_length\n\n def item_output(self) -> Data:\n \"\"\"Output the next item in the list or stop if done.\"\"\"\n self.initialize_data()\n current_item = Data(text=\"\")\n\n if self.evaluate_stop_loop():\n self.stop(\"item\")\n else:\n # Get data list and current index\n data_list, current_index = self.loop_variables()\n if current_index < len(data_list):\n # Output current item and increment index\n try:\n current_item = data_list[current_index]\n except IndexError:\n current_item = Data(text=\"\")\n self.aggregated_output()\n self.update_ctx({f\"{self._id}_index\": current_index + 1})\n\n # Now we need to update the dependencies for the next run\n self.update_dependency()\n return current_item\n\n def update_dependency(self):\n item_dependency_id = self.get_incoming_edge_by_target_param(\"item\")\n\n self.graph.run_manager.run_predecessors[self._id].append(item_dependency_id)\n\n def done_output(self) -> DataFrame:\n \"\"\"Trigger the done output when iteration is complete.\"\"\"\n self.initialize_data()\n\n if self.evaluate_stop_loop():\n self.stop(\"item\")\n self.start(\"done\")\n\n aggregated = self.ctx.get(f\"{self._id}_aggregated\", [])\n\n return DataFrame(aggregated)\n self.stop(\"done\")\n return DataFrame([])\n\n def loop_variables(self):\n \"\"\"Retrieve loop variables from context.\"\"\"\n return (\n self.ctx.get(f\"{self._id}_data\", []),\n self.ctx.get(f\"{self._id}_index\", 0),\n )\n\n def aggregated_output(self) -> list[Data]:\n \"\"\"Return the aggregated list once all items are processed.\"\"\"\n self.initialize_data()\n\n # Get data list and aggregated list\n data_list = self.ctx.get(f\"{self._id}_data\", [])\n aggregated = self.ctx.get(f\"{self._id}_aggregated\", [])\n loop_input = self.item\n if loop_input is not None and not isinstance(loop_input, str) and len(aggregated) <= len(data_list):\n aggregated.append(loop_input)\n self.update_ctx({f\"{self._id}_aggregated\": aggregated})\n return aggregated\n"
},
"data": {
"_input_type": "HandleInput",
"advanced": false,
"display_name": "Data or DataFrame",
"display_name": "Inputs",
"dynamic": false,
"info": "The initial list of Data objects or DataFrame to iterate over.",
"input_types": [
"Data",
"DataFrame"
],
"list": false,

View file

@ -132,7 +132,7 @@ test(
.first()
.click();
await page
.getByTestId("handle-loopcomponent-shownode-data or dataframe-left")
.getByTestId("handle-loopcomponent-shownode-inputs-left")
.first()
.click();