fix: update condition to run end_all_traces (#2707)

This commit is contained in:
Gabriel Luiz Freitas Almeida 2024-07-15 13:13:49 -03:00 committed by GitHub
commit 537e358b65
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 6 additions and 1 deletions

View file

@ -260,7 +260,7 @@ async def build_vertex(
if graph.stop_vertex and graph.stop_vertex in next_runnable_vertices:
next_runnable_vertices = [graph.stop_vertex]
if not graph.run_manager.vertices_to_run and not next_runnable_vertices:
if graph.run_manager.all_predecessors_are_fulfilled() and not next_runnable_vertices:
background_tasks.add_task(graph.end_all_traces)
build_response = VertexBuildResponse(

View file

@ -539,6 +539,8 @@ class Graph:
"""Marks a vertex in the graph."""
vertex = self.get_vertex(vertex_id)
vertex.set_state(state)
if state == VertexStates.INACTIVE:
self.run_manager.remove_from_predecessors(vertex_id)
def mark_branch(self, vertex_id: str, state: str, visited: Optional[set] = None, output_name: Optional[str] = None):
"""Marks a branch of the graph."""

View file

@ -41,6 +41,9 @@ class RunnableVerticesManager:
self.run_predecessors = state["run_predecessors"]
self.vertices_to_run = state["vertices_to_run"]
def all_predecessors_are_fulfilled(self) -> bool:
return all(not value for value in self.run_predecessors.values())
def update_run_state(self, run_predecessors: dict, vertices_to_run: set):
self.run_predecessors.update(run_predecessors)
self.vertices_to_run.update(vertices_to_run)