🔧 chore(utils.py): remove unused imports and classes

🔥 refactor(utils.py): remove unused Memoize class and its methods
This commit is contained in:
Gabriel Luiz Freitas Almeida 2023-09-05 15:34:45 -03:00
commit d13f0e77df

View file

@ -6,12 +6,12 @@ import os
import tempfile
from collections import OrderedDict
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Dict
from typing import TYPE_CHECKING, Any, Dict
from appdirs import user_cache_dir
from langflow.services.database.models.base import orjson_dumps
if TYPE_CHECKING:
from langflow.services.cache.base import BaseCacheManager
pass
CACHE: Dict[str, Any] = {}
@ -195,44 +195,3 @@ def save_uploaded_file(file, folder_name):
new_file.write(chunk)
return file_path
class Memoize:
def __init__(
self,
get_cache_manager: Callable[[], "BaseCacheManager"],
):
self.get_cache_manager = get_cache_manager
self.hash_func = compute_dict_hash
def clear_cache(self, session_id):
cache_manager = self.get_cache_manager()
cache_manager.delete(session_id)
def get_result_by_session_id(self, session_id):
cache_manager = self.get_cache_manager()
return cache_manager.get(session_id)
def update_cache(self, session_id, value):
cache_manager = self.get_cache_manager()
cache_manager.set(session_id, value)
def __call__(self, func: Callable[..., Any]):
@functools.wraps(func)
def wrapper(*args, **kwargs):
cache_manager = self.get_cache_manager()
session_id = self.hash_func(args[0])
result = cache_manager.get(session_id)
if result is None:
result = func(*args, **kwargs)
cache_manager.set(session_id, result)
wrapper.session_id = session_id
return result
wrapper.clear_cache = self.clear_cache
wrapper.update_cache = self.update_cache
wrapper.get_result_by_session_id = self.get_result_by_session_id
return wrapper