diff --git a/src/backend/langflow/cache/manager.py b/src/backend/langflow/cache/manager.py index 947f5ce21..13b281008 100644 --- a/src/backend/langflow/cache/manager.py +++ b/src/backend/langflow/cache/manager.py @@ -54,7 +54,7 @@ class CacheManager(Subject): def __init__(self): super().__init__() - self.CACHE = {} + self._cache = {} self.current_client_id = None self.current_cache = {} @@ -68,12 +68,12 @@ class CacheManager(Subject): """ previous_client_id = self.current_client_id self.current_client_id = client_id - self.current_cache = self.CACHE.setdefault(client_id, {}) + self.current_cache = self._cache.setdefault(client_id, {}) try: yield finally: self.current_client_id = previous_client_id - self.current_cache = self.CACHE.get(self.current_client_id, {}) + self.current_cache = self._cache.get(self.current_client_id, {}) def add(self, name: str, obj: Any, obj_type: str, extension: Optional[str] = None): """ diff --git a/src/backend/langflow/cache/utils.py b/src/backend/langflow/cache/utils.py new file mode 100644 index 000000000..0f1ff5d92 --- /dev/null +++ b/src/backend/langflow/cache/utils.py @@ -0,0 +1,154 @@ +import base64 +import contextlib +import functools +import hashlib +import json +import os +import tempfile +from collections import OrderedDict +from pathlib import Path +from typing import Any, Dict + +import dill # type: ignore + +CACHE: Dict[str, Any] = {} + + +def create_cache_folder(func): + def wrapper(*args, **kwargs): + # Get the destination folder + cache_path = Path(tempfile.gettempdir()) / PREFIX + + # Create the destination folder if it doesn't exist + os.makedirs(cache_path, exist_ok=True) + + return func(*args, **kwargs) + + return wrapper + + +def memoize_dict(maxsize=128): + cache = OrderedDict() + + def decorator(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + hashed = compute_dict_hash(args[0]) + key = (func.__name__, hashed, frozenset(kwargs.items())) + if key not in cache: + result = func(*args, **kwargs) + cache[key] = result + if len(cache) > maxsize: + cache.popitem(last=False) + else: + result = cache[key] + return result + + def clear_cache(): + cache.clear() + + wrapper.clear_cache = clear_cache # type: ignore + wrapper.cache = cache # type: ignore + return wrapper + + return decorator + + +PREFIX = "langflow_cache" + + +@create_cache_folder +def clear_old_cache_files(max_cache_size: int = 3): + cache_dir = Path(tempfile.gettempdir()) / PREFIX + cache_files = list(cache_dir.glob("*.dill")) + + if len(cache_files) > max_cache_size: + cache_files_sorted_by_mtime = sorted( + cache_files, key=lambda x: x.stat().st_mtime, reverse=True + ) + + for cache_file in cache_files_sorted_by_mtime[max_cache_size:]: + with contextlib.suppress(OSError): + os.remove(cache_file) + + +def compute_dict_hash(graph_data): + graph_data = filter_json(graph_data) + + cleaned_graph_json = json.dumps(graph_data, sort_keys=True) + return hashlib.sha256(cleaned_graph_json.encode("utf-8")).hexdigest() + + +def filter_json(json_data): + filtered_data = json_data.copy() + + # Remove 'viewport' and 'chatHistory' keys + if "viewport" in filtered_data: + del filtered_data["viewport"] + if "chatHistory" in filtered_data: + del filtered_data["chatHistory"] + + # Filter nodes + if "nodes" in filtered_data: + for node in filtered_data["nodes"]: + if "position" in node: + del node["position"] + if "positionAbsolute" in node: + del node["positionAbsolute"] + if "selected" in node: + del node["selected"] + if "dragging" in node: + del node["dragging"] + + return filtered_data + + +@create_cache_folder +def save_binary_file(content: str, file_name: str, accepted_types: list[str]) -> str: + """ + Save a binary file to the specified folder. + + Args: + content: The content of the file as a bytes object. + file_name: The name of the file, including its extension. + + Returns: + The path to the saved file. + """ + if not any(file_name.endswith(suffix) for suffix in accepted_types): + raise ValueError(f"File {file_name} is not accepted") + + # Get the destination folder + cache_path = Path(tempfile.gettempdir()) / PREFIX + if not content: + raise ValueError("Please, reload the file in the loader.") + data = content.split(",")[1] + decoded_bytes = base64.b64decode(data) + + # Create the full file path + file_path = os.path.join(cache_path, file_name) + + # Save the binary content to the file + with open(file_path, "wb") as file: + file.write(decoded_bytes) + + return file_path + + +@create_cache_folder +def save_cache(hash_val: str, chat_data, clean_old_cache_files: bool): + cache_path = Path(tempfile.gettempdir()) / PREFIX / f"{hash_val}.dill" + with cache_path.open("wb") as cache_file: + dill.dump(chat_data, cache_file) + + if clean_old_cache_files: + clear_old_cache_files() + + +@create_cache_folder +def load_cache(hash_val): + cache_path = Path(tempfile.gettempdir()) / PREFIX / f"{hash_val}.dill" + if cache_path.exists(): + with cache_path.open("rb") as cache_file: + return dill.load(cache_file) + return None