Merge branch 'chat_and_cache' into 186-endpoints-for-node-validation-and-debugging

This commit is contained in:
Gabriel Luiz Freitas Almeida 2023-04-26 01:28:43 -03:00 committed by GitHub
commit 5dbca85512
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
55 changed files with 1736 additions and 606 deletions

View file

@ -1,52 +0,0 @@
# `python-base` sets up all our shared environment variables
FROM python:3.10-slim
# python
ENV PYTHONUNBUFFERED=1 \
# prevents python creating .pyc files
PYTHONDONTWRITEBYTECODE=1 \
\
# pip
PIP_NO_CACHE_DIR=off \
PIP_DISABLE_PIP_VERSION_CHECK=on \
PIP_DEFAULT_TIMEOUT=100 \
\
# poetry
# https://python-poetry.org/docs/configuration/#using-environment-variables
POETRY_VERSION=1.4.0 \
# make poetry install to this location
POETRY_HOME="/opt/poetry" \
# make poetry create the virtual environment in the project's root
# it gets named `.venv`
POETRY_VIRTUALENVS_IN_PROJECT=true \
# do not ask any interactive question
POETRY_NO_INTERACTION=1 \
\
# paths
# this is where our requirements + virtual environment will live
PYSETUP_PATH="/opt/pysetup" \
VENV_PATH="/opt/pysetup/.venv"
# prepend poetry and venv to path
ENV PATH="$POETRY_HOME/bin:$VENV_PATH/bin:$PATH"
RUN apt-get update \
&& apt-get install --no-install-recommends -y \
# deps for installing poetry
curl \
# deps for building python deps
build-essential libpq-dev
# install poetry - respects $POETRY_VERSION & $POETRY_HOME
RUN curl -sSL https://install.python-poetry.org | python3 -
# copy project requirement files here to ensure they will be cached.
WORKDIR /app
COPY poetry.lock pyproject.toml ./
COPY langflow/ ./langflow
# poetry install
RUN poetry install --without dev
# build wheel
RUN poetry build -f wheel

View file

@ -1,6 +0,0 @@
#! /bin/bash
docker build -t logspace/backend_build -f build.Dockerfile .
VERSION=$(toml get --toml-path pyproject.toml tool.poetry.version)
docker build --build-arg VERSION=$VERSION -t ibiscp/langflow:$VERSION .
docker push ibiscp/langflow:$VERSION

View file

@ -1 +1,4 @@
from langflow.interface.loading import load_flow_from_json # noqa
from langflow.interface.loading import load_flow_from_json
from langflow.cache import cache_manager
__all__ = ["load_flow_from_json", "cache_manager"]

View file

@ -3,6 +3,10 @@ from pydantic import BaseModel, validator
from langflow.graph.utils import extract_input_variables_from_prompt
class CacheResponse(BaseModel):
data: dict
class Code(BaseModel):
code: str

View file

@ -0,0 +1,16 @@
from typing import Any
from langchain.callbacks.base import AsyncCallbackHandler
from langflow.api.schemas import ChatResponse
# https://github.com/hwchase17/chat-langchain/blob/master/callback.py
class StreamingLLMCallbackHandler(AsyncCallbackHandler):
"""Callback handler for streaming LLM responses."""
def __init__(self, websocket):
self.websocket = websocket
async def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
resp = ChatResponse(message=token, type="stream", intermediate_steps="")
await self.websocket.send_json(resp.dict())

View file

@ -0,0 +1,18 @@
from fastapi import APIRouter, WebSocket
from langflow.api.chat_manager import ChatManager
from langflow.utils.logger import logger
router = APIRouter()
chat_manager = ChatManager()
@router.websocket("/chat/{client_id}")
async def websocket_endpoint(client_id: str, websocket: WebSocket):
"""Websocket endpoint for chat."""
try:
await chat_manager.handle_websocket(client_id, websocket)
except Exception as e:
# Log stack trace
logger.exception(e)
raise e

View file

@ -0,0 +1,207 @@
import asyncio
from typing import Dict, List
from collections import defaultdict
from fastapi import WebSocket
import json
from langflow.api.schemas import ChatMessage, ChatResponse, FileResponse
from langflow.cache.manager import Subject
from langflow.interface.run import (
get_result_and_steps,
load_or_build_langchain_object,
)
from langflow.interface.utils import pil_to_base64, try_setting_streaming_options
from langflow.utils.logger import logger
from langflow.cache import cache_manager
class ChatHistory(Subject):
def __init__(self):
super().__init__()
self.history: Dict[str, List[ChatMessage]] = defaultdict(list)
def add_message(self, client_id: str, message: ChatMessage):
"""Add a message to the chat history."""
self.history[client_id].append(message)
if not isinstance(message, FileResponse):
self.notify()
def get_history(self, client_id: str, filter=True) -> List[ChatMessage]:
"""Get the chat history for a client."""
if history := self.history.get(client_id, []):
if filter:
return [msg for msg in history if msg.type not in ["start", "stream"]]
return history
else:
return []
class ChatManager:
def __init__(self):
self.active_connections: Dict[str, WebSocket] = {}
self.chat_history = ChatHistory()
self.chat_history.attach(self.on_chat_history_update)
self.cache_manager = cache_manager
self.cache_manager.attach(self.update)
def on_chat_history_update(self):
"""Send the last chat message to the client."""
client_id = self.cache_manager.current_client_id
if client_id in self.active_connections:
chat_response = self.chat_history.get_history(client_id, filter=False)[-1]
if chat_response.is_bot:
# Process FileResponse
if isinstance(chat_response, FileResponse):
# If data_type is pandas, convert to csv
if chat_response.data_type == "pandas":
chat_response.data = chat_response.data.to_csv()
elif chat_response.data_type == "image":
# Base64 encode the image
chat_response.data = pil_to_base64(chat_response.data)
# get event loop
loop = asyncio.get_event_loop()
coroutine = self.send_json(client_id, chat_response)
asyncio.run_coroutine_threadsafe(coroutine, loop)
def update(self):
if self.cache_manager.current_client_id in self.active_connections:
self.last_cached_object_dict = self.cache_manager.get_last()
# Add a new ChatResponse with the data
chat_response = FileResponse(
message=None,
type="file",
data=self.last_cached_object_dict["obj"],
data_type=self.last_cached_object_dict["type"],
)
self.chat_history.add_message(
self.cache_manager.current_client_id, chat_response
)
async def connect(self, client_id: str, websocket: WebSocket):
await websocket.accept()
self.active_connections[client_id] = websocket
def disconnect(self, client_id: str):
del self.active_connections[client_id]
async def send_message(self, client_id: str, message: str):
websocket = self.active_connections[client_id]
await websocket.send_text(message)
async def send_json(self, client_id: str, message: ChatMessage):
websocket = self.active_connections[client_id]
await websocket.send_json(message.dict())
async def process_message(self, client_id: str, payload: Dict):
# Process the graph data and chat message
chat_message = payload.pop("message", "")
chat_message = ChatMessage(message=chat_message)
self.chat_history.add_message(client_id, chat_message)
graph_data = payload
start_resp = ChatResponse(message=None, type="start", intermediate_steps="")
self.chat_history.add_message(client_id, start_resp)
is_first_message = len(self.chat_history.get_history(client_id=client_id)) == 0
# Generate result and thought
try:
logger.debug("Generating result and thought")
result, intermediate_steps = await process_graph(
graph_data=graph_data,
is_first_message=is_first_message,
chat_message=chat_message,
websocket=self.active_connections[client_id],
)
except Exception as e:
# Log stack trace
logger.exception(e)
raise e
# Send a response back to the frontend, if needed
intermediate_steps = intermediate_steps or ""
history = self.chat_history.get_history(client_id, filter=False)
file_responses = []
if history:
# Iterate backwards through the history
for msg in reversed(history):
if isinstance(msg, FileResponse):
if msg.data_type == "image":
# Base64 encode the image
msg.data = pil_to_base64(msg.data)
file_responses.append(msg)
if msg.type == "start":
break
response = ChatResponse(
message=result or "",
intermediate_steps=intermediate_steps.strip(),
type="end",
files=file_responses,
)
self.chat_history.add_message(client_id, response)
async def handle_websocket(self, client_id: str, websocket: WebSocket):
await self.connect(client_id, websocket)
try:
chat_history = self.chat_history.get_history(client_id)
# iterate and make BaseModel into dict
chat_history = [chat.dict() for chat in chat_history]
await websocket.send_json(chat_history)
while True:
json_payload = await websocket.receive_json()
try:
payload = json.loads(json_payload)
except TypeError:
payload = json_payload
if "clear_history" in payload:
self.chat_history.history[client_id] = []
continue
with self.cache_manager.set_client_id(client_id):
await self.process_message(client_id, payload)
except Exception as e:
# Handle any exceptions that might occur
logger.exception(e)
# send a message to the client
await self.send_message(client_id, str(e))
raise e
finally:
await self.active_connections[client_id].close(
code=1000, reason="Client disconnected"
)
self.disconnect(client_id)
async def process_graph(
graph_data: Dict,
is_first_message: bool,
chat_message: ChatMessage,
websocket: WebSocket,
):
langchain_object = load_or_build_langchain_object(graph_data, is_first_message)
langchain_object = try_setting_streaming_options(langchain_object, websocket)
logger.debug("Loaded langchain object")
if langchain_object is None:
# Raise user facing error
raise ValueError(
"There was an error loading the langchain_object. Please, check all the nodes and try again."
)
# Generate result and thought
try:
logger.debug("Generating result and thought")
result, intermediate_steps = get_result_and_steps(
langchain_object, chat_message.message or ""
)
logger.debug("Generated result and intermediate_steps")
return result, intermediate_steps
except Exception as e:
# Log stack trace
logger.exception(e)
raise e

View file

@ -0,0 +1,40 @@
from typing import Any, Union
from pydantic import BaseModel, validator
class ChatMessage(BaseModel):
"""Chat message schema."""
is_bot: bool = False
message: Union[str, None] = None
type: str = "human"
class ChatResponse(ChatMessage):
"""Chat response schema."""
intermediate_steps: str
type: str
is_bot: bool = True
files: list = []
@validator("type")
def validate_message_type(cls, v):
if v not in ["start", "stream", "end", "error", "info", "file"]:
raise ValueError("type must be start, stream, end, error, info, or file")
return v
class FileResponse(ChatMessage):
"""File response schema."""
data: Any
data_type: str
type: str = "file"
is_bot: bool = True
@validator("data_type")
def validate_data_type(cls, v):
if v not in ["image", "csv"]:
raise ValueError("data_type must be image or csv")
return v

View file

@ -0,0 +1 @@
from langflow.cache.manager import cache_manager # noqa

View file

@ -2,13 +2,18 @@ import base64
import contextlib
import functools
import hashlib
import json
import os
import tempfile
from collections import OrderedDict
from pathlib import Path
from typing import Any
from PIL import Image
import dill
import pandas as pd # type: ignore
import dill # type: ignore
CACHE = {}
def create_cache_folder(func):

127
src/backend/langflow/cache/manager.py vendored Normal file
View file

