📦 chore(cache): add cache module with cache manager, factory, base cache, and in-memory cache implementations

 feat(utils.py): add cache utility functions for memoization and file saving
🐛 fix(utils.py): fix cache folder creation to use the correct cache directory
🔧 chore(utils.py): refactor code to improve readability and remove unused imports
This commit is contained in:
Gabriel Luiz Freitas Almeida 2023-08-05 22:12:27 -03:00
commit d5ad152250
6 changed files with 584 additions and 0 deletions

View file

@ -0,0 +1,11 @@
from . import factory, manager
from langflow.services.cache.manager import cache_manager
from langflow.services.cache.flow import InMemoryCache
__all__ = [
"cache_manager",
"factory",
"manager",
"InMemoryCache",
]

View file

@ -0,0 +1,84 @@
import abc
class BaseCache(abc.ABC):
"""
Abstract base class for a cache.
"""
@abc.abstractmethod
def get(self, key):
"""
Retrieve an item from the cache.
Args:
key: The key of the item to retrieve.
Returns:
The value associated with the key, or None if the key is not found.
"""
@abc.abstractmethod
def set(self, key, value):
"""
Add an item to the cache.
Args:
key: The key of the item.
value: The value to cache.
"""
@abc.abstractmethod
def delete(self, key):
"""
Remove an item from the cache.
Args:
key: The key of the item to remove.
"""
@abc.abstractmethod
def clear(self):
"""
Clear all items from the cache.
"""
@abc.abstractmethod
def __contains__(self, key):
"""
Check if the key is in the cache.
Args:
key: The key of the item to check.
Returns:
True if the key is in the cache, False otherwise.
"""
@abc.abstractmethod
def __getitem__(self, key):
"""
Retrieve an item from the cache using the square bracket notation.
Args:
key: The key of the item to retrieve.
"""
@abc.abstractmethod
def __setitem__(self, key, value):
"""
Add an item to the cache using the square bracket notation.
Args:
key: The key of the item.
value: The value to cache.
"""
@abc.abstractmethod
def __delitem__(self, key):
"""
Remove an item from the cache using the square bracket notation.
Args:
key: The key of the item to remove.
"""

View file

@ -0,0 +1,11 @@
from langflow.services.cache.manager import CacheManager
from langflow.services.factory import ServiceFactory
class CacheManagerFactory(ServiceFactory):
def __init__(self):
super().__init__(CacheManager)
def create(self, settings_service):
# Here you would have logic to create and configure a CacheManager
return CacheManager()

View file

@ -0,0 +1,146 @@
import threading
import time
from collections import OrderedDict
from langflow.services.cache.base import BaseCache
class InMemoryCache(BaseCache):
"""
A simple in-memory cache using an OrderedDict.
This cache supports setting a maximum size and expiration time for cached items.
When the cache is full, it uses a Least Recently Used (LRU) eviction policy.
Thread-safe using a threading Lock.
Attributes:
max_size (int, optional): Maximum number of items to store in the cache.
expiration_time (int, optional): Time in seconds after which a cached item expires. Default is 1 hour.
Example:
cache = InMemoryCache(max_size=3, expiration_time=5)
# setting cache values
cache.set("a", 1)
cache.set("b", 2)
cache["c"] = 3
# getting cache values
a = cache.get("a")
b = cache["b"]
"""
def __init__(self, max_size=None, expiration_time=60 * 60):
"""
Initialize a new InMemoryCache instance.
Args:
max_size (int, optional): Maximum number of items to store in the cache.
expiration_time (int, optional): Time in seconds after which a cached item expires. Default is 1 hour.
"""
self._cache = OrderedDict()
self._lock = threading.Lock()
self.max_size = max_size
self.expiration_time = expiration_time
def get(self, key):
"""
Retrieve an item from the cache.
Args:
key: The key of the item to retrieve.
Returns:
The value associated with the key, or None if the key is not found or the item has expired.
"""
with self._lock:
if key in self._cache:
item = self._cache.pop(key)
if (
self.expiration_time is None
or time.time() - item["time"] < self.expiration_time
):
# Move the key to the end to make it recently used
self._cache[key] = item
return item["value"]
else:
self.delete(key)
return None
def set(self, key, value):
"""
Add an item to the cache.
If the cache is full, the least recently used item is evicted.
Args:
key: The key of the item.
value: The value to cache.
"""
with self._lock:
if key in self._cache:
# Remove existing key before re-inserting to update order
self.delete(key)
elif self.max_size and len(self._cache) >= self.max_size:
# Remove least recently used item
self._cache.popitem(last=False)
self._cache[key] = {"value": value, "time": time.time()}
def get_or_set(self, key, value):
"""
Retrieve an item from the cache. If the item does not exist, set it with the provided value.
Args:
key: The key of the item.
value: The value to cache if the item doesn't exist.
Returns:
The cached value associated with the key.
"""
with self._lock:
if key in self._cache:
return self.get(key)
self.set(key, value)
return value
def delete(self, key):
"""
Remove an item from the cache.
Args:
key: The key of the item to remove.
"""
# with self._lock:
self._cache.pop(key, None)
def clear(self):
"""
Clear all items from the cache.
"""
with self._lock:
self._cache.clear()
def __contains__(self, key):
"""Check if the key is in the cache."""
return key in self._cache
def __getitem__(self, key):
"""Retrieve an item from the cache using the square bracket notation."""
return self.get(key)
def __setitem__(self, key, value):
"""Add an item to the cache using the square bracket notation."""
self.set(key, value)
def __delitem__(self, key):
"""Remove an item from the cache using the square bracket notation."""
self.delete(key)
def __len__(self):
"""Return the number of items in the cache."""
return len(self._cache)
def __repr__(self):
"""Return a string representation of the InMemoryCache instance."""
return f"InMemoryCache(max_size={self.max_size}, expiration_time={self.expiration_time})"

