From 7414a01235115b8a05c69fd389c907f26d102227 Mon Sep 17 00:00:00 2001 From: himan-k Date: Wed, 10 Jul 2024 04:52:37 -0700 Subject: [PATCH] fixing ThreadingInMemoryCache usage (#2604) * ThreadingInMemoryCache usage is broken. This commit addresses those issues along with missing documentation about the caching options. * make lint & make unit_tests fixes * removing unnecessary changes from unclear test results in last run * [autofix.ci] apply automated fixes --------- Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- .env.example | 4 +- .gitignore | 1 + .vscode/launch.json | 2 + README.md | 11 +++ src/backend/base/langflow/api/utils.py | 2 +- src/backend/base/langflow/api/v1/chat.py | 2 +- src/backend/base/langflow/graph/graph/base.py | 2 +- src/backend/base/langflow/interface/types.py | 43 +++++++++- .../base/langflow/services/chat/service.py | 86 +++++++++++++++++-- 9 files changed, 138 insertions(+), 15 deletions(-) diff --git a/.env.example b/.env.example index 6ce9411f6..dac588a24 100644 --- a/.env.example +++ b/.env.example @@ -59,8 +59,8 @@ LANGFLOW_OPEN_BROWSER= # Example: LANGFLOW_REMOVE_API_KEYS=false LANGFLOW_REMOVE_API_KEYS= -# Whether to use RedisCache or InMemoryCache -# Values: memory, redis +# Whether to use RedisCache or ThreadingInMemoryCache or AsyncInMemoryCache +# Values: async, memory, redis # Example: LANGFLOW_CACHE_TYPE=memory # If you want to use redis then the following environment variables must be set: # LANGFLOW_REDIS_HOST (default: localhost) diff --git a/.gitignore b/.gitignore index e5ecbb32a..f3fb37a39 100644 --- a/.gitignore +++ b/.gitignore @@ -272,3 +272,4 @@ prof/* src/frontend/temp *-shm *-wal +.history diff --git a/.vscode/launch.json b/.vscode/launch.json index eaaf23f33..5929f0eff 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -1,6 +1,7 @@ { "version": "0.2.0", "configurations": [ + { "name": "Debug Backend", "type": "debugpy", @@ -38,6 +39,7 @@ "--env-file", "${workspaceFolder}/.env" ], + // "python": "/path/to/your/python_env/python", // Replace with the path to your Python executable "jinja": true, "justMyCode": false }, diff --git a/README.md b/README.md index df26f3cab..93218443a 100644 --- a/README.md +++ b/README.md @@ -42,9 +42,13 @@ # 📝 Content +- [](#) +- [📝 Content](#-content) - [📦 Get Started](#-get-started) - [🎨 Create Flows](#-create-flows) - [Deploy](#deploy) + - [DataStax Langflow](#datastax-langflow) + - [Deploy Langflow on Hugging Face Spaces](#deploy-langflow-on-hugging-face-spaces) - [Deploy Langflow on Google Cloud Platform](#deploy-langflow-on-google-cloud-platform) - [Deploy on Railway](#deploy-on-railway) - [Deploy on Render](#deploy-on-render) @@ -64,6 +68,13 @@ You can install Langflow with pip: # Make sure you have >=Python 3.10 installed on your system. python -m pip install langflow -U ``` +Or + +If you would like to install from your cloned repo, you can build and install Langflow's frontend and backend with: + +```shell +make install_frontend && make build_frontend && make install_backend +``` Then, run Langflow with: diff --git a/src/backend/base/langflow/api/utils.py b/src/backend/base/langflow/api/utils.py index c4414fc23..148c2130c 100644 --- a/src/backend/base/langflow/api/utils.py +++ b/src/backend/base/langflow/api/utils.py @@ -200,7 +200,7 @@ async def get_next_runnable_vertices( list: A list of IDs of the next runnable vertices. """ - async with chat_service._cache_locks[flow_id] as lock: + async with chat_service._async_cache_locks[flow_id] as lock: graph.remove_from_predecessors(vertex_id) direct_successors_ready = [v for v in vertex.successors_ids if graph.is_vertex_runnable(v)] if not direct_successors_ready: diff --git a/src/backend/base/langflow/api/v1/chat.py b/src/backend/base/langflow/api/v1/chat.py index 61155c43a..8c925e71f 100644 --- a/src/backend/base/langflow/api/v1/chat.py +++ b/src/backend/base/langflow/api/v1/chat.py @@ -188,7 +188,7 @@ async def build_vertex( vertex = graph.get_vertex(vertex_id) try: - lock = chat_service._cache_locks[flow_id_str] + lock = chat_service._async_cache_locks[flow_id_str] ( result_dict, params, diff --git a/src/backend/base/langflow/graph/graph/base.py b/src/backend/base/langflow/graph/graph/base.py index 6b81e2e78..62ecfdeee 100644 --- a/src/backend/base/langflow/graph/graph/base.py +++ b/src/backend/base/langflow/graph/graph/base.py @@ -951,7 +951,7 @@ class Graph: self.set_run_id(run_id) self.set_run_name() await self.initialize_run() - lock = chat_service._cache_locks[self.run_id] + lock = chat_service._async_cache_locks[self.run_id] while to_process: current_batch = list(to_process) # Copy current deque items to a list to_process.clear() # Clear the deque for new items diff --git a/src/backend/base/langflow/interface/types.py b/src/backend/base/langflow/interface/types.py index 4680e1d35..d067cebca 100644 --- a/src/backend/base/langflow/interface/types.py +++ b/src/backend/base/langflow/interface/types.py @@ -1,10 +1,10 @@ import asyncio import json - from typing import TYPE_CHECKING -from loguru import logger +from loguru import logger from langflow.custom.utils import abuild_custom_components, build_custom_components +from langflow.services.cache.base import AsyncBaseCacheService if TYPE_CHECKING: from langflow.services.cache.base import CacheService @@ -63,9 +63,44 @@ async def get_and_cache_all_types_dict( force_refresh: bool = False, lock: asyncio.Lock | None = None, ): - all_types_dict = await cache_service.get(key="all_types_dict", lock=lock) + async def get_from_cache(key): + """ + Retrieves a value from the cache based on the given key. + + Args: + key: The key to retrieve the value from the cache. + + Returns: + The value associated with the given key in the cache. + + Raises: + None. + """ + if isinstance(cache_service, AsyncBaseCacheService): + return await cache_service.get(key=key, lock=lock) + else: + return cache_service.get(key=key, lock=lock) + + async def set_in_cache(key, value): + """ + Sets the given key-value pair in the cache. + + Parameters: + - key: The key to set in the cache. + - value: The value to associate with the key in the cache. + + Returns: + None + """ + if isinstance(cache_service, AsyncBaseCacheService): + await cache_service.set(key=key, value=value, lock=lock) + else: + cache_service.set(key=key, value=value, lock=lock) + + all_types_dict = await get_from_cache("all_types_dict") if not all_types_dict or force_refresh: logger.debug("Building langchain types dict") all_types_dict = await aget_all_types_dict(settings_service.settings.components_path) - await cache_service.set(key="all_types_dict", value=all_types_dict, lock=lock) + await set_in_cache("all_types_dict", all_types_dict) + return all_types_dict diff --git a/src/backend/base/langflow/services/chat/service.py b/src/backend/base/langflow/services/chat/service.py index bb913e6e9..abf57e4e3 100644 --- a/src/backend/base/langflow/services/chat/service.py +++ b/src/backend/base/langflow/services/chat/service.py @@ -1,39 +1,113 @@ import asyncio from collections import defaultdict +from threading import RLock from typing import Any, Optional from langflow.services.base import Service +from langflow.services.cache.base import AsyncBaseCacheService from langflow.services.deps import get_cache_service class ChatService(Service): + """ + Service class for managing chat-related operations. + """ + name = "chat_service" def __init__(self): - self._cache_locks = defaultdict(asyncio.Lock) + self._async_cache_locks = defaultdict(asyncio.Lock) + self._sync_cache_locks = defaultdict(RLock) self.cache_service = get_cache_service() + def _get_lock(self, key: str): + """ + Retrieves the lock associated with the given key. + + Args: + key (str): The key to retrieve the lock for. + + Returns: + threading.Lock or asyncio.Lock: The lock associated with the given key. + """ + if isinstance(self.cache_service, AsyncBaseCacheService): + return self._async_cache_locks[key] + else: + return self._sync_cache_locks[key] + + async def _perform_cache_operation( + self, operation: str, key: str, data: Any = None, lock: Optional[asyncio.Lock] = None + ): + """ + Perform a cache operation based on the given operation type. + + Args: + operation (str): The type of cache operation to perform. Possible values are "upsert", "get", or "delete". + key (str): The key associated with the cache operation. + data (Any, optional): The data to be stored in the cache. Only applicable for "upsert" operation. Defaults to None. + lock (Optional[asyncio.Lock], optional): The lock to be used for the cache operation. Defaults to None. + + Returns: + Any: The result of the cache operation. Only applicable for "get" operation. + + Raises: + None + + """ + lock = lock or self._get_lock(key) + if isinstance(self.cache_service, AsyncBaseCacheService): + if operation == "upsert": + await self.cache_service.upsert(str(key), data, lock=lock) + elif operation == "get": + return await self.cache_service.get(key, lock=lock) + elif operation == "delete": + await self.cache_service.delete(key, lock=lock) + else: + if operation == "upsert": + self.cache_service.upsert(str(key), data, lock=lock) + elif operation == "get": + return self.cache_service.get(key, lock=lock) + elif operation == "delete": + self.cache_service.delete(key, lock=lock) + async def set_cache(self, key: str, data: Any, lock: Optional[asyncio.Lock] = None) -> bool: """ Set the cache for a client. + + Args: + key (str): The cache key. + data (Any): The data to be cached. + lock (Optional[asyncio.Lock], optional): The lock to use for the cache operation. Defaults to None. + + Returns: + bool: True if the cache was set successfully, False otherwise. """ - # client_id is the flow id but that already exists in the cache - # so we need to change it to something else result_dict = { "result": data, "type": type(data), } - await self.cache_service.upsert(str(key), result_dict, lock=lock or self._cache_locks[key]) + await self._perform_cache_operation("upsert", key, result_dict, lock) return key in self.cache_service async def get_cache(self, key: str, lock: Optional[asyncio.Lock] = None) -> Any: """ Get the cache for a client. + + Args: + key (str): The cache key. + lock (Optional[asyncio.Lock], optional): The lock to use for the cache operation. Defaults to None. + + Returns: + Any: The cached data. """ - return await self.cache_service.get(key, lock=lock or self._cache_locks[key]) + return await self._perform_cache_operation("get", key, lock=lock or self._get_lock(key)) async def clear_cache(self, key: str, lock: Optional[asyncio.Lock] = None): """ Clear the cache for a client. + + Args: + key (str): The cache key. + lock (Optional[asyncio.Lock], optional): The lock to use for the cache operation. Defaults to None. """ - await self.cache_service.delete(key, lock=lock or self._cache_locks[key]) + await self._perform_cache_operation("delete", key, lock=lock or self._get_lock(key))