From 56ecb18ef5feea651e85894c9803c090441f9131 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Fri, 9 Aug 2024 09:42:21 -0300 Subject: [PATCH] feat: Improve caching logic and add disk caching option (#3246) * feat: add diskcache package version 5.6.3 to poetry.lock and pyproject.toml for improved caching functionality * refactor: simplify CacheMiss import in cache service files for better clarity and maintainability * feat: Add AsyncDiskCache class for disk-based caching * feat: Add disk caching option in CacheServiceFactory with AsyncDiskCache * feat: Restrict cache_type to specific literals: async, redis, memory, disk for enhanced type safety and clarity * feat: Change get_requester_result to async await for proper async handling in Vertex class * fix: Update outputs dictionary in ResultData class to use key-value pairs for better readability and maintainability * fix: Improve caching logic in Graph class by ensuring vertex builds properly handle exceptions and cache updates more reliably * feat: Add teardown method to AsyncDiskCache for clearing cache directory during cleanup process * fix: Correct variable name in Graph class to ensure proper handling of vertex results in caching logic * feat: Clear AsyncDiskCache on initialization to align behavior with in-memory cache until frontend handling is implemented --- src/backend/base/langflow/graph/graph/base.py | 58 ++++++----- src/backend/base/langflow/graph/schema.py | 2 +- .../base/langflow/graph/vertex/base.py | 2 +- .../base/langflow/services/cache/disk.py | 96 +++++++++++++++++++ .../base/langflow/services/cache/factory.py | 6 ++ .../base/langflow/services/cache/service.py | 4 +- .../base/langflow/services/cache/utils.py | 3 + .../base/langflow/services/settings/base.py | 4 +- src/backend/base/poetry.lock | 13 ++- src/backend/base/pyproject.toml | 1 + 10 files changed, 157 insertions(+), 32 deletions(-) create mode 100644 src/backend/base/langflow/services/cache/disk.py diff --git a/src/backend/base/langflow/graph/graph/base.py b/src/backend/base/langflow/graph/graph/base.py index d1dbe1c71..918ce23de 100644 --- a/src/backend/base/langflow/graph/graph/base.py +++ b/src/backend/base/langflow/graph/graph/base.py @@ -1126,41 +1126,51 @@ class Graph: self.run_manager.add_to_vertices_being_run(vertex_id) try: params = "" - if vertex.frozen: + should_build = False + if not vertex.frozen: + should_build = True + else: # Check the cache for the vertex if get_cache is not None: cached_result = await get_cache(key=vertex.id) else: cached_result = None if isinstance(cached_result, CacheMiss): - await vertex.build( - user_id=user_id, inputs=inputs_dict, fallback_to_env_vars=fallback_to_env_vars, files=files - ) - if set_cache is not None: - await set_cache(key=vertex.id, data=vertex) - if cached_result and not isinstance(cached_result, CacheMiss): - cached_vertex = cached_result["result"] - # Now set update the vertex with the cached vertex - vertex._built = cached_vertex._built - vertex.result = cached_vertex.result - vertex.results = cached_vertex.results - vertex.artifacts = cached_vertex.artifacts - vertex._built_object = cached_vertex._built_object - vertex._custom_component = cached_vertex._custom_component - if vertex.result is not None: - vertex.result.used_frozen_result = True + should_build = True else: - await vertex.build( - user_id=user_id, inputs=inputs_dict, fallback_to_env_vars=fallback_to_env_vars, files=files - ) - if set_cache is not None: - await set_cache(key=vertex.id, data=vertex) - else: + try: + cached_vertex_dict = cached_result["result"] + # Now set update the vertex with the cached vertex + vertex._built = cached_vertex_dict["_built"] + vertex.artifacts = cached_vertex_dict["artifacts"] + vertex._built_object = cached_vertex_dict["_built_object"] + vertex._built_result = cached_vertex_dict["_built_result"] + vertex._data = cached_vertex_dict["_data"] + vertex.results = cached_vertex_dict["results"] + try: + vertex._finalize_build() + if vertex.result is not None: + vertex.result.used_frozen_result = True + except Exception: + should_build = True + except KeyError: + should_build = True + + if should_build: await vertex.build( user_id=user_id, inputs=inputs_dict, fallback_to_env_vars=fallback_to_env_vars, files=files ) if set_cache is not None: - await set_cache(key=vertex.id, data=vertex) + vertex_dict = { + "_built": vertex._built, + "results": vertex.results, + "artifacts": vertex.artifacts, + "_built_object": vertex._built_object, + "_built_result": vertex._built_result, + "_data": vertex._data, + } + + await set_cache(key=vertex.id, data=vertex_dict) if vertex.result is not None: params = f"{vertex._built_object_repr()}{params}" diff --git a/src/backend/base/langflow/graph/schema.py b/src/backend/base/langflow/graph/schema.py index eab0040c6..fdabcdaaa 100644 --- a/src/backend/base/langflow/graph/schema.py +++ b/src/backend/base/langflow/graph/schema.py @@ -43,7 +43,7 @@ class ResultData(BaseModel): stream_url = StreamURL(location=message["stream_url"]) values["outputs"].update({key: OutputValue(message=stream_url, type=message["type"])}) elif "type" in message: - values["outputs"].update({OutputValue(message=message, type=message["type"])}) + values["outputs"].update({key: OutputValue(message=message, type=message["type"])}) return values diff --git a/src/backend/base/langflow/graph/vertex/base.py b/src/backend/base/langflow/graph/vertex/base.py index 8284736ac..0b7941734 100644 --- a/src/backend/base/langflow/graph/vertex/base.py +++ b/src/backend/base/langflow/graph/vertex/base.py @@ -753,7 +753,7 @@ class Vertex: return if self.frozen and self._built: - return self.get_requester_result(requester) + return await self.get_requester_result(requester) elif self._built and requester is not None: # This means that the vertex has already been built # and we are just getting the result for the requester diff --git a/src/backend/base/langflow/services/cache/disk.py b/src/backend/base/langflow/services/cache/disk.py new file mode 100644 index 000000000..dbbd85f13 --- /dev/null +++ b/src/backend/base/langflow/services/cache/disk.py @@ -0,0 +1,96 @@ +import asyncio +import pickle +import time +from typing import Generic, Optional + +from diskcache import Cache +from loguru import logger + +from langflow.services.cache.base import AsyncBaseCacheService, AsyncLockType +from langflow.services.cache.utils import CACHE_MISS + + +class AsyncDiskCache(AsyncBaseCacheService, Generic[AsyncLockType]): # type: ignore + def __init__(self, cache_dir, max_size=None, expiration_time=3600): + self.cache = Cache(cache_dir) + # Let's clear the cache for now to maintain a similar + # behavior as the in-memory cache + # Later we should implement endpoints for the frontend to grab + # output logs from the cache + if len(self.cache) > 0: + self.cache.clear() + self.lock = asyncio.Lock() + self.max_size = max_size + self.expiration_time = expiration_time + + async def get(self, key, lock: Optional[asyncio.Lock] = None): + if not lock: + async with self.lock: + return await self._get(key) + else: + return await self._get(key) + + async def _get(self, key): + item = await asyncio.to_thread(self.cache.get, key, default=None) + if item: + if time.time() - item["time"] < self.expiration_time: + await asyncio.to_thread(self.cache.touch, key) # Refresh the expiry time + return pickle.loads(item["value"]) if isinstance(item["value"], bytes) else item["value"] + else: + logger.info(f"Cache item for key '{key}' has expired and will be deleted.") + await self._delete(key) # Log before deleting the expired item + return CACHE_MISS + + async def set(self, key, value, lock: Optional[asyncio.Lock] = None): + if not lock: + async with self.lock: + await self._set(key, value) + else: + await self._set(key, value) + + async def _set(self, key, value): + if self.max_size and len(self.cache) >= self.max_size: + await asyncio.to_thread(self.cache.cull) + item = {"value": pickle.dumps(value) if not isinstance(value, (str, bytes)) else value, "time": time.time()} + await asyncio.to_thread(self.cache.set, key, item) + + async def delete(self, key, lock: Optional[asyncio.Lock] = None): + if not lock: + async with self.lock: + await self._delete(key) + else: + await self._delete(key) + + async def _delete(self, key): + await asyncio.to_thread(self.cache.delete, key) + + async def clear(self, lock: Optional[asyncio.Lock] = None): + if not lock: + async with self.lock: + await self._clear() + else: + await self._clear() + + async def _clear(self): + await asyncio.to_thread(self.cache.clear) + + async def upsert(self, key, value, lock: Optional[asyncio.Lock] = None): + if not lock: + async with self.lock: + await self._upsert(key, value) + else: + await self._upsert(key, value) + + async def _upsert(self, key, value): + existing_value = await self.get(key) + if existing_value is not CACHE_MISS and isinstance(existing_value, dict) and isinstance(value, dict): + existing_value.update(value) + value = existing_value + await self.set(key, value) + + def __contains__(self, key): + return asyncio.run(asyncio.to_thread(self.cache.__contains__, key)) + + async def teardown(self): + # Clean up the cache directory + self.cache.clear(retry=True) diff --git a/src/backend/base/langflow/services/cache/factory.py b/src/backend/base/langflow/services/cache/factory.py index 5cc6b12af..74364dbfc 100644 --- a/src/backend/base/langflow/services/cache/factory.py +++ b/src/backend/base/langflow/services/cache/factory.py @@ -1,5 +1,6 @@ from typing import TYPE_CHECKING +from langflow.services.cache.disk import AsyncDiskCache from langflow.services.cache.service import AsyncInMemoryCache, CacheService, RedisCache, ThreadingInMemoryCache from langflow.services.factory import ServiceFactory from langflow.utils.logger import logger @@ -36,3 +37,8 @@ class CacheServiceFactory(ServiceFactory): return ThreadingInMemoryCache(expiration_time=settings_service.settings.cache_expire) elif settings_service.settings.cache_type == "async": return AsyncInMemoryCache(expiration_time=settings_service.settings.cache_expire) + elif settings_service.settings.cache_type == "disk": + return AsyncDiskCache( + cache_dir=settings_service.settings.config_dir, + expiration_time=settings_service.settings.cache_expire, + ) diff --git a/src/backend/base/langflow/services/cache/service.py b/src/backend/base/langflow/services/cache/service.py index 3d4131c23..021c33f90 100644 --- a/src/backend/base/langflow/services/cache/service.py +++ b/src/backend/base/langflow/services/cache/service.py @@ -8,9 +8,7 @@ from typing import Generic, Optional from loguru import logger from langflow.services.cache.base import AsyncBaseCacheService, AsyncLockType, CacheService, LockType -from langflow.services.cache.utils import CacheMiss - -CACHE_MISS = CacheMiss() +from langflow.services.cache.utils import CACHE_MISS class ThreadingInMemoryCache(CacheService, Generic[LockType]): # type: ignore diff --git a/src/backend/base/langflow/services/cache/utils.py b/src/backend/base/langflow/services/cache/utils.py index a89963f56..c2f3c9611 100644 --- a/src/backend/base/langflow/services/cache/utils.py +++ b/src/backend/base/langflow/services/cache/utils.py @@ -166,3 +166,6 @@ def update_build_status(cache_service, flow_id: str, status: "BuildStatus"): cache_service[flow_id] = cached_flow cached_flow["status"] = status cache_service[flow_id] = cached_flow + + +CACHE_MISS = CacheMiss() diff --git a/src/backend/base/langflow/services/settings/base.py b/src/backend/base/langflow/services/settings/base.py index 658edd57e..88c2a64b4 100644 --- a/src/backend/base/langflow/services/settings/base.py +++ b/src/backend/base/langflow/services/settings/base.py @@ -3,7 +3,7 @@ import json import os from pathlib import Path from shutil import copy2 -from typing import Any, List, Optional, Tuple, Type +from typing import Any, List, Literal, Optional, Tuple, Type import orjson import yaml @@ -79,7 +79,7 @@ class Settings(BaseSettings): """SQLite pragmas to use when connecting to the database.""" # cache configuration - cache_type: str = "async" + cache_type: Literal["async", "redis", "memory", "disk"] = "async" """The cache type can be 'async' or 'redis'.""" cache_expire: int = 3600 """The cache expire in seconds.""" diff --git a/src/backend/base/poetry.lock b/src/backend/base/poetry.lock index e52b0c5b7..8deb46e0b 100644 --- a/src/backend/base/poetry.lock +++ b/src/backend/base/poetry.lock @@ -1208,6 +1208,17 @@ files = [ graph = ["objgraph (>=1.7.2)"] profile = ["gprof2dot (>=2022.7.29)"] +[[package]] +name = "diskcache" +version = "5.6.3" +description = "Disk Cache -- Disk and file backed persistent cache." +optional = false +python-versions = ">=3" +files = [ + {file = "diskcache-5.6.3-py3-none-any.whl", hash = "sha256:5e31b2d5fbad117cc363ebaf6b689474db18a1f6438bc82358b024abd4c2ca19"}, + {file = "diskcache-5.6.3.tar.gz", hash = "sha256:2c3a3fa2743d8535d832ec61c2054a1641f41775aa7c556758a109941e33e4fc"}, +] + [[package]] name = "distlib" version = "0.3.8" @@ -7654,4 +7665,4 @@ local = [] [metadata] lock-version = "2.0" python-versions = ">=3.10,<3.13" -content-hash = "747dad35b9e5b1338a989ea6bfd4ac3465ba34f792639aeabda3c1ca9b40c689" +content-hash = "fe6710d7325bc2cceeaa298d94d6f1157cfe1533c2acbabe3ecdca5594d9e007" diff --git a/src/backend/base/pyproject.toml b/src/backend/base/pyproject.toml index d98c673e0..a8de78d9e 100644 --- a/src/backend/base/pyproject.toml +++ b/src/backend/base/pyproject.toml @@ -79,6 +79,7 @@ filelock = "^3.15.4" grandalf = "^0.8.0" crewai = "^0.36.0" spider-client = "^0.0.27" +diskcache = "^5.6.3" [tool.poetry.extras]