View file

@ -0,0 +1,153 @@
from contextlib import contextmanager
from typing import Any, Awaitable, Callable, List, Optional
from langflow.services.base import Service
import pandas as pd
from PIL import Image
class Subject:
"""Base class for implementing the observer pattern."""
def __init__(self):
self.observers: List[Callable[[], None]] = []
def attach(self, observer: Callable[[], None]):
"""Attach an observer to the subject."""
self.observers.append(observer)
def detach(self, observer: Callable[[], None]):
"""Detach an observer from the subject."""
self.observers.remove(observer)
def notify(self):
"""Notify all observers about an event."""
for observer in self.observers:
if observer is None:
continue
observer()
class AsyncSubject:
"""Base class for implementing the async observer pattern."""
def __init__(self):
self.observers: List[Callable[[], Awaitable]] = []
def attach(self, observer: Callable[[], Awaitable]):
"""Attach an observer to the subject."""
self.observers.append(observer)
def detach(self, observer: Callable[[], Awaitable]):
"""Detach an observer from the subject."""
self.observers.remove(observer)
async def notify(self):
"""Notify all observers about an event."""
for observer in self.observers:
if observer is None:
continue
await observer()
class CacheManager(Subject, Service):
"""Manages cache for different clients and notifies observers on changes."""
name = "cache_manager"
def __init__(self):
super().__init__()
self._cache = {}
self.current_client_id = None
self.current_cache = {}
@contextmanager
def set_client_id(self, client_id: str):
"""
Context manager to set the current client_id and associated cache.
Args:
client_id (str): The client identifier.
"""
previous_client_id = self.current_client_id
self.current_client_id = client_id
self.current_cache = self._cache.setdefault(client_id, {})
try:
yield
finally:
self.current_client_id = previous_client_id
self.current_cache = self._cache.get(self.current_client_id, {})
def add(self, name: str, obj: Any, obj_type: str, extension: Optional[str] = None):
"""
Add an object to the current client's cache.
Args:
name (str): The cache key.
obj (Any): The object to cache.
obj_type (str): The type of the object.
"""
object_extensions = {
"image": "png",
"pandas": "csv",
}
if obj_type in object_extensions:
_extension = object_extensions[obj_type]
else:
_extension = type(obj).__name__.lower()
self.current_cache[name] = {
"obj": obj,
"type": obj_type,
"extension": extension or _extension,
}
self.notify()
def add_pandas(self, name: str, obj: Any):
"""
Add a pandas DataFrame or Series to the current client's cache.
Args:
name (str): The cache key.
obj (Any): The pandas DataFrame or Series object.
"""
if isinstance(obj, (pd.DataFrame, pd.Series)):
self.add(name, obj.to_csv(), "pandas", extension="csv")
else:
raise ValueError("Object is not a pandas DataFrame or Series")
def add_image(self, name: str, obj: Any, extension: str = "png"):
"""
Add a PIL Image to the current client's cache.
Args:
name (str): The cache key.
obj (Any): The PIL Image object.
"""
if isinstance(obj, Image.Image):
self.add(name, obj, "image", extension=extension)
else:
raise ValueError("Object is not a PIL Image")
def get(self, name: str):
"""
Get an object from the current client's cache.
Args:
name (str): The cache key.
Returns:
The cached object associated with the given cache key.
"""
return self.current_cache[name]
def get_last(self):
"""
Get the last added item in the current client's cache.
Returns:
The last added item in the cache.
"""
return list(self.current_cache.values())[-1]
cache_manager = CacheManager()

