Refactor graph building and running logic
This commit is contained in:
parent
f95b20322e
commit
74fc520a7d
2 changed files with 9 additions and 7 deletions
|
|
@ -387,7 +387,6 @@ class Graph:
|
|||
|
||||
self.in_degree_map = self.build_in_degree()
|
||||
self.parent_child_map = self.build_parent_child_map()
|
||||
self.run_manager.build_run_map(self)
|
||||
|
||||
def reset_inactivated_vertices(self):
|
||||
"""
|
||||
|
|
@ -983,6 +982,8 @@ class Graph:
|
|||
successors_result.append(successor)
|
||||
return successors_result
|
||||
|
||||
stop_or_start_vertex = self.get_vertex(vertex_id)
|
||||
stop_predecessors = [pre.id for pre in stop_or_start_vertex.predecessors]
|
||||
# DFS to collect all vertices that can reach the specified vertex
|
||||
while stack:
|
||||
current_id = stack.pop()
|
||||
|
|
@ -1009,7 +1010,7 @@ class Graph:
|
|||
stack.append(successor.id)
|
||||
else:
|
||||
excluded.add(successor.id)
|
||||
else:
|
||||
elif current_id not in stop_predecessors:
|
||||
# If the current vertex is not the target vertex, we should add all its successors
|
||||
# to the stack if they are not in visited
|
||||
for successor in current_vertex.successors:
|
||||
|
|
@ -1149,7 +1150,7 @@ class Graph:
|
|||
# save the only the rest
|
||||
self.vertices_layers = vertices_layers[1:]
|
||||
self.vertices_to_run = {vertex_id for vertex_id in chain.from_iterable(vertices_layers)}
|
||||
self.build_graph_maps()
|
||||
self.build_run_map()
|
||||
# Return just the first layer
|
||||
return first_layer
|
||||
|
||||
|
|
|
|||
|
|
@ -43,6 +43,7 @@ class RunnableVerticesManager:
|
|||
for predecessor in predecessors:
|
||||
self.run_map[predecessor].append(vertex_id)
|
||||
self.run_predecessors = graph.predecessor_map.copy()
|
||||
self.vertices_to_run = graph.vertices_to_run
|
||||
|
||||
def update_vertex_run_state(self, vertex_id: str, is_runnable: bool):
|
||||
"""Updates the runnable state of a vertex."""
|
||||
|
|
@ -73,8 +74,8 @@ class RunnableVerticesManager:
|
|||
|
||||
"""
|
||||
async with lock:
|
||||
graph.remove_from_predecessors(vertex.id)
|
||||
direct_successors_ready = [v for v in vertex.successors_ids if graph.is_vertex_runnable(v)]
|
||||
self.remove_from_predecessors(vertex.id)
|
||||
direct_successors_ready = [v for v in vertex.successors_ids if self.is_vertex_runnable(v)]
|
||||
if not direct_successors_ready:
|
||||
# No direct successors ready, look for runnable predecessors of successors
|
||||
next_runnable_vertices = self.find_runnable_predecessors_for_successors(vertex.id)
|
||||
|
|
@ -82,8 +83,8 @@ class RunnableVerticesManager:
|
|||
next_runnable_vertices = direct_successors_ready
|
||||
|
||||
for v_id in set(next_runnable_vertices): # Use set to avoid duplicates
|
||||
graph.vertices_to_run.remove(v_id)
|
||||
graph.remove_from_predecessors(v_id)
|
||||
self.update_vertex_run_state(v_id, is_runnable=False)
|
||||
self.remove_from_predecessors(v_id)
|
||||
await set_cache_coro(data=graph, lock=lock)
|
||||
return next_runnable_vertices
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue