From bdbc46f01ddf06e7c3ad350cad8b02ef02c5633c Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Sun, 11 Jun 2023 18:06:57 -0300 Subject: [PATCH 01/52] =?UTF-8?q?=F0=9F=90=9B=20fix(chat.py):=20add=20exce?= =?UTF-8?q?ption=20handling=20to=20post=5Fbuild=20endpoint=20=E2=9C=A8=20f?= =?UTF-8?q?eat(cache/=5F=5Finit=5F=5F.py):=20add=20InMemoryCache=20to=20ex?= =?UTF-8?q?ported=20modules=20=F0=9F=94=96=20refactor(endpoints.py):=20rem?= =?UTF-8?q?ove=20unused=20import=20statement=20The=20post=5Fbuild=20endpoi?= =?UTF-8?q?nt=20in=20chat.py=20now=20has=20exception=20handling=20to=20cat?= =?UTF-8?q?ch=20any=20errors=20that=20may=20occur=20during=20the=20build?= =?UTF-8?q?=20process.=20InMemoryCache=20is=20now=20exported=20as=20part?= =?UTF-8?q?=20of=20the=20cache=20module.=20The=20endpoints.py=20file=20has?= =?UTF-8?q?=20been=20refactored=20to=20remove=20an=20unused=20import=20sta?= =?UTF-8?q?tement.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/backend/langflow/api/v1/chat.py | 12 ++++++++++++ src/backend/langflow/api/v1/endpoints.py | 1 - src/backend/langflow/cache/__init__.py | 8 +++++++- 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/src/backend/langflow/api/v1/chat.py b/src/backend/langflow/api/v1/chat.py index 7df4c65ed..9619d5f30 100644 --- a/src/backend/langflow/api/v1/chat.py +++ b/src/backend/langflow/api/v1/chat.py @@ -1,5 +1,6 @@ from fastapi import ( APIRouter, + HTTPException, WebSocket, WebSocketDisconnect, WebSocketException, @@ -24,3 +25,14 @@ async def websocket_endpoint(client_id: str, websocket: WebSocket): except WebSocketDisconnect as exc: logger.error(exc) await websocket.close(code=status.WS_1000_NORMAL_CLOSURE, reason=str(exc)) + + +@router.post("/build/{client_id}") +async def post_build(client_id: str, graph_data: dict): + """Build langchain object from data_graph.""" + try: + if chat_manager.build(client_id, graph_data.get("data")): + return {"message": "Build successful"} + except Exception as exc: + logger.exception(exc) + raise HTTPException(status_code=500, detail=str(exc)) from exc diff --git a/src/backend/langflow/api/v1/endpoints.py b/src/backend/langflow/api/v1/endpoints.py index 0427eb291..bef1c4d1e 100644 --- a/src/backend/langflow/api/v1/endpoints.py +++ b/src/backend/langflow/api/v1/endpoints.py @@ -1,5 +1,4 @@ import logging -from importlib.metadata import version from fastapi import APIRouter, HTTPException diff --git a/src/backend/langflow/cache/__init__.py b/src/backend/langflow/cache/__init__.py index 583d5ac6d..723aa9e18 100644 --- a/src/backend/langflow/cache/__init__.py +++ b/src/backend/langflow/cache/__init__.py @@ -1 +1,7 @@ -from langflow.cache.manager import cache_manager # noqa +from langflow.cache.manager import cache_manager +from langflow.cache.flow import InMemoryCache + +__all__ = [ + "cache_manager", + "InMemoryCache", +] From a27e5d58ddd86d06c5457a401d7cbe58c2b61a21 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Sun, 11 Jun 2023 18:07:17 -0300 Subject: [PATCH 02/52] =?UTF-8?q?=F0=9F=94=A8=20refactor(base.py):=20refac?= =?UTF-8?q?tor=20BaseCache=20class=20to=20use=20abstract=20methods=20This?= =?UTF-8?q?=20commit=20refactors=20the=20BaseCache=20class=20to=20use=20ab?= =?UTF-8?q?stract=20methods=20instead=20of=20concrete=20methods.=20This=20?= =?UTF-8?q?makes=20the=20class=20more=20flexible=20and=20allows=20for=20di?= =?UTF-8?q?fferent=20implementations=20of=20the=20cache.=20The=20abstract?= =?UTF-8?q?=20methods=20include=20get,=20set,=20delete,=20clear,=20=5F=5Fc?= =?UTF-8?q?ontains=5F=5F,=20=5F=5Fgetitem=5F=5F,=20=5F=5Fsetitem=5F=5F,=20?= =?UTF-8?q?and=20=5F=5Fdelitem=5F=5F.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/backend/langflow/cache/base.py | 223 +++++++++++------------------ 1 file changed, 82 insertions(+), 141 deletions(-) diff --git a/src/backend/langflow/cache/base.py b/src/backend/langflow/cache/base.py index 0f1ff5d92..96639774b 100644 --- a/src/backend/langflow/cache/base.py +++ b/src/backend/langflow/cache/base.py @@ -1,154 +1,95 @@ -import base64 -import contextlib -import functools -import hashlib -import json -import os -import tempfile -from collections import OrderedDict -from pathlib import Path -from typing import Any, Dict - -import dill # type: ignore - -CACHE: Dict[str, Any] = {} +import abc -def create_cache_folder(func): - def wrapper(*args, **kwargs): - # Get the destination folder - cache_path = Path(tempfile.gettempdir()) / PREFIX - - # Create the destination folder if it doesn't exist - os.makedirs(cache_path, exist_ok=True) - - return func(*args, **kwargs) - - return wrapper - - -def memoize_dict(maxsize=128): - cache = OrderedDict() - - def decorator(func): - @functools.wraps(func) - def wrapper(*args, **kwargs): - hashed = compute_dict_hash(args[0]) - key = (func.__name__, hashed, frozenset(kwargs.items())) - if key not in cache: - result = func(*args, **kwargs) - cache[key] = result - if len(cache) > maxsize: - cache.popitem(last=False) - else: - result = cache[key] - return result - - def clear_cache(): - cache.clear() - - wrapper.clear_cache = clear_cache # type: ignore - wrapper.cache = cache # type: ignore - return wrapper - - return decorator - - -PREFIX = "langflow_cache" - - -@create_cache_folder -def clear_old_cache_files(max_cache_size: int = 3): - cache_dir = Path(tempfile.gettempdir()) / PREFIX - cache_files = list(cache_dir.glob("*.dill")) - - if len(cache_files) > max_cache_size: - cache_files_sorted_by_mtime = sorted( - cache_files, key=lambda x: x.stat().st_mtime, reverse=True - ) - - for cache_file in cache_files_sorted_by_mtime[max_cache_size:]: - with contextlib.suppress(OSError): - os.remove(cache_file) - - -def compute_dict_hash(graph_data): - graph_data = filter_json(graph_data) - - cleaned_graph_json = json.dumps(graph_data, sort_keys=True) - return hashlib.sha256(cleaned_graph_json.encode("utf-8")).hexdigest() - - -def filter_json(json_data): - filtered_data = json_data.copy() - - # Remove 'viewport' and 'chatHistory' keys - if "viewport" in filtered_data: - del filtered_data["viewport"] - if "chatHistory" in filtered_data: - del filtered_data["chatHistory"] - - # Filter nodes - if "nodes" in filtered_data: - for node in filtered_data["nodes"]: - if "position" in node: - del node["position"] - if "positionAbsolute" in node: - del node["positionAbsolute"] - if "selected" in node: - del node["selected"] - if "dragging" in node: - del node["dragging"] - - return filtered_data - - -@create_cache_folder -def save_binary_file(content: str, file_name: str, accepted_types: list[str]) -> str: +class BaseCache(abc.ABC): """ - Save a binary file to the specified folder. - - Args: - content: The content of the file as a bytes object. - file_name: The name of the file, including its extension. - - Returns: - The path to the saved file. + Abstract base class for a cache. """ - if not any(file_name.endswith(suffix) for suffix in accepted_types): - raise ValueError(f"File {file_name} is not accepted") - # Get the destination folder - cache_path = Path(tempfile.gettempdir()) / PREFIX - if not content: - raise ValueError("Please, reload the file in the loader.") - data = content.split(",")[1] - decoded_bytes = base64.b64decode(data) + @abc.abstractmethod + def get(self, key): + """ + Retrieve an item from the cache. - # Create the full file path - file_path = os.path.join(cache_path, file_name) + Args: + key: The key of the item to retrieve. - # Save the binary content to the file - with open(file_path, "wb") as file: - file.write(decoded_bytes) + Returns: + The value associated with the key, or None if the key is not found. + """ + pass - return file_path + @abc.abstractmethod + def set(self, key, value): + """ + Add an item to the cache. + Args: + key: The key of the item. + value: The value to cache. + """ + pass -@create_cache_folder -def save_cache(hash_val: str, chat_data, clean_old_cache_files: bool): - cache_path = Path(tempfile.gettempdir()) / PREFIX / f"{hash_val}.dill" - with cache_path.open("wb") as cache_file: - dill.dump(chat_data, cache_file) + @abc.abstractmethod + def delete(self, key): + """ + Remove an item from the cache. - if clean_old_cache_files: - clear_old_cache_files() + Args: + key: The key of the item to remove. + """ + pass + @abc.abstractmethod + def clear(self): + """ + Clear all items from the cache. + """ + pass -@create_cache_folder -def load_cache(hash_val): - cache_path = Path(tempfile.gettempdir()) / PREFIX / f"{hash_val}.dill" - if cache_path.exists(): - with cache_path.open("rb") as cache_file: - return dill.load(cache_file) - return None + @abc.abstractmethod + def __contains__(self, key): + """ + Check if the key is in the cache. + + Args: + key: The key of the item to check. + + Returns: + True if the key is in the cache, False otherwise. + """ + pass + + @abc.abstractmethod + def __getitem__(self, key): + """ + Retrieve an item from the cache using the square bracket notation. + + Args: + key: The key of the item to retrieve. + + Returns: + The value associated with the key, or None if the key is not found. + """ + pass + + @abc.abstractmethod + def __setitem__(self, key, value): + """ + Add an item to the cache using the square bracket notation. + + Args: + key: The key of the item. + value: The value to cache. + """ + pass + + @abc.abstractmethod + def __delitem__(self, key): + """ + Remove an item from the cache using the square bracket notation. + + Args: + key: The key of the item to remove. + """ + pass From 02163cf7759d28f701e942e2de83288d1f42475e Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Sun, 11 Jun 2023 18:07:28 -0300 Subject: [PATCH 03/52] =?UTF-8?q?=E2=9C=A8=20feat(flow.py):=20add=20InMemo?= =?UTF-8?q?ryCache=20class=20to=20implement=20an=20in-memory=20cache=20wit?= =?UTF-8?q?h=20LRU=20eviction=20policy=20This=20commit=20adds=20a=20new=20?= =?UTF-8?q?class=20InMemoryCache=20to=20implement=20an=20in-memory=20cache?= =?UTF-8?q?=20with=20a=20Least=20Recently=20Used=20(LRU)=20eviction=20poli?= =?UTF-8?q?cy.=20The=20cache=20supports=20setting=20a=20maximum=20size=20a?= =?UTF-8?q?nd=20expiration=20time=20for=20cached=20items.=20The=20cache=20?= =?UTF-8?q?is=20thread-safe=20using=20a=20threading=20Lock.=20The=20InMemo?= =?UTF-8?q?ryCache=20class=20inherits=20from=20the=20BaseCache=20class=20a?= =?UTF-8?q?nd=20implements=20the=20get,=20set,=20get=5For=5Fset,=20delete,?= =?UTF-8?q?=20clear,=20=5F=5Fcontains=5F=5F,=20=5F=5Fgetitem=5F=5F,=20=5F?= =?UTF-8?q?=5Fsetitem=5F=5F,=20=5F=5Fdelitem=5F=5F,=20=5F=5Flen=5F=5F,=20a?= =?UTF-8?q?nd=20=5F=5Frepr=5F=5F=20methods.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/backend/langflow/cache/flow.py | 146 +++++++++++++++++++++++++++++ 1 file changed, 146 insertions(+) create mode 100644 src/backend/langflow/cache/flow.py diff --git a/src/backend/langflow/cache/flow.py b/src/backend/langflow/cache/flow.py new file mode 100644 index 000000000..6d8fee977 --- /dev/null +++ b/src/backend/langflow/cache/flow.py @@ -0,0 +1,146 @@ +import threading +import time +from collections import OrderedDict + +from langflow.cache.base import BaseCache + + +class InMemoryCache(BaseCache): + """ + A simple in-memory cache using an OrderedDict. + + This cache supports setting a maximum size and expiration time for cached items. + When the cache is full, it uses a Least Recently Used (LRU) eviction policy. + Thread-safe using a threading Lock. + + Attributes: + max_size (int, optional): Maximum number of items to store in the cache. + expiration_time (int, optional): Time in seconds after which a cached item expires. Default is 1 hour. + + Example: + + cache = InMemoryCache(max_size=3, expiration_time=5) + + # setting cache values + cache.set("a", 1) + cache.set("b", 2) + cache["c"] = 3 + + # getting cache values + a = cache.get("a") + b = cache["b"] + """ + + def __init__(self, max_size=None, expiration_time=60 * 60): + """ + Initialize a new InMemoryCache instance. + + Args: + max_size (int, optional): Maximum number of items to store in the cache. + expiration_time (int, optional): Time in seconds after which a cached item expires. Default is 1 hour. + """ + self._cache = OrderedDict() + self._lock = threading.Lock() + self.max_size = max_size + self.expiration_time = expiration_time + + def get(self, key): + """ + Retrieve an item from the cache. + + Args: + key: The key of the item to retrieve. + + Returns: + The value associated with the key, or None if the key is not found or the item has expired. + """ + with self._lock: + if key in self._cache: + item = self._cache.pop(key) + if ( + self.expiration_time is None + or time.time() - item["time"] < self.expiration_time + ): + # Move the key to the end to make it recently used + self._cache[key] = item + return item["value"] + else: + self.delete(key) + return None + + def set(self, key, value): + """ + Add an item to the cache. + + If the cache is full, the least recently used item is evicted. + + Args: + key: The key of the item. + value: The value to cache. + """ + with self._lock: + if key in self._cache: + # Remove existing key before re-inserting to update order + self.delete(key) + elif self.max_size and len(self._cache) >= self.max_size: + # Remove least recently used item + self._cache.popitem(last=False) + self._cache[key] = {"value": value, "time": time.time()} + + def get_or_set(self, key, value): + """ + Retrieve an item from the cache. If the item does not exist, set it with the provided value. + + Args: + key: The key of the item. + value: The value to cache if the item doesn't exist. + + Returns: + The cached value associated with the key. + """ + with self._lock: + if key in self._cache: + return self.get(key) + self.set(key, value) + return value + + def delete(self, key): + """ + Remove an item from the cache. + + Args: + key: The key of the item to remove. + """ + # with self._lock: + self._cache.pop(key, None) + + def clear(self): + """ + Clear all items from the cache. + """ + with self._lock: + self._cache.clear() + + def __contains__(self, key): + """Check if the key is in the cache.""" + return key in self._cache + + def __getitem__(self, key): + """Retrieve an item from the cache using the square bracket notation.""" + return self.get(key) + + def __setitem__(self, key, value): + """Add an item to the cache using the square bracket notation.""" + self.set(key, value) + + def __delitem__(self, key): + """Remove an item from the cache using the square bracket notation.""" + self.delete(key) + + def __len__(self): + """Return the number of items in the cache.""" + return len(self._cache) + + def __repr__(self): + """Return a string representation of the InMemoryCache instance.""" + return f"InMemoryCache(max_size={self.max_size}, expiration_time={self.expiration_time})" From dacc90d9016d4df56f7e4a639841691bb6e84b27 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Sun, 11 Jun 2023 18:07:45 -0300 Subject: [PATCH 04/52] =?UTF-8?q?=F0=9F=90=9B=20fix(cache/manager.py):=20c?= =?UTF-8?q?hange=20CACHE=20variable=20to=20=5Fcache=20to=20follow=20naming?= =?UTF-8?q?=20conventions=20=E2=9C=A8=20feat(cache/utils.py):=20add=20supp?= =?UTF-8?q?ort=20for=20clearing=20old=20cache=20files=20and=20memoization?= =?UTF-8?q?=20of=20dictionary=20functions=20The=20CACHE=20variable=20in=20?= =?UTF-8?q?CacheManager=20class=20has=20been=20renamed=20to=20=5Fcache=20t?= =?UTF-8?q?o=20follow=20naming=20conventions.=20The=20utils.py=20module=20?= =?UTF-8?q?now=20supports=20clearing=20old=20cache=20files=20and=20memoiza?= =?UTF-8?q?tion=20of=20dictionary=20functions.=20The=20clear=5Fold=5Fcache?= =?UTF-8?q?=5Ffiles=20function=20removes=20old=20cache=20files=20if=20the?= =?UTF-8?q?=20number=20of=20cache=20files=20exceeds=20the=20max=5Fcache=5F?= =?UTF-8?q?size.=20The=20memoize=5Fdict=20decorator=20memoizes=20dictionar?= =?UTF-8?q?y=20functions=20and=20clears=20the=20cache=20when=20the=20clear?= =?UTF-8?q?=5Fcache=20method=20is=20called.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/backend/langflow/cache/manager.py | 6 +- src/backend/langflow/cache/utils.py | 154 ++++++++++++++++++++++++++ 2 files changed, 157 insertions(+), 3 deletions(-) create mode 100644 src/backend/langflow/cache/utils.py diff --git a/src/backend/langflow/cache/manager.py b/src/backend/langflow/cache/manager.py index 947f5ce21..13b281008 100644 --- a/src/backend/langflow/cache/manager.py +++ b/src/backend/langflow/cache/manager.py @@ -54,7 +54,7 @@ class CacheManager(Subject): def __init__(self): super().__init__() - self.CACHE = {} + self._cache = {} self.current_client_id = None self.current_cache = {} @@ -68,12 +68,12 @@ class CacheManager(Subject): """ previous_client_id = self.current_client_id self.current_client_id = client_id - self.current_cache = self.CACHE.setdefault(client_id, {}) + self.current_cache = self._cache.setdefault(client_id, {}) try: yield finally: self.current_client_id = previous_client_id - self.current_cache = self.CACHE.get(self.current_client_id, {}) + self.current_cache = self._cache.get(self.current_client_id, {}) def add(self, name: str, obj: Any, obj_type: str, extension: Optional[str] = None): """ diff --git a/src/backend/langflow/cache/utils.py b/src/backend/langflow/cache/utils.py new file mode 100644 index 000000000..0f1ff5d92 --- /dev/null +++ b/src/backend/langflow/cache/utils.py @@ -0,0 +1,154 @@ +import base64 +import contextlib +import functools +import hashlib +import json +import os +import tempfile +from collections import OrderedDict +from pathlib import Path +from typing import Any, Dict + +import dill # type: ignore + +CACHE: Dict[str, Any] = {} + + +def create_cache_folder(func): + def wrapper(*args, **kwargs): + # Get the destination folder + cache_path = Path(tempfile.gettempdir()) / PREFIX + + # Create the destination folder if it doesn't exist + os.makedirs(cache_path, exist_ok=True) + + return func(*args, **kwargs) + + return wrapper + + +def memoize_dict(maxsize=128): + cache = OrderedDict() + + def decorator(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + hashed = compute_dict_hash(args[0]) + key = (func.__name__, hashed, frozenset(kwargs.items())) + if key not in cache: + result = func(*args, **kwargs) + cache[key] = result + if len(cache) > maxsize: + cache.popitem(last=False) + else: + result = cache[key] + return result + + def clear_cache(): + cache.clear() + + wrapper.clear_cache = clear_cache # type: ignore + wrapper.cache = cache # type: ignore + return wrapper + + return decorator + + +PREFIX = "langflow_cache" + + +@create_cache_folder +def clear_old_cache_files(max_cache_size: int = 3): + cache_dir = Path(tempfile.gettempdir()) / PREFIX + cache_files = list(cache_dir.glob("*.dill")) + + if len(cache_files) > max_cache_size: + cache_files_sorted_by_mtime = sorted( + cache_files, key=lambda x: x.stat().st_mtime, reverse=True + ) + + for cache_file in cache_files_sorted_by_mtime[max_cache_size:]: + with contextlib.suppress(OSError): + os.remove(cache_file) + + +def compute_dict_hash(graph_data): + graph_data = filter_json(graph_data) + + cleaned_graph_json = json.dumps(graph_data, sort_keys=True) + return hashlib.sha256(cleaned_graph_json.encode("utf-8")).hexdigest() + + +def filter_json(json_data): + filtered_data = json_data.copy() + + # Remove 'viewport' and 'chatHistory' keys + if "viewport" in filtered_data: + del filtered_data["viewport"] + if "chatHistory" in filtered_data: + del filtered_data["chatHistory"] + + # Filter nodes + if "nodes" in filtered_data: + for node in filtered_data["nodes"]: + if "position" in node: + del node["position"] + if "positionAbsolute" in node: + del node["positionAbsolute"] + if "selected" in node: + del node["selected"] + if "dragging" in node: + del node["dragging"] + + return filtered_data + + +@create_cache_folder +def save_binary_file(content: str, file_name: str, accepted_types: list[str]) -> str: + """ + Save a binary file to the specified folder. + + Args: + content: The content of the file as a bytes object. + file_name: The name of the file, including its extension. + + Returns: + The path to the saved file. + """ + if not any(file_name.endswith(suffix) for suffix in accepted_types): + raise ValueError(f"File {file_name} is not accepted") + + # Get the destination folder + cache_path = Path(tempfile.gettempdir()) / PREFIX + if not content: + raise ValueError("Please, reload the file in the loader.") + data = content.split(",")[1] + decoded_bytes = base64.b64decode(data) + + # Create the full file path + file_path = os.path.join(cache_path, file_name) + + # Save the binary content to the file + with open(file_path, "wb") as file: + file.write(decoded_bytes) + + return file_path + + +@create_cache_folder +def save_cache(hash_val: str, chat_data, clean_old_cache_files: bool): + cache_path = Path(tempfile.gettempdir()) / PREFIX / f"{hash_val}.dill" + with cache_path.open("wb") as cache_file: + dill.dump(chat_data, cache_file) + + if clean_old_cache_files: + clear_old_cache_files() + + +@create_cache_folder +def load_cache(hash_val): + cache_path = Path(tempfile.gettempdir()) / PREFIX / f"{hash_val}.dill" + if cache_path.exists(): + with cache_path.open("rb") as cache_file: + return dill.load(cache_file) + return None From ccf9477b7f64d50da489350a263c49c2c87f9512 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Sun, 11 Jun 2023 18:08:01 -0300 Subject: [PATCH 05/52] =?UTF-8?q?=F0=9F=90=9B=20fix(manager.py):=20add=20c?= =?UTF-8?q?heck=20for=20langchain=20object=20in=20process=5Fmessage=20to?= =?UTF-8?q?=20avoid=20errors=20=E2=9C=A8=20feat(manager.py):=20add=20build?= =?UTF-8?q?=20method=20to=20build=20langchain=20object=20and=20store=20it?= =?UTF-8?q?=20in=20an=20in-memory=20cache=20The=20`process=5Fmessage`=20me?= =?UTF-8?q?thod=20now=20checks=20if=20the=20langchain=20object=20has=20bee?= =?UTF-8?q?n=20built=20and=20stored=20in=20the=20in-memory=20cache=20befor?= =?UTF-8?q?e=20processing=20the=20message.=20If=20the=20object=20is=20not?= =?UTF-8?q?=20found,=20the=20connection=20is=20closed=20with=20an=20error?= =?UTF-8?q?=20message.=20A=20new=20`build`=20method=20has=20been=20added?= =?UTF-8?q?=20to=20build=20the=20langchain=20object=20and=20store=20it=20i?= =?UTF-8?q?n=20an=20in-memory=20cache.=20This=20method=20is=20called=20bef?= =?UTF-8?q?ore=20processing=20any=20messages.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/backend/langflow/chat/manager.py | 51 ++++++++++++++++++++++------ src/backend/langflow/chat/utils.py | 4 +-- 2 files changed, 41 insertions(+), 14 deletions(-) diff --git a/src/backend/langflow/chat/manager.py b/src/backend/langflow/chat/manager.py index d24057b68..4169ecc92 100644 --- a/src/backend/langflow/chat/manager.py +++ b/src/backend/langflow/chat/manager.py @@ -10,7 +10,10 @@ from langflow.utils.logger import logger import asyncio import json -from typing import Dict, List +from typing import Any, Dict, List + +from langflow.cache.flow import InMemoryCache +from langflow.graph import Graph class ChatHistory(Subject): @@ -46,6 +49,7 @@ class ChatManager: self.chat_history = ChatHistory() self.cache_manager = cache_manager self.cache_manager.attach(self.update) + self.in_memory_cache = InMemoryCache() def on_chat_history_update(self): """Send the last chat message to the client.""" @@ -99,24 +103,30 @@ class ChatManager: websocket = self.active_connections[client_id] await websocket.send_json(message.dict()) - async def process_message(self, client_id: str, payload: Dict): + async def close_connection(self, client_id: str, code: status, reason: str): + if websocket := self.active_connections[client_id]: + await websocket.close(code=code, reason=reason) + self.disconnect(client_id) + + async def process_message( + self, client_id: str, payload: Dict, langchain_object: Any + ): # Process the graph data and chat message chat_message = payload.pop("message", "") chat_message = ChatMessage(message=chat_message) self.chat_history.add_message(client_id, chat_message) - graph_data = payload + # graph_data = payload start_resp = ChatResponse(message=None, type="start", intermediate_steps="") await self.send_json(client_id, start_resp) - is_first_message = len(self.chat_history.get_history(client_id=client_id)) <= 1 + # is_first_message = len(self.chat_history.get_history(client_id=client_id)) <= 1 # Generate result and thought try: logger.debug("Generating result and thought") result, intermediate_steps = await process_graph( - graph_data=graph_data, - is_first_message=is_first_message, + langchain_object=langchain_object, chat_message=chat_message, websocket=self.active_connections[client_id], ) @@ -149,6 +159,17 @@ class ChatManager: await self.send_json(client_id, response) self.chat_history.add_message(client_id, response) + def build(self, client_id: str, graph_data: Dict) -> bool: + """ + Build the langchain object and set the streaming options, + then store it in the in-memory cache. + """ + logger.debug("Building langchain object") + graph = Graph.from_payload(graph_data) + langchain_object = graph.build() + self.in_memory_cache.set(client_id, langchain_object) + return client_id in self.in_memory_cache + async def handle_websocket(self, client_id: str, websocket: WebSocket): await self.connect(client_id, websocket) @@ -169,16 +190,24 @@ class ChatManager: continue with self.cache_manager.set_client_id(client_id): - await self.process_message(client_id, payload) + if client_id not in self.in_memory_cache: + self.close_connection( + client_id=client_id, + code=status.WS_1011_INTERNAL_ERROR, + reason="Please, build the flow before sending messages", + ) + else: + langchain_object = self.in_memory_cache.get(client_id) + await self.process_message(client_id, payload, langchain_object) except Exception as e: # Handle any exceptions that might occur logger.exception(e) - # send a message to the client - await self.active_connections[client_id].close( - code=status.WS_1011_INTERNAL_ERROR, reason=str(e)[:120] + self.close_connection( + client_id=client_id, + code=status.WS_1011_INTERNAL_ERROR, + reason=str(e)[:120], ) - self.disconnect(client_id) finally: try: connection = self.active_connections.get(client_id) diff --git a/src/backend/langflow/chat/utils.py b/src/backend/langflow/chat/utils.py index 410a442be..9473cc133 100644 --- a/src/backend/langflow/chat/utils.py +++ b/src/backend/langflow/chat/utils.py @@ -12,12 +12,10 @@ from typing import Dict async def process_graph( - graph_data: Dict, - is_first_message: bool, + langchain_object, chat_message: ChatMessage, websocket: WebSocket, ): - langchain_object = load_or_build_langchain_object(graph_data, is_first_message) langchain_object = try_setting_streaming_options(langchain_object, websocket) logger.debug("Loaded langchain object") From f2e4a93df5394c38af4173d0aca9866e409e4cc5 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Sun, 11 Jun 2023 18:08:20 -0300 Subject: [PATCH 06/52] =?UTF-8?q?=F0=9F=90=9B=20fix(base.py):=20remove=20u?= =?UTF-8?q?nused=20import=20of=20logger=20=F0=9F=8E=A8=20refactor(run.py):?= =?UTF-8?q?=20update=20import=20statement=20for=20compute=5Fdict=5Fhash,?= =?UTF-8?q?=20load=5Fcache,=20and=20memoize=5Fdict=20The=20import=20of=20l?= =?UTF-8?q?ogger=20in=20base.py=20was=20not=20being=20used,=20so=20it=20wa?= =?UTF-8?q?s=20removed=20to=20improve=20code=20readability.=20In=20run.py,?= =?UTF-8?q?=20the=20import=20statement=20for=20compute=5Fdict=5Fhash,=20lo?= =?UTF-8?q?ad=5Fcache,=20and=20memoize=5Fdict=20was=20updated=20to=20refle?= =?UTF-8?q?ct=20the=20new=20location=20of=20these=20functions=20in=20the?= =?UTF-8?q?=20cache.utils=20module.=20This=20improves=20the=20organization?= =?UTF-8?q?=20of=20the=20code=20and=20makes=20it=20easier=20to=20find=20th?= =?UTF-8?q?e=20relevant=20functions.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/backend/langflow/graph/graph/base.py | 1 - src/backend/langflow/interface/run.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/backend/langflow/graph/graph/base.py b/src/backend/langflow/graph/graph/base.py index 3653a0ec5..1e9d1f10f 100644 --- a/src/backend/langflow/graph/graph/base.py +++ b/src/backend/langflow/graph/graph/base.py @@ -10,7 +10,6 @@ from langflow.graph.vertex.types import ( ) from langflow.interface.tools.constants import FILE_TOOLS from langflow.utils import payload -from langflow.utils.logger import logger class Graph: diff --git a/src/backend/langflow/interface/run.py b/src/backend/langflow/interface/run.py index 89f71fd8b..6fe4c089b 100644 --- a/src/backend/langflow/interface/run.py +++ b/src/backend/langflow/interface/run.py @@ -1,4 +1,4 @@ -from langflow.cache.base import compute_dict_hash, load_cache, memoize_dict +from langflow.cache.utils import compute_dict_hash, load_cache, memoize_dict from langflow.graph import Graph from langflow.utils.logger import logger From 395af241012cc0565a7131987894dd025846468f Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Sun, 11 Jun 2023 18:08:35 -0300 Subject: [PATCH 07/52] =?UTF-8?q?=F0=9F=9A=80=20chore(vite.config.ts):=20a?= =?UTF-8?q?dd=20build=20route=20to=20apiRoutes=20array=20The=20build=20rou?= =?UTF-8?q?te=20was=20added=20to=20the=20apiRoutes=20array=20to=20allow=20?= =?UTF-8?q?the=20frontend=20to=20access=20the=20build=20files.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/frontend/vite.config.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/frontend/vite.config.ts b/src/frontend/vite.config.ts index b1e7bdd66..564343d19 100644 --- a/src/frontend/vite.config.ts +++ b/src/frontend/vite.config.ts @@ -8,6 +8,7 @@ const apiRoutes = [ "^/chat/*", "/version", "/health", + "^/build/*", ]; // Use environment variable to determine the target. From 2c0a9aee95fd8ff9be1add69fd806d44f621cdc0 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Sun, 11 Jun 2023 18:08:52 -0300 Subject: [PATCH 08/52] =?UTF-8?q?=F0=9F=9A=B8=20chore(chatTrigger):=20add?= =?UTF-8?q?=20error=20message=20when=20chat=20is=20triggered=20before=20fl?= =?UTF-8?q?ow=20is=20built=20=E2=9C=A8=20feat(chatComponent):=20add=20Buil?= =?UTF-8?q?dTrigger=20component=20to=20check=20if=20flow=20is=20built=20be?= =?UTF-8?q?fore=20showing=20ChatTrigger=20component=20The=20ChatTrigger=20?= =?UTF-8?q?component=20now=20checks=20if=20the=20flow=20is=20built=20befor?= =?UTF-8?q?e=20allowing=20the=20user=20to=20open=20the=20chat.=20If=20the?= =?UTF-8?q?=20flow=20is=20not=20built,=20an=20error=20message=20is=20displ?= =?UTF-8?q?ayed=20instead.=20The=20BuildTrigger=20component=20is=20added?= =?UTF-8?q?=20to=20check=20if=20the=20flow=20is=20built=20before=20showing?= =?UTF-8?q?=20the=20ChatTrigger=20component.=20This=20improves=20the=20use?= =?UTF-8?q?r=20experience=20by=20preventing=20the=20user=20from=20opening?= =?UTF-8?q?=20the=20chat=20before=20the=20flow=20is=20built.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../chatComponent/chatTrigger/index.tsx | 25 +++++++++++++------ .../src/components/chatComponent/index.tsx | 22 +++++++++++++++- 2 files changed, 39 insertions(+), 8 deletions(-) diff --git a/src/frontend/src/components/chatComponent/chatTrigger/index.tsx b/src/frontend/src/components/chatComponent/chatTrigger/index.tsx index daa49f34f..1651a91bc 100644 --- a/src/frontend/src/components/chatComponent/chatTrigger/index.tsx +++ b/src/frontend/src/components/chatComponent/chatTrigger/index.tsx @@ -3,13 +3,26 @@ import { Bars3CenterLeftIcon, ChatBubbleBottomCenterTextIcon, } from "@heroicons/react/24/outline"; +import { MessagesSquare } from "lucide-react"; import { nodeColors } from "../../../utils"; -import { PopUpContext } from "../../../contexts/popUpContext"; +import { alertContext } from "../../../contexts/alertContext"; import { useContext } from "react"; import ChatModal from "../../../modals/chatModal"; -export default function ChatTrigger({ open, setOpen }) { - const { openPopUp } = useContext(PopUpContext); +export default function ChatTrigger({ open, setOpen, isBuilt }) { + const { setErrorData } = useContext(alertContext); + + function handleClick() { + if (isBuilt) { + setOpen(true); + } else { + setErrorData({ + title: "Flow not built", + list: ["Please build the flow before chatting"], + }); + } + } + return (
{ - setOpen(true); - }} + onClick={handleClick} > +
+ +
+ ); +} diff --git a/src/frontend/src/components/ui/loading.tsx b/src/frontend/src/components/ui/loading.tsx new file mode 100644 index 000000000..a32c046a4 --- /dev/null +++ b/src/frontend/src/components/ui/loading.tsx @@ -0,0 +1,39 @@ +import * as React from "react"; +import { SVGProps } from "react"; + +export const Loading = (props: SVGProps) => ( + + + + + + +); +export default Loading; diff --git a/src/frontend/src/controllers/API/index.ts b/src/frontend/src/controllers/API/index.ts index ffa9eef72..53480ae61 100644 --- a/src/frontend/src/controllers/API/index.ts +++ b/src/frontend/src/controllers/API/index.ts @@ -39,3 +39,7 @@ export async function getExamples(): Promise { return await Promise.all(contentsPromises); } + +export async function postBuild(flow: FlowType) { + return await axios.post(`/build/${flow.id}`, flow); +} From 9ebd02e331052549135ab7698bb6a70b342e89c0 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Mon, 12 Jun 2023 07:49:35 -0300 Subject: [PATCH 10/52] =?UTF-8?q?=F0=9F=9A=80=20feat(base.py):=20add=20top?= =?UTF-8?q?ological=5Fsort=20method=20to=20perform=20a=20topological=20sor?= =?UTF-8?q?t=20of=20the=20vertices=20in=20the=20graph=20=F0=9F=9A=80=20fea?= =?UTF-8?q?t(base.py):=20add=20generator=5Fbuild=20method=20to=20build=20e?= =?UTF-8?q?ach=20node=20in=20the=20graph=20and=20yield=20it=20The=20topolo?= =?UTF-8?q?gical=5Fsort=20method=20performs=20a=20topological=20sort=20of?= =?UTF-8?q?=20the=20vertices=20in=20the=20graph,=20returning=20a=20list=20?= =?UTF-8?q?of=20vertices=20in=20topological=20order.=20The=20generator=5Fb?= =?UTF-8?q?uild=20method=20builds=20each=20node=20in=20the=20graph=20and?= =?UTF-8?q?=20yields=20it.=20These=20methods=20are=20useful=20for=20buildi?= =?UTF-8?q?ng=20the=20graph=20in=20a=20specific=20order,=20which=20is=20im?= =?UTF-8?q?portant=20for=20certain=20algorithms=20that=20rely=20on=20the?= =?UTF-8?q?=20order=20of=20the=20nodes.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/backend/langflow/graph/graph/base.py | 42 ++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/src/backend/langflow/graph/graph/base.py b/src/backend/langflow/graph/graph/base.py index 1e9d1f10f..dba948693 100644 --- a/src/backend/langflow/graph/graph/base.py +++ b/src/backend/langflow/graph/graph/base.py @@ -107,6 +107,48 @@ class Graph: raise ValueError("No root node found") return root_node.build() + def topological_sort(self) -> List[Vertex]: + """ + Performs a topological sort of the vertices in the graph. + + Returns: + List[Vertex]: A list of vertices in topological order. + + Raises: + ValueError: If the graph contains a cycle. + """ + # States: 0 = unvisited, 1 = visiting, 2 = visited + state = {node: 0 for node in self.nodes} + sorted_vertices = [] + + def dfs(node): + if state[node] == 1: + # We have a cycle + raise ValueError( + "Graph contains a cycle, cannot perform topological sort" + ) + if state[node] == 0: + state[node] = 1 + for edge in node.edges: + if edge.source == node: + dfs(edge.target) + state[node] = 2 + sorted_vertices.append(node) + + # Visit each node + for node in self.nodes: + if state[node] == 0: + dfs(node) + + return list(reversed(sorted_vertices)) + + def generator_build(self) -> List[Vertex]: + """Builds each + node in the graph and yields it.""" + sorted_vertices = self.topological_sort() + for node in sorted_vertices: + yield node.build() + def get_node_neighbors(self, node: Vertex) -> Dict[Vertex, int]: """Returns the neighbors of a node.""" neighbors: Dict[Vertex, int] = {} From 76a49c866354ee8354e7d10637095eef0d795421 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Mon, 12 Jun 2023 07:49:49 -0300 Subject: [PATCH 11/52] =?UTF-8?q?=F0=9F=90=9B=20fix(manager.py):=20change?= =?UTF-8?q?=20status=20parameter=20type=20from=20status=20to=20int=20in=20?= =?UTF-8?q?close=5Fconnection=20method=20=F0=9F=94=A5=20refactor(manager.p?= =?UTF-8?q?y):=20remove=20unused=20build=20method=20and=20rename=20set=5Fc?= =?UTF-8?q?ache=20to=20set=5Fclient=5Fcache=20=F0=9F=9A=80=20chore(manager?= =?UTF-8?q?.py):=20add=20async=20keyword=20to=20close=5Fconnection=20metho?= =?UTF-8?q?d=20The=20close=5Fconnection=20method=20now=20accepts=20an=20in?= =?UTF-8?q?teger=20as=20the=20status=20parameter=20instead=20of=20a=20stat?= =?UTF-8?q?us=20enum.=20The=20build=20method=20was=20removed=20as=20it=20w?= =?UTF-8?q?as=20not=20being=20used.=20The=20set=5Fcache=20method=20was=20r?= =?UTF-8?q?enamed=20to=20set=5Fclient=5Fcache=20to=20improve=20semantics.?= =?UTF-8?q?=20The=20close=5Fconnection=20method=20was=20updated=20to=20be?= =?UTF-8?q?=20an=20async=20method.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/backend/langflow/chat/manager.py | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/src/backend/langflow/chat/manager.py b/src/backend/langflow/chat/manager.py index 4169ecc92..614588968 100644 --- a/src/backend/langflow/chat/manager.py +++ b/src/backend/langflow/chat/manager.py @@ -103,7 +103,7 @@ class ChatManager: websocket = self.active_connections[client_id] await websocket.send_json(message.dict()) - async def close_connection(self, client_id: str, code: status, reason: str): + async def close_connection(self, client_id: str, code: int, reason: str): if websocket := self.active_connections[client_id]: await websocket.close(code=code, reason=reason) self.disconnect(client_id) @@ -159,14 +159,11 @@ class ChatManager: await self.send_json(client_id, response) self.chat_history.add_message(client_id, response) - def build(self, client_id: str, graph_data: Dict) -> bool: + def set_cache(self, client_id: str, langchain_object: Any) -> bool: """ - Build the langchain object and set the streaming options, - then store it in the in-memory cache. + Set the cache for a client. """ - logger.debug("Building langchain object") - graph = Graph.from_payload(graph_data) - langchain_object = graph.build() + self.in_memory_cache.set(client_id, langchain_object) return client_id in self.in_memory_cache @@ -191,7 +188,7 @@ class ChatManager: with self.cache_manager.set_client_id(client_id): if client_id not in self.in_memory_cache: - self.close_connection( + await self.close_connection( client_id=client_id, code=status.WS_1011_INTERNAL_ERROR, reason="Please, build the flow before sending messages", @@ -210,10 +207,11 @@ class ChatManager: ) finally: try: - connection = self.active_connections.get(client_id) - if connection: - await connection.close(code=1000, reason="Client disconnected") - self.disconnect(client_id) + await self.close_connection( + client_id=client_id, + code=status.WS_1000_NORMAL_CLOSURE, + reason="Client disconnected", + ) except Exception as e: logger.exception(e) self.disconnect(client_id) From 273f452f7b40cceacdef27fd0cf16a0f875c2bf9 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Mon, 12 Jun 2023 07:50:00 -0300 Subject: [PATCH 12/52] =?UTF-8?q?=F0=9F=94=A7=20chore(chat.py):=20add=20SS?= =?UTF-8?q?E=20support=20to=20post=5Fbuild=20endpoint=20=E2=9C=A8=20feat(c?= =?UTF-8?q?hat.py):=20add=20SSE=20support=20to=20post=5Fbuild=20endpoint?= =?UTF-8?q?=20to=20stream=20build=20progress=20to=20the=20client=20The=20p?= =?UTF-8?q?ost=5Fbuild=20endpoint=20now=20returns=20a=20StreamingResponse?= =?UTF-8?q?=20object=20that=20streams=20Server-Sent=20Events=20(SSE)=20to?= =?UTF-8?q?=20the=20client.=20This=20allows=20the=20client=20to=20receive?= =?UTF-8?q?=20build=20progress=20updates=20in=20real-time.=20The=20event?= =?UTF-8?q?=5Fstream=20function=20is=20responsible=20for=20generating=20th?= =?UTF-8?q?e=20SSE=20events=20and=20is=20called=20by=20the=20StreamingResp?= =?UTF-8?q?onse=20object.=20The=20SSE=20events=20contain=20information=20a?= =?UTF-8?q?bout=20the=20build=20progress,=20including=20whether=20the=20bu?= =?UTF-8?q?ild=20was=20successful=20or=20not,=20the=20parameters=20used=20?= =?UTF-8?q?to=20build=20each=20node,=20and=20the=20node=20ID.=20The=20chat?= =?UTF-8?q?=5Fmanager.set=5Fcache=20method=20is=20called=20to=20cache=20th?= =?UTF-8?q?e=20built=20graph=20object.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/backend/langflow/api/v1/chat.py | 39 +++++++++++++++++++++++------ 1 file changed, 32 insertions(+), 7 deletions(-) diff --git a/src/backend/langflow/api/v1/chat.py b/src/backend/langflow/api/v1/chat.py index 9619d5f30..2c085c187 100644 --- a/src/backend/langflow/api/v1/chat.py +++ b/src/backend/langflow/api/v1/chat.py @@ -1,3 +1,4 @@ +import json from fastapi import ( APIRouter, HTTPException, @@ -6,8 +7,10 @@ from fastapi import ( WebSocketException, status, ) +from fastapi.responses import StreamingResponse from langflow.chat.manager import ChatManager +from langflow.graph.graph.base import Graph from langflow.utils.logger import logger router = APIRouter() @@ -27,12 +30,34 @@ async def websocket_endpoint(client_id: str, websocket: WebSocket): await websocket.close(code=status.WS_1000_NORMAL_CLOSURE, reason=str(exc)) -@router.post("/build/{client_id}") +@router.post("/build/{client_id}", response_class=StreamingResponse) async def post_build(client_id: str, graph_data: dict): """Build langchain object from data_graph.""" - try: - if chat_manager.build(client_id, graph_data.get("data")): - return {"message": "Build successful"} - except Exception as exc: - logger.exception(exc) - raise HTTPException(status_code=500, detail=str(exc)) from exc + + async def event_stream(graph_data): + try: + graph_data = graph_data.get("data") + if not graph_data: + raise HTTPException(status_code=400, detail="No data provided") + + logger.debug("Building langchain object") + graph = Graph.from_payload(graph_data) + for node in graph.generator_build(): + logger.debug(f"Building node {node.name}") + response = json.dumps( + { + "valid": True, + "params": str(node._built_object_repr()), + "id": node.id, + } + ) + yield f"data: {response}\n\n" # SSE format + + chat_manager.set_cache(client_id, graph.build()) + + except Exception as exc: + logger.exception(exc) + error_response = json.dumps({"error": str(exc)}) + yield f"data: {error_response}\n\n" # SSE format + + return StreamingResponse(event_stream(graph_data), media_type="text/event-stream") From 036818e9b9681786abd7ed1ec7b43b8d44945568 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Mon, 12 Jun 2023 09:30:20 -0300 Subject: [PATCH 13/52] =?UTF-8?q?=F0=9F=94=A7=20refactor(chat.py):=20renam?= =?UTF-8?q?e=20post=5Fbuild=20to=20stream=5Fbuild=20to=20improve=20semanti?= =?UTF-8?q?cs=20=F0=9F=90=9B=20fix(chat.py):=20fix=20generator=5Fbuild=20m?= =?UTF-8?q?ethod=20to=20yield=20node=5Frepr=20and=20node=5Fid=20=E2=9C=A8?= =?UTF-8?q?=20feat(chat.py):=20add=20valid=20and=20id=20fields=20to=20erro?= =?UTF-8?q?r=20response=20to=20improve=20error=20handling=20=F0=9F=94=A7?= =?UTF-8?q?=20refactor(manager.py):=20change=20logger.exception=20to=20log?= =?UTF-8?q?ger.error=20to=20log=20exceptions=20=F0=9F=94=A7=20refactor(gra?= =?UTF-8?q?ph/base.py):=20add=20logging=20to=20generator=5Fbuild=20method?= =?UTF-8?q?=20to=20improve=20debugging=20=F0=9F=94=A7=20refactor(vertex/ba?= =?UTF-8?q?se.py):=20rename=20cache.base=20to=20cache.utils=20to=20improve?= =?UTF-8?q?=20semantics=20The=20post=5Fbuild=20method=20in=20chat.py=20was?= =?UTF-8?q?=20renamed=20to=20stream=5Fbuild=20to=20better=20reflect=20its?= =?UTF-8?q?=20functionality.=20The=20generator=5Fbuild=20method=20in=20gra?= =?UTF-8?q?ph/base.py=20was=20fixed=20to=20yield=20node=5Frepr=20and=20nod?= =?UTF-8?q?e=5Fid=20instead=20of=20node.=5Fbuilt=5Fobject=5Frepr()=20and?= =?UTF-8?q?=20node.id.=20The=20error=20response=20in=20chat.py=20now=20inc?= =?UTF-8?q?ludes=20valid=20and=20id=20fields=20to=20improve=20error=20hand?= =?UTF-8?q?ling.=20logger.exception=20in=20manager.py=20was=20changed=20to?= =?UTF-8?q?=20logger.error=20to=20log=20exceptions.=20The=20generator=5Fbu?= =?UTF-8?q?ild=20method=20in=20graph/base.py=20now=20logs=20the=20sorted?= =?UTF-8?q?=20vertices=20to=20improve=20debugging.=20The=20cache.base=20mo?= =?UTF-8?q?dule=20in=20vertex/base.py=20was=20renamed=20to=20cache.utils?= =?UTF-8?q?=20to=20better=20reflect=20its=20functionality.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/backend/langflow/api/v1/chat.py | 16 ++++++++++------ src/backend/langflow/chat/manager.py | 6 +++--- src/backend/langflow/graph/graph/base.py | 5 ++++- src/backend/langflow/graph/vertex/base.py | 2 +- 4 files changed, 18 insertions(+), 11 deletions(-) diff --git a/src/backend/langflow/api/v1/chat.py b/src/backend/langflow/api/v1/chat.py index 2c085c187..1dc75b7bc 100644 --- a/src/backend/langflow/api/v1/chat.py +++ b/src/backend/langflow/api/v1/chat.py @@ -31,7 +31,7 @@ async def websocket_endpoint(client_id: str, websocket: WebSocket): @router.post("/build/{client_id}", response_class=StreamingResponse) -async def post_build(client_id: str, graph_data: dict): +async def stream_build(client_id: str, graph_data: dict): """Build langchain object from data_graph.""" async def event_stream(graph_data): @@ -42,13 +42,15 @@ async def post_build(client_id: str, graph_data: dict): logger.debug("Building langchain object") graph = Graph.from_payload(graph_data) - for node in graph.generator_build(): - logger.debug(f"Building node {node.name}") + for node_repr, node_id in graph.generator_build(): + logger.debug( + f"Building node {node_repr[:50]}{'...' if len(node_repr) > 50 else ''}" + ) response = json.dumps( { "valid": True, - "params": str(node._built_object_repr()), - "id": node.id, + "params": node_repr, + "id": node_id, } ) yield f"data: {response}\n\n" # SSE format @@ -57,7 +59,9 @@ async def post_build(client_id: str, graph_data: dict): except Exception as exc: logger.exception(exc) - error_response = json.dumps({"error": str(exc)}) + error_response = json.dumps( + {"valid": False, "params": str(exc), "id": node_id} + ) yield f"data: {error_response}\n\n" # SSE format return StreamingResponse(event_stream(graph_data), media_type="text/event-stream") diff --git a/src/backend/langflow/chat/manager.py b/src/backend/langflow/chat/manager.py index 614588968..8bd776538 100644 --- a/src/backend/langflow/chat/manager.py +++ b/src/backend/langflow/chat/manager.py @@ -199,8 +199,8 @@ class ChatManager: except Exception as e: # Handle any exceptions that might occur - logger.exception(e) - self.close_connection( + logger.error(e) + await self.close_connection( client_id=client_id, code=status.WS_1011_INTERNAL_ERROR, reason=str(e)[:120], @@ -213,5 +213,5 @@ class ChatManager: reason="Client disconnected", ) except Exception as e: - logger.exception(e) + logger.error(e) self.disconnect(client_id) diff --git a/src/backend/langflow/graph/graph/base.py b/src/backend/langflow/graph/graph/base.py index dba948693..d81613fff 100644 --- a/src/backend/langflow/graph/graph/base.py +++ b/src/backend/langflow/graph/graph/base.py @@ -10,6 +10,7 @@ from langflow.graph.vertex.types import ( ) from langflow.interface.tools.constants import FILE_TOOLS from langflow.utils import payload +from langflow.utils.logger import logger class Graph: @@ -146,8 +147,10 @@ class Graph: """Builds each node in the graph and yields it.""" sorted_vertices = self.topological_sort() + logger.info("Sorted vertices: %s", sorted_vertices) for node in sorted_vertices: - yield node.build() + node.build() + yield node._built_object_repr(), node.id def get_node_neighbors(self, node: Vertex) -> Dict[Vertex, int]: """Returns the neighbors of a node.""" diff --git a/src/backend/langflow/graph/vertex/base.py b/src/backend/langflow/graph/vertex/base.py index bb6ff34dc..2900ba538 100644 --- a/src/backend/langflow/graph/vertex/base.py +++ b/src/backend/langflow/graph/vertex/base.py @@ -1,4 +1,4 @@ -from langflow.cache import base as cache_utils +from langflow.cache import utils as cache_utils from langflow.graph.vertex.constants import DIRECT_TYPES from langflow.interface import loading from langflow.interface.listing import ALL_TYPES_DICT From 42d5a501bb872b8818ad3b26ba49d6999bb9caa4 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Mon, 12 Jun 2023 09:30:47 -0300 Subject: [PATCH 14/52] =?UTF-8?q?=F0=9F=9A=80=20feat(App.tsx):=20add=20SSE?= =?UTF-8?q?Provider=20to=20App=20component=20=F0=9F=8E=89=20feat(SSEContex?= =?UTF-8?q?t.tsx):=20add=20SSEContext=20and=20SSEProvider=20components=20t?= =?UTF-8?q?o=20handle=20server-sent=20events=20The=20SSEProvider=20compone?= =?UTF-8?q?nt=20was=20added=20to=20the=20App=20component=20to=20provide=20?= =?UTF-8?q?the=20SSEContext=20to=20the=20TabsManagerComponent.=20The=20SSE?= =?UTF-8?q?Context=20component=20was=20also=20added=20to=20handle=20server?= =?UTF-8?q?-sent=20events.=20The=20SSEContext=20component=20provides=20a?= =?UTF-8?q?=20way=20to=20update=20the=20sseData=20state=20and=20to=20acces?= =?UTF-8?q?s=20the=20sseData=20state=20from=20any=20component=20that=20use?= =?UTF-8?q?s=20the=20useSSE=20hook.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/frontend/src/App.tsx | 5 +++- src/frontend/src/contexts/SSEContext.tsx | 35 ++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) create mode 100644 src/frontend/src/contexts/SSEContext.tsx diff --git a/src/frontend/src/App.tsx b/src/frontend/src/App.tsx index f68c3ea73..04dbcdf04 100644 --- a/src/frontend/src/App.tsx +++ b/src/frontend/src/App.tsx @@ -14,6 +14,7 @@ import TabsManagerComponent from "./pages/FlowPage/components/tabsManagerCompone import { ErrorBoundary } from "react-error-boundary"; import CrashErrorComponent from "./components/CrashErrorComponent"; import { TabsContext } from "./contexts/tabsContext"; +import { SSEProvider } from "./contexts/SSEContext"; export default function App() { let { setCurrent, setShowSideBar, setIsStackedOpen } = @@ -128,7 +129,9 @@ export default function App() {
{/* Primary column */}
- + + +
diff --git a/src/frontend/src/contexts/SSEContext.tsx b/src/frontend/src/contexts/SSEContext.tsx new file mode 100644 index 000000000..8d90b26e2 --- /dev/null +++ b/src/frontend/src/contexts/SSEContext.tsx @@ -0,0 +1,35 @@ +import { + createContext, + useContext, + useState, + useEffect, + useCallback, +} from "react"; + +const initialValue = { + updateSSEData: ({}) => {}, + sseData: {}, +}; + +const SSEContext = createContext(initialValue); + +export function useSSE() { + return useContext(SSEContext); +} + +export function SSEProvider({ children }) { + const [sseData, setSSEData] = useState({}); + + const updateSSEData = useCallback((newData: any) => { + setSSEData((prevData) => ({ + ...prevData, + ...newData, + })); + }, []); + + return ( + + {children} + + ); +} From eaae1dda2716fde920b8a616fd6e89106a3e3316 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Mon, 12 Jun 2023 09:31:01 -0300 Subject: [PATCH 15/52] =?UTF-8?q?=E2=9C=A8=20feat(chatComponent):=20add=20?= =?UTF-8?q?SSE=20context=20to=20update=20data=20in=20real-time=20The=20han?= =?UTF-8?q?dleBuild=20function=20has=20been=20refactored=20to=20process=20?= =?UTF-8?q?data=20in=20chunks=20instead=20of=20waiting=20for=20the=20entir?= =?UTF-8?q?e=20response=20to=20be=20received.=20This=20improves=20the=20pe?= =?UTF-8?q?rformance=20of=20the=20function=20and=20allows=20for=20real-tim?= =?UTF-8?q?e=20updates.=20The=20SSE=20context=20has=20been=20added=20to=20?= =?UTF-8?q?update=20the=20data=20in=20real-time=20as=20it=20is=20received.?= =?UTF-8?q?=20=F0=9F=94=A8=20refactor(chatComponent):=20refactor=20handleB?= =?UTF-8?q?uild=20function=20to=20process=20data=20in=20chunks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../chatComponent/buildTrigger/index.tsx | 74 +++++++++++++------ 1 file changed, 53 insertions(+), 21 deletions(-) diff --git a/src/frontend/src/components/chatComponent/buildTrigger/index.tsx b/src/frontend/src/components/chatComponent/buildTrigger/index.tsx index 4cfe74980..6ce343280 100644 --- a/src/frontend/src/components/chatComponent/buildTrigger/index.tsx +++ b/src/frontend/src/components/chatComponent/buildTrigger/index.tsx @@ -8,6 +8,8 @@ import ChatModal from "../../../modals/chatModal"; import { FlowType } from "../../../types/flow"; import { postBuild } from "../../../controllers/API"; import Loading from "../../../components/ui/loading"; +import { useSSE } from "../../../contexts/SSEContext"; +import axios from "axios"; export default function BuildTrigger({ open, @@ -22,36 +24,66 @@ export default function BuildTrigger({ }) { const [isBuilding, setIsBuilding] = useState(false); - function handleBuild(flow: FlowType) { - const minimumLoadingTime = 500; // in milliseconds - const startTime = Date.now(); + const { updateSSEData } = useSSE(); + function handleBuild(flow) { setIsBuilding(true); - postBuild(flow) - .then((res) => { - console.log(res); - setIsBuilt(true); - }) + // State to keep track of validity status of all chunks + let allChunksValid = true; + + const apiUrl = `/build/${flow.id}`; + + // Post data to the server + axios({ + method: "post", + url: apiUrl, + data: { data: flow }, + headers: { "Content-Type": "application/json" }, + onDownloadProgress: (progressEvent) => { + const { currentTarget } = progressEvent.event; + const { responseText } = currentTarget; + // responseText is a string with \n\n delimiters + + // Get only the new data since the last read + // by splitting the string and getting the one before the last \n\n + + const chunks = responseText.split("\n\n"); + + // Process each chunk + chunks.forEach((chunk: string) => { + if (chunk !== "") { + let valid = processChunk(chunk); + console.log("Valid: ", valid); + allChunksValid = allChunksValid && valid; + } + }); + }, + }) .catch((err) => { - console.log(err); - setIsBuilt(false); + console.error("Error:", err); }) .finally(() => { - const endTime = Date.now(); - const elapsedTime = endTime - startTime; - - if (elapsedTime < minimumLoadingTime) { - const remainingTime = minimumLoadingTime - elapsedTime; - setTimeout(() => { - setIsBuilding(false); - }, remainingTime); - } else { - setIsBuilding(false); - } + // Set isBuilt to the value of allChunksValid + setIsBuilt(allChunksValid); + setIsBuilding(false); }); } + function processChunk(chunk: string) { + // Process each chunk of data here + // Parse the chunk and update the context + let parsedData = { valid: false, id: null }; + try { + parsedData = JSON.parse(chunk.slice(6)); // Remove the "data: " part + updateSSEData({ [parsedData.id]: parsedData }); + } catch (err) { + console.log("Chunk is not valid JSON: ", chunk); + console.log("Error parsing chunk: ", err); + } + return parsedData.valid; + } + return ( Date: Mon, 12 Jun 2023 09:31:43 -0300 Subject: [PATCH 16/52] feat (GenericNode): replace validation with SSE validation --- .../src/CustomNodes/GenericNode/index.tsx | 51 ++++++------------- 1 file changed, 15 insertions(+), 36 deletions(-) diff --git a/src/frontend/src/CustomNodes/GenericNode/index.tsx b/src/frontend/src/CustomNodes/GenericNode/index.tsx index 86eaf299d..864cf6dab 100644 --- a/src/frontend/src/CustomNodes/GenericNode/index.tsx +++ b/src/frontend/src/CustomNodes/GenericNode/index.tsx @@ -33,6 +33,8 @@ import { NodeToolbar } from "reactflow"; import NodeToolbarComponent from "../../pages/FlowPage/components/nodeToolbarComponent"; import ShadTooltip from "../../components/ShadTooltipComponent"; +import { useSSE } from "../../contexts/SSEContext"; + export default function GenericNode({ data, selected, @@ -48,45 +50,22 @@ export default function GenericNode({ const Icon = nodeIcons[data.type] || nodeIcons[types[data.type]]; const [validationStatus, setValidationStatus] = useState(null); // State for outline color - const [isValid, setIsValid] = useState(false); - const { save } = useContext(TabsContext); - const { reactFlowInstance } = useContext(typesContext); - const [params, setParams] = useState([]); + const { sseData } = useSSE(); + // useEffect(() => { + // if (reactFlowInstance) { + // setParams(Object.values(reactFlowInstance.toObject())); + // } + // }, [save]); + + // New useEffect to watch for changes in sseData and update validation status useEffect(() => { - if (reactFlowInstance) { - setParams(Object.values(reactFlowInstance.toObject())); + const relevantData = sseData[data.id]; + if (relevantData) { + // Extract validation information from relevantData and update the validationStatus state + setValidationStatus(relevantData); } - }, [save]); - - const validateNode = useCallback( - debounce(async () => { - try { - const response = await fetch(`/validate/node/${data.id}`, { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify(reactFlowInstance.toObject()), - }); - - if (response.status === 200) { - let jsonResponse = await response.json(); - let jsonResponseParsed = await JSON.parse(jsonResponse); - setValidationStatus(jsonResponseParsed); - } - } catch (error) { - // console.error("Error validating node:", error); - setValidationStatus("error"); - } - }, 1000), // Adjust the debounce delay (500ms) as needed - [reactFlowInstance, data.id] - ); - useEffect(() => { - if (params.length > 0) { - validateNode(); - } - }, [params, validateNode]); + }, [sseData, data.id]); if (!Icon) { if (showError.current) { From 25f22ae88cb352a79bc938f71dcd2e12c616fcbb Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Mon, 12 Jun 2023 09:33:03 -0300 Subject: [PATCH 17/52] =?UTF-8?q?=F0=9F=94=87=20chore(frontend):=20comment?= =?UTF-8?q?=20out=20console.log=20statements=20The=20console.log=20stateme?= =?UTF-8?q?nts=20were=20commented=20out=20in=20the=20tabsContext.tsx,=20Ed?= =?UTF-8?q?itNodeModal/index.tsx,=20NodeToolbarComponent/index.tsx,=20and?= =?UTF-8?q?=20FlowPage/index.tsx=20files=20to=20remove=20unnecessary=20log?= =?UTF-8?q?ging=20in=20the=20console.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/frontend/src/contexts/tabsContext.tsx | 8 ++++---- src/frontend/src/modals/EditNodeModal/index.tsx | 2 +- .../FlowPage/components/nodeToolbarComponent/index.tsx | 4 ++-- src/frontend/src/pages/FlowPage/index.tsx | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/frontend/src/contexts/tabsContext.tsx b/src/frontend/src/contexts/tabsContext.tsx index 9ef27b203..800027ede 100644 --- a/src/frontend/src/contexts/tabsContext.tsx +++ b/src/frontend/src/contexts/tabsContext.tsx @@ -69,12 +69,12 @@ export function TabsProvider({ children }: { children: ReactNode }) { Saveflows.forEach((flow) => { if (flow.data && flow.data?.nodes) flow.data?.nodes.forEach((node) => { - console.log(node.data.type); + // console.log(node.data.type); //looking for file fields to prevent saving the content and breaking the flow for exceeding the the data limite for local storage Object.keys(node.data.node.template).forEach((key) => { - console.log(node.data.node.template[key].type); + // console.log(node.data.node.template[key].type); if (node.data.node.template[key].type === "file") { - console.log(node.data.node.template[key]); + // console.log(node.data.node.template[key]); node.data.node.template[key].content = null; node.data.node.template[key].value = ""; } @@ -139,7 +139,7 @@ export function TabsProvider({ children }: { children: ReactNode }) { useEffect(() => { //save tabs locally - console.log(id); + // console.log(id); save(); }, [flows, id, tabIndex, newNodeId]); diff --git a/src/frontend/src/modals/EditNodeModal/index.tsx b/src/frontend/src/modals/EditNodeModal/index.tsx index f6f4d9111..4c9422afb 100644 --- a/src/frontend/src/modals/EditNodeModal/index.tsx +++ b/src/frontend/src/modals/EditNodeModal/index.tsx @@ -87,7 +87,7 @@ export default function EditNodeModal({ data }: { data: NodeDataType }) { setNodeValue(!nodeValue); } - console.log(data.node.template); + // console.log(data.node.template); return ( diff --git a/src/frontend/src/pages/FlowPage/components/nodeToolbarComponent/index.tsx b/src/frontend/src/pages/FlowPage/components/nodeToolbarComponent/index.tsx index 068bdf27f..4bceafffd 100644 --- a/src/frontend/src/pages/FlowPage/components/nodeToolbarComponent/index.tsx +++ b/src/frontend/src/pages/FlowPage/components/nodeToolbarComponent/index.tsx @@ -61,7 +61,7 @@ const NodeToolbarComponent = (props) => { )} onClick={(event) => { event.preventDefault(); - console.log(reactFlowInstance.getNode(props.data.id)); + // console.log(reactFlowInstance.getNode(props.data.id)); paste( { nodes: [reactFlowInstance.getNode(props.data.id)], @@ -94,7 +94,7 @@ const NodeToolbarComponent = (props) => { )} - {/* + {/*