From 9ec37f92c91d32a3ca5c9ebaef7a7e886f370a9f Mon Sep 17 00:00:00 2001 From: Garrett George Date: Wed, 2 Apr 2025 15:28:17 -0500 Subject: [PATCH] Fix: Fixes bug with RedisCache init and serialization (#7092) * Fixes bug with RedisCache init and serialization * [autofix.ci] apply automated fixes * Removes duplicate get_service call --------- Co-authored-by: Garrett George Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- .../base/langflow/services/cache/base.py | 14 +++++++++++ .../base/langflow/services/cache/factory.py | 8 +----- .../base/langflow/services/cache/service.py | 25 ++++++++++++------- src/backend/base/langflow/services/utils.py | 9 +++++-- 4 files changed, 38 insertions(+), 18 deletions(-) diff --git a/src/backend/base/langflow/services/cache/base.py b/src/backend/base/langflow/services/cache/base.py index 744476783..514380812 100644 --- a/src/backend/base/langflow/services/cache/base.py +++ b/src/backend/base/langflow/services/cache/base.py @@ -167,3 +167,17 @@ class AsyncBaseCacheService(Service, Generic[AsyncLockType]): Returns: True if the key is in the cache, False otherwise. """ + + +class ExternalAsyncBaseCacheService(AsyncBaseCacheService): + """Abstract base class for an external async cache.""" + + name = "cache_service" + + @abc.abstractmethod + async def is_connected(self) -> bool: + """Check if the cache is connected. + + Returns: + True if the cache is connected, False otherwise. + """ diff --git a/src/backend/base/langflow/services/cache/factory.py b/src/backend/base/langflow/services/cache/factory.py index ea432812c..3e8c3abe0 100644 --- a/src/backend/base/langflow/services/cache/factory.py +++ b/src/backend/base/langflow/services/cache/factory.py @@ -24,19 +24,13 @@ class CacheServiceFactory(ServiceFactory): if settings_service.settings.cache_type == "redis": logger.debug("Creating Redis cache") - redis_cache: RedisCache = RedisCache( + return RedisCache( host=settings_service.settings.redis_host, port=settings_service.settings.redis_port, db=settings_service.settings.redis_db, url=settings_service.settings.redis_url, expiration_time=settings_service.settings.redis_cache_expire, ) - if redis_cache.is_connected(): - logger.debug("Redis cache is connected") - return redis_cache - # do not attempt to fallback to another cache type - msg = "Failed to connect to Redis cache" - raise ConnectionError(msg) if settings_service.settings.cache_type == "memory": return ThreadingInMemoryCache(expiration_time=settings_service.settings.cache_expire) diff --git a/src/backend/base/langflow/services/cache/service.py b/src/backend/base/langflow/services/cache/service.py index 8f55d8f0e..efe5c13a9 100644 --- a/src/backend/base/langflow/services/cache/service.py +++ b/src/backend/base/langflow/services/cache/service.py @@ -5,10 +5,17 @@ import time from collections import OrderedDict from typing import Generic, Union +import dill from loguru import logger from typing_extensions import override -from langflow.services.cache.base import AsyncBaseCacheService, AsyncLockType, CacheService, LockType +from langflow.services.cache.base import ( + AsyncBaseCacheService, + AsyncLockType, + CacheService, + ExternalAsyncBaseCacheService, + LockType, +) from langflow.services.cache.utils import CACHE_MISS @@ -168,7 +175,7 @@ class ThreadingInMemoryCache(CacheService, Generic[LockType]): return f"InMemoryCache(max_size={self.max_size}, expiration_time={self.expiration_time})" -class RedisCache(AsyncBaseCacheService, Generic[LockType]): +class RedisCache(ExternalAsyncBaseCacheService, Generic[LockType]): """A Redis-based cache implementation. This cache supports setting an expiration time for cached items. @@ -218,15 +225,15 @@ class RedisCache(AsyncBaseCacheService, Generic[LockType]): self._client = StrictRedis(host=host, port=port, db=db) self.expiration_time = expiration_time - # check connection - def is_connected(self) -> bool: + async def is_connected(self) -> bool: """Check if the Redis client is connected.""" import redis try: - asyncio.run(self._client.ping()) + await self._client.ping() except redis.exceptions.ConnectionError: - logger.exception("RedisCache could not connect to the Redis server") + msg = "RedisCache could not connect to the Redis server" + logger.exception(msg) return False return True @@ -235,17 +242,17 @@ class RedisCache(AsyncBaseCacheService, Generic[LockType]): if key is None: return CACHE_MISS value = await self._client.get(str(key)) - return pickle.loads(value) if value else CACHE_MISS + return dill.loads(value) if value else CACHE_MISS @override async def set(self, key, value, lock=None) -> None: try: - if pickled := pickle.dumps(value): + if pickled := dill.dumps(value, recurse=True): result = await self._client.setex(str(key), self.expiration_time, pickled) if not result: msg = "RedisCache could not set the value." raise ValueError(msg) - except TypeError as exc: + except pickle.PicklingError as exc: msg = "RedisCache only accepts values that can be pickled. " raise TypeError(msg) from exc diff --git a/src/backend/base/langflow/services/utils.py b/src/backend/base/langflow/services/utils.py index d207e3c53..b83509a21 100644 --- a/src/backend/base/langflow/services/utils.py +++ b/src/backend/base/langflow/services/utils.py @@ -9,6 +9,7 @@ from sqlalchemy import exc as sqlalchemy_exc from sqlmodel import col, select from langflow.services.auth.utils import create_super_user, verify_password +from langflow.services.cache.base import ExternalAsyncBaseCacheService from langflow.services.cache.factory import CacheServiceFactory from langflow.services.database.models.transactions.model import TransactionTable from langflow.services.database.models.vertex_builds.model import VertexBuildTable @@ -228,8 +229,12 @@ async def clean_vertex_builds(settings_service: SettingsService, session: AsyncS async def initialize_services(*, fix_migration: bool = False) -> None: """Initialize all the services needed.""" - # Test cache connection - get_service(ServiceType.CACHE_SERVICE, default=CacheServiceFactory()) + cache_service = get_service(ServiceType.CACHE_SERVICE, default=CacheServiceFactory()) + # Test external cache connection + if isinstance(cache_service, ExternalAsyncBaseCacheService) and not (await cache_service.is_connected()): + msg = "Cache service failed to connect to external database" + raise ConnectionError(msg) + # Setup the superuser await initialize_database(fix_migration=fix_migration) db_service = get_db_service()