From 6403a8f564d1f390fd471fdc64643d11276f701e Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Thu, 3 Jul 2025 11:39:09 -0300 Subject: [PATCH] fix(graph): fixes bug that caused simple flows with Loop to fail (#8809) * fix: improve predecessors check for loop component - Enhanced the handling of cycle vertices to prevent infinite loops by ensuring that a vertex can only run if all pending predecessors have completed. - Updated conditions for the first execution of cycle vertices to allow running only if all pending predecessors are also cycle vertices. - This refactor improves the robustness of the vertex management system in asynchronous workflows. * fix: update _mark_branch method to return visited vertices and refine predecessor mapping * fix: prevent duplicate item dependencies in LoopComponent * feat: add loop connection handling in Component class - Introduced methods to process loop feedback connections, allowing components to connect outputs to loop-enabled inputs. - Implemented checks to validate loop connections and ensure proper handling of callable methods from other components. - Enhanced the edge creation logic to support special loop feedback edges targeting outputs instead of inputs. * fix: enhance name overlap validation in FrontendNode - Updated the validate_name_overlap method to exclude outputs that allow loops from the overlap check. - Improved error message to include the display name of the component, along with detailed lists of input and output names for better debugging. * fix: correct condition for executing cycle vertices in RunnableVerticesManager - Updated the logic to ensure that a cycle vertex can only execute if it is a loop and all pending predecessors are cycle vertices. This change enhances the robustness of the vertex management system in asynchronous workflows. * feat: implement comprehensive loop flow for URL processing - Added a new loop flow that processes multiple URLs through a series of components including URLComponent, SplitTextComponent, LoopComponent, ParserComponent, PromptComponent, OpenAIModelComponent, StructuredOutputComponent, and ChatOutput. - Enhanced the StructuredOutputComponent to include a detailed system prompt and refined output schema to ensure proper JSON formatting. - Introduced a test case to validate the creation and execution of the loop flow, ensuring all components are correctly integrated and the expected execution order is maintained. * refactor: enhance loop target handling in Component and Edge classes - Introduced LoopTargetHandleDict to better manage loop target structures in the Component and Edge classes. - Updated the Component class to utilize type casting for loop target handles, improving type safety. - Refactored the Edge class to accommodate the new loop target handling, ensuring compatibility with existing edge structures. - Removed deprecated message handling methods from the Component class to streamline the codebase and improve maintainability. * test: skip OpenAI model integration test if API key is not set - Added a conditional skip to the test_build_model_integration_reasoning method to prevent execution when the OPENAI_API_KEY environment variable is not set, ensuring tests run only in appropriate environments. * [autofix.ci] apply automated fixes * chore: add required secrets for OpenAI and Anthropic APIs in CI workflows * Updated ci.yml to include OPENAI_API_KEY and ANTHROPIC_API_KEY secrets. * Modified python_test.yml to mark these secrets as required for workflow execution. * fix: update OPENAI_API_KEY check in test_loop.py to handle dummy values * Modified the condition in the pytest skipif decorator to also skip tests when OPENAI_API_KEY is set to "dummy", ensuring more robust test execution. * refactor: streamline component setup in test_loop.py * Removed redundant comments and improved formatting for component initialization in the loop_flow function. * Added missing system_prompt to StructuredOutputComponent to resolve "Multiple structured outputs" error. * Updated test_loop_flow to ensure it tests the graph creation with proper loop feedback connection. --------- Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- .github/workflows/ci.yml | 3 + .github/workflows/python_test.yml | 6 + .../base/langflow/components/logic/loop.py | 4 +- .../custom/custom_component/component.py | 68 +++++++++ src/backend/base/langflow/graph/edge/base.py | 6 +- .../base/langflow/graph/edge/schema.py | 11 +- src/backend/base/langflow/graph/graph/base.py | 14 +- .../graph/graph/runnable_vertices_manager.py | 17 ++- .../Research Translation Loop.json | 2 +- .../langflow/template/frontend_node/base.py | 8 +- .../languagemodels/test_openai_model.py | 1 + .../tests/unit/components/logic/test_loop.py | 132 ++++++++++++++++++ 12 files changed, 253 insertions(+), 19 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b7cc1b2b1..b0f43ed42 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -162,6 +162,9 @@ jobs: uses: ./.github/workflows/python_test.yml with: python-versions: ${{ inputs.python-versions || '["3.10"]' }} + secrets: + OPENAI_API_KEY: "${{ secrets.OPENAI_API_KEY }}" + ANTHROPIC_API_KEY: "${{ secrets.ANTHROPIC_API_KEY }}" test-frontend-unit: needs: [path-filter, set-ci-condition] diff --git a/.github/workflows/python_test.yml b/.github/workflows/python_test.yml index 88892d7b1..9fb41a4ba 100644 --- a/.github/workflows/python_test.yml +++ b/.github/workflows/python_test.yml @@ -2,6 +2,11 @@ name: Python tests on: workflow_call: + secrets: + OPENAI_API_KEY: + required: true + ANTHROPIC_API_KEY: + required: true inputs: python-versions: description: "(Optional) Python versions to test" @@ -31,6 +36,7 @@ env: jobs: build: + name: Unit Tests - Python ${{ matrix.python-version }} - Group ${{ matrix.group }} runs-on: ubuntu-latest strategy: diff --git a/src/backend/base/langflow/components/logic/loop.py b/src/backend/base/langflow/components/logic/loop.py index b17daece6..d650de02d 100644 --- a/src/backend/base/langflow/components/logic/loop.py +++ b/src/backend/base/langflow/components/logic/loop.py @@ -86,8 +86,8 @@ class LoopComponent(Component): def update_dependency(self): item_dependency_id = self.get_incoming_edge_by_target_param("item") - - self.graph.run_manager.run_predecessors[self._id].append(item_dependency_id) + if item_dependency_id not in self.graph.run_manager.run_predecessors[self._id]: + self.graph.run_manager.run_predecessors[self._id].append(item_dependency_id) def done_output(self) -> DataFrame: """Trigger the done output when iteration is complete.""" diff --git a/src/backend/base/langflow/custom/custom_component/component.py b/src/backend/base/langflow/custom/custom_component/component.py index 7ad5fa56c..85f2810d1 100644 --- a/src/backend/base/langflow/custom/custom_component/component.py +++ b/src/backend/base/langflow/custom/custom_component/component.py @@ -660,6 +660,11 @@ class Component(CustomComponent): return getattr(value, output.method) def _process_connection_or_parameter(self, key, value) -> None: + # Special handling for Loop components: check if we're setting a loop-enabled output + if self._is_loop_connection(key, value): + self._process_loop_connection(key, value) + return + input_ = self._get_or_create_input(key) # We need to check if callable AND if it is a method from a class that inherits from Component if isinstance(value, Component): @@ -677,6 +682,69 @@ class Component(CustomComponent): else: self._set_parameter_or_attribute(key, value) + def _is_loop_connection(self, key: str, value) -> bool: + """Check if this is a loop feedback connection. + + A loop connection occurs when: + 1. The key matches an output name of this component + 2. That output has allows_loop=True + 3. The value is a callable method from another component + """ + # Check if key matches a loop-enabled output + if key not in self._outputs_map: + return False + + output = self._outputs_map[key] + if not getattr(output, "allows_loop", False): + return False + + # Check if value is a callable method from a Component + return callable(value) and self._inherits_from_component(value) + + def _process_loop_connection(self, key: str, value) -> None: + """Process a loop feedback connection. + + Creates a special edge that connects the source component's output + to this Loop component's loop-enabled output (not an input). + """ + try: + self._method_is_valid_output(value) + except ValueError as e: + msg = f"Method {value.__name__} is not a valid output of {value.__self__.__class__.__name__}" + raise ValueError(msg) from e + + source_component = value.__self__ + self._components.append(source_component) + source_output = source_component.get_output_by_method(value) + target_output = self._outputs_map[key] + + # Create special loop feedback edge + self._add_loop_edge(source_component, source_output, target_output) + + def _add_loop_edge(self, source_component, source_output, target_output) -> None: + """Add a special loop feedback edge that targets an output instead of an input.""" + self._edges.append( + { + "source": source_component._id, + "target": self._id, + "data": { + "sourceHandle": { + "dataType": source_component.name or source_component.__class__.__name__, + "id": source_component._id, + "name": source_output.name, + "output_types": source_output.types, + }, + "targetHandle": { + # Special loop edge structure - targets an output, not an input + "dataType": self.name or self.__class__.__name__, + "id": self._id, + "name": target_output.name, + "output_types": target_output.types, + }, + }, + } + ) + def _process_connection_or_parameters(self, key, value) -> None: # if value is a list of components, we need to process each component # Note this update make sure it is not a list str | int | float | bool | type(None) diff --git a/src/backend/base/langflow/graph/edge/base.py b/src/backend/base/langflow/graph/edge/base.py index 013662e7a..2972fd414 100644 --- a/src/backend/base/langflow/graph/edge/base.py +++ b/src/backend/base/langflow/graph/edge/base.py @@ -4,7 +4,7 @@ from typing import TYPE_CHECKING, Any, cast from loguru import logger -from langflow.graph.edge.schema import EdgeData, SourceHandle, TargetHandle, TargetHandleDict +from langflow.graph.edge.schema import EdgeData, LoopTargetHandleDict, SourceHandle, TargetHandle, TargetHandleDict from langflow.schema.schema import INPUT_FIELD_NAME if TYPE_CHECKING: @@ -27,7 +27,9 @@ class Edge: if isinstance(self._target_handle, dict): try: if "name" in self._target_handle: - self.target_handle: TargetHandle = TargetHandle.from_loop_target_handle(self._target_handle) + self.target_handle: TargetHandle = TargetHandle.from_loop_target_handle( + cast(LoopTargetHandleDict, self._target_handle) + ) else: self.target_handle = TargetHandle(**self._target_handle) except Exception as e: diff --git a/src/backend/base/langflow/graph/edge/schema.py b/src/backend/base/langflow/graph/edge/schema.py index 72efd8669..40ce47c74 100644 --- a/src/backend/base/langflow/graph/edge/schema.py +++ b/src/backend/base/langflow/graph/edge/schema.py @@ -21,9 +21,16 @@ class TargetHandleDict(TypedDict): type: str +class LoopTargetHandleDict(TypedDict): + dataType: str + id: str + name: str + output_types: list[str] + + class EdgeDataDetails(TypedDict): sourceHandle: SourceHandleDict - targetHandle: TargetHandleDict + targetHandle: TargetHandleDict | LoopTargetHandleDict class EdgeData(TypedDict, total=False): @@ -74,7 +81,7 @@ class TargetHandle(BaseModel): type: str = Field(None, description="Type of the target handle.") @classmethod - def from_loop_target_handle(cls, target_handle: TargetHandleDict) -> "TargetHandle": + def from_loop_target_handle(cls, target_handle: LoopTargetHandleDict) -> "TargetHandle": # The target handle is a loop edge # The target handle is a dict with the following keys: # - name: str diff --git a/src/backend/base/langflow/graph/graph/base.py b/src/backend/base/langflow/graph/graph/base.py index 2231476ad..b66c22ac2 100644 --- a/src/backend/base/langflow/graph/graph/base.py +++ b/src/backend/base/langflow/graph/graph/base.py @@ -910,14 +910,14 @@ class Graph: def _mark_branch( self, vertex_id: str, state: str, visited: set | None = None, output_name: str | None = None - ) -> None: + ) -> set: """Marks a branch of the graph.""" if visited is None: visited = set() else: self.mark_vertex(vertex_id, state) if vertex_id in visited: - return + return visited visited.add(vertex_id) for child_id in self.parent_child_map[vertex_id]: @@ -928,10 +928,18 @@ class Graph: if edge and edge.source_handle.name != output_name: continue self._mark_branch(child_id, state, visited) + return visited def mark_branch(self, vertex_id: str, state: str, output_name: str | None = None) -> None: - self._mark_branch(vertex_id=vertex_id, state=state, output_name=output_name) + visited = self._mark_branch(vertex_id=vertex_id, state=state, output_name=output_name) new_predecessor_map, _ = self.build_adjacency_maps(self.edges) + new_predecessor_map = {k: v for k, v in new_predecessor_map.items() if k in visited} + if vertex_id in self.cycle_vertices: + # Remove dependencies that are not in the cycle and have run at least once + new_predecessor_map = { + k: [dep for dep in v if dep in self.cycle_vertices and dep in self.run_manager.ran_at_least_once] + for k, v in new_predecessor_map.items() + } self.run_manager.update_run_state( run_predecessors=new_predecessor_map, vertices_to_run=self.vertices_to_run, diff --git a/src/backend/base/langflow/graph/graph/runnable_vertices_manager.py b/src/backend/base/langflow/graph/graph/runnable_vertices_manager.py index 51a321e29..55272f67a 100644 --- a/src/backend/base/langflow/graph/graph/runnable_vertices_manager.py +++ b/src/backend/base/langflow/graph/graph/runnable_vertices_manager.py @@ -86,14 +86,17 @@ class RunnableVerticesManager: # For cycle vertices, check if any pending predecessors are also in cycle # Using set intersection is faster than iteration if vertex_id in self.cycle_vertices: - # If this is a loop vertex that has run before and has no pending predecessors, - # it should not run again to prevent infinite loops - if is_loop and vertex_id in self.ran_at_least_once and bool(set(pending)): - return False - # For loop vertices, allow running if it's a loop or if none of its pending - # predecessors are also cycle vertices (preventing circular dependencies) - return is_loop or not bool(set(pending) & self.cycle_vertices) + pending_set = set(pending) + running_predecessors = pending_set & self.vertices_being_run + # If this vertex has already run at least once, be strict: wait until NOTHING is pending or running + if vertex_id in self.ran_at_least_once: + # Wait if there are still pending or running predecessors; otherwise allow. + return not (pending_set or running_predecessors) + + # FIRST execution of a cycle vertex + # Allow running **only** if it's a loop AND *all* pending predecessors are cycle vertices + return is_loop and pending_set <= self.cycle_vertices return False def remove_from_predecessors(self, vertex_id: str) -> None: diff --git a/src/backend/base/langflow/initial_setup/starter_projects/Research Translation Loop.json b/src/backend/base/langflow/initial_setup/starter_projects/Research Translation Loop.json index 827850ae4..32debab3c 100644 --- a/src/backend/base/langflow/initial_setup/starter_projects/Research Translation Loop.json +++ b/src/backend/base/langflow/initial_setup/starter_projects/Research Translation Loop.json @@ -1251,7 +1251,7 @@ "show": true, "title_case": false, "type": "code", - "value": "from langflow.custom.custom_component.component import Component\nfrom langflow.inputs.inputs import HandleInput\nfrom langflow.schema.data import Data\nfrom langflow.schema.dataframe import DataFrame\nfrom langflow.template.field.base import Output\n\n\nclass LoopComponent(Component):\n display_name = \"Loop\"\n description = (\n \"Iterates over a list of Data objects, outputting one item at a time and aggregating results from loop inputs.\"\n )\n icon = \"infinity\"\n\n inputs = [\n HandleInput(\n name=\"data\",\n display_name=\"Inputs\",\n info=\"The initial list of Data objects or DataFrame to iterate over.\",\n input_types=[\"DataFrame\"],\n ),\n ]\n\n outputs = [\n Output(display_name=\"Item\", name=\"item\", method=\"item_output\", allows_loop=True, group_outputs=True),\n Output(display_name=\"Done\", name=\"done\", method=\"done_output\", group_outputs=True),\n ]\n\n def initialize_data(self) -> None:\n \"\"\"Initialize the data list, context index, and aggregated list.\"\"\"\n if self.ctx.get(f\"{self._id}_initialized\", False):\n return\n\n # Ensure data is a list of Data objects\n data_list = self._validate_data(self.data)\n\n # Store the initial data and context variables\n self.update_ctx(\n {\n f\"{self._id}_data\": data_list,\n f\"{self._id}_index\": 0,\n f\"{self._id}_aggregated\": [],\n f\"{self._id}_initialized\": True,\n }\n )\n\n def _validate_data(self, data):\n \"\"\"Validate and return a list of Data objects.\"\"\"\n if isinstance(data, DataFrame):\n return data.to_data_list()\n if isinstance(data, Data):\n return [data]\n if isinstance(data, list) and all(isinstance(item, Data) for item in data):\n return data\n msg = \"The 'data' input must be a DataFrame, a list of Data objects, or a single Data object.\"\n raise TypeError(msg)\n\n def evaluate_stop_loop(self) -> bool:\n \"\"\"Evaluate whether to stop item or done output.\"\"\"\n current_index = self.ctx.get(f\"{self._id}_index\", 0)\n data_length = len(self.ctx.get(f\"{self._id}_data\", []))\n return current_index > data_length\n\n def item_output(self) -> Data:\n \"\"\"Output the next item in the list or stop if done.\"\"\"\n self.initialize_data()\n current_item = Data(text=\"\")\n\n if self.evaluate_stop_loop():\n self.stop(\"item\")\n else:\n # Get data list and current index\n data_list, current_index = self.loop_variables()\n if current_index < len(data_list):\n # Output current item and increment index\n try:\n current_item = data_list[current_index]\n except IndexError:\n current_item = Data(text=\"\")\n self.aggregated_output()\n self.update_ctx({f\"{self._id}_index\": current_index + 1})\n\n # Now we need to update the dependencies for the next run\n self.update_dependency()\n return current_item\n\n def update_dependency(self):\n item_dependency_id = self.get_incoming_edge_by_target_param(\"item\")\n\n self.graph.run_manager.run_predecessors[self._id].append(item_dependency_id)\n\n def done_output(self) -> DataFrame:\n \"\"\"Trigger the done output when iteration is complete.\"\"\"\n self.initialize_data()\n\n if self.evaluate_stop_loop():\n self.stop(\"item\")\n self.start(\"done\")\n\n aggregated = self.ctx.get(f\"{self._id}_aggregated\", [])\n\n return DataFrame(aggregated)\n self.stop(\"done\")\n return DataFrame([])\n\n def loop_variables(self):\n \"\"\"Retrieve loop variables from context.\"\"\"\n return (\n self.ctx.get(f\"{self._id}_data\", []),\n self.ctx.get(f\"{self._id}_index\", 0),\n )\n\n def aggregated_output(self) -> list[Data]:\n \"\"\"Return the aggregated list once all items are processed.\"\"\"\n self.initialize_data()\n\n # Get data list and aggregated list\n data_list = self.ctx.get(f\"{self._id}_data\", [])\n aggregated = self.ctx.get(f\"{self._id}_aggregated\", [])\n loop_input = self.item\n if loop_input is not None and not isinstance(loop_input, str) and len(aggregated) <= len(data_list):\n aggregated.append(loop_input)\n self.update_ctx({f\"{self._id}_aggregated\": aggregated})\n return aggregated\n" + "value": "from langflow.custom.custom_component.component import Component\nfrom langflow.inputs.inputs import HandleInput\nfrom langflow.schema.data import Data\nfrom langflow.schema.dataframe import DataFrame\nfrom langflow.template.field.base import Output\n\n\nclass LoopComponent(Component):\n display_name = \"Loop\"\n description = (\n \"Iterates over a list of Data objects, outputting one item at a time and aggregating results from loop inputs.\"\n )\n icon = \"infinity\"\n\n inputs = [\n HandleInput(\n name=\"data\",\n display_name=\"Inputs\",\n info=\"The initial list of Data objects or DataFrame to iterate over.\",\n input_types=[\"DataFrame\"],\n ),\n ]\n\n outputs = [\n Output(display_name=\"Item\", name=\"item\", method=\"item_output\", allows_loop=True, group_outputs=True),\n Output(display_name=\"Done\", name=\"done\", method=\"done_output\", group_outputs=True),\n ]\n\n def initialize_data(self) -> None:\n \"\"\"Initialize the data list, context index, and aggregated list.\"\"\"\n if self.ctx.get(f\"{self._id}_initialized\", False):\n return\n\n # Ensure data is a list of Data objects\n data_list = self._validate_data(self.data)\n\n # Store the initial data and context variables\n self.update_ctx(\n {\n f\"{self._id}_data\": data_list,\n f\"{self._id}_index\": 0,\n f\"{self._id}_aggregated\": [],\n f\"{self._id}_initialized\": True,\n }\n )\n\n def _validate_data(self, data):\n \"\"\"Validate and return a list of Data objects.\"\"\"\n if isinstance(data, DataFrame):\n return data.to_data_list()\n if isinstance(data, Data):\n return [data]\n if isinstance(data, list) and all(isinstance(item, Data) for item in data):\n return data\n msg = \"The 'data' input must be a DataFrame, a list of Data objects, or a single Data object.\"\n raise TypeError(msg)\n\n def evaluate_stop_loop(self) -> bool:\n \"\"\"Evaluate whether to stop item or done output.\"\"\"\n current_index = self.ctx.get(f\"{self._id}_index\", 0)\n data_length = len(self.ctx.get(f\"{self._id}_data\", []))\n return current_index > data_length\n\n def item_output(self) -> Data:\n \"\"\"Output the next item in the list or stop if done.\"\"\"\n self.initialize_data()\n current_item = Data(text=\"\")\n\n if self.evaluate_stop_loop():\n self.stop(\"item\")\n else:\n # Get data list and current index\n data_list, current_index = self.loop_variables()\n if current_index < len(data_list):\n # Output current item and increment index\n try:\n current_item = data_list[current_index]\n except IndexError:\n current_item = Data(text=\"\")\n self.aggregated_output()\n self.update_ctx({f\"{self._id}_index\": current_index + 1})\n\n # Now we need to update the dependencies for the next run\n self.update_dependency()\n return current_item\n\n def update_dependency(self):\n item_dependency_id = self.get_incoming_edge_by_target_param(\"item\")\n if item_dependency_id not in self.graph.run_manager.run_predecessors[self._id]:\n self.graph.run_manager.run_predecessors[self._id].append(item_dependency_id)\n\n def done_output(self) -> DataFrame:\n \"\"\"Trigger the done output when iteration is complete.\"\"\"\n self.initialize_data()\n\n if self.evaluate_stop_loop():\n self.stop(\"item\")\n self.start(\"done\")\n\n aggregated = self.ctx.get(f\"{self._id}_aggregated\", [])\n\n return DataFrame(aggregated)\n self.stop(\"done\")\n return DataFrame([])\n\n def loop_variables(self):\n \"\"\"Retrieve loop variables from context.\"\"\"\n return (\n self.ctx.get(f\"{self._id}_data\", []),\n self.ctx.get(f\"{self._id}_index\", 0),\n )\n\n def aggregated_output(self) -> list[Data]:\n \"\"\"Return the aggregated list once all items are processed.\"\"\"\n self.initialize_data()\n\n # Get data list and aggregated list\n data_list = self.ctx.get(f\"{self._id}_data\", [])\n aggregated = self.ctx.get(f\"{self._id}_aggregated\", [])\n loop_input = self.item\n if loop_input is not None and not isinstance(loop_input, str) and len(aggregated) <= len(data_list):\n aggregated.append(loop_input)\n self.update_ctx({f\"{self._id}_aggregated\": aggregated})\n return aggregated\n" }, "data": { "_input_type": "HandleInput", diff --git a/src/backend/base/langflow/template/frontend_node/base.py b/src/backend/base/langflow/template/frontend_node/base.py index 97238a594..b02920117 100644 --- a/src/backend/base/langflow/template/frontend_node/base.py +++ b/src/backend/base/langflow/template/frontend_node/base.py @@ -127,12 +127,16 @@ class FrontendNode(BaseModel): def validate_name_overlap(self) -> None: # Check if any of the output names overlap with the any of the inputs - output_names = [output.name for output in self.outputs] + output_names = [output.name for output in self.outputs if not output.allows_loop] input_names = [input_.name for input_ in self.template.fields] overlap = set(output_names).intersection(input_names) if overlap: overlap_str = ", ".join(f"'{x}'" for x in overlap) - msg = f"There should be no overlap between input and output names. Names {overlap_str} are duplicated." + msg = ( + "There should be no overlap between input and output names. " + f"Names {overlap_str} are duplicated in component {self.display_name}. " + f"Inputs are {input_names} and outputs are {output_names}." + ) raise ValueError(msg) def validate_attributes(self) -> None: diff --git a/src/backend/tests/unit/components/languagemodels/test_openai_model.py b/src/backend/tests/unit/components/languagemodels/test_openai_model.py index e5e0e3168..60c77c7f2 100644 --- a/src/backend/tests/unit/components/languagemodels/test_openai_model.py +++ b/src/backend/tests/unit/components/languagemodels/test_openai_model.py @@ -186,6 +186,7 @@ class TestOpenAIModelComponent(ComponentTestBaseWithoutClient): assert model.model_name == "gpt-4.1-nano" assert model.openai_api_base == "https://api.openai.com/v1" + @pytest.mark.skipif(os.getenv("OPENAI_API_KEY") is None, reason="OPENAI_API_KEY is not set") def test_build_model_integration_reasoning(self): component = OpenAIModelComponent() component.api_key = os.getenv("OPENAI_API_KEY") diff --git a/src/backend/tests/unit/components/logic/test_loop.py b/src/backend/tests/unit/components/logic/test_loop.py index 7188aad1f..60f25c8cc 100644 --- a/src/backend/tests/unit/components/logic/test_loop.py +++ b/src/backend/tests/unit/components/logic/test_loop.py @@ -1,10 +1,21 @@ import json +import os from uuid import UUID import orjson import pytest from httpx import AsyncClient +from langflow.components.data.url import URLComponent +from langflow.components.input_output import ChatOutput from langflow.components.logic import LoopComponent +from langflow.components.openai.openai_chat_model import OpenAIModelComponent +from langflow.components.processing import ( + ParserComponent, + PromptComponent, + SplitTextComponent, + StructuredOutputComponent, +) +from langflow.graph import Graph from langflow.memory import aget_messages from langflow.schema.data import Data from langflow.services.database.models.flow import FlowCreate @@ -116,3 +127,124 @@ class TestLoopComponentWithAPI(ComponentTestBaseWithClient): assert "outputs" in data assert "session_id" in data assert len(data["outputs"][-1]["outputs"]) > 0 + + +@pytest.mark.skipif(os.getenv("OPENAI_API_KEY") in {None, "dummy"}, reason="OPENAI_API_KEY is not set") +def loop_flow(): + """Complete loop flow that processes multiple URLs through a loop.""" + # Create URL component to fetch content from multiple sources + url_component = URLComponent() + url_component.set(urls=["https://docs.langflow.org/"]) + + # Create SplitText component to chunk the content + split_text_component = SplitTextComponent() + split_text_component.set( + data_inputs=url_component.fetch_content, + chunk_size=1000, + chunk_overlap=200, + separator="\n\n", + ) + + # Create Loop component to iterate through the chunks + loop_component = LoopComponent() + loop_component.set(data=split_text_component.split_text) + + # Create Parser component to format the current loop item + parser_component = ParserComponent() + parser_component.set( + input_data=loop_component.item_output, + pattern="Content: {text}", + sep="\n", + ) + + # Create Prompt component to create processing instructions + prompt_component = PromptComponent() + prompt_component.set( + template="Analyze and summarize this content: {context}", + input_text=parser_component.parse_combined_text, + ) + + # Create OpenAI model component for processing + openai_component = OpenAIModelComponent() + openai_component.set( + api_key=os.getenv("OPENAI_API_KEY"), + model_name="gpt-4.1-mini", + temperature=0.7, + ) + + # Create StructuredOutput component to process content + structured_output = StructuredOutputComponent() + structured_output.set( + llm=openai_component.build_model, + input_value=prompt_component.build_prompt, + schema_name="ProcessedContent", + system_prompt=( # Added missing system_prompt - this was causing the "Multiple structured outputs" error + "You are an AI that extracts one structured JSON object from unstructured text. " + "Use a predefined schema with expected types (str, int, float, bool, dict). " + "If multiple structures exist, extract only the first most complete one. " + "Fill missing or ambiguous values with defaults: null for missing values. " + "Ignore duplicates and partial repeats. " + "Always return one valid JSON, never throw errors or return multiple objects." + "Output: A single well-formed JSON object, and nothing else." + ), + output_schema=[ # Fixed schema types to match expected format + {"name": "summary", "type": "str", "description": "Key summary of the content", "multiple": False}, + {"name": "topics", "type": "list", "description": "Main topics covered", "multiple": False}, + {"name": "source_url", "type": "str", "description": "Source URL of the content", "multiple": False}, + ], + ) + + # Connect the feedback loop - StructuredOutput back to Loop item input + # Note: 'item' is a special dynamic input for LoopComponent feedback loops + loop_component.set(item=structured_output.build_structured_output) + # Create ChatOutput component to display final results + chat_output = ChatOutput() + chat_output.set(input_value=loop_component.done_output) + + return Graph(start=url_component, end=chat_output) + + +@pytest.mark.xfail +async def test_loop_flow(): + """Test that loop_flow creates a working graph with proper loop feedback connection.""" + flow = loop_flow() + assert flow is not None + assert flow._start is not None + assert flow._end is not None + + # Verify all expected components are present + expected_vertices = { + "URLComponent", + "SplitTextComponent", + "LoopComponent", + "ParserComponent", + "PromptComponent", + "OpenAIModelComponent", + "StructuredOutputComponent", + "ChatOutput", + } + + assert all(vertex.id.split("-")[0] in expected_vertices for vertex in flow.vertices) + + expected_execution_order = [ + "OpenAIModelComponent", + "URLComponent", + "SplitTextComponent", + "LoopComponent", + "ParserComponent", + "PromptComponent", + "StructuredOutputComponent", + "LoopComponent", + "ParserComponent", + "PromptComponent", + "StructuredOutputComponent", + "LoopComponent", + "ParserComponent", + "PromptComponent", + "StructuredOutputComponent", + "LoopComponent", + "ChatOutput", + ] + results = [result async for result in flow.async_start()] + result_order = [result.vertex.id.split("-")[0] for result in results if hasattr(result, "vertex")] + assert result_order == expected_execution_order