@ -0,0 +1,127 @@
from contextlib import contextmanager
from typing import Any, Awaitable, Callable, List
from PIL import Image
import pandas as pd
class Subject:
"""Base class for implementing the observer pattern."""
def __init__(self):
self.observers: List[Callable[[], None]] = []
def attach(self, observer: Callable[[], None]):
"""Attach an observer to the subject."""
self.observers.append(observer)
def detach(self, observer: Callable[[], None]):
"""Detach an observer from the subject."""
self.observers.remove(observer)
def notify(self):
"""Notify all observers about an event."""
for observer in self.observers:
if observer is None:
continue
observer()
class AsyncSubject:
"""Base class for implementing the async observer pattern."""
def __init__(self):
self.observers: List[Callable[[], Awaitable]] = []
def attach(self, observer: Callable[[], Awaitable]):
"""Attach an observer to the subject."""
self.observers.append(observer)
def detach(self, observer: Callable[[], Awaitable]):
"""Detach an observer from the subject."""
self.observers.remove(observer)
async def notify(self):
"""Notify all observers about an event."""
for observer in self.observers:
if observer is None:
continue
await observer()
class CacheManager(Subject):
"""Manages cache for different clients and notifies observers on changes."""
def __init__(self):
super().__init__()
self.CACHE = {}
self.current_client_id = None
self.current_cache = {}
@contextmanager
def set_client_id(self, client_id: str):
"""
Context manager to set the current client_id and associated cache.
Args:
client_id (str): The client identifier.
"""
previous_client_id = self.current_client_id
self.current_client_id = client_id
self.current_cache = self.CACHE.setdefault(client_id, {})
try:
yield
finally:
self.current_client_id = previous_client_id
self.current_cache = self.CACHE.get(self.current_client_id, {})
def add_pandas(self, name: str, obj: Any):
"""
Add a pandas DataFrame or Series to the current client's cache.
Args:
name (str): The cache key.
obj (Any): The pandas DataFrame or Series object.
"""
if isinstance(obj, (pd.DataFrame, pd.Series)):
self.current_cache[name] = {"obj": obj, "type": "pandas"}
self.notify()
else:
raise ValueError("Object is not a pandas DataFrame or Series")
def add_image(self, name: str, obj: Any):
"""
Add a PIL Image to the current client's cache.
Args:
name (str): The cache key.
obj (Any): The PIL Image object.
"""
if isinstance(obj, Image.Image):
self.current_cache[name] = {"obj": obj, "type": "image"}
self.notify()
else:
raise ValueError("Object is not a PIL Image")
def get(self, name: str):
"""
Get an object from the current client's cache.
Args:
name (str): The cache key.
Returns:
The cached object associated with the given cache key.
"""
return self.current_cache[name]
def get_last(self):
"""
Get the last added item in the current client's cache.
Returns:
The last added item in the cache.
"""
return list(self.current_cache.values())[-1]
cache_manager = CacheManager()

View file

