chore: Refactor RunnableVerticesManager to consider inactivated vertices in is_vertex_runnable and find_runnable_predecessors_for_successors

This commit is contained in:
Gabriel Luiz Freitas Almeida 2024-06-26 10:57:04 -03:00
commit fb3c61f3b3

View file

@ -1,6 +1,6 @@
import asyncio
from collections import defaultdict
from typing import TYPE_CHECKING, Callable, List, Coroutine
from typing import TYPE_CHECKING, Callable, Coroutine, List
if TYPE_CHECKING:
from langflow.graph.graph.base import Graph
@ -40,19 +40,23 @@ class RunnableVerticesManager:
self.run_predecessors = state["run_predecessors"]
self.vertices_to_run = state["vertices_to_run"]
def is_vertex_runnable(self, vertex_id: str) -> bool:
def is_vertex_runnable(self, vertex_id: str, inactivated_vertices: set[str]) -> bool:
"""Determines if a vertex is runnable."""
return vertex_id in self.vertices_to_run and not self.run_predecessors.get(vertex_id)
return (
vertex_id in self.vertices_to_run
and not self.run_predecessors.get(vertex_id)
and vertex_id not in inactivated_vertices
)
def find_runnable_predecessors_for_successors(self, vertex_id: str) -> List[str]:
def find_runnable_predecessors_for_successors(self, vertex_id: str, inactivated_vertices: set[str]) -> List[str]:
"""Finds runnable predecessors for the successors of a given vertex."""
runnable_vertices = []
visited = set()
for successor_id in self.run_map.get(vertex_id, []):
for predecessor_id in self.run_predecessors.get(successor_id, []):
if predecessor_id not in visited and self.is_vertex_runnable(predecessor_id):
if predecessor_id not in visited and self.is_vertex_runnable(predecessor_id, inactivated_vertices):
runnable_vertices.append(predecessor_id)
visited.add(predecessor_id)
return runnable_vertices
@ -104,10 +108,14 @@ class RunnableVerticesManager:
"""
async with lock:
self.remove_from_predecessors(vertex.id)
direct_successors_ready = [v for v in vertex.successors_ids if self.is_vertex_runnable(v)]
direct_successors_ready = [
v for v in vertex.successors_ids if self.is_vertex_runnable(v, graph.inactivated_vertices)
]
if not direct_successors_ready:
# No direct successors ready, look for runnable predecessors of successors
next_runnable_vertices = self.find_runnable_predecessors_for_successors(vertex.id)
next_runnable_vertices = self.find_runnable_predecessors_for_successors(
vertex.id, graph.inactivated_vertices
)
else:
next_runnable_vertices = direct_successors_ready