🐛 fix(manager.py): change threading.Lock to threading.RLock to allow reentrant locking and prevent deadlocks
🔀 merge(manager.py): refactor _get method to extract logic into _get_without_lock method for code reuse and readability 🔀 merge(manager.py): refactor set method to use _get_without_lock method for code reuse and readability 🔀 merge(manager.py): add lock acquisition in delete method to ensure thread safety 🔀 merge(utils.py): add update_cache method to Memoize class for updating cache values
This commit is contained in:
parent
cf2940be05
commit
632eef48b2
2 changed files with 26 additions and 16 deletions
37
src/backend/langflow/services/cache/manager.py
vendored
37
src/backend/langflow/services/cache/manager.py
vendored
|
|
@ -43,7 +43,7 @@ class InMemoryCache(BaseCacheManager):
|
|||
expiration_time (int, optional): Time in seconds after which a cached item expires. Default is 1 hour.
|
||||
"""
|
||||
self._cache = OrderedDict()
|
||||
self._lock = threading.Lock()
|
||||
self._lock = threading.RLock()
|
||||
self.max_size = max_size
|
||||
self.expiration_time = expiration_time
|
||||
|
||||
|
|
@ -58,18 +58,23 @@ class InMemoryCache(BaseCacheManager):
|
|||
The value associated with the key, or None if the key is not found or the item has expired.
|
||||
"""
|
||||
with self._lock:
|
||||
if key in self._cache:
|
||||
item = self._cache.pop(key)
|
||||
if (
|
||||
self.expiration_time is None
|
||||
or time.time() - item["time"] < self.expiration_time
|
||||
):
|
||||
# Move the key to the end to make it recently used
|
||||
self._cache[key] = item
|
||||
return item["value"]
|
||||
else:
|
||||
self.delete(key)
|
||||
return None
|
||||
return self._get_without_lock(key)
|
||||
|
||||
def _get_without_lock(self, key):
|
||||
"""
|
||||
Retrieve an item from the cache without acquiring the lock.
|
||||
"""
|
||||
if item := self._cache.get(key):
|
||||
if (
|
||||
self.expiration_time is None
|
||||
or time.time() - item["time"] < self.expiration_time
|
||||
):
|
||||
# Move the key to the end to make it recently used
|
||||
self._cache.move_to_end(key)
|
||||
return item["value"]
|
||||
else:
|
||||
self.delete(key)
|
||||
return None
|
||||
|
||||
def set(self, key, value):
|
||||
"""
|
||||
|
|
@ -100,7 +105,7 @@ class InMemoryCache(BaseCacheManager):
|
|||
value: The value to insert or update.
|
||||
"""
|
||||
with self._lock:
|
||||
existing_value = self.get(key)
|
||||
existing_value = self._get_without_lock(key)
|
||||
if (
|
||||
existing_value is not None
|
||||
and isinstance(existing_value, dict)
|
||||
|
|
@ -135,8 +140,8 @@ class InMemoryCache(BaseCacheManager):
|
|||
Args:
|
||||
key: The key of the item to remove.
|
||||
"""
|
||||
# with self._lock:
|
||||
self._cache.pop(key, None)
|
||||
with self._lock:
|
||||
self._cache.pop(key, None)
|
||||
|
||||
def clear(self):
|
||||
"""
|
||||
|
|
|
|||
5
src/backend/langflow/services/cache/utils.py
vendored
5
src/backend/langflow/services/cache/utils.py
vendored
|
|
@ -212,6 +212,10 @@ class Memoize:
|
|||
cache_manager = self.get_cache_manager()
|
||||
return cache_manager.get(session_id)
|
||||
|
||||
def update_cache(self, session_id, value):
|
||||
cache_manager = self.get_cache_manager()
|
||||
cache_manager.set(session_id, value)
|
||||
|
||||
def __call__(self, func: Callable[..., Any]):
|
||||
@functools.wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
|
|
@ -228,5 +232,6 @@ class Memoize:
|
|||
return result
|
||||
|
||||
wrapper.clear_cache = self.clear_cache
|
||||
wrapper.update_cache = self.update_cache
|
||||
wrapper.get_result_by_session_id = self.get_result_by_session_id
|
||||
return wrapper
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue