fix: Enhance vertex runnability logic with loop detection (#6309)
* feat: add is_loop property to Vertex class for detecting looping outputs * feat: improve vertex runnability logic for graph traversal - Update `is_vertex_runnable` to handle loop vertices more robustly - Modify `are_all_predecessors_fulfilled` to better manage cycle dependencies - Change adjacency maps to use sets for more efficient predecessor/successor tracking * refactor: change graph adjacency maps from lists to sets for improved performance - Update graph data structures to use sets instead of lists for predecessor, successor, and parent-child maps - Modify type hints and method signatures to reflect the change from list to set - Improve graph traversal and vertex tracking efficiency by using set operations
This commit is contained in:
parent
f3ddbcf1a8
commit
f5a2c1cb3e
7 changed files with 88 additions and 41 deletions
|
|
@ -116,10 +116,10 @@ class Graph:
|
|||
|
||||
self.top_level_vertices: list[str] = []
|
||||
self.vertex_map: dict[str, Vertex] = {}
|
||||
self.predecessor_map: dict[str, list[str]] = defaultdict(list)
|
||||
self.successor_map: dict[str, list[str]] = defaultdict(list)
|
||||
self.predecessor_map: dict[str, set[str]] = defaultdict(set)
|
||||
self.successor_map: dict[str, set[str]] = defaultdict(set)
|
||||
self.in_degree_map: dict[str, int] = defaultdict(int)
|
||||
self.parent_child_map: dict[str, list[str]] = defaultdict(list)
|
||||
self.parent_child_map: dict[str, set[str]] = defaultdict(set)
|
||||
self._run_queue: deque[str] = deque()
|
||||
self._first_layer: list[str] = []
|
||||
self._lock = asyncio.Lock()
|
||||
|
|
@ -469,10 +469,10 @@ class Graph:
|
|||
self.add_edge(edge)
|
||||
source_id = edge["data"]["sourceHandle"]["id"]
|
||||
target_id = edge["data"]["targetHandle"]["id"]
|
||||
self.predecessor_map[target_id].append(source_id)
|
||||
self.successor_map[source_id].append(target_id)
|
||||
self.predecessor_map[target_id].add(source_id)
|
||||
self.successor_map[source_id].add(target_id)
|
||||
self.in_degree_map[target_id] += 1
|
||||
self.parent_child_map[source_id].append(target_id)
|
||||
self.parent_child_map[source_id].add(target_id)
|
||||
|
||||
def add_node(self, node: NodeData) -> None:
|
||||
self._vertices.append(node)
|
||||
|
|
@ -1554,7 +1554,7 @@ class Graph:
|
|||
logger.debug("Graph processing complete")
|
||||
return self
|
||||
|
||||
def find_next_runnable_vertices(self, vertex_successors_ids: list[str]) -> list[str]:
|
||||
def find_next_runnable_vertices(self, vertex_successors_ids: set[str]) -> list[str]:
|
||||
next_runnable_vertices = set()
|
||||
for v_id in sorted(vertex_successors_ids):
|
||||
if not self.is_vertex_runnable(v_id):
|
||||
|
|
@ -1692,7 +1692,7 @@ class Graph:
|
|||
|
||||
def get_successors(self, vertex: Vertex) -> list[Vertex]:
|
||||
"""Returns the successors of a vertex."""
|
||||
return [self.get_vertex(target_id) for target_id in self.successor_map.get(vertex.id, [])]
|
||||
return [self.get_vertex(target_id) for target_id in self.successor_map.get(vertex.id, set())]
|
||||
|
||||
def get_vertex_neighbors(self, vertex: Vertex) -> dict[Vertex, int]:
|
||||
"""Returns the neighbors of a vertex."""
|
||||
|
|
@ -1952,7 +1952,8 @@ class Graph:
|
|||
def is_vertex_runnable(self, vertex_id: str) -> bool:
|
||||
"""Returns whether a vertex is runnable."""
|
||||
is_active = self.get_vertex(vertex_id).is_active()
|
||||
return self.run_manager.is_vertex_runnable(vertex_id, is_active=is_active)
|
||||
is_loop = self.get_vertex(vertex_id).is_loop
|
||||
return self.run_manager.is_vertex_runnable(vertex_id, is_active=is_active, is_loop=is_loop)
|
||||
|
||||
def build_run_map(self) -> None:
|
||||
"""Builds the run map for the graph.
|
||||
|
|
@ -1984,7 +1985,8 @@ class Graph:
|
|||
return
|
||||
visited.add(predecessor_id)
|
||||
is_active = self.get_vertex(predecessor_id).is_active()
|
||||
if self.run_manager.is_vertex_runnable(predecessor_id, is_active=is_active):
|
||||
is_loop = self.get_vertex(predecessor_id).is_loop
|
||||
if self.run_manager.is_vertex_runnable(predecessor_id, is_active=is_active, is_loop=is_loop):
|
||||
runnable_vertices.append(predecessor_id)
|
||||
else:
|
||||
for pred_pred_id in self.run_manager.run_predecessors.get(predecessor_id, []):
|
||||
|
|
@ -2029,13 +2031,13 @@ class Graph:
|
|||
return in_degree
|
||||
|
||||
@staticmethod
|
||||
def build_adjacency_maps(edges: list[CycleEdge]) -> tuple[dict[str, list[str]], dict[str, list[str]]]:
|
||||
def build_adjacency_maps(edges: list[CycleEdge]) -> tuple[dict[str, set[str]], dict[str, set[str]]]:
|
||||
"""Returns the adjacency maps for the graph."""
|
||||
predecessor_map: dict[str, list[str]] = defaultdict(list)
|
||||
successor_map: dict[str, list[str]] = defaultdict(list)
|
||||
predecessor_map: dict[str, set[str]] = defaultdict(set)
|
||||
successor_map: dict[str, set[str]] = defaultdict(set)
|
||||
for edge in edges:
|
||||
predecessor_map[edge.target_id].append(edge.source_id)
|
||||
successor_map[edge.source_id].append(edge.target_id)
|
||||
predecessor_map[edge.target_id].add(edge.source_id)
|
||||
successor_map[edge.source_id].add(edge.target_id)
|
||||
return predecessor_map, successor_map
|
||||
|
||||
def __to_dict(self) -> dict[str, dict[str, list[str]]]:
|
||||
|
|
|
|||
|
|
@ -1,6 +1,14 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from langflow.graph.schema import CHAT_COMPONENTS
|
||||
from langflow.utils.lazy_load import LazyLoadDictBase
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from langflow.graph.vertex.base import Vertex
|
||||
from langflow.graph.vertex.vertex_types import CustomComponentVertex
|
||||
|
||||
|
||||
class Finish:
|
||||
def __bool__(self) -> bool:
|
||||
|
|
@ -22,7 +30,7 @@ class VertexTypesDict(LazyLoadDictBase):
|
|||
self._types = _import_vertex_types
|
||||
|
||||
@property
|
||||
def vertex_type_map(self):
|
||||
def vertex_type_map(self) -> dict[str, type[Vertex]]:
|
||||
return self.all_types_dict
|
||||
|
||||
def _build_dict(self):
|
||||
|
|
@ -32,7 +40,7 @@ class VertexTypesDict(LazyLoadDictBase):
|
|||
"Custom": ["Custom Tool", "Python Function"],
|
||||
}
|
||||
|
||||
def get_type_dict(self):
|
||||
def get_type_dict(self) -> dict[str, type[Vertex]]:
|
||||
types = self._types()
|
||||
return {
|
||||
"CustomComponent": types.CustomComponentVertex,
|
||||
|
|
@ -40,7 +48,7 @@ class VertexTypesDict(LazyLoadDictBase):
|
|||
**dict.fromkeys(CHAT_COMPONENTS, types.InterfaceVertex),
|
||||
}
|
||||
|
||||
def get_custom_component_vertex_type(self):
|
||||
def get_custom_component_vertex_type(self) -> type[CustomComponentVertex]:
|
||||
return self._types().CustomComponentVertex
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -48,18 +48,41 @@ class RunnableVerticesManager:
|
|||
self.vertices_to_run.update(vertices_to_run)
|
||||
self.build_run_map(self.run_predecessors, self.vertices_to_run)
|
||||
|
||||
def is_vertex_runnable(self, vertex_id: str, *, is_active: bool) -> bool:
|
||||
"""Determines if a vertex is runnable."""
|
||||
def is_vertex_runnable(self, vertex_id: str, *, is_active: bool, is_loop: bool = False) -> bool:
|
||||
"""Determines if a vertex is runnable based on its active state and predecessor fulfillment."""
|
||||
if not is_active:
|
||||
return False
|
||||
if vertex_id in self.vertices_being_run:
|
||||
return False
|
||||
if vertex_id not in self.vertices_to_run:
|
||||
return False
|
||||
return self.are_all_predecessors_fulfilled(vertex_id) or vertex_id in self.cycle_vertices
|
||||
|
||||
def are_all_predecessors_fulfilled(self, vertex_id: str) -> bool:
|
||||
return not any(self.run_predecessors.get(vertex_id, []))
|
||||
return self.are_all_predecessors_fulfilled(vertex_id, is_loop=is_loop)
|
||||
|
||||
def are_all_predecessors_fulfilled(self, vertex_id: str, *, is_loop: bool) -> bool:
|
||||
"""Determines if all predecessors for a vertex have been fulfilled.
|
||||
|
||||
This method checks if a vertex is ready to run by verifying that either:
|
||||
1. It has no pending predecessors that need to complete first
|
||||
2. For vertices in cycles, none of its pending predecessors are also cycle vertices
|
||||
(which would create a circular dependency)
|
||||
|
||||
Args:
|
||||
vertex_id (str): The ID of the vertex to check
|
||||
is_loop (bool): Whether the vertex is a loop
|
||||
Returns:
|
||||
bool: True if all predecessor conditions are met, False otherwise
|
||||
"""
|
||||
# Get pending predecessors, return True if none exist
|
||||
if not (pending := self.run_predecessors.get(vertex_id, set())):
|
||||
return True
|
||||
|
||||
# For cycle vertices, check if any pending predecessors are also in cycle
|
||||
# Using set intersection is faster than iteration
|
||||
if vertex_id in self.cycle_vertices:
|
||||
return is_loop or not bool(pending.intersection(self.cycle_vertices))
|
||||
|
||||
return False
|
||||
|
||||
def remove_from_predecessors(self, vertex_id: str) -> None:
|
||||
"""Removes a vertex from the predecessor list of its successors."""
|
||||
|
|
|
|||
|
|
@ -461,8 +461,8 @@ def find_cycle_vertices(edges):
|
|||
def layered_topological_sort(
|
||||
vertices_ids: set[str],
|
||||
in_degree_map: dict[str, int],
|
||||
successor_map: dict[str, list[str]],
|
||||
predecessor_map: dict[str, list[str]],
|
||||
successor_map: dict[str, set[str]],
|
||||
predecessor_map: dict[str, set[str]],
|
||||
start_id: str | None = None,
|
||||
cycle_vertices: set[str] | None = None,
|
||||
is_input_vertex: Callable[[str], bool] | None = None, # noqa: ARG001
|
||||
|
|
@ -780,8 +780,8 @@ def get_sorted_vertices(
|
|||
start_component_id: str | None = None,
|
||||
graph_dict: dict[str, Any] | None = None,
|
||||
in_degree_map: dict[str, int] | None = None,
|
||||
successor_map: dict[str, list[str]] | None = None,
|
||||
predecessor_map: dict[str, list[str]] | None = None,
|
||||
successor_map: dict[str, set[str]] | None = None,
|
||||
predecessor_map: dict[str, set[str]] | None = None,
|
||||
is_input_vertex: Callable[[str], bool] | None = None,
|
||||
get_vertex_predecessors: Callable[[str], list[str]] | None = None,
|
||||
get_vertex_successors: Callable[[str], list[str]] | None = None,
|
||||
|
|
@ -826,18 +826,18 @@ def get_sorted_vertices(
|
|||
successor_map = {}
|
||||
for vertex_id in vertices_ids:
|
||||
if get_vertex_successors is not None:
|
||||
successor_map[vertex_id] = get_vertex_successors(vertex_id)
|
||||
successor_map[vertex_id] = set(get_vertex_successors(vertex_id))
|
||||
else:
|
||||
successor_map[vertex_id] = []
|
||||
successor_map[vertex_id] = set()
|
||||
|
||||
# Build predecessor_map if not provided
|
||||
if predecessor_map is None:
|
||||
predecessor_map = {}
|
||||
for vertex_id in vertices_ids:
|
||||
if get_vertex_predecessors is not None:
|
||||
predecessor_map[vertex_id] = get_vertex_predecessors(vertex_id)
|
||||
predecessor_map[vertex_id] = set(get_vertex_predecessors(vertex_id))
|
||||
else:
|
||||
predecessor_map[vertex_id] = []
|
||||
predecessor_map[vertex_id] = set()
|
||||
|
||||
# If we have a stop component, we need to filter out all vertices
|
||||
# that are not predecessors of the stop component
|
||||
|
|
|
|||
|
|
@ -66,6 +66,7 @@ class Vertex:
|
|||
self.is_state = False
|
||||
self.is_input = any(input_component_name in self.id for input_component_name in INPUT_COMPONENTS)
|
||||
self.is_output = any(output_component_name in self.id for output_component_name in OUTPUT_COMPONENTS)
|
||||
self._is_loop = None
|
||||
self.has_session_id = None
|
||||
self.custom_component = None
|
||||
self.has_external_input = False
|
||||
|
|
@ -78,7 +79,7 @@ class Vertex:
|
|||
self.built_object: Any = UnbuiltObject()
|
||||
self.built_result: Any = None
|
||||
self.built = False
|
||||
self._successors_ids: list[str] | None = None
|
||||
self._successors_ids: set[str] | None = None
|
||||
self.artifacts: dict[str, Any] = {}
|
||||
self.artifacts_raw: dict[str, Any] = {}
|
||||
self.artifacts_type: dict[str, str] = {}
|
||||
|
|
@ -109,6 +110,13 @@ class Vertex:
|
|||
output["name"] for output in self.outputs if isinstance(output, dict) and "name" in output
|
||||
]
|
||||
|
||||
@property
|
||||
def is_loop(self) -> bool:
|
||||
"""Check if any output allows looping."""
|
||||
if self._is_loop is None:
|
||||
self._is_loop = any(output.get("allows_loop", False) for output in self.outputs)
|
||||
return self._is_loop
|
||||
|
||||
def set_input_value(self, name: str, value: Any) -> None:
|
||||
if self.custom_component is None:
|
||||
msg = f"Vertex {self.id} does not have a component instance."
|
||||
|
|
@ -199,8 +207,8 @@ class Vertex:
|
|||
return self.graph.get_successors(self)
|
||||
|
||||
@property
|
||||
def successors_ids(self) -> list[str]:
|
||||
return self.graph.successor_map.get(self.id, [])
|
||||
def successors_ids(self) -> set[str]:
|
||||
return self.graph.successor_map.get(self.id, set())
|
||||
|
||||
def __getstate__(self):
|
||||
state = self.__dict__.copy()
|
||||
|
|
|
|||
|
|
@ -466,7 +466,7 @@ class StateVertex(ComponentVertex):
|
|||
self.is_state = False
|
||||
|
||||
@property
|
||||
def successors_ids(self) -> list[str]:
|
||||
def successors_ids(self) -> set[str]:
|
||||
if self._successors_ids is None:
|
||||
self.is_state = False
|
||||
return super().successors_ids
|
||||
|
|
|
|||
|
|
@ -90,8 +90,9 @@ def test_is_vertex_runnable(data):
|
|||
manager = RunnableVerticesManager.from_dict(data)
|
||||
vertex_id = "A"
|
||||
is_active = True
|
||||
is_loop = False
|
||||
|
||||
result = manager.is_vertex_runnable(vertex_id, is_active=is_active)
|
||||
result = manager.is_vertex_runnable(vertex_id, is_active=is_active, is_loop=is_loop)
|
||||
|
||||
assert result is False
|
||||
|
||||
|
|
@ -100,8 +101,9 @@ def test_is_vertex_runnable__wrong_is_active(data):
|
|||
manager = RunnableVerticesManager.from_dict(data)
|
||||
vertex_id = "A"
|
||||
is_active = False
|
||||
is_loop = False
|
||||
|
||||
result = manager.is_vertex_runnable(vertex_id, is_active=is_active)
|
||||
result = manager.is_vertex_runnable(vertex_id, is_active=is_active, is_loop=is_loop)
|
||||
|
||||
assert result is False
|
||||
|
||||
|
|
@ -110,8 +112,9 @@ def test_is_vertex_runnable__wrong_vertices_to_run(data):
|
|||
manager = RunnableVerticesManager.from_dict(data)
|
||||
vertex_id = "D"
|
||||
is_active = True
|
||||
is_loop = False
|
||||
|
||||
result = manager.is_vertex_runnable(vertex_id, is_active=is_active)
|
||||
result = manager.is_vertex_runnable(vertex_id, is_active=is_active, is_loop=is_loop)
|
||||
|
||||
assert result is False
|
||||
|
||||
|
|
@ -120,8 +123,9 @@ def test_is_vertex_runnable__wrong_run_predecessors(data):
|
|||
manager = RunnableVerticesManager.from_dict(data)
|
||||
vertex_id = "C"
|
||||
is_active = True
|
||||
is_loop = False
|
||||
|
||||
result = manager.is_vertex_runnable(vertex_id, is_active=is_active)
|
||||
result = manager.is_vertex_runnable(vertex_id, is_active=is_active, is_loop=is_loop)
|
||||
|
||||
assert result is False
|
||||
|
||||
|
|
@ -129,8 +133,9 @@ def test_is_vertex_runnable__wrong_run_predecessors(data):
|
|||
def test_are_all_predecessors_fulfilled(data):
|
||||
manager = RunnableVerticesManager.from_dict(data)
|
||||
vertex_id = "A"
|
||||
is_loop = False
|
||||
|
||||
result = manager.are_all_predecessors_fulfilled(vertex_id)
|
||||
result = manager.are_all_predecessors_fulfilled(vertex_id, is_loop=is_loop)
|
||||
|
||||
assert result is True
|
||||
|
||||
|
|
@ -138,8 +143,9 @@ def test_are_all_predecessors_fulfilled(data):
|
|||
def test_are_all_predecessors_fulfilled__wrong(data):
|
||||
manager = RunnableVerticesManager.from_dict(data)
|
||||
vertex_id = "D"
|
||||
is_loop = False
|
||||
|
||||
result = manager.are_all_predecessors_fulfilled(vertex_id)
|
||||
result = manager.are_all_predecessors_fulfilled(vertex_id, is_loop=is_loop)
|
||||
|
||||
assert result is False
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue