diff --git a/src/backend/base/langflow/api/v1/chat.py b/src/backend/base/langflow/api/v1/chat.py index bbc183da8..7e933d1df 100644 --- a/src/backend/base/langflow/api/v1/chat.py +++ b/src/backend/base/langflow/api/v1/chat.py @@ -110,7 +110,7 @@ async def retrieve_vertices_order( # We need to get the id of each vertex # and return the same structure but only with the ids components_count = len(graph.vertices) - vertices_to_run = list(graph.vertices_to_run) + get_top_level_vertices(graph, graph.vertices_to_run) + vertices_to_run = list(graph.vertices_to_run.union(get_top_level_vertices(graph, graph.vertices_to_run))) await chat_service.set_cache(str(flow_id), graph) background_tasks.add_task( telemetry_service.log_package_playground, diff --git a/src/backend/base/langflow/graph/graph/base.py b/src/backend/base/langflow/graph/graph/base.py index 1b383f7e9..b67b2733c 100644 --- a/src/backend/base/langflow/graph/graph/base.py +++ b/src/backend/base/langflow/graph/graph/base.py @@ -134,9 +134,10 @@ class Graph: vertices_ids = set() new_predecessor_map = {} for vertex_id in self._is_state_vertices: - if vertex_id == caller: - continue + caller_vertex = self.get_vertex(caller) vertex = self.get_vertex(vertex_id) + if vertex_id == caller or vertex.display_name == caller_vertex.display_name: + continue if ( isinstance(vertex._raw_params["name"], str) and name in vertex._raw_params["name"] @@ -152,15 +153,17 @@ class Graph: # and run self.build_adjacency_maps(edges) to get the new predecessor map # that is not complete but we can use to update the run_predecessors edges_set = set() - for vertex in [vertex] + successors: - edges_set.update(vertex.edges) - vertices_ids.add(vertex.id) - if vertex.state == VertexStates.INACTIVE: - vertex.set_state("ACTIVE") + for _vertex in [vertex] + successors: + edges_set.update(_vertex.edges) + if _vertex.state == VertexStates.INACTIVE: + _vertex.set_state("ACTIVE") edges = list(edges_set) predecessor_map, _ = self.build_adjacency_maps(edges) new_predecessor_map.update(predecessor_map) + # vertices_ids.update(new_predecessor_map.keys()) + # vertices_ids.update(v_id for value_list in new_predecessor_map.values() for v_id in value_list) + self.activated_vertices = list(vertices_ids) self.vertices_to_run.update(vertices_ids) self.run_manager.update_run_state( @@ -861,6 +864,7 @@ class Graph: ValueError: If no result is found for the vertex. """ vertex = self.get_vertex(vertex_id) + self.run_manager.add_to_vertices_being_run(vertex_id) try: params = "" if vertex.frozen: @@ -1494,8 +1498,6 @@ class Graph: predecessor_map: dict[str, list[str]] = defaultdict(list) successor_map: dict[str, list[str]] = defaultdict(list) for edge in edges: - if edge.source_id not in predecessor_map[edge.target_id]: - predecessor_map[edge.target_id].append(edge.source_id) - if edge.target_id not in successor_map[edge.source_id]: - successor_map[edge.source_id].append(edge.target_id) + predecessor_map[edge.target_id].append(edge.source_id) + successor_map[edge.source_id].append(edge.target_id) return predecessor_map, successor_map diff --git a/src/backend/base/langflow/graph/graph/runnable_vertices_manager.py b/src/backend/base/langflow/graph/graph/runnable_vertices_manager.py index fc9970554..01c80e06a 100644 --- a/src/backend/base/langflow/graph/graph/runnable_vertices_manager.py +++ b/src/backend/base/langflow/graph/graph/runnable_vertices_manager.py @@ -12,6 +12,7 @@ class RunnableVerticesManager: self.run_map = defaultdict(list) # Tracks successors of each vertex self.run_predecessors = defaultdict(set) # Tracks predecessors for each vertex self.vertices_to_run = set() # Set of vertices that are ready to run + self.vertices_being_run = set() # Set of vertices that are currently running def to_dict(self) -> dict: return { @@ -49,7 +50,7 @@ class RunnableVerticesManager: """Determines if a vertex is runnable.""" return ( - vertex_id in self.vertices_to_run + vertex_id not in self.vertices_being_run and not self.run_predecessors.get(vertex_id) and vertex_id not in inactivated_vertices ) @@ -88,6 +89,7 @@ class RunnableVerticesManager: self.vertices_to_run.add(vertex_id) else: self.vertices_to_run.discard(vertex_id) + self.vertices_being_run.discard(vertex_id) async def get_next_runnable_vertices( self, @@ -125,7 +127,10 @@ class RunnableVerticesManager: next_runnable_vertices = direct_successors_ready for v_id in set(next_runnable_vertices): # Use set to avoid duplicates - self.remove_vertex_from_runnables(v_id) + if vertex.id == v_id: + next_runnable_vertices.remove(v_id) + else: + self.add_to_vertices_being_run(v_id) if cache: await set_cache_coro(data=graph, lock=lock) # type: ignore return next_runnable_vertices @@ -134,6 +139,9 @@ class RunnableVerticesManager: self.update_vertex_run_state(v_id, is_runnable=False) self.remove_from_predecessors(v_id) + def add_to_vertices_being_run(self, v_id): + self.vertices_being_run.add(v_id) + @staticmethod def get_top_level_vertices(graph, vertices_ids): """