fix: resolve potential deadlock in AsyncInMemoryCache (#4464)

* Fix potential lock misuse and deadlock in AsyncInMemoryCache.

* Recover async lock handling logic.

* Remove unused lock parameter in upsert.

* Fix potential lock misuse and deadlock in AsyncInMemoryCache.

* Recover async lock handling logic.

* Remove unused lock parameter in upsert.

* Add lock parameter to prevent errors.

* Fix ARG002 rule error.

* Lock passed to get and set method.

---------

Co-authored-by: Gabriel Luiz Freitas Almeida <gabriel@langflow.org>
This commit is contained in:
dhlidongming 2024-11-14 19:44:21 +08:00 committed by GitHub
commit af546551bc
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -298,10 +298,7 @@ class AsyncInMemoryCache(AsyncBaseCacheService, Generic[AsyncLockType]):
self.expiration_time = expiration_time
async def get(self, key, lock: asyncio.Lock | None = None):
if not lock:
async with self.lock:
return await self._get(key)
else:
async with lock or self.lock:
return await self._get(key)
async def _get(self, key):
@ -315,13 +312,7 @@ class AsyncInMemoryCache(AsyncBaseCacheService, Generic[AsyncLockType]):
return CACHE_MISS
async def set(self, key, value, lock: asyncio.Lock | None = None) -> None:
if not lock:
async with self.lock:
await self._set(
key,
value,
)
else:
async with lock or self.lock:
await self._set(
key,
value,
@ -334,10 +325,7 @@ class AsyncInMemoryCache(AsyncBaseCacheService, Generic[AsyncLockType]):
self.cache.move_to_end(key)
async def delete(self, key, lock: asyncio.Lock | None = None) -> None:
if not lock:
async with self.lock:
await self._delete(key)
else:
async with lock or self.lock:
await self._delete(key)
async def _delete(self, key) -> None:
@ -345,28 +333,21 @@ class AsyncInMemoryCache(AsyncBaseCacheService, Generic[AsyncLockType]):
del self.cache[key]
async def clear(self, lock: asyncio.Lock | None = None) -> None:
if not lock:
async with self.lock:
await self._clear()
else:
async with lock or self.lock:
await self._clear()
async def _clear(self) -> None:
self.cache.clear()
async def upsert(self, key, value, lock: asyncio.Lock | None = None) -> None:
if not lock:
async with self.lock:
await self._upsert(key, value)
else:
await self._upsert(key, value)
await self._upsert(key, value, lock)
async def _upsert(self, key, value) -> None:
existing_value = await self.get(key)
async def _upsert(self, key, value, lock: asyncio.Lock | None = None) -> None:
existing_value = await self.get(key, lock)
if existing_value is not None and isinstance(existing_value, dict) and isinstance(value, dict):
existing_value.update(value)
value = existing_value
await self.set(key, value)
await self.set(key, value, lock)
async def contains(self, key) -> bool:
return key in self.cache