langflow/src/backend/langflow/cache/base.py
Gabriel Almeida 28ec8338f0 🐛 fix(endpoints.py): import save_uploaded_file function from cache.base module
🐛 fix(graph/base.py): change file_path assignment to use the new file_path key in value dict
🐛 fix(template/field/base.py): change content key to file_path key in result dict
 feat(endpoints.py): add endpoint to upload file
 feat(cache/base.py): add function to save uploaded file to cache
The save_uploaded_file function was not being imported in the endpoints.py module, causing a NameError when trying to use it. The file_path variable in the Node class was being assigned the value of the content key in the value dict, but it should be assigned the value of the file_path key instead. The content key in the TemplateFieldCreator class was changed to file_path to better reflect its purpose. An endpoint to upload files was added to the endpoints.py module, which uses the save_uploaded_file function from the cache.base module to save the uploaded file to the cache.
2023-05-30 23:40:32 -03:00

167 lines
4.6 KiB
Python

import base64
import contextlib
import functools
import hashlib
import json
import os
import tempfile
from collections import OrderedDict
from pathlib import Path
from typing import Any, Dict
import dill # type: ignore
CACHE: Dict[str, Any] = {}
def create_cache_folder(func):
def wrapper(*args, **kwargs):
# Get the destination folder
cache_path = Path(tempfile.gettempdir()) / PREFIX
# Create the destination folder if it doesn't exist
os.makedirs(cache_path, exist_ok=True)
return func(*args, **kwargs)
return wrapper
def memoize_dict(maxsize=128):
cache = OrderedDict()
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
hashed = compute_dict_hash(args[0])
key = (func.__name__, hashed, frozenset(kwargs.items()))
if key not in cache:
result = func(*args, **kwargs)
cache[key] = result
if len(cache) > maxsize:
cache.popitem(last=False)
else:
result = cache[key]
return result
def clear_cache():
cache.clear()
wrapper.clear_cache = clear_cache # type: ignore
wrapper.cache = cache # type: ignore
return wrapper
return decorator
PREFIX = "langflow_cache"
@create_cache_folder
def clear_old_cache_files(max_cache_size: int = 3):
cache_dir = Path(tempfile.gettempdir()) / PREFIX
cache_files = list(cache_dir.glob("*.dill"))
if len(cache_files) > max_cache_size:
cache_files_sorted_by_mtime = sorted(
cache_files, key=lambda x: x.stat().st_mtime, reverse=True
)
for cache_file in cache_files_sorted_by_mtime[max_cache_size:]:
with contextlib.suppress(OSError):
os.remove(cache_file)
def compute_dict_hash(graph_data):
graph_data = filter_json(graph_data)
cleaned_graph_json = json.dumps(graph_data, sort_keys=True)
return hashlib.sha256(cleaned_graph_json.encode("utf-8")).hexdigest()
def filter_json(json_data):
filtered_data = json_data.copy()
# Remove 'viewport' and 'chatHistory' keys
if "viewport" in filtered_data:
del filtered_data["viewport"]
if "chatHistory" in filtered_data:
del filtered_data["chatHistory"]
# Filter nodes
if "nodes" in filtered_data:
for node in filtered_data["nodes"]:
if "position" in node:
del node["position"]
if "positionAbsolute" in node:
del node["positionAbsolute"]
if "selected" in node:
del node["selected"]
if "dragging" in node:
del node["dragging"]
return filtered_data
@create_cache_folder
def save_binary_file(content: str, file_name: str, accepted_types: list[str]) -> str:
"""
Save a binary file to the specified folder.
Args:
content: The content of the file as a bytes object.
file_name: The name of the file, including its extension.
Returns:
The path to the saved file.
"""
if not any(file_name.endswith(suffix) for suffix in accepted_types):
raise ValueError(f"File {file_name} is not accepted")
# Get the destination folder
cache_path = Path(tempfile.gettempdir()) / PREFIX
if not content:
raise ValueError("Please, reload the file in the loader.")
data = content.split(",")[1]
decoded_bytes = base64.b64decode(data)
# Create the full file path
file_path = os.path.join(cache_path, file_name)
# Save the binary content to the file
with open(file_path, "wb") as file:
file.write(decoded_bytes)
return file_path
@create_cache_folder
def save_uploaded_file(file, file_name):
cache_path = Path(tempfile.gettempdir()) / PREFIX
file_path = cache_path / file_name
with open(file_path, "wb") as new_file:
# Iterate over the uploaded file in small chunks to conserve memory
while chunk := file.read(8192): # Read 8KB at a time (adjust as needed)
new_file.write(chunk)
return file_path
@create_cache_folder
def save_cache(hash_val: str, chat_data, clean_old_cache_files: bool):
cache_path = Path(tempfile.gettempdir()) / PREFIX / f"{hash_val}.dill"
with cache_path.open("wb") as cache_file:
dill.dump(chat_data, cache_file)
if clean_old_cache_files:
clear_old_cache_files()
@create_cache_folder
def load_cache(hash_val):
cache_path = Path(tempfile.gettempdir()) / PREFIX / f"{hash_val}.dill"
if cache_path.exists():
with cache_path.open("rb") as cache_file:
return dill.load(cache_file)
return None