Refactor cache update function
This commit is contained in:
parent
77ff531112
commit
875430592d
1 changed files with 8 additions and 59 deletions
67
src/backend/langflow/services/cache/utils.py
vendored
67
src/backend/langflow/services/cache/utils.py
vendored
|
|
@ -1,24 +1,23 @@
|
|||
import base64
|
||||
import contextlib
|
||||
import functools
|
||||
import hashlib
|
||||
import os
|
||||
import tempfile
|
||||
from collections import OrderedDict
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, Any, Dict
|
||||
from platformdirs import user_cache_dir
|
||||
|
||||
from fastapi import UploadFile
|
||||
from langflow.api.v1.schemas import BuildStatus
|
||||
from langflow.services.database.models.base import orjson_dumps
|
||||
from platformdirs import user_cache_dir
|
||||
|
||||
if TYPE_CHECKING:
|
||||
pass
|
||||
from langflow.api.v1.schemas import BuildStatus
|
||||
|
||||
CACHE: Dict[str, Any] = {}
|
||||
|
||||
CACHE_DIR = user_cache_dir("langflow", "langflow")
|
||||
|
||||
PREFIX = "langflow_cache"
|
||||
|
||||
|
||||
def create_cache_folder(func):
|
||||
def wrapper(*args, **kwargs):
|
||||
|
|
@ -33,50 +32,6 @@ def create_cache_folder(func):
|
|||
return wrapper
|
||||
|
||||
|
||||
def memoize_dict(maxsize=128):
|
||||
cache = OrderedDict()
|
||||
hash_to_key = {} # Mapping from hash to cache key
|
||||
|
||||
def decorator(func):
|
||||
@functools.wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
hashed = compute_dict_hash(args[0])
|
||||
key = (func.__name__, hashed, frozenset(kwargs.items()))
|
||||
if key not in cache:
|
||||
result = func(*args, **kwargs)
|
||||
cache[key] = result
|
||||
hash_to_key[hashed] = key # Store the mapping
|
||||
if len(cache) > maxsize:
|
||||
oldest_key = next(iter(cache))
|
||||
oldest_hash = oldest_key[1]
|
||||
del cache[oldest_key]
|
||||
del hash_to_key[oldest_hash]
|
||||
else:
|
||||
result = cache[key]
|
||||
|
||||
wrapper.session_id = hashed # Store hash in the wrapper
|
||||
return result
|
||||
|
||||
def clear_cache():
|
||||
cache.clear()
|
||||
hash_to_key.clear()
|
||||
|
||||
def get_result_by_session_id(session_id):
|
||||
key = hash_to_key.get(session_id)
|
||||
return cache.get(key) if key is not None else None
|
||||
|
||||
wrapper.clear_cache = clear_cache # type: ignore
|
||||
wrapper.get_result_by_session_id = get_result_by_session_id # type: ignore
|
||||
wrapper.hash = None
|
||||
wrapper.cache = cache # type: ignore
|
||||
return wrapper
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
PREFIX = "langflow_cache"
|
||||
|
||||
|
||||
@create_cache_folder
|
||||
def clear_old_cache_files(max_cache_size: int = 3):
|
||||
cache_dir = Path(tempfile.gettempdir()) / PREFIX
|
||||
|
|
@ -90,14 +45,6 @@ def clear_old_cache_files(max_cache_size: int = 3):
|
|||
os.remove(cache_file)
|
||||
|
||||
|
||||
def compute_dict_hash(graph_data):
|
||||
graph_data = filter_json(graph_data)
|
||||
|
||||
cleaned_graph_json = orjson_dumps(graph_data, sort_keys=True)
|
||||
|
||||
return hashlib.sha256(cleaned_graph_json.encode("utf-8")).hexdigest()
|
||||
|
||||
|
||||
def filter_json(json_data):
|
||||
filtered_data = json_data.copy()
|
||||
|
||||
|
|
@ -203,9 +150,11 @@ def save_uploaded_file(file: UploadFile, folder_name):
|
|||
return file_path
|
||||
|
||||
|
||||
def update_build_status(cache_service, flow_id: str, status: BuildStatus):
|
||||
def update_build_status(cache_service, flow_id: str, status: "BuildStatus"):
|
||||
cached_flow = cache_service[flow_id]
|
||||
if cached_flow is None:
|
||||
raise ValueError(f"Flow {flow_id} not found in cache")
|
||||
cached_flow["status"] = status
|
||||
cache_service[flow_id] = cached_flow
|
||||
cached_flow["status"] = status
|
||||
cache_service[flow_id] = cached_flow
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue