feat: Improve caching logic and add disk caching option (#3246)

* feat: add diskcache package version 5.6.3 to poetry.lock and pyproject.toml for improved caching functionality

* refactor: simplify CacheMiss import in cache service files for better clarity and maintainability

* feat: Add AsyncDiskCache class for disk-based caching

* feat: Add disk caching option in CacheServiceFactory with AsyncDiskCache

* feat: Restrict cache_type to specific literals: async, redis, memory, disk for enhanced type safety and clarity

* feat: Change get_requester_result to async await for proper async handling in Vertex class

* fix: Update outputs dictionary in ResultData class to use key-value pairs for better readability and maintainability

* fix: Improve caching logic in Graph class by ensuring vertex builds properly handle exceptions and cache updates more reliably

* feat: Add teardown method to AsyncDiskCache for clearing cache directory during cleanup process

* fix: Correct variable name in Graph class to ensure proper handling of vertex results in caching logic

* feat: Clear AsyncDiskCache on initialization to align behavior with in-memory cache until frontend handling is implemented
This commit is contained in:
Gabriel Luiz Freitas Almeida 2024-08-09 09:42:21 -03:00 committed by GitHub
commit 56ecb18ef5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 157 additions and 32 deletions

View file

@ -1126,41 +1126,51 @@ class Graph:
self.run_manager.add_to_vertices_being_run(vertex_id)
try:
params = ""
if vertex.frozen:
should_build = False
if not vertex.frozen:
should_build = True
else:
# Check the cache for the vertex
if get_cache is not None:
cached_result = await get_cache(key=vertex.id)
else:
cached_result = None
if isinstance(cached_result, CacheMiss):
await vertex.build(
user_id=user_id, inputs=inputs_dict, fallback_to_env_vars=fallback_to_env_vars, files=files
)
if set_cache is not None:
await set_cache(key=vertex.id, data=vertex)
if cached_result and not isinstance(cached_result, CacheMiss):
cached_vertex = cached_result["result"]
# Now set update the vertex with the cached vertex
vertex._built = cached_vertex._built
vertex.result = cached_vertex.result
vertex.results = cached_vertex.results
vertex.artifacts = cached_vertex.artifacts
vertex._built_object = cached_vertex._built_object
vertex._custom_component = cached_vertex._custom_component
if vertex.result is not None:
vertex.result.used_frozen_result = True
should_build = True
else:
await vertex.build(
user_id=user_id, inputs=inputs_dict, fallback_to_env_vars=fallback_to_env_vars, files=files
)
if set_cache is not None:
await set_cache(key=vertex.id, data=vertex)
else:
try:
cached_vertex_dict = cached_result["result"]
# Now set update the vertex with the cached vertex
vertex._built = cached_vertex_dict["_built"]
vertex.artifacts = cached_vertex_dict["artifacts"]
vertex._built_object = cached_vertex_dict["_built_object"]
vertex._built_result = cached_vertex_dict["_built_result"]
vertex._data = cached_vertex_dict["_data"]
vertex.results = cached_vertex_dict["results"]
try:
vertex._finalize_build()
if vertex.result is not None:
vertex.result.used_frozen_result = True
except Exception:
should_build = True
except KeyError:
should_build = True
if should_build:
await vertex.build(
user_id=user_id, inputs=inputs_dict, fallback_to_env_vars=fallback_to_env_vars, files=files
)
if set_cache is not None:
await set_cache(key=vertex.id, data=vertex)
vertex_dict = {
"_built": vertex._built,
"results": vertex.results,
"artifacts": vertex.artifacts,
"_built_object": vertex._built_object,
"_built_result": vertex._built_result,
"_data": vertex._data,
}
await set_cache(key=vertex.id, data=vertex_dict)
if vertex.result is not None:
params = f"{vertex._built_object_repr()}{params}"

View file

@ -43,7 +43,7 @@ class ResultData(BaseModel):
stream_url = StreamURL(location=message["stream_url"])
values["outputs"].update({key: OutputValue(message=stream_url, type=message["type"])})
elif "type" in message:
values["outputs"].update({OutputValue(message=message, type=message["type"])})
values["outputs"].update({key: OutputValue(message=message, type=message["type"])})
return values

View file

@ -753,7 +753,7 @@ class Vertex:
return
if self.frozen and self._built:
return self.get_requester_result(requester)
return await self.get_requester_result(requester)
elif self._built and requester is not None:
# This means that the vertex has already been built
# and we are just getting the result for the requester

View file

@ -0,0 +1,96 @@
import asyncio
import pickle
import time
from typing import Generic, Optional
from diskcache import Cache
from loguru import logger
from langflow.services.cache.base import AsyncBaseCacheService, AsyncLockType
from langflow.services.cache.utils import CACHE_MISS
class AsyncDiskCache(AsyncBaseCacheService, Generic[AsyncLockType]): # type: ignore
def __init__(self, cache_dir, max_size=None, expiration_time=3600):
self.cache = Cache(cache_dir)
# Let's clear the cache for now to maintain a similar
# behavior as the in-memory cache
# Later we should implement endpoints for the frontend to grab
# output logs from the cache
if len(self.cache) > 0:
self.cache.clear()
self.lock = asyncio.Lock()
self.max_size = max_size
self.expiration_time = expiration_time
async def get(self, key, lock: Optional[asyncio.Lock] = None):
if not lock:
async with self.lock:
return await self._get(key)
else:
return await self._get(key)
async def _get(self, key):
item = await asyncio.to_thread(self.cache.get, key, default=None)
if item:
if time.time() - item["time"] < self.expiration_time:
await asyncio.to_thread(self.cache.touch, key) # Refresh the expiry time
return pickle.loads(item["value"]) if isinstance(item["value"], bytes) else item["value"]
else:
logger.info(f"Cache item for key '{key}' has expired and will be deleted.")
await self._delete(key) # Log before deleting the expired item
return CACHE_MISS
async def set(self, key, value, lock: Optional[asyncio.Lock] = None):
if not lock:
async with self.lock:
await self._set(key, value)
else:
await self._set(key, value)
async def _set(self, key, value):
if self.max_size and len(self.cache) >= self.max_size:
await asyncio.to_thread(self.cache.cull)
item = {"value": pickle.dumps(value) if not isinstance(value, (str, bytes)) else value, "time": time.time()}
await asyncio.to_thread(self.cache.set, key, item)
async def delete(self, key, lock: Optional[asyncio.Lock] = None):
if not lock:
async with self.lock:
await self._delete(key)
else:
await self._delete(key)
async def _delete(self, key):
await asyncio.to_thread(self.cache.delete, key)
async def clear(self, lock: Optional[asyncio.Lock] = None):
if not lock:
async with self.lock:
await self._clear()
else:
await self._clear()
async def _clear(self):
await asyncio.to_thread(self.cache.clear)
async def upsert(self, key, value, lock: Optional[asyncio.Lock] = None):
if not lock:
async with self.lock:
await self._upsert(key, value)
else:
await self._upsert(key, value)
async def _upsert(self, key, value):
existing_value = await self.get(key)
if existing_value is not CACHE_MISS and isinstance(existing_value, dict) and isinstance(value, dict):
existing_value.update(value)
value = existing_value
await self.set(key, value)
def __contains__(self, key):
return asyncio.run(asyncio.to_thread(self.cache.__contains__, key))
async def teardown(self):
# Clean up the cache directory
self.cache.clear(retry=True)

View file

@ -1,5 +1,6 @@
from typing import TYPE_CHECKING
from langflow.services.cache.disk import AsyncDiskCache
from langflow.services.cache.service import AsyncInMemoryCache, CacheService, RedisCache, ThreadingInMemoryCache
from langflow.services.factory import ServiceFactory
from langflow.utils.logger import logger
@ -36,3 +37,8 @@ class CacheServiceFactory(ServiceFactory):
return ThreadingInMemoryCache(expiration_time=settings_service.settings.cache_expire)
elif settings_service.settings.cache_type == "async":
return AsyncInMemoryCache(expiration_time=settings_service.settings.cache_expire)
elif settings_service.settings.cache_type == "disk":
return AsyncDiskCache(
cache_dir=settings_service.settings.config_dir,
expiration_time=settings_service.settings.cache_expire,
)

View file

@ -8,9 +8,7 @@ from typing import Generic, Optional
from loguru import logger
from langflow.services.cache.base import AsyncBaseCacheService, AsyncLockType, CacheService, LockType
from langflow.services.cache.utils import CacheMiss
CACHE_MISS = CacheMiss()
from langflow.services.cache.utils import CACHE_MISS
class ThreadingInMemoryCache(CacheService, Generic[LockType]): # type: ignore

View file

@ -166,3 +166,6 @@ def update_build_status(cache_service, flow_id: str, status: "BuildStatus"):
cache_service[flow_id] = cached_flow
cached_flow["status"] = status
cache_service[flow_id] = cached_flow
CACHE_MISS = CacheMiss()

View file

@ -3,7 +3,7 @@ import json
import os
from pathlib import Path
from shutil import copy2
from typing import Any, List, Optional, Tuple, Type
from typing import Any, List, Literal, Optional, Tuple, Type
import orjson
import yaml
@ -79,7 +79,7 @@ class Settings(BaseSettings):
"""SQLite pragmas to use when connecting to the database."""
# cache configuration
cache_type: str = "async"
cache_type: Literal["async", "redis", "memory", "disk"] = "async"
"""The cache type can be 'async' or 'redis'."""
cache_expire: int = 3600
"""The cache expire in seconds."""

View file

@ -1208,6 +1208,17 @@ files = [
graph = ["objgraph (>=1.7.2)"]
profile = ["gprof2dot (>=2022.7.29)"]
[[package]]
name = "diskcache"
version = "5.6.3"
description = "Disk Cache -- Disk and file backed persistent cache."
optional = false
python-versions = ">=3"
files = [
{file = "diskcache-5.6.3-py3-none-any.whl", hash = "sha256:5e31b2d5fbad117cc363ebaf6b689474db18a1f6438bc82358b024abd4c2ca19"},
{file = "diskcache-5.6.3.tar.gz", hash = "sha256:2c3a3fa2743d8535d832ec61c2054a1641f41775aa7c556758a109941e33e4fc"},
]
[[package]]
name = "distlib"
version = "0.3.8"
@ -7654,4 +7665,4 @@ local = []
[metadata]
lock-version = "2.0"
python-versions = ">=3.10,<3.13"
content-hash = "747dad35b9e5b1338a989ea6bfd4ac3465ba34f792639aeabda3c1ca9b40c689"
content-hash = "fe6710d7325bc2cceeaa298d94d6f1157cfe1533c2acbabe3ecdca5594d9e007"

View file

@ -79,6 +79,7 @@ filelock = "^3.15.4"
grandalf = "^0.8.0"
crewai = "^0.36.0"
spider-client = "^0.0.27"
diskcache = "^5.6.3"
[tool.poetry.extras]