feat: improve Listen and Notify related logic and components (#7969)

* refactor: remove unused property and state management from StateVertex class

* feat: extend next runnable vertices for activated vertices in state vertices

* refactor: simplify ListenComponent by removing unused methods and updating inheritance

* refactor: update NotifyComponent to use new input/output structure and improve context handling

* feat: add is_state attribute to StateVertex for improved state management

* refactor: update input validation to exclude custom attributes from reserved words

* refactor: enhance ListenComponent and NotifyComponent with improved input types and output caching

- Updated ListenComponent to include additional input types for context_key.
- Modified NotifyComponent to use HandleInput for input_value and added input types.
- Set cache to False for the output in NotifyComponent to ensure fresh data retrieval.

* refactor: update state management in Graph class to improve handling of state vertices

- Changed _is_state_vertices to be initialized as None and updated its type hint.
- Added a property is_state_vertices to lazily compute state vertices when accessed.
- Refactored activate_state_vertices method to utilize the new property for better clarity and performance.

* refactor: update ListenComponent to return Data with text instead of data

- Modified the build method in ListenComponent to return Data initialized with a text field instead of a data field, enhancing clarity in data handling.

* refactor: update NotifyComponent to use input_value for data handling

- Modified the build method in NotifyComponent to replace input_data with input_value for improved clarity and consistency in data processing.
- Ensured that the context handling and status updates reflect the changes in input handling.

* refactor: simplify context key check in Graph class for better readability

* refactor: enhance Graph class to include recursive predecessor retrieval and update edge handling

* 📝 Add docstrings to `try-to-fix-listen-notify` (#8326)

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>

* [autofix.ci] apply automated fixes

* [autofix.ci] apply automated fixes

* refactor: clean up unused imports and improve docstring formatting across components

* refactor: rename build method to notify_components and improve input handling in NotifyComponent

* refactor: rename build method to listen_for_data in ListenComponent for clarity

* fix: correct status assignment logic in NotifyComponent to ensure accurate state updates

* fix: handle None input value in NotifyComponent to ensure default Data initialization

* Make context_key required

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>

* 📝 Add docstrings to `try-to-fix-listen-notify` (#8442)

Co-authored-by: Jordan Frazier <jordan.frazier@datastax.com>
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Co-authored-by: Edwin Jose <edwin.jose@datastax.com>
Co-authored-by: Gabriel Luiz Freitas Almeida <gabriel@langflow.org>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
fix: remove duplicated import statement in nvidia_ingest (#8439)

*  Implement caching logic for Listen/Notify components and enhance docstring for edge retrieval

- Added `_set_cache_if_listen_notify_components` method to disable caching for all vertices if Listen/Notify components are present, ensuring proper real-time communication.
- Improved docstring for `_get_edges_as_list_of_tuples` to clarify the return value and structure.

* 🔧 Refactor Graph class by removing state management methods

- Removed `get_state` and `update_state` methods from the `Graph` class to streamline state management.
- This change simplifies the class structure and focuses on enhancing the overall design of the graph handling.

* fix(listen): restrict input types to "Message" for ListenComponent

* refactor: remove deprecated components from deactivated module

---------

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: Jordan Frazier <jordan.frazier@datastax.com>
Co-authored-by: Edwin Jose <edwin.jose@datastax.com>
This commit is contained in:
Gabriel Luiz Freitas Almeida 2025-06-25 12:12:58 -03:00 committed by GitHub
commit 6f521e17ea
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 272 additions and 58 deletions

View file

@ -9,9 +9,7 @@ __all__ = [
"ExtractKeyFromDataComponent",
"FlowToolComponent",
"ListFlowsComponent",
"ListenComponent",
"MergeDataComponent",
"NotifyComponent",
"PythonFunctionComponent",
"RunFlowComponent",
"SQLExecutorComponent",

View file

@ -0,0 +1,29 @@
from langflow.custom import Component
from langflow.io import Output, StrInput
from langflow.schema.data import Data
class ListenComponent(Component):
display_name = "Listen"
description = "A component to listen for a notification."
name = "Listen"
beta: bool = True
icon = "Radio"
inputs = [
StrInput(
name="context_key",
display_name="Context Key",
info="The key of the context to listen for.",
input_types=["Message"],
required=True,
)
]
outputs = [Output(name="data", display_name="Data", method="listen_for_data", cache=False)]
def listen_for_data(self) -> Data:
"""Retrieves a Data object from the component context using the provided context key.
If the specified context key does not exist in the context, returns an empty Data object.
"""
return self.ctx.get(self.context_key, Data(text=""))

View file

@ -0,0 +1,88 @@
from typing import cast
from langflow.custom import Component
from langflow.io import BoolInput, HandleInput, Output, StrInput
from langflow.schema.data import Data
class NotifyComponent(Component):
display_name = "Notify"
description = "A component to generate a notification to Get Notified component."
icon = "Notify"
name = "Notify"
beta: bool = True
inputs = [
StrInput(
name="context_key",
display_name="Context Key",
info="The key of the context to store the notification.",
required=True,
),
HandleInput(
name="input_value",
display_name="Input Data",
info="The data to store.",
required=False,
input_types=["Data", "Message", "DataFrame"],
),
BoolInput(
name="append",
display_name="Append",
info="If True, the record will be appended to the notification.",
value=False,
required=False,
),
]
outputs = [
Output(
display_name="Data",
name="result",
method="notify_components",
cache=False,
),
]
async def notify_components(self) -> Data:
"""Processes and stores a notification in the component's context.
Normalizes the input value to a `Data` object and stores it under the
specified context key. If `append` is True, adds the value to a list
of notifications; otherwise, replaces the existing value. Updates the
component's status and activates related state vertices in the graph.
Returns:
The processed `Data` object stored in the context.
Raises:
ValueError: If the component is not part of a graph.
"""
if not self._vertex:
msg = "Notify component must be used in a graph."
raise ValueError(msg)
input_value: Data | str | dict | None = self.input_value
if input_value is None:
input_value = Data(text="")
elif not isinstance(input_value, Data):
if isinstance(input_value, str):
input_value = Data(text=input_value)
elif isinstance(input_value, dict):
input_value = Data(data=input_value)
else:
input_value = Data(text=str(input_value))
if input_value:
if self.append:
current_data = self.ctx.get(self.context_key, [])
if not isinstance(current_data, list):
current_data = [current_data]
current_data.append(input_value)
self.update_ctx({self.context_key: current_data})
else:
self.update_ctx({self.context_key: input_value})
self.status = input_value
else:
self.status = "No record provided."
self._vertex.is_state = True
self.graph.activate_state_vertices(name=self.context_key, caller=self._id)
return cast(Data, input_value)

View file

@ -873,6 +873,11 @@ class Component(CustomComponent):
def _validate_inputs(self, params: dict) -> None:
# Params keys are the `name` attribute of the Input objects
"""Validates and assigns input values from the provided parameters dictionary.
For each parameter matching a defined input, sets the input's value and updates the parameter
dictionary with the validated value.
"""
for key, value in params.copy().items():
if key not in self._inputs:
continue
@ -883,10 +888,16 @@ class Component(CustomComponent):
params[input_.name] = input_.value
def set_attributes(self, params: dict) -> None:
"""Sets component attributes from the given parameters, preventing conflicts with reserved attribute names.
Raises:
ValueError: If a parameter name matches a reserved attribute not managed in _attributes and its
value differs from the current attribute value.
"""
self._validate_inputs(params)
attributes = {}
for key, value in params.items():
if key in self.__dict__ and value != getattr(self, key):
if key in self.__dict__ and key not in self._attributes and value != getattr(self, key):
msg = (
f"{self.__class__.__name__} defines an input parameter named '{key}' "
f"that is a reserved word and cannot be used."

View file

@ -69,17 +69,12 @@ class Graph:
log_config: LogConfig | None = None,
context: dict[str, Any] | None = None,
) -> None:
"""Initializes a new instance of the Graph class.
"""Initializes a new Graph instance.
Args:
start: The start component.
end: The end component.
flow_id: The ID of the flow. Defaults to None.
flow_name: The flow name.
description: The graph description.
user_id: The user ID.
log_config: The log configuration.
context: Additional context for the graph. Defaults to None.
If both start and end components are provided, the graph is initialized and prepared for execution.
If only one is provided, a ValueError is raised. The context must be a dictionary if specified,
otherwise a TypeError is raised. Internal data structures for vertices, edges, state management,
run management, and tracing are set up during initialization.
"""
if log_config:
configure(**log_config)
@ -96,7 +91,7 @@ class Graph:
self.user_id = user_id
self._is_input_vertices: list[str] = []
self._is_output_vertices: list[str] = []
self._is_state_vertices: list[str] = []
self._is_state_vertices: list[str] | None = None
self.has_session_id_vertices: list[str] = []
self._sorted_vertices_layers: list[list[str]] = []
self._run_id = ""
@ -491,26 +486,34 @@ class Graph:
self.build_graph_maps(self.edges)
self.define_vertices_lists()
def activate_state_vertices(self, name: str, caller: str) -> None:
"""Activates the state vertices in the graph with the given name and caller.
@property
def is_state_vertices(self) -> list[str]:
"""Returns a cached list of vertex IDs for vertices marked as state vertices.
Args:
name (str): The name of the state.
caller (str): The ID of the vertex that is updating the state.
The list is computed on first access by filtering vertices with `is_state` set to True and is
cached for future calls.
"""
if self._is_state_vertices is None:
self._is_state_vertices = [vertex.id for vertex in self.vertices if vertex.is_state]
return self._is_state_vertices
def activate_state_vertices(self, name: str, caller: str) -> None:
"""Activates vertices associated with a given state name.
Marks vertices with the specified state name, as well as their successors and related
predecessors. The state manager is then updated with the new state record.
"""
vertices_ids = set()
new_predecessor_map = {}
for vertex_id in self._is_state_vertices:
activated_vertices = []
for vertex_id in self.is_state_vertices:
caller_vertex = self.get_vertex(caller)
vertex = self.get_vertex(vertex_id)
if vertex_id == caller or vertex.display_name == caller_vertex.display_name:
continue
if (
isinstance(vertex.raw_params["name"], str)
and name in vertex.raw_params["name"]
and vertex_id != caller
and isinstance(vertex, StateVertex)
):
ctx_key = vertex.raw_params.get("context_key")
if isinstance(ctx_key, str) and name in ctx_key and vertex_id != caller and isinstance(vertex, StateVertex):
activated_vertices.append(vertex_id)
vertices_ids.add(vertex_id)
successors = self.get_all_successors(vertex, flat=True)
# Update run_manager.run_predecessors because we are activating vertices
@ -519,8 +522,12 @@ class Graph:
# So we need to get all edges of the vertex and successors
# and run self.build_adjacency_maps(edges) to get the new predecessor map
# that is not complete but we can use to update the run_predecessors
successors_predecessors = set()
for sucessor in successors:
successors_predecessors.update(self.get_all_predecessors(sucessor))
edges_set = set()
for _vertex in [vertex, *successors]:
for _vertex in [vertex, *successors, *successors_predecessors]:
edges_set.update(_vertex.edges)
if _vertex.state == VertexStates.INACTIVE:
_vertex.set_state("ACTIVE")
@ -533,7 +540,7 @@ class Graph:
vertices_ids.update(new_predecessor_map.keys())
vertices_ids.update(v_id for value_list in new_predecessor_map.values() for v_id in value_list)
self.activated_vertices = list(vertices_ids)
self.activated_vertices = activated_vertices
self.vertices_to_run.update(vertices_ids)
self.run_manager.update_run_state(
run_predecessors=new_predecessor_map,
@ -649,17 +656,21 @@ class Graph:
@property
def sorted_vertices_layers(self) -> list[list[str]]:
"""The sorted layers of vertices in the graph.
"""Returns the sorted layers of vertex IDs by type.
Returns:
List[List[str]]: The sorted layers of vertices.
Each layer in the returned list contains vertex IDs grouped by their classification,
such as input, output, session, or state vertices. Sorting is performed if not already done.
"""
if not self._sorted_vertices_layers:
self.sort_vertices()
return self._sorted_vertices_layers
def define_vertices_lists(self) -> None:
"""Defines the lists of vertices that are inputs, outputs, and have session_id."""
"""Populates internal lists of input, output, session ID, and state vertex IDs.
Iterates over all vertices and appends their IDs to the corresponding internal lists
based on their classification.
"""
for vertex in self.vertices:
if vertex.is_input:
self._is_input_vertices.append(vertex.id)
@ -668,9 +679,17 @@ class Graph:
if vertex.has_session_id:
self.has_session_id_vertices.append(vertex.id)
if vertex.is_state:
if self._is_state_vertices is None:
self._is_state_vertices = []
self._is_state_vertices.append(vertex.id)
def _set_inputs(self, input_components: list[str], inputs: dict[str, str], input_type: InputType | None) -> None:
"""Updates input vertices' parameters with the provided inputs, filtering by component list and input type.
Only vertices whose IDs or display names match the specified input components and whose IDs contain
the input type (unless input type is 'any' or None) are updated. Raises a ValueError if a specified
vertex is not found.
"""
for vertex_id in self._is_input_vertices:
vertex = self.get_vertex(vertex_id)
# If the vertex is not in the input_components list
@ -1194,6 +1213,7 @@ class Graph:
self._build_vertex_params()
self._instantiate_components_in_vertices()
self._set_cache_to_vertices_in_cycle()
self._set_cache_if_listen_notify_components()
for vertex in self.vertices:
if vertex.id in self.cycle_vertices:
self.run_manager.add_to_cycle_vertices(vertex.id)
@ -1201,9 +1221,29 @@ class Graph:
self.assert_streaming_sequence()
def _get_edges_as_list_of_tuples(self) -> list[tuple[str, str]]:
"""Returns the edges of the graph as a list of tuples."""
"""Returns the edges of the graph as a list of tuples.
Each tuple contains the source and target handle IDs from the edge data.
Returns:
list[tuple[str, str]]: List of (source_id, target_id) tuples representing graph edges.
"""
return [(e["data"]["sourceHandle"]["id"], e["data"]["targetHandle"]["id"]) for e in self._edges]
def _set_cache_if_listen_notify_components(self) -> None:
"""Disables caching for all vertices if Listen/Notify components are present.
If the graph contains any Listen or Notify components, caching is disabled for all vertices
by setting cache=False on their outputs. This ensures proper handling of real-time
communication between components.
"""
has_listen_or_notify_component = any(
vertex.id.split("-")[0] in {"Listen", "Notify"} for vertex in self.vertices
)
if has_listen_or_notify_component:
for vertex in self.vertices:
vertex.apply_on_outputs(lambda output_object: setattr(output_object, "cache", False))
def _set_cache_to_vertices_in_cycle(self) -> None:
"""Sets the cache to the vertices in cycle."""
edges = self._get_edges_as_list_of_tuples()
@ -1528,6 +1568,11 @@ class Graph:
return self
def find_next_runnable_vertices(self, vertex_successors_ids: list[str]) -> list[str]:
"""Determines the next set of runnable vertices from a list of successor vertex IDs.
For each successor, if it is not runnable, recursively finds its runnable
predecessors; otherwise, includes the successor itself. Returns a sorted list of all such vertex IDs.
"""
next_runnable_vertices = set()
for v_id in sorted(vertex_successors_ids):
if not self.is_vertex_runnable(v_id):
@ -1538,6 +1583,19 @@ class Graph:
return sorted(next_runnable_vertices)
async def get_next_runnable_vertices(self, lock: asyncio.Lock, vertex: Vertex, *, cache: bool = True) -> list[str]:
"""Determines the next set of runnable vertex IDs after a vertex completes execution.
If the completed vertex is a state vertex, any recently activated state vertices are also included.
Updates the run manager to reflect the new runnable state and optionally caches the updated graph state.
Args:
lock: An asyncio lock for thread-safe updates.
vertex: The vertex that has just finished execution.
cache: If True, caches the updated graph state.
Returns:
A list of vertex IDs that are ready to be executed next.
"""
v_id = vertex.id
v_successors_ids = vertex.successors_ids
self.run_manager.ran_at_least_once.add(v_id)
@ -1553,25 +1611,15 @@ class Graph:
if cache and self.flow_id is not None:
set_cache_coro = partial(get_chat_service().set_cache, key=self.flow_id)
await set_cache_coro(data=self, lock=lock)
if vertex.is_state:
next_runnable_vertices.extend(self.activated_vertices)
return next_runnable_vertices
async def _log_vertex_build_from_exception(self, vertex_id: str, result: Exception) -> None:
"""Log a vertex build failure caused by an exception.
"""Logs detailed information about a vertex build exception.
This method handles formatting and logging errors that occur during vertex building.
It creates appropriate error output structures and logs the build failure.
Args:
vertex_id: The ID of the vertex that failed to build
result: The exception that caused the build failure
Returns:
None
Side effects:
- Logs the exception details
- Creates error output structures
- Calls log_vertex_build to record the failure
Formats the exception message and stack trace, constructs an error output,
and records the failure using the vertex build logging system.
"""
if isinstance(result, ComponentBuildError):
params = result.message
@ -1706,6 +1754,17 @@ class Graph:
return [self.get_vertex(source_id) for source_id in self.predecessor_map.get(vertex.id, [])]
def get_all_successors(self, vertex: Vertex, *, recursive=True, flat=True, visited=None):
"""Returns all successors of a given vertex, optionally recursively and as a flat or nested list.
Args:
vertex: The vertex whose successors are to be retrieved.
recursive: If True, retrieves successors recursively; otherwise, only immediate successors.
flat: If True, returns a flat list of successors; if False, returns a nested list structure.
visited: Internal set used to track visited vertices and prevent cycles.
Returns:
A list of successor vertices, either flat or nested depending on the `flat` parameter.
"""
if visited is None:
visited = set()
@ -1739,11 +1798,37 @@ class Graph:
return successors_result
def get_successors(self, vertex: Vertex) -> list[Vertex]:
"""Returns the successors of a vertex."""
"""Returns the immediate successor vertices of the given vertex.
Args:
vertex: The vertex whose successors are to be retrieved.
Returns:
A list of vertices that are direct successors of the specified vertex.
"""
return [self.get_vertex(target_id) for target_id in self.successor_map.get(vertex.id, set())]
def get_all_predecessors(self, vertex: Vertex, *, recursive: bool = True) -> list[Vertex]:
"""Retrieves all predecessor vertices of a given vertex.
If `recursive` is True, returns both direct and indirect predecessors by
traversing the graph recursively. If False, returns only the immediate predecessors.
"""
_predecessors = self.predecessor_map.get(vertex.id, [])
predecessors = [self.get_vertex(v_id) for v_id in _predecessors]
if recursive:
for predecessor in _predecessors:
predecessors.extend(self.get_all_predecessors(self.get_vertex(predecessor), recursive=recursive))
else:
predecessors.extend([self.get_vertex(predecessor) for predecessor in _predecessors])
return predecessors
def get_vertex_neighbors(self, vertex: Vertex) -> dict[Vertex, int]:
"""Returns the neighbors of a vertex."""
"""Returns a dictionary mapping each direct neighbor of a vertex to the count of connecting edges.
A neighbor is any vertex directly connected to the input vertex, either as a source or target.
The count reflects the number of edges between the input vertex and each neighbor.
"""
neighbors: dict[Vertex, int] = {}
for edge in self.edges:
if edge.source_id == vertex.id:

View file

@ -463,18 +463,21 @@ class InterfaceVertex(ComponentVertex):
class StateVertex(ComponentVertex):
def __init__(self, data: NodeData, graph):
"""Initializes a StateVertex with the provided node data and graph.
Sets up the build steps and marks the vertex as a state vertex.
"""
super().__init__(data, graph=graph)
self.steps = [self._build]
self.is_state = False
@property
def successors_ids(self) -> list[str]:
if self._successors_ids is None:
self.is_state = False
return super().successors_ids
return self._successors_ids
self.is_state = True
def built_object_repr(self):
"""Returns a string representation of the built object from the artifacts if available.
If the artifacts dictionary contains a non-empty "repr" key, its value is returned.
If the "repr" value is falsy, falls back to the superclass representation.
Returns None if no representation is available.
"""
if self.artifacts and "repr" in self.artifacts:
return self.artifacts["repr"] or super().built_object_repr()
return None