fixing ThreadingInMemoryCache usage (#2604)

* ThreadingInMemoryCache usage is broken. This commit addresses those issues along with missing documentation about the caching options.

* make lint & make unit_tests fixes

* removing unnecessary changes from unclear test results in  last run

* [autofix.ci] apply automated fixes

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
himan-k 2024-07-10 04:52:37 -07:00 committed by GitHub
commit 7414a01235
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 138 additions and 15 deletions

View file

@ -59,8 +59,8 @@ LANGFLOW_OPEN_BROWSER=
# Example: LANGFLOW_REMOVE_API_KEYS=false
LANGFLOW_REMOVE_API_KEYS=
# Whether to use RedisCache or InMemoryCache
# Values: memory, redis
# Whether to use RedisCache or ThreadingInMemoryCache or AsyncInMemoryCache
# Values: async, memory, redis
# Example: LANGFLOW_CACHE_TYPE=memory
# If you want to use redis then the following environment variables must be set:
# LANGFLOW_REDIS_HOST (default: localhost)

1
.gitignore vendored
View file

@ -272,3 +272,4 @@ prof/*
src/frontend/temp
*-shm
*-wal
.history

2
.vscode/launch.json vendored
View file

@ -1,6 +1,7 @@
{
"version": "0.2.0",
"configurations": [
{
"name": "Debug Backend",
"type": "debugpy",
@ -38,6 +39,7 @@
"--env-file",
"${workspaceFolder}/.env"
],
// "python": "/path/to/your/python_env/python", // Replace with the path to your Python executable
"jinja": true,
"justMyCode": false
},

View file

@ -42,9 +42,13 @@
# 📝 Content
- [](#)
- [📝 Content](#-content)
- [📦 Get Started](#-get-started)
- [🎨 Create Flows](#-create-flows)
- [Deploy](#deploy)
- [DataStax Langflow](#datastax-langflow)
- [Deploy Langflow on Hugging Face Spaces](#deploy-langflow-on-hugging-face-spaces)
- [Deploy Langflow on Google Cloud Platform](#deploy-langflow-on-google-cloud-platform)
- [Deploy on Railway](#deploy-on-railway)
- [Deploy on Render](#deploy-on-render)
@ -64,6 +68,13 @@ You can install Langflow with pip:
# Make sure you have >=Python 3.10 installed on your system.
python -m pip install langflow -U
```
Or
If you would like to install from your cloned repo, you can build and install Langflow's frontend and backend with:
```shell
make install_frontend && make build_frontend && make install_backend
```
Then, run Langflow with:

View file

@ -200,7 +200,7 @@ async def get_next_runnable_vertices(
list: A list of IDs of the next runnable vertices.
"""
async with chat_service._cache_locks[flow_id] as lock:
async with chat_service._async_cache_locks[flow_id] as lock:
graph.remove_from_predecessors(vertex_id)
direct_successors_ready = [v for v in vertex.successors_ids if graph.is_vertex_runnable(v)]
if not direct_successors_ready:

View file

@ -188,7 +188,7 @@ async def build_vertex(
vertex = graph.get_vertex(vertex_id)
try:
lock = chat_service._cache_locks[flow_id_str]
lock = chat_service._async_cache_locks[flow_id_str]
(
result_dict,
params,

View file

@ -951,7 +951,7 @@ class Graph:
self.set_run_id(run_id)
self.set_run_name()
await self.initialize_run()
lock = chat_service._cache_locks[self.run_id]
lock = chat_service._async_cache_locks[self.run_id]
while to_process:
current_batch = list(to_process) # Copy current deque items to a list
to_process.clear() # Clear the deque for new items

View file

@ -1,10 +1,10 @@
import asyncio
import json
from typing import TYPE_CHECKING
from loguru import logger
from loguru import logger
from langflow.custom.utils import abuild_custom_components, build_custom_components
from langflow.services.cache.base import AsyncBaseCacheService
if TYPE_CHECKING:
from langflow.services.cache.base import CacheService
@ -63,9 +63,44 @@ async def get_and_cache_all_types_dict(
force_refresh: bool = False,
lock: asyncio.Lock | None = None,
):
all_types_dict = await cache_service.get(key="all_types_dict", lock=lock)
async def get_from_cache(key):
"""
Retrieves a value from the cache based on the given key.
Args:
key: The key to retrieve the value from the cache.
Returns:
The value associated with the given key in the cache.
Raises:
None.
"""
if isinstance(cache_service, AsyncBaseCacheService):
return await cache_service.get(key=key, lock=lock)
else:
return cache_service.get(key=key, lock=lock)
async def set_in_cache(key, value):
"""
Sets the given key-value pair in the cache.
Parameters:
- key: The key to set in the cache.
- value: The value to associate with the key in the cache.
Returns:
None
"""
if isinstance(cache_service, AsyncBaseCacheService):
await cache_service.set(key=key, value=value, lock=lock)
else:
cache_service.set(key=key, value=value, lock=lock)
all_types_dict = await get_from_cache("all_types_dict")
if not all_types_dict or force_refresh:
logger.debug("Building langchain types dict")
all_types_dict = await aget_all_types_dict(settings_service.settings.components_path)
await cache_service.set(key="all_types_dict", value=all_types_dict, lock=lock)
await set_in_cache("all_types_dict", all_types_dict)
return all_types_dict

View file

@ -1,39 +1,113 @@
import asyncio
from collections import defaultdict
from threading import RLock
from typing import Any, Optional
from langflow.services.base import Service
from langflow.services.cache.base import AsyncBaseCacheService
from langflow.services.deps import get_cache_service
class ChatService(Service):
"""
Service class for managing chat-related operations.
"""
name = "chat_service"
def __init__(self):
self._cache_locks = defaultdict(asyncio.Lock)
self._async_cache_locks = defaultdict(asyncio.Lock)
self._sync_cache_locks = defaultdict(RLock)
self.cache_service = get_cache_service()
def _get_lock(self, key: str):
"""
Retrieves the lock associated with the given key.
Args:
key (str): The key to retrieve the lock for.
Returns:
threading.Lock or asyncio.Lock: The lock associated with the given key.
"""
if isinstance(self.cache_service, AsyncBaseCacheService):
return self._async_cache_locks[key]
else:
return self._sync_cache_locks[key]
async def _perform_cache_operation(
self, operation: str, key: str, data: Any = None, lock: Optional[asyncio.Lock] = None
):
"""
Perform a cache operation based on the given operation type.
Args:
operation (str): The type of cache operation to perform. Possible values are "upsert", "get", or "delete".
key (str): The key associated with the cache operation.
data (Any, optional): The data to be stored in the cache. Only applicable for "upsert" operation. Defaults to None.
lock (Optional[asyncio.Lock], optional): The lock to be used for the cache operation. Defaults to None.
Returns:
Any: The result of the cache operation. Only applicable for "get" operation.
Raises:
None
"""
lock = lock or self._get_lock(key)
if isinstance(self.cache_service, AsyncBaseCacheService):
if operation == "upsert":
await self.cache_service.upsert(str(key), data, lock=lock)
elif operation == "get":
return await self.cache_service.get(key, lock=lock)
elif operation == "delete":
await self.cache_service.delete(key, lock=lock)
else:
if operation == "upsert":
self.cache_service.upsert(str(key), data, lock=lock)
elif operation == "get":
return self.cache_service.get(key, lock=lock)
elif operation == "delete":
self.cache_service.delete(key, lock=lock)
async def set_cache(self, key: str, data: Any, lock: Optional[asyncio.Lock] = None) -> bool:
"""
Set the cache for a client.
Args:
key (str): The cache key.
data (Any): The data to be cached.
lock (Optional[asyncio.Lock], optional): The lock to use for the cache operation. Defaults to None.
Returns:
bool: True if the cache was set successfully, False otherwise.
"""
# client_id is the flow id but that already exists in the cache
# so we need to change it to something else
result_dict = {
"result": data,
"type": type(data),
}
await self.cache_service.upsert(str(key), result_dict, lock=lock or self._cache_locks[key])
await self._perform_cache_operation("upsert", key, result_dict, lock)
return key in self.cache_service
async def get_cache(self, key: str, lock: Optional[asyncio.Lock] = None) -> Any:
"""
Get the cache for a client.
Args:
key (str): The cache key.
lock (Optional[asyncio.Lock], optional): The lock to use for the cache operation. Defaults to None.
Returns:
Any: The cached data.
"""
return await self.cache_service.get(key, lock=lock or self._cache_locks[key])
return await self._perform_cache_operation("get", key, lock=lock or self._get_lock(key))
async def clear_cache(self, key: str, lock: Optional[asyncio.Lock] = None):
"""
Clear the cache for a client.
Args:
key (str): The cache key.
lock (Optional[asyncio.Lock], optional): The lock to use for the cache operation. Defaults to None.
"""
await self.cache_service.delete(key, lock=lock or self._cache_locks[key])
await self._perform_cache_operation("delete", key, lock=lock or self._get_lock(key))