@ -4,16 +4,18 @@
# - Build each inner agent first, then build the outer agent
import contextlib
import inspect
import types
import warnings
from copy import deepcopy
from typing import Any, Dict, List, Optional
from langflow.cache import utils as cache_utils
from langflow.cache import base as cache_utils
from langflow.graph.constants import DIRECT_TYPES
from langflow.interface import loading
from langflow.interface.listing import ALL_TYPES_DICT
from langflow.utils.logger import logger
from langflow.utils.util import sync_to_async
class Node:
@ -158,13 +160,20 @@ class Node:
continue
result = value.build()
# If the key is "func", then we need to use the run method
if key == "func" and not isinstance(result, types.FunctionType):
# func can be PythonFunction(code='\ndef upper_case(text: str) -> str:\n return text.upper()\n')
# so we need to check if there is an attribute called run
if hasattr(result, "run"):
result = result.run # type: ignore
elif hasattr(result, "get_function"):
result = result.get_function() # type: ignore
if key == "func":
if not isinstance(result, types.FunctionType):
# func can be PythonFunction(code='\ndef upper_case(text: str) -> str:\n return text.upper()\n')
# so we need to check if there is an attribute called run
if hasattr(result, "run"):
result = result.run # type: ignore
elif hasattr(result, "get_function"):
result = result.get_function() # type: ignore
elif inspect.iscoroutinefunction(result):
self.params["coroutine"] = result
else:
# turn result which is a function into a coroutine
# so that it can be awaited
self.params["coroutine"] = sync_to_async(result)
self.params[key] = result
elif isinstance(value, list) and all(

View file

@ -20,6 +20,7 @@ from langchain.llms.loading import load_llm_from_config
from langflow.interface.agents.custom import CUSTOM_AGENTS
from langflow.interface.importing.utils import import_by_type
from langflow.interface.run import fix_memory_inputs
from langflow.interface.toolkits.base import toolkits_creator
from langflow.interface.types import get_type_list
from langflow.interface.utils import load_file_into_dict
@ -106,7 +107,19 @@ def load_flow_from_json(path: str, build=True):
# Nodes, edges and root node
edges = data_graph["edges"]
graph = Graph(nodes, edges)
return graph.build() if build else graph
if build:
langchain_object = graph.build()
if hasattr(langchain_object, "verbose"):
langchain_object.verbose = True
if hasattr(langchain_object, "return_intermediate_steps"):
# https://github.com/hwchase17/langchain/issues/2068
# Deactivating until we have a frontend solution
# to display intermediate steps
langchain_object.return_intermediate_steps = False
fix_memory_inputs(langchain_object)
return langchain_object
return graph
def replace_zero_shot_prompt_with_prompt_template(nodes):

View file

@ -3,7 +3,7 @@ import io
from typing import Any, Dict
from chromadb.errors import NotEnoughElementsException
from langflow.cache.utils import compute_dict_hash, load_cache, memoize_dict
from langflow.cache.base import compute_dict_hash, load_cache, memoize_dict
from langflow.graph.graph import Graph
from langflow.interface import loading
from langflow.utils.logger import logger
@ -32,7 +32,7 @@ def load_or_build_langchain_object(data_graph, is_first_message=False):
return build_langchain_object_with_caching(data_graph)
@memoize_dict(maxsize=1)
@memoize_dict(maxsize=10)
def build_langchain_object_with_caching(data_graph):
"""
Build langchain object from data_graph.
@ -87,7 +87,7 @@ def process_graph(data_graph: Dict[str, Any]):
# Generate result and thought
logger.debug("Generating result and thought")
result, thought = get_result_and_thought_using_graph(langchain_object, message)
result, thought = get_result_and_steps(langchain_object, message)
logger.debug("Generated result and thought")
# Save langchain_object to cache
@ -118,7 +118,7 @@ def process_graph_cached(data_graph: Dict[str, Any]):
# Generate result and thought
logger.debug("Generating result and thought")
result, thought = get_result_and_thought_using_graph(langchain_object, message)
result, thought = get_result_and_steps(langchain_object, message)
logger.debug("Generated result and thought")
return {"result": str(result), "thought": thought.strip()}
@ -184,7 +184,7 @@ def fix_memory_inputs(langchain_object):
update_memory_keys(langchain_object, possible_new_mem_key)
def get_result_and_thought_using_graph(langchain_object, message: str):
def get_result_and_steps(langchain_object, message: str):
"""Get result and thought from extracted json"""
try:
if hasattr(langchain_object, "verbose"):
@ -240,6 +240,61 @@ def get_result_and_thought_using_graph(langchain_object, message: str):
return result, thought
def async_get_result_and_steps(langchain_object, message: str):
"""Get result and thought from extracted json"""
try:
if hasattr(langchain_object, "verbose"):
langchain_object.verbose = True
chat_input = None
memory_key = ""
if hasattr(langchain_object, "memory") and langchain_object.memory is not None:
memory_key = langchain_object.memory.memory_key
if hasattr(langchain_object, "input_keys"):
for key in langchain_object.input_keys:
if key not in [memory_key, "chat_history"]:
chat_input = {key: message}
else:
chat_input = message # type: ignore
if hasattr(langchain_object, "return_intermediate_steps"):
# https://github.com/hwchase17/langchain/issues/2068
# Deactivating until we have a frontend solution
# to display intermediate steps
langchain_object.return_intermediate_steps = False
fix_memory_inputs(langchain_object)
with io.StringIO() as output_buffer, contextlib.redirect_stdout(output_buffer):
try:
# if hasattr(langchain_object, "acall"):
# output = await langchain_object.acall(chat_input)
# else:
output = langchain_object(chat_input)
except ValueError as exc:
# make the error message more informative
logger.debug(f"Error: {str(exc)}")
output = langchain_object.run(chat_input)
intermediate_steps = (
output.get("intermediate_steps", []) if isinstance(output, dict) else []
)
result = (
output.get(langchain_object.output_keys[0])
if isinstance(output, dict)
else output
)
if intermediate_steps:
thought = format_intermediate_steps(intermediate_steps)
else:
thought = output_buffer.getvalue()
except Exception as exc:
raise ValueError(f"Error: {str(exc)}") from exc
return result, thought
def get_result_and_thought(extracted_json: Dict[str, Any], message: str):
"""Get result and thought from extracted json"""
try:

View file

@ -1,5 +1,12 @@
import base64
from io import BytesIO
import json
import os
from PIL.Image import Image
from langchain.callbacks.base import AsyncCallbackManager
from langchain.chat_models import AzureChatOpenAI, ChatOpenAI
from langchain.llms import AzureOpenAI, OpenAI
from langflow.api.callback import StreamingLLMCallbackHandler
import yaml
@ -20,3 +27,30 @@ def load_file_into_dict(file_path: str) -> dict:
raise ValueError("Unsupported file type. Please provide a JSON or YAML file.")
return data
def pil_to_base64(image: Image) -> str:
buffered = BytesIO()
image.save(buffered, format="PNG")
img_str = base64.b64encode(buffered.getvalue())
return img_str.decode("utf-8")
def try_setting_streaming_options(langchain_object, websocket):
# If the LLM type is OpenAI or ChatOpenAI,
# set streaming to True
# First we need to find the LLM
llm = None
if hasattr(langchain_object, "llm"):
llm = langchain_object.llm
elif hasattr(langchain_object, "llm_chain") and hasattr(
langchain_object.llm_chain, "llm"
):
llm = langchain_object.llm_chain.llm
if isinstance(llm, (OpenAI, ChatOpenAI, AzureOpenAI, AzureChatOpenAI)):
llm.streaming = bool(hasattr(llm, "streaming"))
stream_handler = StreamingLLMCallbackHandler(websocket)
stream_manager = AsyncCallbackManager([stream_handler])
llm.callback_manager = stream_manager
return langchain_object

View file

@ -3,6 +3,7 @@ from fastapi.middleware.cors import CORSMiddleware
from langflow.api.endpoints import router as endpoints_router
from langflow.api.validate import router as validate_router
from langflow.api.chat import router as chat_router
def create_app():
@ -23,6 +24,7 @@ def create_app():
app.include_router(endpoints_router)
app.include_router(validate_router)
app.include_router(chat_router)
return app

View file

@ -1,3 +1,5 @@
import asyncio
from functools import partial, wraps
import importlib
import inspect
import re
@ -301,3 +303,15 @@ def update_verbose(d: dict, new_value: bool) -> dict:
elif k == "verbose":
d[k] = new_value
return d
def sync_to_async(func):
"""
Decorator to convert a sync function to an async function.
"""
@wraps(func)
async def async_wrapper(*args, **kwargs):
return func(*args, **kwargs)
return async_wrapper

View file

@ -155,7 +155,7 @@ def create_function(code, function_name):
exec_globals[function_name] = locals()[function_name]
# Return a function that imports necessary modules and calls the target function
def wrapped_function(*args, **kwargs):
async def wrapped_function(*args, **kwargs):
for module_name, module in exec_globals.items():
if isinstance(module, type(importlib)):
globals()[module_name] = module

View file

@ -1,8 +0,0 @@
#! /bin/bash
poetry remove langchain
docker build -t logspace/backend_build -f build.Dockerfile .
VERSION=$(toml get --toml-path pyproject.toml tool.poetry.version)
docker build --build-arg VERSION=$VERSION -t ibiscp/langflow:$VERSION .
docker run -p 5003:80 -d ibiscp/langflow:$VERSION
poetry add --editable ../../../langchain