From 38b10b02df498523ee6ae03439b7c8dc551af687 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Mon, 8 Jul 2024 17:48:20 -0300 Subject: [PATCH] feat: add vertices_being_run set to RunnableVerticesManager (#2589) * refactor(base.py): remove unnecessary conditional statements for adding edges to predecessor and successor maps in Graph class * refactor(graph/base.py): optimize the process of adding vertices to the set and updating the predecessor map in the Graph class * refactor(graph/base.py): remove unnecessary line that adds vertex_id to vertices_ids set refactor(graph/base.py): fix indentation for predecessor_map and successor_map dictionaries to improve code readability * feat: Add vertices_being_run set to RunnableVerticesManager This commit adds a new set called `vertices_being_run` to the `RunnableVerticesManager` class. This set keeps track of vertices that are currently running. The purpose of this set is to prevent a vertex from being considered runnable if it is already being run. Co-authored-by: Gabriel Luiz Freitas Almeida * chore: Remove unnecessary vertex from next_runnable_vertices in RunnableVerticesManager * feat: Update vertices_to_run logic in retrieve_vertices_order function This commit updates the logic for retrieving the vertices to run in the `retrieve_vertices_order` function in `chat.py`. The previous implementation used the `list` function to convert the `vertices_to_run` set to a list and then concatenated it with the result of the `get_top_level_vertices` function. The updated logic uses the `union` method to combine the two sets directly. This change improves the efficiency and readability of the code. * refactor(graph/base.py): optimize the process of adding vertices to the set and updating the predecessor map in the Graph class --- src/backend/base/langflow/api/v1/chat.py | 2 +- src/backend/base/langflow/graph/graph/base.py | 24 ++++++++++--------- .../graph/graph/runnable_vertices_manager.py | 12 ++++++++-- 3 files changed, 24 insertions(+), 14 deletions(-) diff --git a/src/backend/base/langflow/api/v1/chat.py b/src/backend/base/langflow/api/v1/chat.py index bbc183da8..7e933d1df 100644 --- a/src/backend/base/langflow/api/v1/chat.py +++ b/src/backend/base/langflow/api/v1/chat.py @@ -110,7 +110,7 @@ async def retrieve_vertices_order( # We need to get the id of each vertex # and return the same structure but only with the ids components_count = len(graph.vertices) - vertices_to_run = list(graph.vertices_to_run) + get_top_level_vertices(graph, graph.vertices_to_run) + vertices_to_run = list(graph.vertices_to_run.union(get_top_level_vertices(graph, graph.vertices_to_run))) await chat_service.set_cache(str(flow_id), graph) background_tasks.add_task( telemetry_service.log_package_playground, diff --git a/src/backend/base/langflow/graph/graph/base.py b/src/backend/base/langflow/graph/graph/base.py index 1b383f7e9..b67b2733c 100644 --- a/src/backend/base/langflow/graph/graph/base.py +++ b/src/backend/base/langflow/graph/graph/base.py @@ -134,9 +134,10 @@ class Graph: vertices_ids = set() new_predecessor_map = {} for vertex_id in self._is_state_vertices: - if vertex_id == caller: - continue + caller_vertex = self.get_vertex(caller) vertex = self.get_vertex(vertex_id) + if vertex_id == caller or vertex.display_name == caller_vertex.display_name: + continue if ( isinstance(vertex._raw_params["name"], str) and name in vertex._raw_params["name"] @@ -152,15 +153,17 @@ class Graph: # and run self.build_adjacency_maps(edges) to get the new predecessor map # that is not complete but we can use to update the run_predecessors edges_set = set() - for vertex in [vertex] + successors: - edges_set.update(vertex.edges) - vertices_ids.add(vertex.id) - if vertex.state == VertexStates.INACTIVE: - vertex.set_state("ACTIVE") + for _vertex in [vertex] + successors: + edges_set.update(_vertex.edges) + if _vertex.state == VertexStates.INACTIVE: + _vertex.set_state("ACTIVE") edges = list(edges_set) predecessor_map, _ = self.build_adjacency_maps(edges) new_predecessor_map.update(predecessor_map) + # vertices_ids.update(new_predecessor_map.keys()) + # vertices_ids.update(v_id for value_list in new_predecessor_map.values() for v_id in value_list) + self.activated_vertices = list(vertices_ids) self.vertices_to_run.update(vertices_ids) self.run_manager.update_run_state( @@ -861,6 +864,7 @@ class Graph: ValueError: If no result is found for the vertex. """ vertex = self.get_vertex(vertex_id) + self.run_manager.add_to_vertices_being_run(vertex_id) try: params = "" if vertex.frozen: @@ -1494,8 +1498,6 @@ class Graph: predecessor_map: dict[str, list[str]] = defaultdict(list) successor_map: dict[str, list[str]] = defaultdict(list) for edge in edges: - if edge.source_id not in predecessor_map[edge.target_id]: - predecessor_map[edge.target_id].append(edge.source_id) - if edge.target_id not in successor_map[edge.source_id]: - successor_map[edge.source_id].append(edge.target_id) + predecessor_map[edge.target_id].append(edge.source_id) + successor_map[edge.source_id].append(edge.target_id) return predecessor_map, successor_map diff --git a/src/backend/base/langflow/graph/graph/runnable_vertices_manager.py b/src/backend/base/langflow/graph/graph/runnable_vertices_manager.py index fc9970554..01c80e06a 100644 --- a/src/backend/base/langflow/graph/graph/runnable_vertices_manager.py +++ b/src/backend/base/langflow/graph/graph/runnable_vertices_manager.py @@ -12,6 +12,7 @@ class RunnableVerticesManager: self.run_map = defaultdict(list) # Tracks successors of each vertex self.run_predecessors = defaultdict(set) # Tracks predecessors for each vertex self.vertices_to_run = set() # Set of vertices that are ready to run + self.vertices_being_run = set() # Set of vertices that are currently running def to_dict(self) -> dict: return { @@ -49,7 +50,7 @@ class RunnableVerticesManager: """Determines if a vertex is runnable.""" return ( - vertex_id in self.vertices_to_run + vertex_id not in self.vertices_being_run and not self.run_predecessors.get(vertex_id) and vertex_id not in inactivated_vertices ) @@ -88,6 +89,7 @@ class RunnableVerticesManager: self.vertices_to_run.add(vertex_id) else: self.vertices_to_run.discard(vertex_id) + self.vertices_being_run.discard(vertex_id) async def get_next_runnable_vertices( self, @@ -125,7 +127,10 @@ class RunnableVerticesManager: next_runnable_vertices = direct_successors_ready for v_id in set(next_runnable_vertices): # Use set to avoid duplicates - self.remove_vertex_from_runnables(v_id) + if vertex.id == v_id: + next_runnable_vertices.remove(v_id) + else: + self.add_to_vertices_being_run(v_id) if cache: await set_cache_coro(data=graph, lock=lock) # type: ignore return next_runnable_vertices @@ -134,6 +139,9 @@ class RunnableVerticesManager: self.update_vertex_run_state(v_id, is_runnable=False) self.remove_from_predecessors(v_id) + def add_to_vertices_being_run(self, v_id): + self.vertices_being_run.add(v_id) + @staticmethod def get_top_level_vertices(graph, vertices_ids): """