diff --git a/src/backend/langflow/graph/graph/base.py b/src/backend/langflow/graph/graph/base.py index eadc06da7..6f9909ed7 100644 --- a/src/backend/langflow/graph/graph/base.py +++ b/src/backend/langflow/graph/graph/base.py @@ -387,7 +387,6 @@ class Graph: self.in_degree_map = self.build_in_degree() self.parent_child_map = self.build_parent_child_map() - self.run_manager.build_run_map(self) def reset_inactivated_vertices(self): """ @@ -983,6 +982,8 @@ class Graph: successors_result.append(successor) return successors_result + stop_or_start_vertex = self.get_vertex(vertex_id) + stop_predecessors = [pre.id for pre in stop_or_start_vertex.predecessors] # DFS to collect all vertices that can reach the specified vertex while stack: current_id = stack.pop() @@ -1009,7 +1010,7 @@ class Graph: stack.append(successor.id) else: excluded.add(successor.id) - else: + elif current_id not in stop_predecessors: # If the current vertex is not the target vertex, we should add all its successors # to the stack if they are not in visited for successor in current_vertex.successors: @@ -1149,7 +1150,7 @@ class Graph: # save the only the rest self.vertices_layers = vertices_layers[1:] self.vertices_to_run = {vertex_id for vertex_id in chain.from_iterable(vertices_layers)} - self.build_graph_maps() + self.build_run_map() # Return just the first layer return first_layer diff --git a/src/backend/langflow/graph/graph/runnable_vertices_manager.py b/src/backend/langflow/graph/graph/runnable_vertices_manager.py index 3987bd37a..3fa9d3e91 100644 --- a/src/backend/langflow/graph/graph/runnable_vertices_manager.py +++ b/src/backend/langflow/graph/graph/runnable_vertices_manager.py @@ -43,6 +43,7 @@ class RunnableVerticesManager: for predecessor in predecessors: self.run_map[predecessor].append(vertex_id) self.run_predecessors = graph.predecessor_map.copy() + self.vertices_to_run = graph.vertices_to_run def update_vertex_run_state(self, vertex_id: str, is_runnable: bool): """Updates the runnable state of a vertex.""" @@ -73,8 +74,8 @@ class RunnableVerticesManager: """ async with lock: - graph.remove_from_predecessors(vertex.id) - direct_successors_ready = [v for v in vertex.successors_ids if graph.is_vertex_runnable(v)] + self.remove_from_predecessors(vertex.id) + direct_successors_ready = [v for v in vertex.successors_ids if self.is_vertex_runnable(v)] if not direct_successors_ready: # No direct successors ready, look for runnable predecessors of successors next_runnable_vertices = self.find_runnable_predecessors_for_successors(vertex.id) @@ -82,8 +83,8 @@ class RunnableVerticesManager: next_runnable_vertices = direct_successors_ready for v_id in set(next_runnable_vertices): # Use set to avoid duplicates - graph.vertices_to_run.remove(v_id) - graph.remove_from_predecessors(v_id) + self.update_vertex_run_state(v_id, is_runnable=False) + self.remove_from_predecessors(v_id) await set_cache_coro(data=graph, lock=lock) return next_runnable_vertices