Fix: Fixes bug with RedisCache init and serialization (#7092)

* Fixes bug with RedisCache init and serialization

* [autofix.ci] apply automated fixes

* Removes duplicate get_service call

---------

Co-authored-by: Garrett George <ggeorge@pros.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
Garrett George 2025-04-02 15:28:17 -05:00 committed by GitHub
commit 9ec37f92c9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 38 additions and 18 deletions

View file

@ -167,3 +167,17 @@ class AsyncBaseCacheService(Service, Generic[AsyncLockType]):
Returns:
True if the key is in the cache, False otherwise.
"""
class ExternalAsyncBaseCacheService(AsyncBaseCacheService):
"""Abstract base class for an external async cache."""
name = "cache_service"
@abc.abstractmethod
async def is_connected(self) -> bool:
"""Check if the cache is connected.
Returns:
True if the cache is connected, False otherwise.
"""

View file

@ -24,19 +24,13 @@ class CacheServiceFactory(ServiceFactory):
if settings_service.settings.cache_type == "redis":
logger.debug("Creating Redis cache")
redis_cache: RedisCache = RedisCache(
return RedisCache(
host=settings_service.settings.redis_host,
port=settings_service.settings.redis_port,
db=settings_service.settings.redis_db,
url=settings_service.settings.redis_url,
expiration_time=settings_service.settings.redis_cache_expire,
)
if redis_cache.is_connected():
logger.debug("Redis cache is connected")
return redis_cache
# do not attempt to fallback to another cache type
msg = "Failed to connect to Redis cache"
raise ConnectionError(msg)
if settings_service.settings.cache_type == "memory":
return ThreadingInMemoryCache(expiration_time=settings_service.settings.cache_expire)

View file

@ -5,10 +5,17 @@ import time
from collections import OrderedDict
from typing import Generic, Union
import dill
from loguru import logger
from typing_extensions import override
from langflow.services.cache.base import AsyncBaseCacheService, AsyncLockType, CacheService, LockType
from langflow.services.cache.base import (
AsyncBaseCacheService,
AsyncLockType,
CacheService,
ExternalAsyncBaseCacheService,
LockType,
)
from langflow.services.cache.utils import CACHE_MISS
@ -168,7 +175,7 @@ class ThreadingInMemoryCache(CacheService, Generic[LockType]):
return f"InMemoryCache(max_size={self.max_size}, expiration_time={self.expiration_time})"
class RedisCache(AsyncBaseCacheService, Generic[LockType]):
class RedisCache(ExternalAsyncBaseCacheService, Generic[LockType]):
"""A Redis-based cache implementation.
This cache supports setting an expiration time for cached items.
@ -218,15 +225,15 @@ class RedisCache(AsyncBaseCacheService, Generic[LockType]):
self._client = StrictRedis(host=host, port=port, db=db)
self.expiration_time = expiration_time
# check connection
def is_connected(self) -> bool:
async def is_connected(self) -> bool:
"""Check if the Redis client is connected."""
import redis
try:
asyncio.run(self._client.ping())
await self._client.ping()
except redis.exceptions.ConnectionError:
logger.exception("RedisCache could not connect to the Redis server")
msg = "RedisCache could not connect to the Redis server"
logger.exception(msg)
return False
return True
@ -235,17 +242,17 @@ class RedisCache(AsyncBaseCacheService, Generic[LockType]):
if key is None:
return CACHE_MISS
value = await self._client.get(str(key))
return pickle.loads(value) if value else CACHE_MISS
return dill.loads(value) if value else CACHE_MISS
@override
async def set(self, key, value, lock=None) -> None:
try:
if pickled := pickle.dumps(value):
if pickled := dill.dumps(value, recurse=True):
result = await self._client.setex(str(key), self.expiration_time, pickled)
if not result:
msg = "RedisCache could not set the value."
raise ValueError(msg)
except TypeError as exc:
except pickle.PicklingError as exc:
msg = "RedisCache only accepts values that can be pickled. "
raise TypeError(msg) from exc

View file

@ -9,6 +9,7 @@ from sqlalchemy import exc as sqlalchemy_exc
from sqlmodel import col, select
from langflow.services.auth.utils import create_super_user, verify_password
from langflow.services.cache.base import ExternalAsyncBaseCacheService
from langflow.services.cache.factory import CacheServiceFactory
from langflow.services.database.models.transactions.model import TransactionTable
from langflow.services.database.models.vertex_builds.model import VertexBuildTable
@ -228,8 +229,12 @@ async def clean_vertex_builds(settings_service: SettingsService, session: AsyncS
async def initialize_services(*, fix_migration: bool = False) -> None:
"""Initialize all the services needed."""
# Test cache connection
get_service(ServiceType.CACHE_SERVICE, default=CacheServiceFactory())
cache_service = get_service(ServiceType.CACHE_SERVICE, default=CacheServiceFactory())
# Test external cache connection
if isinstance(cache_service, ExternalAsyncBaseCacheService) and not (await cache_service.is_connected()):
msg = "Cache service failed to connect to external database"
raise ConnectionError(msg)
# Setup the superuser
await initialize_database(fix_migration=fix_migration)
db_service = get_db_service()