diff --git a/src/backend/base/langflow/graph/graph/base.py b/src/backend/base/langflow/graph/graph/base.py index f7b3eec89..7e57c274c 100644 --- a/src/backend/base/langflow/graph/graph/base.py +++ b/src/backend/base/langflow/graph/graph/base.py @@ -116,10 +116,10 @@ class Graph: self.top_level_vertices: list[str] = [] self.vertex_map: dict[str, Vertex] = {} - self.predecessor_map: dict[str, list[str]] = defaultdict(list) - self.successor_map: dict[str, list[str]] = defaultdict(list) + self.predecessor_map: dict[str, set[str]] = defaultdict(set) + self.successor_map: dict[str, set[str]] = defaultdict(set) self.in_degree_map: dict[str, int] = defaultdict(int) - self.parent_child_map: dict[str, list[str]] = defaultdict(list) + self.parent_child_map: dict[str, set[str]] = defaultdict(set) self._run_queue: deque[str] = deque() self._first_layer: list[str] = [] self._lock = asyncio.Lock() @@ -469,10 +469,10 @@ class Graph: self.add_edge(edge) source_id = edge["data"]["sourceHandle"]["id"] target_id = edge["data"]["targetHandle"]["id"] - self.predecessor_map[target_id].append(source_id) - self.successor_map[source_id].append(target_id) + self.predecessor_map[target_id].add(source_id) + self.successor_map[source_id].add(target_id) self.in_degree_map[target_id] += 1 - self.parent_child_map[source_id].append(target_id) + self.parent_child_map[source_id].add(target_id) def add_node(self, node: NodeData) -> None: self._vertices.append(node) @@ -1554,7 +1554,7 @@ class Graph: logger.debug("Graph processing complete") return self - def find_next_runnable_vertices(self, vertex_successors_ids: list[str]) -> list[str]: + def find_next_runnable_vertices(self, vertex_successors_ids: set[str]) -> list[str]: next_runnable_vertices = set() for v_id in sorted(vertex_successors_ids): if not self.is_vertex_runnable(v_id): @@ -1692,7 +1692,7 @@ class Graph: def get_successors(self, vertex: Vertex) -> list[Vertex]: """Returns the successors of a vertex.""" - return [self.get_vertex(target_id) for target_id in self.successor_map.get(vertex.id, [])] + return [self.get_vertex(target_id) for target_id in self.successor_map.get(vertex.id, set())] def get_vertex_neighbors(self, vertex: Vertex) -> dict[Vertex, int]: """Returns the neighbors of a vertex.""" @@ -1952,7 +1952,8 @@ class Graph: def is_vertex_runnable(self, vertex_id: str) -> bool: """Returns whether a vertex is runnable.""" is_active = self.get_vertex(vertex_id).is_active() - return self.run_manager.is_vertex_runnable(vertex_id, is_active=is_active) + is_loop = self.get_vertex(vertex_id).is_loop + return self.run_manager.is_vertex_runnable(vertex_id, is_active=is_active, is_loop=is_loop) def build_run_map(self) -> None: """Builds the run map for the graph. @@ -1984,7 +1985,8 @@ class Graph: return visited.add(predecessor_id) is_active = self.get_vertex(predecessor_id).is_active() - if self.run_manager.is_vertex_runnable(predecessor_id, is_active=is_active): + is_loop = self.get_vertex(predecessor_id).is_loop + if self.run_manager.is_vertex_runnable(predecessor_id, is_active=is_active, is_loop=is_loop): runnable_vertices.append(predecessor_id) else: for pred_pred_id in self.run_manager.run_predecessors.get(predecessor_id, []): @@ -2029,13 +2031,13 @@ class Graph: return in_degree @staticmethod - def build_adjacency_maps(edges: list[CycleEdge]) -> tuple[dict[str, list[str]], dict[str, list[str]]]: + def build_adjacency_maps(edges: list[CycleEdge]) -> tuple[dict[str, set[str]], dict[str, set[str]]]: """Returns the adjacency maps for the graph.""" - predecessor_map: dict[str, list[str]] = defaultdict(list) - successor_map: dict[str, list[str]] = defaultdict(list) + predecessor_map: dict[str, set[str]] = defaultdict(set) + successor_map: dict[str, set[str]] = defaultdict(set) for edge in edges: - predecessor_map[edge.target_id].append(edge.source_id) - successor_map[edge.source_id].append(edge.target_id) + predecessor_map[edge.target_id].add(edge.source_id) + successor_map[edge.source_id].add(edge.target_id) return predecessor_map, successor_map def __to_dict(self) -> dict[str, dict[str, list[str]]]: diff --git a/src/backend/base/langflow/graph/graph/constants.py b/src/backend/base/langflow/graph/graph/constants.py index fd9306d9d..127c055e1 100644 --- a/src/backend/base/langflow/graph/graph/constants.py +++ b/src/backend/base/langflow/graph/graph/constants.py @@ -1,6 +1,14 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + from langflow.graph.schema import CHAT_COMPONENTS from langflow.utils.lazy_load import LazyLoadDictBase +if TYPE_CHECKING: + from langflow.graph.vertex.base import Vertex + from langflow.graph.vertex.vertex_types import CustomComponentVertex + class Finish: def __bool__(self) -> bool: @@ -22,7 +30,7 @@ class VertexTypesDict(LazyLoadDictBase): self._types = _import_vertex_types @property - def vertex_type_map(self): + def vertex_type_map(self) -> dict[str, type[Vertex]]: return self.all_types_dict def _build_dict(self): @@ -32,7 +40,7 @@ class VertexTypesDict(LazyLoadDictBase): "Custom": ["Custom Tool", "Python Function"], } - def get_type_dict(self): + def get_type_dict(self) -> dict[str, type[Vertex]]: types = self._types() return { "CustomComponent": types.CustomComponentVertex, @@ -40,7 +48,7 @@ class VertexTypesDict(LazyLoadDictBase): **dict.fromkeys(CHAT_COMPONENTS, types.InterfaceVertex), } - def get_custom_component_vertex_type(self): + def get_custom_component_vertex_type(self) -> type[CustomComponentVertex]: return self._types().CustomComponentVertex diff --git a/src/backend/base/langflow/graph/graph/runnable_vertices_manager.py b/src/backend/base/langflow/graph/graph/runnable_vertices_manager.py index 9abc227e9..2b11d16fa 100644 --- a/src/backend/base/langflow/graph/graph/runnable_vertices_manager.py +++ b/src/backend/base/langflow/graph/graph/runnable_vertices_manager.py @@ -48,18 +48,41 @@ class RunnableVerticesManager: self.vertices_to_run.update(vertices_to_run) self.build_run_map(self.run_predecessors, self.vertices_to_run) - def is_vertex_runnable(self, vertex_id: str, *, is_active: bool) -> bool: - """Determines if a vertex is runnable.""" + def is_vertex_runnable(self, vertex_id: str, *, is_active: bool, is_loop: bool = False) -> bool: + """Determines if a vertex is runnable based on its active state and predecessor fulfillment.""" if not is_active: return False if vertex_id in self.vertices_being_run: return False if vertex_id not in self.vertices_to_run: return False - return self.are_all_predecessors_fulfilled(vertex_id) or vertex_id in self.cycle_vertices - def are_all_predecessors_fulfilled(self, vertex_id: str) -> bool: - return not any(self.run_predecessors.get(vertex_id, [])) + return self.are_all_predecessors_fulfilled(vertex_id, is_loop=is_loop) + + def are_all_predecessors_fulfilled(self, vertex_id: str, *, is_loop: bool) -> bool: + """Determines if all predecessors for a vertex have been fulfilled. + + This method checks if a vertex is ready to run by verifying that either: + 1. It has no pending predecessors that need to complete first + 2. For vertices in cycles, none of its pending predecessors are also cycle vertices + (which would create a circular dependency) + + Args: + vertex_id (str): The ID of the vertex to check + is_loop (bool): Whether the vertex is a loop + Returns: + bool: True if all predecessor conditions are met, False otherwise + """ + # Get pending predecessors, return True if none exist + if not (pending := self.run_predecessors.get(vertex_id, set())): + return True + + # For cycle vertices, check if any pending predecessors are also in cycle + # Using set intersection is faster than iteration + if vertex_id in self.cycle_vertices: + return is_loop or not bool(pending.intersection(self.cycle_vertices)) + + return False def remove_from_predecessors(self, vertex_id: str) -> None: """Removes a vertex from the predecessor list of its successors.""" diff --git a/src/backend/base/langflow/graph/graph/utils.py b/src/backend/base/langflow/graph/graph/utils.py index 26650f343..d0a8ea174 100644 --- a/src/backend/base/langflow/graph/graph/utils.py +++ b/src/backend/base/langflow/graph/graph/utils.py @@ -461,8 +461,8 @@ def find_cycle_vertices(edges): def layered_topological_sort( vertices_ids: set[str], in_degree_map: dict[str, int], - successor_map: dict[str, list[str]], - predecessor_map: dict[str, list[str]], + successor_map: dict[str, set[str]], + predecessor_map: dict[str, set[str]], start_id: str | None = None, cycle_vertices: set[str] | None = None, is_input_vertex: Callable[[str], bool] | None = None, # noqa: ARG001 @@ -780,8 +780,8 @@ def get_sorted_vertices( start_component_id: str | None = None, graph_dict: dict[str, Any] | None = None, in_degree_map: dict[str, int] | None = None, - successor_map: dict[str, list[str]] | None = None, - predecessor_map: dict[str, list[str]] | None = None, + successor_map: dict[str, set[str]] | None = None, + predecessor_map: dict[str, set[str]] | None = None, is_input_vertex: Callable[[str], bool] | None = None, get_vertex_predecessors: Callable[[str], list[str]] | None = None, get_vertex_successors: Callable[[str], list[str]] | None = None, @@ -826,18 +826,18 @@ def get_sorted_vertices( successor_map = {} for vertex_id in vertices_ids: if get_vertex_successors is not None: - successor_map[vertex_id] = get_vertex_successors(vertex_id) + successor_map[vertex_id] = set(get_vertex_successors(vertex_id)) else: - successor_map[vertex_id] = [] + successor_map[vertex_id] = set() # Build predecessor_map if not provided if predecessor_map is None: predecessor_map = {} for vertex_id in vertices_ids: if get_vertex_predecessors is not None: - predecessor_map[vertex_id] = get_vertex_predecessors(vertex_id) + predecessor_map[vertex_id] = set(get_vertex_predecessors(vertex_id)) else: - predecessor_map[vertex_id] = [] + predecessor_map[vertex_id] = set() # If we have a stop component, we need to filter out all vertices # that are not predecessors of the stop component diff --git a/src/backend/base/langflow/graph/vertex/base.py b/src/backend/base/langflow/graph/vertex/base.py index bcc11f7c4..a354812e3 100644 --- a/src/backend/base/langflow/graph/vertex/base.py +++ b/src/backend/base/langflow/graph/vertex/base.py @@ -66,6 +66,7 @@ class Vertex: self.is_state = False self.is_input = any(input_component_name in self.id for input_component_name in INPUT_COMPONENTS) self.is_output = any(output_component_name in self.id for output_component_name in OUTPUT_COMPONENTS) + self._is_loop = None self.has_session_id = None self.custom_component = None self.has_external_input = False @@ -78,7 +79,7 @@ class Vertex: self.built_object: Any = UnbuiltObject() self.built_result: Any = None self.built = False - self._successors_ids: list[str] | None = None + self._successors_ids: set[str] | None = None self.artifacts: dict[str, Any] = {} self.artifacts_raw: dict[str, Any] = {} self.artifacts_type: dict[str, str] = {} @@ -109,6 +110,13 @@ class Vertex: output["name"] for output in self.outputs if isinstance(output, dict) and "name" in output ] + @property + def is_loop(self) -> bool: + """Check if any output allows looping.""" + if self._is_loop is None: + self._is_loop = any(output.get("allows_loop", False) for output in self.outputs) + return self._is_loop + def set_input_value(self, name: str, value: Any) -> None: if self.custom_component is None: msg = f"Vertex {self.id} does not have a component instance." @@ -199,8 +207,8 @@ class Vertex: return self.graph.get_successors(self) @property - def successors_ids(self) -> list[str]: - return self.graph.successor_map.get(self.id, []) + def successors_ids(self) -> set[str]: + return self.graph.successor_map.get(self.id, set()) def __getstate__(self): state = self.__dict__.copy() diff --git a/src/backend/base/langflow/graph/vertex/vertex_types.py b/src/backend/base/langflow/graph/vertex/vertex_types.py index cebc0cb9c..c6885000b 100644 --- a/src/backend/base/langflow/graph/vertex/vertex_types.py +++ b/src/backend/base/langflow/graph/vertex/vertex_types.py @@ -466,7 +466,7 @@ class StateVertex(ComponentVertex): self.is_state = False @property - def successors_ids(self) -> list[str]: + def successors_ids(self) -> set[str]: if self._successors_ids is None: self.is_state = False return super().successors_ids diff --git a/src/backend/tests/unit/graph/graph/test_runnable_vertices_manager.py b/src/backend/tests/unit/graph/graph/test_runnable_vertices_manager.py index 188d19b91..e775b11f8 100644 --- a/src/backend/tests/unit/graph/graph/test_runnable_vertices_manager.py +++ b/src/backend/tests/unit/graph/graph/test_runnable_vertices_manager.py @@ -90,8 +90,9 @@ def test_is_vertex_runnable(data): manager = RunnableVerticesManager.from_dict(data) vertex_id = "A" is_active = True + is_loop = False - result = manager.is_vertex_runnable(vertex_id, is_active=is_active) + result = manager.is_vertex_runnable(vertex_id, is_active=is_active, is_loop=is_loop) assert result is False @@ -100,8 +101,9 @@ def test_is_vertex_runnable__wrong_is_active(data): manager = RunnableVerticesManager.from_dict(data) vertex_id = "A" is_active = False + is_loop = False - result = manager.is_vertex_runnable(vertex_id, is_active=is_active) + result = manager.is_vertex_runnable(vertex_id, is_active=is_active, is_loop=is_loop) assert result is False @@ -110,8 +112,9 @@ def test_is_vertex_runnable__wrong_vertices_to_run(data): manager = RunnableVerticesManager.from_dict(data) vertex_id = "D" is_active = True + is_loop = False - result = manager.is_vertex_runnable(vertex_id, is_active=is_active) + result = manager.is_vertex_runnable(vertex_id, is_active=is_active, is_loop=is_loop) assert result is False @@ -120,8 +123,9 @@ def test_is_vertex_runnable__wrong_run_predecessors(data): manager = RunnableVerticesManager.from_dict(data) vertex_id = "C" is_active = True + is_loop = False - result = manager.is_vertex_runnable(vertex_id, is_active=is_active) + result = manager.is_vertex_runnable(vertex_id, is_active=is_active, is_loop=is_loop) assert result is False @@ -129,8 +133,9 @@ def test_is_vertex_runnable__wrong_run_predecessors(data): def test_are_all_predecessors_fulfilled(data): manager = RunnableVerticesManager.from_dict(data) vertex_id = "A" + is_loop = False - result = manager.are_all_predecessors_fulfilled(vertex_id) + result = manager.are_all_predecessors_fulfilled(vertex_id, is_loop=is_loop) assert result is True @@ -138,8 +143,9 @@ def test_are_all_predecessors_fulfilled(data): def test_are_all_predecessors_fulfilled__wrong(data): manager = RunnableVerticesManager.from_dict(data) vertex_id = "D" + is_loop = False - result = manager.are_all_predecessors_fulfilled(vertex_id) + result = manager.are_all_predecessors_fulfilled(vertex_id, is_loop=is_loop) assert result is False