Update cache service references

This commit is contained in:
Gabriel Luiz Freitas Almeida 2024-03-25 10:17:06 -03:00
commit 7a8f10287d
7 changed files with 23 additions and 63 deletions

View file

@ -1,6 +1,6 @@
from langflow.services.cache.service import (
AsyncInMemoryCache,
BaseCacheService,
CacheService,
RedisCache,
ThreadingInMemoryCache,
)
@ -12,6 +12,6 @@ __all__ = [
"service",
"ThreadingInMemoryCache",
"AsyncInMemoryCache",
"BaseCacheService",
"CacheService",
"RedisCache",
]

View file

@ -1,11 +1,6 @@
from typing import TYPE_CHECKING
from langflow.services.cache.service import (
AsyncInMemoryCache,
BaseCacheService,
RedisCache,
ThreadingInMemoryCache,
)
from langflow.services.cache.service import AsyncInMemoryCache, CacheService, RedisCache, ThreadingInMemoryCache
from langflow.services.factory import ServiceFactory
from langflow.utils.logger import logger

View file

@ -8,10 +8,10 @@ from typing import Optional
from loguru import logger
from langflow.services.base import Service
from langflow.services.cache.base import AsyncBaseCacheService, BaseCacheService
from langflow.services.cache.base import AsyncBaseCacheService, CacheService
class ThreadingInMemoryCache(BaseCacheService, Service):
class ThreadingInMemoryCache(CacheService, Service):
"""
A simple in-memory cache using an OrderedDict.
@ -68,10 +68,7 @@ class ThreadingInMemoryCache(BaseCacheService, Service):
Retrieve an item from the cache without acquiring the lock.
"""
if item := self._cache.get(key):
if (
self.expiration_time is None
or time.time() - item["time"] < self.expiration_time
):
if self.expiration_time is None or time.time() - item["time"] < self.expiration_time:
# Move the key to the end to make it recently used
self._cache.move_to_end(key)
# Check if the value is pickled
@ -116,11 +113,7 @@ class ThreadingInMemoryCache(BaseCacheService, Service):
"""
with lock or self._lock:
existing_value = self._get_without_lock(key)
if (
existing_value is not None
and isinstance(existing_value, dict)
and isinstance(value, dict)
):
if existing_value is not None and isinstance(existing_value, dict) and isinstance(value, dict):
existing_value.update(value)
value = existing_value
@ -209,9 +202,7 @@ class RedisCache(CacheService):
b = cache["b"]
"""
def __init__(
self, host="localhost", port=6379, db=0, url=None, expiration_time=60 * 60
):
def __init__(self, host="localhost", port=6379, db=0, url=None, expiration_time=60 * 60):
"""
Initialize a new RedisCache instance.
@ -279,9 +270,7 @@ class RedisCache(CacheService):
if not result:
raise ValueError("RedisCache could not set the value.")
except TypeError as exc:
raise TypeError(
"RedisCache only accepts values that can be pickled. "
) from exc
raise TypeError("RedisCache only accepts values that can be pickled. ") from exc
def upsert(self, key, value):
"""
@ -293,11 +282,7 @@ class RedisCache(CacheService):
value: The value to insert or update.
"""
existing_value = self.get(key)
if (
existing_value is not None
and isinstance(existing_value, dict)
and isinstance(value, dict)
):
if existing_value is not None and isinstance(existing_value, dict) and isinstance(value, dict):
existing_value.update(value)
value = existing_value

View file

@ -79,7 +79,7 @@ def session_scope():
session.close()
def get_cache_service() -> "BaseCacheService":
def get_cache_service() -> "CacheService":
return service_manager.get(ServiceType.CACHE_SERVICE) # type: ignore

View file

@ -1,12 +1,10 @@
from typing import TYPE_CHECKING, Coroutine, Optional
from typing import Coroutine, Optional
from langflow.interface.run import build_sorted_vertices
from langflow.services.base import Service
from langflow.services.cache.base import CacheService
from langflow.services.session.utils import compute_dict_hash, session_id_generator
if TYPE_CHECKING:
from langflow.services.cache.base import CacheService
class SessionService(Service):
name = "session_service"

View file

@ -1,13 +1,9 @@
from typing import TYPE_CHECKING
from langflow.services.factory import ServiceFactory
from langflow.services.schema import ServiceType
from langflow.services.storage.service import StorageService
from loguru import logger
if TYPE_CHECKING:
from langflow.services.session.service import SessionService
from langflow.services.settings.service import SettingsService
from langflow.services.factory import ServiceFactory
from langflow.services.session.service import SessionService
from langflow.services.settings.service import SettingsService
from langflow.services.storage.service import StorageService
class StorageServiceFactory(ServiceFactory):
@ -16,9 +12,7 @@ class StorageServiceFactory(ServiceFactory):
StorageService,
)
def create(
self, session_service: "SessionService", settings_service: "SettingsService"
):
def create(self, session_service: SessionService, settings_service: SettingsService):
storage_type = settings_service.settings.STORAGE_TYPE
if storage_type.lower() == "local":
from .local import LocalStorageService
@ -29,9 +23,7 @@ class StorageServiceFactory(ServiceFactory):
return S3StorageService(session_service, settings_service)
else:
logger.warning(
f"Storage type {storage_type} not supported. Using local storage."
)
logger.warning(f"Storage type {storage_type} not supported. Using local storage.")
from .local import LocalStorageService
return LocalStorageService(session_service, settings_service)

View file

@ -9,20 +9,14 @@ from langflow.services.database.utils import initialize_database
from langflow.services.factory import ServiceFactory
from langflow.services.manager import service_manager
from langflow.services.schema import ServiceType
from langflow.services.settings.constants import (
DEFAULT_SUPERUSER,
DEFAULT_SUPERUSER_PASSWORD,
)
from langflow.services.settings.constants import DEFAULT_SUPERUSER, DEFAULT_SUPERUSER_PASSWORD
from langflow.services.socket.utils import set_socketio_server
from .deps import get_db_service, get_session, get_settings_service
def get_factories():
service_names = [
ServiceType(service_type).value.replace("_service", "")
for service_type in ServiceType
]
service_names = [ServiceType(service_type).value.replace("_service", "") for service_type in ServiceType]
base_module = "langflow.services"
factories = []
@ -39,9 +33,7 @@ def get_factories():
except Exception as exc:
logger.exception(exc)
raise RuntimeError(
f"Could not initialize services. Please check your settings."
) from exc
raise RuntimeError(f"Could not initialize services. Please check your settings. Error in {name}.") from exc
return factories
@ -172,9 +164,7 @@ def initialize_session_service():
Initialize the session manager.
"""
from langflow.services.cache import factory as cache_factory
from langflow.services.session import (
factory as session_service_factory,
) # type: ignore
from langflow.services.session import factory as session_service_factory # type: ignore
initialize_settings_service()