From 6f521e17ea6d148237e29225768aed33ec122994 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Wed, 25 Jun 2025 12:12:58 -0300 Subject: [PATCH] feat: improve Listen and Notify related logic and components (#7969) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * refactor: remove unused property and state management from StateVertex class * feat: extend next runnable vertices for activated vertices in state vertices * refactor: simplify ListenComponent by removing unused methods and updating inheritance * refactor: update NotifyComponent to use new input/output structure and improve context handling * feat: add is_state attribute to StateVertex for improved state management * refactor: update input validation to exclude custom attributes from reserved words * refactor: enhance ListenComponent and NotifyComponent with improved input types and output caching - Updated ListenComponent to include additional input types for context_key. - Modified NotifyComponent to use HandleInput for input_value and added input types. - Set cache to False for the output in NotifyComponent to ensure fresh data retrieval. * refactor: update state management in Graph class to improve handling of state vertices - Changed _is_state_vertices to be initialized as None and updated its type hint. - Added a property is_state_vertices to lazily compute state vertices when accessed. - Refactored activate_state_vertices method to utilize the new property for better clarity and performance. * refactor: update ListenComponent to return Data with text instead of data - Modified the build method in ListenComponent to return Data initialized with a text field instead of a data field, enhancing clarity in data handling. * refactor: update NotifyComponent to use input_value for data handling - Modified the build method in NotifyComponent to replace input_data with input_value for improved clarity and consistency in data processing. - Ensured that the context handling and status updates reflect the changes in input handling. * refactor: simplify context key check in Graph class for better readability * refactor: enhance Graph class to include recursive predecessor retrieval and update edge handling * 📝 Add docstrings to `try-to-fix-listen-notify` (#8326) Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> * [autofix.ci] apply automated fixes * [autofix.ci] apply automated fixes * refactor: clean up unused imports and improve docstring formatting across components * refactor: rename build method to notify_components and improve input handling in NotifyComponent * refactor: rename build method to listen_for_data in ListenComponent for clarity * fix: correct status assignment logic in NotifyComponent to ensure accurate state updates * fix: handle None input value in NotifyComponent to ensure default Data initialization * Make context_key required Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> * 📝 Add docstrings to `try-to-fix-listen-notify` (#8442) Co-authored-by: Jordan Frazier Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> Co-authored-by: Edwin Jose Co-authored-by: Gabriel Luiz Freitas Almeida Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> fix: remove duplicated import statement in nvidia_ingest (#8439) * ✨ Implement caching logic for Listen/Notify components and enhance docstring for edge retrieval - Added `_set_cache_if_listen_notify_components` method to disable caching for all vertices if Listen/Notify components are present, ensuring proper real-time communication. - Improved docstring for `_get_edges_as_list_of_tuples` to clarify the return value and structure. * 🔧 Refactor Graph class by removing state management methods - Removed `get_state` and `update_state` methods from the `Graph` class to streamline state management. - This change simplifies the class structure and focuses on enhancing the overall design of the graph handling. * fix(listen): restrict input types to "Message" for ListenComponent * refactor: remove deprecated components from deactivated module --------- Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: Jordan Frazier Co-authored-by: Edwin Jose --- .../components/deactivated/__init__.py | 2 - .../base/langflow/components/logic/listen.py | 29 +++ .../base/langflow/components/logic/notify.py | 88 +++++++++ .../custom/custom_component/component.py | 13 +- src/backend/base/langflow/graph/graph/base.py | 179 +++++++++++++----- .../langflow/graph/vertex/vertex_types.py | 19 +- 6 files changed, 272 insertions(+), 58 deletions(-) create mode 100644 src/backend/base/langflow/components/logic/listen.py create mode 100644 src/backend/base/langflow/components/logic/notify.py diff --git a/src/backend/base/langflow/components/deactivated/__init__.py b/src/backend/base/langflow/components/deactivated/__init__.py index 9cf7ac2dc..507e6e2f9 100644 --- a/src/backend/base/langflow/components/deactivated/__init__.py +++ b/src/backend/base/langflow/components/deactivated/__init__.py @@ -9,9 +9,7 @@ __all__ = [ "ExtractKeyFromDataComponent", "FlowToolComponent", "ListFlowsComponent", - "ListenComponent", "MergeDataComponent", - "NotifyComponent", "PythonFunctionComponent", "RunFlowComponent", "SQLExecutorComponent", diff --git a/src/backend/base/langflow/components/logic/listen.py b/src/backend/base/langflow/components/logic/listen.py new file mode 100644 index 000000000..4d49604ab --- /dev/null +++ b/src/backend/base/langflow/components/logic/listen.py @@ -0,0 +1,29 @@ +from langflow.custom import Component +from langflow.io import Output, StrInput +from langflow.schema.data import Data + + +class ListenComponent(Component): + display_name = "Listen" + description = "A component to listen for a notification." + name = "Listen" + beta: bool = True + icon = "Radio" + inputs = [ + StrInput( + name="context_key", + display_name="Context Key", + info="The key of the context to listen for.", + input_types=["Message"], + required=True, + ) + ] + + outputs = [Output(name="data", display_name="Data", method="listen_for_data", cache=False)] + + def listen_for_data(self) -> Data: + """Retrieves a Data object from the component context using the provided context key. + + If the specified context key does not exist in the context, returns an empty Data object. + """ + return self.ctx.get(self.context_key, Data(text="")) diff --git a/src/backend/base/langflow/components/logic/notify.py b/src/backend/base/langflow/components/logic/notify.py new file mode 100644 index 000000000..5f764453c --- /dev/null +++ b/src/backend/base/langflow/components/logic/notify.py @@ -0,0 +1,88 @@ +from typing import cast + +from langflow.custom import Component +from langflow.io import BoolInput, HandleInput, Output, StrInput +from langflow.schema.data import Data + + +class NotifyComponent(Component): + display_name = "Notify" + description = "A component to generate a notification to Get Notified component." + icon = "Notify" + name = "Notify" + beta: bool = True + + inputs = [ + StrInput( + name="context_key", + display_name="Context Key", + info="The key of the context to store the notification.", + required=True, + ), + HandleInput( + name="input_value", + display_name="Input Data", + info="The data to store.", + required=False, + input_types=["Data", "Message", "DataFrame"], + ), + BoolInput( + name="append", + display_name="Append", + info="If True, the record will be appended to the notification.", + value=False, + required=False, + ), + ] + + outputs = [ + Output( + display_name="Data", + name="result", + method="notify_components", + cache=False, + ), + ] + + async def notify_components(self) -> Data: + """Processes and stores a notification in the component's context. + + Normalizes the input value to a `Data` object and stores it under the + specified context key. If `append` is True, adds the value to a list + of notifications; otherwise, replaces the existing value. Updates the + component's status and activates related state vertices in the graph. + + Returns: + The processed `Data` object stored in the context. + + Raises: + ValueError: If the component is not part of a graph. + """ + if not self._vertex: + msg = "Notify component must be used in a graph." + raise ValueError(msg) + input_value: Data | str | dict | None = self.input_value + if input_value is None: + input_value = Data(text="") + elif not isinstance(input_value, Data): + if isinstance(input_value, str): + input_value = Data(text=input_value) + elif isinstance(input_value, dict): + input_value = Data(data=input_value) + else: + input_value = Data(text=str(input_value)) + if input_value: + if self.append: + current_data = self.ctx.get(self.context_key, []) + if not isinstance(current_data, list): + current_data = [current_data] + current_data.append(input_value) + self.update_ctx({self.context_key: current_data}) + else: + self.update_ctx({self.context_key: input_value}) + self.status = input_value + else: + self.status = "No record provided." + self._vertex.is_state = True + self.graph.activate_state_vertices(name=self.context_key, caller=self._id) + return cast(Data, input_value) diff --git a/src/backend/base/langflow/custom/custom_component/component.py b/src/backend/base/langflow/custom/custom_component/component.py index 8006f4d95..1b0e60a9b 100644 --- a/src/backend/base/langflow/custom/custom_component/component.py +++ b/src/backend/base/langflow/custom/custom_component/component.py @@ -873,6 +873,11 @@ class Component(CustomComponent): def _validate_inputs(self, params: dict) -> None: # Params keys are the `name` attribute of the Input objects + """Validates and assigns input values from the provided parameters dictionary. + + For each parameter matching a defined input, sets the input's value and updates the parameter + dictionary with the validated value. + """ for key, value in params.copy().items(): if key not in self._inputs: continue @@ -883,10 +888,16 @@ class Component(CustomComponent): params[input_.name] = input_.value def set_attributes(self, params: dict) -> None: + """Sets component attributes from the given parameters, preventing conflicts with reserved attribute names. + + Raises: + ValueError: If a parameter name matches a reserved attribute not managed in _attributes and its + value differs from the current attribute value. + """ self._validate_inputs(params) attributes = {} for key, value in params.items(): - if key in self.__dict__ and value != getattr(self, key): + if key in self.__dict__ and key not in self._attributes and value != getattr(self, key): msg = ( f"{self.__class__.__name__} defines an input parameter named '{key}' " f"that is a reserved word and cannot be used." diff --git a/src/backend/base/langflow/graph/graph/base.py b/src/backend/base/langflow/graph/graph/base.py index bef5cf430..d79db8c6c 100644 --- a/src/backend/base/langflow/graph/graph/base.py +++ b/src/backend/base/langflow/graph/graph/base.py @@ -69,17 +69,12 @@ class Graph: log_config: LogConfig | None = None, context: dict[str, Any] | None = None, ) -> None: - """Initializes a new instance of the Graph class. + """Initializes a new Graph instance. - Args: - start: The start component. - end: The end component. - flow_id: The ID of the flow. Defaults to None. - flow_name: The flow name. - description: The graph description. - user_id: The user ID. - log_config: The log configuration. - context: Additional context for the graph. Defaults to None. + If both start and end components are provided, the graph is initialized and prepared for execution. + If only one is provided, a ValueError is raised. The context must be a dictionary if specified, + otherwise a TypeError is raised. Internal data structures for vertices, edges, state management, + run management, and tracing are set up during initialization. """ if log_config: configure(**log_config) @@ -96,7 +91,7 @@ class Graph: self.user_id = user_id self._is_input_vertices: list[str] = [] self._is_output_vertices: list[str] = [] - self._is_state_vertices: list[str] = [] + self._is_state_vertices: list[str] | None = None self.has_session_id_vertices: list[str] = [] self._sorted_vertices_layers: list[list[str]] = [] self._run_id = "" @@ -491,26 +486,34 @@ class Graph: self.build_graph_maps(self.edges) self.define_vertices_lists() - def activate_state_vertices(self, name: str, caller: str) -> None: - """Activates the state vertices in the graph with the given name and caller. + @property + def is_state_vertices(self) -> list[str]: + """Returns a cached list of vertex IDs for vertices marked as state vertices. - Args: - name (str): The name of the state. - caller (str): The ID of the vertex that is updating the state. + The list is computed on first access by filtering vertices with `is_state` set to True and is + cached for future calls. + """ + if self._is_state_vertices is None: + self._is_state_vertices = [vertex.id for vertex in self.vertices if vertex.is_state] + return self._is_state_vertices + + def activate_state_vertices(self, name: str, caller: str) -> None: + """Activates vertices associated with a given state name. + + Marks vertices with the specified state name, as well as their successors and related + predecessors. The state manager is then updated with the new state record. """ vertices_ids = set() new_predecessor_map = {} - for vertex_id in self._is_state_vertices: + activated_vertices = [] + for vertex_id in self.is_state_vertices: caller_vertex = self.get_vertex(caller) vertex = self.get_vertex(vertex_id) if vertex_id == caller or vertex.display_name == caller_vertex.display_name: continue - if ( - isinstance(vertex.raw_params["name"], str) - and name in vertex.raw_params["name"] - and vertex_id != caller - and isinstance(vertex, StateVertex) - ): + ctx_key = vertex.raw_params.get("context_key") + if isinstance(ctx_key, str) and name in ctx_key and vertex_id != caller and isinstance(vertex, StateVertex): + activated_vertices.append(vertex_id) vertices_ids.add(vertex_id) successors = self.get_all_successors(vertex, flat=True) # Update run_manager.run_predecessors because we are activating vertices @@ -519,8 +522,12 @@ class Graph: # So we need to get all edges of the vertex and successors # and run self.build_adjacency_maps(edges) to get the new predecessor map # that is not complete but we can use to update the run_predecessors + successors_predecessors = set() + for sucessor in successors: + successors_predecessors.update(self.get_all_predecessors(sucessor)) + edges_set = set() - for _vertex in [vertex, *successors]: + for _vertex in [vertex, *successors, *successors_predecessors]: edges_set.update(_vertex.edges) if _vertex.state == VertexStates.INACTIVE: _vertex.set_state("ACTIVE") @@ -533,7 +540,7 @@ class Graph: vertices_ids.update(new_predecessor_map.keys()) vertices_ids.update(v_id for value_list in new_predecessor_map.values() for v_id in value_list) - self.activated_vertices = list(vertices_ids) + self.activated_vertices = activated_vertices self.vertices_to_run.update(vertices_ids) self.run_manager.update_run_state( run_predecessors=new_predecessor_map, @@ -649,17 +656,21 @@ class Graph: @property def sorted_vertices_layers(self) -> list[list[str]]: - """The sorted layers of vertices in the graph. + """Returns the sorted layers of vertex IDs by type. - Returns: - List[List[str]]: The sorted layers of vertices. + Each layer in the returned list contains vertex IDs grouped by their classification, + such as input, output, session, or state vertices. Sorting is performed if not already done. """ if not self._sorted_vertices_layers: self.sort_vertices() return self._sorted_vertices_layers def define_vertices_lists(self) -> None: - """Defines the lists of vertices that are inputs, outputs, and have session_id.""" + """Populates internal lists of input, output, session ID, and state vertex IDs. + + Iterates over all vertices and appends their IDs to the corresponding internal lists + based on their classification. + """ for vertex in self.vertices: if vertex.is_input: self._is_input_vertices.append(vertex.id) @@ -668,9 +679,17 @@ class Graph: if vertex.has_session_id: self.has_session_id_vertices.append(vertex.id) if vertex.is_state: + if self._is_state_vertices is None: + self._is_state_vertices = [] self._is_state_vertices.append(vertex.id) def _set_inputs(self, input_components: list[str], inputs: dict[str, str], input_type: InputType | None) -> None: + """Updates input vertices' parameters with the provided inputs, filtering by component list and input type. + + Only vertices whose IDs or display names match the specified input components and whose IDs contain + the input type (unless input type is 'any' or None) are updated. Raises a ValueError if a specified + vertex is not found. + """ for vertex_id in self._is_input_vertices: vertex = self.get_vertex(vertex_id) # If the vertex is not in the input_components list @@ -1194,6 +1213,7 @@ class Graph: self._build_vertex_params() self._instantiate_components_in_vertices() self._set_cache_to_vertices_in_cycle() + self._set_cache_if_listen_notify_components() for vertex in self.vertices: if vertex.id in self.cycle_vertices: self.run_manager.add_to_cycle_vertices(vertex.id) @@ -1201,9 +1221,29 @@ class Graph: self.assert_streaming_sequence() def _get_edges_as_list_of_tuples(self) -> list[tuple[str, str]]: - """Returns the edges of the graph as a list of tuples.""" + """Returns the edges of the graph as a list of tuples. + + Each tuple contains the source and target handle IDs from the edge data. + + Returns: + list[tuple[str, str]]: List of (source_id, target_id) tuples representing graph edges. + """ return [(e["data"]["sourceHandle"]["id"], e["data"]["targetHandle"]["id"]) for e in self._edges] + def _set_cache_if_listen_notify_components(self) -> None: + """Disables caching for all vertices if Listen/Notify components are present. + + If the graph contains any Listen or Notify components, caching is disabled for all vertices + by setting cache=False on their outputs. This ensures proper handling of real-time + communication between components. + """ + has_listen_or_notify_component = any( + vertex.id.split("-")[0] in {"Listen", "Notify"} for vertex in self.vertices + ) + if has_listen_or_notify_component: + for vertex in self.vertices: + vertex.apply_on_outputs(lambda output_object: setattr(output_object, "cache", False)) + def _set_cache_to_vertices_in_cycle(self) -> None: """Sets the cache to the vertices in cycle.""" edges = self._get_edges_as_list_of_tuples() @@ -1528,6 +1568,11 @@ class Graph: return self def find_next_runnable_vertices(self, vertex_successors_ids: list[str]) -> list[str]: + """Determines the next set of runnable vertices from a list of successor vertex IDs. + + For each successor, if it is not runnable, recursively finds its runnable + predecessors; otherwise, includes the successor itself. Returns a sorted list of all such vertex IDs. + """ next_runnable_vertices = set() for v_id in sorted(vertex_successors_ids): if not self.is_vertex_runnable(v_id): @@ -1538,6 +1583,19 @@ class Graph: return sorted(next_runnable_vertices) async def get_next_runnable_vertices(self, lock: asyncio.Lock, vertex: Vertex, *, cache: bool = True) -> list[str]: + """Determines the next set of runnable vertex IDs after a vertex completes execution. + + If the completed vertex is a state vertex, any recently activated state vertices are also included. + Updates the run manager to reflect the new runnable state and optionally caches the updated graph state. + + Args: + lock: An asyncio lock for thread-safe updates. + vertex: The vertex that has just finished execution. + cache: If True, caches the updated graph state. + + Returns: + A list of vertex IDs that are ready to be executed next. + """ v_id = vertex.id v_successors_ids = vertex.successors_ids self.run_manager.ran_at_least_once.add(v_id) @@ -1553,25 +1611,15 @@ class Graph: if cache and self.flow_id is not None: set_cache_coro = partial(get_chat_service().set_cache, key=self.flow_id) await set_cache_coro(data=self, lock=lock) + if vertex.is_state: + next_runnable_vertices.extend(self.activated_vertices) return next_runnable_vertices async def _log_vertex_build_from_exception(self, vertex_id: str, result: Exception) -> None: - """Log a vertex build failure caused by an exception. + """Logs detailed information about a vertex build exception. - This method handles formatting and logging errors that occur during vertex building. - It creates appropriate error output structures and logs the build failure. - - Args: - vertex_id: The ID of the vertex that failed to build - result: The exception that caused the build failure - - Returns: - None - - Side effects: - - Logs the exception details - - Creates error output structures - - Calls log_vertex_build to record the failure + Formats the exception message and stack trace, constructs an error output, + and records the failure using the vertex build logging system. """ if isinstance(result, ComponentBuildError): params = result.message @@ -1706,6 +1754,17 @@ class Graph: return [self.get_vertex(source_id) for source_id in self.predecessor_map.get(vertex.id, [])] def get_all_successors(self, vertex: Vertex, *, recursive=True, flat=True, visited=None): + """Returns all successors of a given vertex, optionally recursively and as a flat or nested list. + + Args: + vertex: The vertex whose successors are to be retrieved. + recursive: If True, retrieves successors recursively; otherwise, only immediate successors. + flat: If True, returns a flat list of successors; if False, returns a nested list structure. + visited: Internal set used to track visited vertices and prevent cycles. + + Returns: + A list of successor vertices, either flat or nested depending on the `flat` parameter. + """ if visited is None: visited = set() @@ -1739,11 +1798,37 @@ class Graph: return successors_result def get_successors(self, vertex: Vertex) -> list[Vertex]: - """Returns the successors of a vertex.""" + """Returns the immediate successor vertices of the given vertex. + + Args: + vertex: The vertex whose successors are to be retrieved. + + Returns: + A list of vertices that are direct successors of the specified vertex. + """ return [self.get_vertex(target_id) for target_id in self.successor_map.get(vertex.id, set())] + def get_all_predecessors(self, vertex: Vertex, *, recursive: bool = True) -> list[Vertex]: + """Retrieves all predecessor vertices of a given vertex. + + If `recursive` is True, returns both direct and indirect predecessors by + traversing the graph recursively. If False, returns only the immediate predecessors. + """ + _predecessors = self.predecessor_map.get(vertex.id, []) + predecessors = [self.get_vertex(v_id) for v_id in _predecessors] + if recursive: + for predecessor in _predecessors: + predecessors.extend(self.get_all_predecessors(self.get_vertex(predecessor), recursive=recursive)) + else: + predecessors.extend([self.get_vertex(predecessor) for predecessor in _predecessors]) + return predecessors + def get_vertex_neighbors(self, vertex: Vertex) -> dict[Vertex, int]: - """Returns the neighbors of a vertex.""" + """Returns a dictionary mapping each direct neighbor of a vertex to the count of connecting edges. + + A neighbor is any vertex directly connected to the input vertex, either as a source or target. + The count reflects the number of edges between the input vertex and each neighbor. + """ neighbors: dict[Vertex, int] = {} for edge in self.edges: if edge.source_id == vertex.id: diff --git a/src/backend/base/langflow/graph/vertex/vertex_types.py b/src/backend/base/langflow/graph/vertex/vertex_types.py index 3c7a26d84..0a83f6b89 100644 --- a/src/backend/base/langflow/graph/vertex/vertex_types.py +++ b/src/backend/base/langflow/graph/vertex/vertex_types.py @@ -463,18 +463,21 @@ class InterfaceVertex(ComponentVertex): class StateVertex(ComponentVertex): def __init__(self, data: NodeData, graph): + """Initializes a StateVertex with the provided node data and graph. + + Sets up the build steps and marks the vertex as a state vertex. + """ super().__init__(data, graph=graph) self.steps = [self._build] - self.is_state = False - - @property - def successors_ids(self) -> list[str]: - if self._successors_ids is None: - self.is_state = False - return super().successors_ids - return self._successors_ids + self.is_state = True def built_object_repr(self): + """Returns a string representation of the built object from the artifacts if available. + + If the artifacts dictionary contains a non-empty "repr" key, its value is returned. + If the "repr" value is falsy, falls back to the superclass representation. + Returns None if no representation is available. + """ if self.artifacts and "repr" in self.artifacts: return self.artifacts["repr"] or super().built_object_repr() return None