From d5ad1522500a2095d1647a88a43cfee9bf0c0e36 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Sat, 5 Aug 2023 22:12:27 -0300 Subject: [PATCH] =?UTF-8?q?=F0=9F=93=A6=20chore(cache):=20add=20cache=20mo?= =?UTF-8?q?dule=20with=20cache=20manager,=20factory,=20base=20cache,=20and?= =?UTF-8?q?=20in-memory=20cache=20implementations?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ✨ feat(utils.py): add cache utility functions for memoization and file saving 🐛 fix(utils.py): fix cache folder creation to use the correct cache directory 🔧 chore(utils.py): refactor code to improve readability and remove unused imports --- .../langflow/services/cache/__init__.py | 11 ++ src/backend/langflow/services/cache/base.py | 84 ++++++++ .../langflow/services/cache/factory.py | 11 ++ src/backend/langflow/services/cache/flow.py | 146 ++++++++++++++ .../langflow/services/cache/manager.py | 153 +++++++++++++++ src/backend/langflow/services/cache/utils.py | 179 ++++++++++++++++++ 6 files changed, 584 insertions(+) create mode 100644 src/backend/langflow/services/cache/__init__.py create mode 100644 src/backend/langflow/services/cache/base.py create mode 100644 src/backend/langflow/services/cache/factory.py create mode 100644 src/backend/langflow/services/cache/flow.py create mode 100644 src/backend/langflow/services/cache/manager.py create mode 100644 src/backend/langflow/services/cache/utils.py diff --git a/src/backend/langflow/services/cache/__init__.py b/src/backend/langflow/services/cache/__init__.py new file mode 100644 index 000000000..79e143807 --- /dev/null +++ b/src/backend/langflow/services/cache/__init__.py @@ -0,0 +1,11 @@ +from . import factory, manager +from langflow.services.cache.manager import cache_manager +from langflow.services.cache.flow import InMemoryCache + + +__all__ = [ + "cache_manager", + "factory", + "manager", + "InMemoryCache", +] diff --git a/src/backend/langflow/services/cache/base.py b/src/backend/langflow/services/cache/base.py new file mode 100644 index 000000000..88cb3a1da --- /dev/null +++ b/src/backend/langflow/services/cache/base.py @@ -0,0 +1,84 @@ +import abc + + +class BaseCache(abc.ABC): + """ + Abstract base class for a cache. + """ + + @abc.abstractmethod + def get(self, key): + """ + Retrieve an item from the cache. + + Args: + key: The key of the item to retrieve. + + Returns: + The value associated with the key, or None if the key is not found. + """ + + @abc.abstractmethod + def set(self, key, value): + """ + Add an item to the cache. + + Args: + key: The key of the item. + value: The value to cache. + """ + + @abc.abstractmethod + def delete(self, key): + """ + Remove an item from the cache. + + Args: + key: The key of the item to remove. + """ + + @abc.abstractmethod + def clear(self): + """ + Clear all items from the cache. + """ + + @abc.abstractmethod + def __contains__(self, key): + """ + Check if the key is in the cache. + + Args: + key: The key of the item to check. + + Returns: + True if the key is in the cache, False otherwise. + """ + + @abc.abstractmethod + def __getitem__(self, key): + """ + Retrieve an item from the cache using the square bracket notation. + + Args: + key: The key of the item to retrieve. + """ + + @abc.abstractmethod + def __setitem__(self, key, value): + """ + Add an item to the cache using the square bracket notation. + + Args: + key: The key of the item. + value: The value to cache. + """ + + @abc.abstractmethod + def __delitem__(self, key): + """ + Remove an item from the cache using the square bracket notation. + + Args: + key: The key of the item to remove. + """ diff --git a/src/backend/langflow/services/cache/factory.py b/src/backend/langflow/services/cache/factory.py new file mode 100644 index 000000000..77f8d58d1 --- /dev/null +++ b/src/backend/langflow/services/cache/factory.py @@ -0,0 +1,11 @@ +from langflow.services.cache.manager import CacheManager +from langflow.services.factory import ServiceFactory + + +class CacheManagerFactory(ServiceFactory): + def __init__(self): + super().__init__(CacheManager) + + def create(self, settings_service): + # Here you would have logic to create and configure a CacheManager + return CacheManager() diff --git a/src/backend/langflow/services/cache/flow.py b/src/backend/langflow/services/cache/flow.py new file mode 100644 index 000000000..0c10c51e1 --- /dev/null +++ b/src/backend/langflow/services/cache/flow.py @@ -0,0 +1,146 @@ +import threading +import time +from collections import OrderedDict + +from langflow.services.cache.base import BaseCache + + +class InMemoryCache(BaseCache): + """ + A simple in-memory cache using an OrderedDict. + + This cache supports setting a maximum size and expiration time for cached items. + When the cache is full, it uses a Least Recently Used (LRU) eviction policy. + Thread-safe using a threading Lock. + + Attributes: + max_size (int, optional): Maximum number of items to store in the cache. + expiration_time (int, optional): Time in seconds after which a cached item expires. Default is 1 hour. + + Example: + + cache = InMemoryCache(max_size=3, expiration_time=5) + + # setting cache values + cache.set("a", 1) + cache.set("b", 2) + cache["c"] = 3 + + # getting cache values + a = cache.get("a") + b = cache["b"] + """ + + def __init__(self, max_size=None, expiration_time=60 * 60): + """ + Initialize a new InMemoryCache instance. + + Args: + max_size (int, optional): Maximum number of items to store in the cache. + expiration_time (int, optional): Time in seconds after which a cached item expires. Default is 1 hour. + """ + self._cache = OrderedDict() + self._lock = threading.Lock() + self.max_size = max_size + self.expiration_time = expiration_time + + def get(self, key): + """ + Retrieve an item from the cache. + + Args: + key: The key of the item to retrieve. + + Returns: + The value associated with the key, or None if the key is not found or the item has expired. + """ + with self._lock: + if key in self._cache: + item = self._cache.pop(key) + if ( + self.expiration_time is None + or time.time() - item["time"] < self.expiration_time + ): + # Move the key to the end to make it recently used + self._cache[key] = item + return item["value"] + else: + self.delete(key) + return None + + def set(self, key, value): + """ + Add an item to the cache. + + If the cache is full, the least recently used item is evicted. + + Args: + key: The key of the item. + value: The value to cache. + """ + with self._lock: + if key in self._cache: + # Remove existing key before re-inserting to update order + self.delete(key) + elif self.max_size and len(self._cache) >= self.max_size: + # Remove least recently used item + self._cache.popitem(last=False) + self._cache[key] = {"value": value, "time": time.time()} + + def get_or_set(self, key, value): + """ + Retrieve an item from the cache. If the item does not exist, set it with the provided value. + + Args: + key: The key of the item. + value: The value to cache if the item doesn't exist. + + Returns: + The cached value associated with the key. + """ + with self._lock: + if key in self._cache: + return self.get(key) + self.set(key, value) + return value + + def delete(self, key): + """ + Remove an item from the cache. + + Args: + key: The key of the item to remove. + """ + # with self._lock: + self._cache.pop(key, None) + + def clear(self): + """ + Clear all items from the cache. + """ + with self._lock: + self._cache.clear() + + def __contains__(self, key): + """Check if the key is in the cache.""" + return key in self._cache + + def __getitem__(self, key): + """Retrieve an item from the cache using the square bracket notation.""" + return self.get(key) + + def __setitem__(self, key, value): + """Add an item to the cache using the square bracket notation.""" + self.set(key, value) + + def __delitem__(self, key): + """Remove an item from the cache using the square bracket notation.""" + self.delete(key) + + def __len__(self): + """Return the number of items in the cache.""" + return len(self._cache) + + def __repr__(self): + """Return a string representation of the InMemoryCache instance.""" + return f"InMemoryCache(max_size={self.max_size}, expiration_time={self.expiration_time})" diff --git a/src/backend/langflow/services/cache/manager.py b/src/backend/langflow/services/cache/manager.py new file mode 100644 index 000000000..ce9a338ef --- /dev/null +++ b/src/backend/langflow/services/cache/manager.py @@ -0,0 +1,153 @@ +from contextlib import contextmanager +from typing import Any, Awaitable, Callable, List, Optional +from langflow.services.base import Service + +import pandas as pd +from PIL import Image + + +class Subject: + """Base class for implementing the observer pattern.""" + + def __init__(self): + self.observers: List[Callable[[], None]] = [] + + def attach(self, observer: Callable[[], None]): + """Attach an observer to the subject.""" + self.observers.append(observer) + + def detach(self, observer: Callable[[], None]): + """Detach an observer from the subject.""" + self.observers.remove(observer) + + def notify(self): + """Notify all observers about an event.""" + for observer in self.observers: + if observer is None: + continue + observer() + + +class AsyncSubject: + """Base class for implementing the async observer pattern.""" + + def __init__(self): + self.observers: List[Callable[[], Awaitable]] = [] + + def attach(self, observer: Callable[[], Awaitable]): + """Attach an observer to the subject.""" + self.observers.append(observer) + + def detach(self, observer: Callable[[], Awaitable]): + """Detach an observer from the subject.""" + self.observers.remove(observer) + + async def notify(self): + """Notify all observers about an event.""" + for observer in self.observers: + if observer is None: + continue + await observer() + + +class CacheManager(Subject, Service): + """Manages cache for different clients and notifies observers on changes.""" + + name = "cache_manager" + + def __init__(self): + super().__init__() + self._cache = {} + self.current_client_id = None + self.current_cache = {} + + @contextmanager + def set_client_id(self, client_id: str): + """ + Context manager to set the current client_id and associated cache. + + Args: + client_id (str): The client identifier. + """ + previous_client_id = self.current_client_id + self.current_client_id = client_id + self.current_cache = self._cache.setdefault(client_id, {}) + try: + yield + finally: + self.current_client_id = previous_client_id + self.current_cache = self._cache.get(self.current_client_id, {}) + + def add(self, name: str, obj: Any, obj_type: str, extension: Optional[str] = None): + """ + Add an object to the current client's cache. + + Args: + name (str): The cache key. + obj (Any): The object to cache. + obj_type (str): The type of the object. + """ + object_extensions = { + "image": "png", + "pandas": "csv", + } + if obj_type in object_extensions: + _extension = object_extensions[obj_type] + else: + _extension = type(obj).__name__.lower() + self.current_cache[name] = { + "obj": obj, + "type": obj_type, + "extension": extension or _extension, + } + self.notify() + + def add_pandas(self, name: str, obj: Any): + """ + Add a pandas DataFrame or Series to the current client's cache. + + Args: + name (str): The cache key. + obj (Any): The pandas DataFrame or Series object. + """ + if isinstance(obj, (pd.DataFrame, pd.Series)): + self.add(name, obj.to_csv(), "pandas", extension="csv") + else: + raise ValueError("Object is not a pandas DataFrame or Series") + + def add_image(self, name: str, obj: Any, extension: str = "png"): + """ + Add a PIL Image to the current client's cache. + + Args: + name (str): The cache key. + obj (Any): The PIL Image object. + """ + if isinstance(obj, Image.Image): + self.add(name, obj, "image", extension=extension) + else: + raise ValueError("Object is not a PIL Image") + + def get(self, name: str): + """ + Get an object from the current client's cache. + + Args: + name (str): The cache key. + + Returns: + The cached object associated with the given cache key. + """ + return self.current_cache[name] + + def get_last(self): + """ + Get the last added item in the current client's cache. + + Returns: + The last added item in the cache. + """ + return list(self.current_cache.values())[-1] + + +cache_manager = CacheManager() diff --git a/src/backend/langflow/services/cache/utils.py b/src/backend/langflow/services/cache/utils.py new file mode 100644 index 000000000..3deabe9f4 --- /dev/null +++ b/src/backend/langflow/services/cache/utils.py @@ -0,0 +1,179 @@ +import base64 +import contextlib +import functools +import hashlib +import json +import os +import tempfile +from collections import OrderedDict +from pathlib import Path +from typing import Any, Dict +from appdirs import user_cache_dir + +CACHE: Dict[str, Any] = {} + +CACHE_DIR = user_cache_dir("langflow", "langflow") + + +def create_cache_folder(func): + def wrapper(*args, **kwargs): + # Get the destination folder + cache_path = Path(CACHE_DIR) / PREFIX + + # Create the destination folder if it doesn't exist + os.makedirs(cache_path, exist_ok=True) + + return func(*args, **kwargs) + + return wrapper + + +def memoize_dict(maxsize=128): + cache = OrderedDict() + + def decorator(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + hashed = compute_dict_hash(args[0]) + key = (func.__name__, hashed, frozenset(kwargs.items())) + if key not in cache: + result = func(*args, **kwargs) + cache[key] = result + if len(cache) > maxsize: + cache.popitem(last=False) + else: + result = cache[key] + return result + + def clear_cache(): + cache.clear() + + wrapper.clear_cache = clear_cache # type: ignore + wrapper.cache = cache # type: ignore + return wrapper + + return decorator + + +PREFIX = "langflow_cache" + + +@create_cache_folder +def clear_old_cache_files(max_cache_size: int = 3): + cache_dir = Path(tempfile.gettempdir()) / PREFIX + cache_files = list(cache_dir.glob("*.dill")) + + if len(cache_files) > max_cache_size: + cache_files_sorted_by_mtime = sorted( + cache_files, key=lambda x: x.stat().st_mtime, reverse=True + ) + + for cache_file in cache_files_sorted_by_mtime[max_cache_size:]: + with contextlib.suppress(OSError): + os.remove(cache_file) + + +def compute_dict_hash(graph_data): + graph_data = filter_json(graph_data) + + cleaned_graph_json = json.dumps(graph_data, sort_keys=True) + return hashlib.sha256(cleaned_graph_json.encode("utf-8")).hexdigest() + + +def filter_json(json_data): + filtered_data = json_data.copy() + + # Remove 'viewport' and 'chatHistory' keys + if "viewport" in filtered_data: + del filtered_data["viewport"] + if "chatHistory" in filtered_data: + del filtered_data["chatHistory"] + + # Filter nodes + if "nodes" in filtered_data: + for node in filtered_data["nodes"]: + if "position" in node: + del node["position"] + if "positionAbsolute" in node: + del node["positionAbsolute"] + if "selected" in node: + del node["selected"] + if "dragging" in node: + del node["dragging"] + + return filtered_data + + +@create_cache_folder +def save_binary_file(content: str, file_name: str, accepted_types: list[str]) -> str: + """ + Save a binary file to the specified folder. + + Args: + content: The content of the file as a bytes object. + file_name: The name of the file, including its extension. + + Returns: + The path to the saved file. + """ + if not any(file_name.endswith(suffix) for suffix in accepted_types): + raise ValueError(f"File {file_name} is not accepted") + + # Get the destination folder + cache_path = Path(CACHE_DIR) / PREFIX + if not content: + raise ValueError("Please, reload the file in the loader.") + data = content.split(",")[1] + decoded_bytes = base64.b64decode(data) + + # Create the full file path + file_path = os.path.join(cache_path, file_name) + + # Save the binary content to the file + with open(file_path, "wb") as file: + file.write(decoded_bytes) + + return file_path + + +@create_cache_folder +def save_uploaded_file(file, folder_name): + """ + Save an uploaded file to the specified folder with a hash of its content as the file name. + + Args: + file: The uploaded file object. + folder_name: The name of the folder to save the file in. + + Returns: + The path to the saved file. + """ + cache_path = Path(CACHE_DIR) + folder_path = cache_path / folder_name + + # Create the folder if it doesn't exist + if not folder_path.exists(): + folder_path.mkdir() + + # Create a hash of the file content + sha256_hash = hashlib.sha256() + # Reset the file cursor to the beginning of the file + file.seek(0) + # Iterate over the uploaded file in small chunks to conserve memory + while chunk := file.read(8192): # Read 8KB at a time (adjust as needed) + sha256_hash.update(chunk) + + # Use the hex digest of the hash as the file name + hex_dig = sha256_hash.hexdigest() + file_name = hex_dig + + # Reset the file cursor to the beginning of the file + file.seek(0) + + # Save the file with the hash as its name + file_path = folder_path / file_name + with open(file_path, "wb") as new_file: + while chunk := file.read(8192): + new_file.write(chunk) + + return file_path