View file

@ -0,0 +1,179 @@
import base64
import contextlib
import functools
import hashlib
import json
import os
import tempfile
from collections import OrderedDict
from pathlib import Path
from typing import Any, Dict
from appdirs import user_cache_dir
CACHE: Dict[str, Any] = {}
CACHE_DIR = user_cache_dir("langflow", "langflow")
def create_cache_folder(func):
def wrapper(*args, **kwargs):
# Get the destination folder
cache_path = Path(CACHE_DIR) / PREFIX
# Create the destination folder if it doesn't exist
os.makedirs(cache_path, exist_ok=True)
return func(*args, **kwargs)
return wrapper
def memoize_dict(maxsize=128):
cache = OrderedDict()
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
hashed = compute_dict_hash(args[0])
key = (func.__name__, hashed, frozenset(kwargs.items()))
if key not in cache:
result = func(*args, **kwargs)
cache[key] = result
if len(cache) > maxsize:
cache.popitem(last=False)
else:
result = cache[key]
return result
def clear_cache():
cache.clear()
wrapper.clear_cache = clear_cache # type: ignore
wrapper.cache = cache # type: ignore
return wrapper
return decorator
PREFIX = "langflow_cache"
@create_cache_folder
def clear_old_cache_files(max_cache_size: int = 3):
cache_dir = Path(tempfile.gettempdir()) / PREFIX
cache_files = list(cache_dir.glob("*.dill"))
if len(cache_files) > max_cache_size:
cache_files_sorted_by_mtime = sorted(
cache_files, key=lambda x: x.stat().st_mtime, reverse=True
)
for cache_file in cache_files_sorted_by_mtime[max_cache_size:]:
with contextlib.suppress(OSError):
os.remove(cache_file)
def compute_dict_hash(graph_data):
graph_data = filter_json(graph_data)
cleaned_graph_json = json.dumps(graph_data, sort_keys=True)
return hashlib.sha256(cleaned_graph_json.encode("utf-8")).hexdigest()
def filter_json(json_data):
filtered_data = json_data.copy()
# Remove 'viewport' and 'chatHistory' keys
if "viewport" in filtered_data:
del filtered_data["viewport"]
if "chatHistory" in filtered_data:
del filtered_data["chatHistory"]
# Filter nodes
if "nodes" in filtered_data:
for node in filtered_data["nodes"]:
if "position" in node:
del node["position"]
if "positionAbsolute" in node:
del node["positionAbsolute"]
if "selected" in node:
del node["selected"]
if "dragging" in node:
del node["dragging"]
return filtered_data
@create_cache_folder
def save_binary_file(content: str, file_name: str, accepted_types: list[str]) -> str:
"""
Save a binary file to the specified folder.
Args:
content: The content of the file as a bytes object.
file_name: The name of the file, including its extension.
Returns:
The path to the saved file.
"""
if not any(file_name.endswith(suffix) for suffix in accepted_types):
raise ValueError(f"File {file_name} is not accepted")
# Get the destination folder
cache_path = Path(CACHE_DIR) / PREFIX
if not content:
raise ValueError("Please, reload the file in the loader.")
data = content.split(",")[1]
decoded_bytes = base64.b64decode(data)
# Create the full file path
file_path = os.path.join(cache_path, file_name)
# Save the binary content to the file
with open(file_path, "wb") as file:
file.write(decoded_bytes)
return file_path
@create_cache_folder
def save_uploaded_file(file, folder_name):
"""
Save an uploaded file to the specified folder with a hash of its content as the file name.
Args:
file: The uploaded file object.
folder_name: The name of the folder to save the file in.
Returns:
The path to the saved file.
"""
cache_path = Path(CACHE_DIR)
folder_path = cache_path / folder_name
# Create the folder if it doesn't exist
if not folder_path.exists():
folder_path.mkdir()
# Create a hash of the file content
sha256_hash = hashlib.sha256()
# Reset the file cursor to the beginning of the file
file.seek(0)
# Iterate over the uploaded file in small chunks to conserve memory
while chunk := file.read(8192): # Read 8KB at a time (adjust as needed)
sha256_hash.update(chunk)
# Use the hex digest of the hash as the file name
hex_dig = sha256_hash.hexdigest()
file_name = hex_dig
# Reset the file cursor to the beginning of the file
file.seek(0)
# Save the file with the hash as its name
file_path = folder_path / file_name
with open(file_path, "wb") as new_file:
while chunk := file.read(8192):
new_file.write(chunk)
return file_path