From dafa85c7c63a926a5951f65485f359d3c6841717 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Thu, 16 Jan 2025 15:01:01 -0300 Subject: [PATCH] fix: remove subscribe call and add unsubscribe method in StateService (#5727) * feat: Implement unsubscribe functionality in state management - Added `unsubscribe` method to `GraphStateManager` for removing observers from the state service. - Introduced `unsubscribe` method in `StateService` with a `NotImplementedError` for future implementation. - Implemented `unsubscribe` in `InMemoryStateService` to allow observers to be removed from the list, ensuring better management of state subscriptions. This enhancement improves the flexibility of state management by allowing observers to unsubscribe, thus preventing potential memory leaks and ensuring cleaner state handling. * fix: remove state manager subscribe call --- src/backend/base/langflow/graph/graph/base.py | 6 ++---- .../base/langflow/graph/graph/state_manager.py | 3 +++ src/backend/base/langflow/services/state/service.py | 13 +++++++++++-- 3 files changed, 16 insertions(+), 6 deletions(-) diff --git a/src/backend/base/langflow/graph/graph/base.py b/src/backend/base/langflow/graph/graph/base.py index 9bc07e703..7339c169e 100644 --- a/src/backend/base/langflow/graph/graph/base.py +++ b/src/backend/base/langflow/graph/graph/base.py @@ -644,10 +644,7 @@ class Graph: if run_id is None: run_id = uuid.uuid4() - run_id_str = str(run_id) - for vertex in self.vertices: - self.state_manager.subscribe(run_id_str, vertex.update_graph_state) - self._run_id = run_id_str + self._run_id = str(run_id) if self.tracing_service: self.tracing_service.set_run_id(run_id) @@ -1430,6 +1427,7 @@ class Graph: vertex.results = cached_vertex_dict["results"] try: vertex.finalize_build() + if vertex.result is not None: vertex.result.used_frozen_result = True except Exception: # noqa: BLE001 diff --git a/src/backend/base/langflow/graph/graph/state_manager.py b/src/backend/base/langflow/graph/graph/state_manager.py index 38aed4b90..06e2c4399 100644 --- a/src/backend/base/langflow/graph/graph/state_manager.py +++ b/src/backend/base/langflow/graph/graph/state_manager.py @@ -33,3 +33,6 @@ class GraphStateManager: def subscribe(self, key, observer: Callable) -> None: self.state_service.subscribe(key, observer) + + def unsubscribe(self, key, observer: Callable) -> None: + self.state_service.unsubscribe(key, observer) diff --git a/src/backend/base/langflow/services/state/service.py b/src/backend/base/langflow/services/state/service.py index 100b5442d..fbb3a7103 100644 --- a/src/backend/base/langflow/services/state/service.py +++ b/src/backend/base/langflow/services/state/service.py @@ -23,6 +23,9 @@ class StateService(Service): def subscribe(self, key, observer: Callable) -> None: raise NotImplementedError + def unsubscribe(self, key, observer: Callable) -> None: + raise NotImplementedError + def notify_observers(self, key, new_state) -> None: raise NotImplementedError @@ -30,8 +33,8 @@ class StateService(Service): class InMemoryStateService(StateService): def __init__(self, settings_service: SettingsService): self.settings_service = settings_service - self.states: dict = {} - self.observers: dict = defaultdict(list) + self.states: dict[str, dict] = {} + self.observers: dict[str, list[Callable]] = defaultdict(list) self.lock = Lock() def append_state(self, key, new_state, run_id: str) -> None: @@ -72,3 +75,9 @@ class InMemoryStateService(StateService): except Exception: # noqa: BLE001 logger.exception(f"Error in observer {callback} for key {key}") logger.warning("Callbacks not implemented yet") + + def unsubscribe(self, key, observer: Callable) -> None: + with self.lock: + if observer in self.observers[key]: + # Use list.remove() since observers[key] is a list + self.observers[key].remove(observer)