diff --git a/src/backend/base/langflow/graph/graph/base.py b/src/backend/base/langflow/graph/graph/base.py index 9bc07e703..7339c169e 100644 --- a/src/backend/base/langflow/graph/graph/base.py +++ b/src/backend/base/langflow/graph/graph/base.py @@ -644,10 +644,7 @@ class Graph: if run_id is None: run_id = uuid.uuid4() - run_id_str = str(run_id) - for vertex in self.vertices: - self.state_manager.subscribe(run_id_str, vertex.update_graph_state) - self._run_id = run_id_str + self._run_id = str(run_id) if self.tracing_service: self.tracing_service.set_run_id(run_id) @@ -1430,6 +1427,7 @@ class Graph: vertex.results = cached_vertex_dict["results"] try: vertex.finalize_build() + if vertex.result is not None: vertex.result.used_frozen_result = True except Exception: # noqa: BLE001 diff --git a/src/backend/base/langflow/graph/graph/state_manager.py b/src/backend/base/langflow/graph/graph/state_manager.py index 38aed4b90..06e2c4399 100644 --- a/src/backend/base/langflow/graph/graph/state_manager.py +++ b/src/backend/base/langflow/graph/graph/state_manager.py @@ -33,3 +33,6 @@ class GraphStateManager: def subscribe(self, key, observer: Callable) -> None: self.state_service.subscribe(key, observer) + + def unsubscribe(self, key, observer: Callable) -> None: + self.state_service.unsubscribe(key, observer) diff --git a/src/backend/base/langflow/services/state/service.py b/src/backend/base/langflow/services/state/service.py index 100b5442d..fbb3a7103 100644 --- a/src/backend/base/langflow/services/state/service.py +++ b/src/backend/base/langflow/services/state/service.py @@ -23,6 +23,9 @@ class StateService(Service): def subscribe(self, key, observer: Callable) -> None: raise NotImplementedError + def unsubscribe(self, key, observer: Callable) -> None: + raise NotImplementedError + def notify_observers(self, key, new_state) -> None: raise NotImplementedError @@ -30,8 +33,8 @@ class StateService(Service): class InMemoryStateService(StateService): def __init__(self, settings_service: SettingsService): self.settings_service = settings_service - self.states: dict = {} - self.observers: dict = defaultdict(list) + self.states: dict[str, dict] = {} + self.observers: dict[str, list[Callable]] = defaultdict(list) self.lock = Lock() def append_state(self, key, new_state, run_id: str) -> None: @@ -72,3 +75,9 @@ class InMemoryStateService(StateService): except Exception: # noqa: BLE001 logger.exception(f"Error in observer {callback} for key {key}") logger.warning("Callbacks not implemented yet") + + def unsubscribe(self, key, observer: Callable) -> None: + with self.lock: + if observer in self.observers[key]: + # Use list.remove() since observers[key] is a list + self.observers[key].remove(observer)