feat: add vertices_being_run set to RunnableVerticesManager (#2589)
* refactor(base.py): remove unnecessary conditional statements for adding edges to predecessor and successor maps in Graph class * refactor(graph/base.py): optimize the process of adding vertices to the set and updating the predecessor map in the Graph class * refactor(graph/base.py): remove unnecessary line that adds vertex_id to vertices_ids set refactor(graph/base.py): fix indentation for predecessor_map and successor_map dictionaries to improve code readability * feat: Add vertices_being_run set to RunnableVerticesManager This commit adds a new set called `vertices_being_run` to the `RunnableVerticesManager` class. This set keeps track of vertices that are currently running. The purpose of this set is to prevent a vertex from being considered runnable if it is already being run. Co-authored-by: Gabriel Luiz Freitas Almeida <gabriel@langflow.org> * chore: Remove unnecessary vertex from next_runnable_vertices in RunnableVerticesManager * feat: Update vertices_to_run logic in retrieve_vertices_order function This commit updates the logic for retrieving the vertices to run in the `retrieve_vertices_order` function in `chat.py`. The previous implementation used the `list` function to convert the `vertices_to_run` set to a list and then concatenated it with the result of the `get_top_level_vertices` function. The updated logic uses the `union` method to combine the two sets directly. This change improves the efficiency and readability of the code. * refactor(graph/base.py): optimize the process of adding vertices to the set and updating the predecessor map in the Graph class
This commit is contained in:
parent
46b7911110
commit
38b10b02df
3 changed files with 24 additions and 14 deletions
|
|
@ -110,7 +110,7 @@ async def retrieve_vertices_order(
|
|||
# We need to get the id of each vertex
|
||||
# and return the same structure but only with the ids
|
||||
components_count = len(graph.vertices)
|
||||
vertices_to_run = list(graph.vertices_to_run) + get_top_level_vertices(graph, graph.vertices_to_run)
|
||||
vertices_to_run = list(graph.vertices_to_run.union(get_top_level_vertices(graph, graph.vertices_to_run)))
|
||||
await chat_service.set_cache(str(flow_id), graph)
|
||||
background_tasks.add_task(
|
||||
telemetry_service.log_package_playground,
|
||||
|
|
|
|||
|
|
@ -134,9 +134,10 @@ class Graph:
|
|||
vertices_ids = set()
|
||||
new_predecessor_map = {}
|
||||
for vertex_id in self._is_state_vertices:
|
||||
if vertex_id == caller:
|
||||
continue
|
||||
caller_vertex = self.get_vertex(caller)
|
||||
vertex = self.get_vertex(vertex_id)
|
||||
if vertex_id == caller or vertex.display_name == caller_vertex.display_name:
|
||||
continue
|
||||
if (
|
||||
isinstance(vertex._raw_params["name"], str)
|
||||
and name in vertex._raw_params["name"]
|
||||
|
|
@ -152,15 +153,17 @@ class Graph:
|
|||
# and run self.build_adjacency_maps(edges) to get the new predecessor map
|
||||
# that is not complete but we can use to update the run_predecessors
|
||||
edges_set = set()
|
||||
for vertex in [vertex] + successors:
|
||||
edges_set.update(vertex.edges)
|
||||
vertices_ids.add(vertex.id)
|
||||
if vertex.state == VertexStates.INACTIVE:
|
||||
vertex.set_state("ACTIVE")
|
||||
for _vertex in [vertex] + successors:
|
||||
edges_set.update(_vertex.edges)
|
||||
if _vertex.state == VertexStates.INACTIVE:
|
||||
_vertex.set_state("ACTIVE")
|
||||
edges = list(edges_set)
|
||||
predecessor_map, _ = self.build_adjacency_maps(edges)
|
||||
new_predecessor_map.update(predecessor_map)
|
||||
|
||||
# vertices_ids.update(new_predecessor_map.keys())
|
||||
# vertices_ids.update(v_id for value_list in new_predecessor_map.values() for v_id in value_list)
|
||||
|
||||
self.activated_vertices = list(vertices_ids)
|
||||
self.vertices_to_run.update(vertices_ids)
|
||||
self.run_manager.update_run_state(
|
||||
|
|
@ -861,6 +864,7 @@ class Graph:
|
|||
ValueError: If no result is found for the vertex.
|
||||
"""
|
||||
vertex = self.get_vertex(vertex_id)
|
||||
self.run_manager.add_to_vertices_being_run(vertex_id)
|
||||
try:
|
||||
params = ""
|
||||
if vertex.frozen:
|
||||
|
|
@ -1494,8 +1498,6 @@ class Graph:
|
|||
predecessor_map: dict[str, list[str]] = defaultdict(list)
|
||||
successor_map: dict[str, list[str]] = defaultdict(list)
|
||||
for edge in edges:
|
||||
if edge.source_id not in predecessor_map[edge.target_id]:
|
||||
predecessor_map[edge.target_id].append(edge.source_id)
|
||||
if edge.target_id not in successor_map[edge.source_id]:
|
||||
successor_map[edge.source_id].append(edge.target_id)
|
||||
predecessor_map[edge.target_id].append(edge.source_id)
|
||||
successor_map[edge.source_id].append(edge.target_id)
|
||||
return predecessor_map, successor_map
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ class RunnableVerticesManager:
|
|||
self.run_map = defaultdict(list) # Tracks successors of each vertex
|
||||
self.run_predecessors = defaultdict(set) # Tracks predecessors for each vertex
|
||||
self.vertices_to_run = set() # Set of vertices that are ready to run
|
||||
self.vertices_being_run = set() # Set of vertices that are currently running
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
|
|
@ -49,7 +50,7 @@ class RunnableVerticesManager:
|
|||
"""Determines if a vertex is runnable."""
|
||||
|
||||
return (
|
||||
vertex_id in self.vertices_to_run
|
||||
vertex_id not in self.vertices_being_run
|
||||
and not self.run_predecessors.get(vertex_id)
|
||||
and vertex_id not in inactivated_vertices
|
||||
)
|
||||
|
|
@ -88,6 +89,7 @@ class RunnableVerticesManager:
|
|||
self.vertices_to_run.add(vertex_id)
|
||||
else:
|
||||
self.vertices_to_run.discard(vertex_id)
|
||||
self.vertices_being_run.discard(vertex_id)
|
||||
|
||||
async def get_next_runnable_vertices(
|
||||
self,
|
||||
|
|
@ -125,7 +127,10 @@ class RunnableVerticesManager:
|
|||
next_runnable_vertices = direct_successors_ready
|
||||
|
||||
for v_id in set(next_runnable_vertices): # Use set to avoid duplicates
|
||||
self.remove_vertex_from_runnables(v_id)
|
||||
if vertex.id == v_id:
|
||||
next_runnable_vertices.remove(v_id)
|
||||
else:
|
||||
self.add_to_vertices_being_run(v_id)
|
||||
if cache:
|
||||
await set_cache_coro(data=graph, lock=lock) # type: ignore
|
||||
return next_runnable_vertices
|
||||
|
|
@ -134,6 +139,9 @@ class RunnableVerticesManager:
|
|||
self.update_vertex_run_state(v_id, is_runnable=False)
|
||||
self.remove_from_predecessors(v_id)
|
||||
|
||||
def add_to_vertices_being_run(self, v_id):
|
||||
self.vertices_being_run.add(v_id)
|
||||
|
||||
@staticmethod
|
||||
def get_top_level_vertices(graph, vertices_ids):
|
||||
"""
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue