Merge branch 'streaming' into chatUpdate

This commit is contained in:
anovazzi1 2023-05-09 17:12:44 -03:00
commit 702834731d
7 changed files with 603 additions and 549 deletions

View file

@ -7,7 +7,7 @@ from langflow.api.schemas import ChatResponse
# https://github.com/hwchase17/chat-langchain/blob/master/callback.py
class StreamingLLMCallbackHandler(AsyncCallbackHandler):
class AsyncStreamingLLMCallbackHandler(AsyncCallbackHandler):
"""Callback handler for streaming LLM responses."""
def __init__(self, websocket):
@ -16,3 +16,17 @@ class StreamingLLMCallbackHandler(AsyncCallbackHandler):
async def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
resp = ChatResponse(message=token, type="stream", intermediate_steps="")
await self.websocket.send_json(resp.dict())
class StreamingLLMCallbackHandler(BaseCallbackHandler):
"""Callback handler for streaming LLM responses."""
def __init__(self, websocket):
self.websocket = websocket
def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
resp = ChatResponse(message=token, type="stream", intermediate_steps="")
loop = asyncio.get_event_loop()
coroutine = self.websocket.send_json(resp.dict())
asyncio.run_coroutine_threadsafe(coroutine, loop)

View file

@ -2,6 +2,7 @@ from fastapi import APIRouter, WebSocket
from langflow.api.chat_manager import ChatManager
from langflow.utils.logger import logger
from fastapi import status, WebSocketDisconnect, WebSocketException
router = APIRouter()
chat_manager = ChatManager()
@ -10,5 +11,11 @@ chat_manager = ChatManager()
@router.websocket("/chat/{client_id}")
async def websocket_endpoint(client_id: str, websocket: WebSocket):
"""Websocket endpoint for chat."""
await chat_manager.handle_websocket(client_id, websocket)
try:
await chat_manager.handle_websocket(client_id, websocket)
except WebSocketException as exc:
logger.error(exc)
await websocket.close(code=status.WS_1011_INTERNAL_ERROR, reason=str(exc))
except WebSocketDisconnect as exc:
logger.error(exc)
await websocket.close(code=status.WS_1000_NORMAL_CLOSURE, reason=str(exc))

View file

@ -106,11 +106,11 @@ class ChatManager:
# Process the graph data and chat message
chat_message = payload.pop("message", "")
chat_message = ChatMessage(message=chat_message)
self.chat_history.add_message(client_id, chat_message)
await self.send_json(client_id, chat_message)
graph_data = payload
start_resp = ChatResponse(message=None, type="start", intermediate_steps="")
self.chat_history.add_message(client_id, start_resp)
await self.send_json(client_id, start_resp)
is_first_message = len(self.chat_history.get_history(client_id=client_id)) == 0
# Generate result and thought
@ -144,12 +144,12 @@ class ChatManager:
break
response = ChatResponse(
message="",
message=result,
intermediate_steps=intermediate_steps.strip(),
type="end",
files=file_responses,
)
self.chat_history.add_message(client_id, response)
await self.send_json(client_id, response)
async def handle_websocket(self, client_id: str, websocket: WebSocket):
await self.connect(client_id, websocket)
@ -172,15 +172,24 @@ class ChatManager:
with self.cache_manager.set_client_id(client_id):
await self.process_message(client_id, payload)
# After the message is sent, wait for message built
final_message = await websocket.receive_json()
# If the message is a string, it is a chat message
chat_response = ChatResponse.parse_obj(final_message)
self.chat_history.add_message(client_id, chat_response)
except Exception as e:
# Handle any exceptions that might occur
logger.exception(e)
# send a message to the client
await self.active_connections[client_id].close(code=1000, reason=str(e))
raise e
finally:
# await self.active_connections[client_id].close(
# code=1000, reason="Client disconnected"
# )
try:
await self.active_connections[client_id].close(
code=1000, reason="Client disconnected"
)
except Exception as e:
logger.exception(e)
self.disconnect(client_id)
@ -203,9 +212,8 @@ async def process_graph(
# Generate result and thought
try:
logger.debug("Generating result and thought")
stream_handler = StreamingLLMCallbackHandler(websocket)
result, intermediate_steps = await get_result_and_steps(
langchain_object, chat_message.message or "", callbacks=[stream_handler]
langchain_object, chat_message.message or "", websocket=websocket
)
logger.debug("Generated result and intermediate_steps")
return result, intermediate_steps

View file

@ -74,7 +74,12 @@ def instantiate_class(node_type: str, base_type: str, params: Dict) -> Any:
return loaded_toolkit
elif base_type == "embeddings":
# ? Why remove model from params?
params.pop("model")
try:
params.pop("model")
except KeyError:
pass
# remove all params that are not in class_object.__fields__
try:
return class_object(**params)

View file

@ -2,7 +2,8 @@ import contextlib
import io
from typing import Any, Dict
from chromadb.errors import NotEnoughElementsException # type: ignore
from chromadb.errors import NotEnoughElementsException
from langflow.api.callback import AsyncStreamingLLMCallbackHandler, StreamingLLMCallbackHandler # type: ignore
from langflow.cache.base import compute_dict_hash, load_cache, memoize_dict
from langflow.graph.graph import Graph
@ -185,11 +186,9 @@ def fix_memory_inputs(langchain_object):
update_memory_keys(langchain_object, possible_new_mem_key)
async def get_result_and_steps(langchain_object, message: str, callbacks=None):
async def get_result_and_steps(langchain_object, message: str, **kwargs):
"""Get result and thought from extracted json"""
if callbacks is None:
callbacks = []
try:
if hasattr(langchain_object, "verbose"):
langchain_object.verbose = True
@ -215,11 +214,15 @@ async def get_result_and_steps(langchain_object, message: str, callbacks=None):
with io.StringIO() as output_buffer, contextlib.redirect_stdout(output_buffer):
try:
output = await langchain_object.acall(chat_input, callbacks=callbacks)
except ValueError as exc:
async_callbacks = [AsyncStreamingLLMCallbackHandler(**kwargs)]
output = await langchain_object.acall(
chat_input, callbacks=async_callbacks
)
except Exception as exc:
# make the error message more informative
logger.debug(f"Error: {str(exc)}")
output = langchain_object.run(chat_input, callbacks=callbacks)
sync_callbacks = [StreamingLLMCallbackHandler(**kwargs)]
output = langchain_object(chat_input, callbacks=sync_callbacks)
intermediate_steps = (
output.get("intermediate_steps", []) if isinstance(output, dict) else []

View file

@ -14,366 +14,382 @@ import ChatInput from "./chatInput";
const _ = require("lodash");
export default function ChatModal({
flow,
open,
setOpen,
flow,
open,
setOpen,
}: {
open: boolean;
setOpen: Function;
flow: FlowType;
open: boolean;
setOpen: Function;
flow: FlowType;
}) {
const [chatValue, setChatValue] = useState("");
const [chatHistory, setChatHistory] = useState<ChatMessageType[]>([]);
const { reactFlowInstance } = useContext(typesContext);
const { setErrorData, setNoticeData } = useContext(alertContext);
const ws = useRef<WebSocket | null>(null);
const [lockChat, setLockChat] = useState(false);
const isOpen = useRef(open);
const [chatValue, setChatValue] = useState("");
const [chatHistory, setChatHistory] = useState<ChatMessageType[]>([]);
const { reactFlowInstance } = useContext(typesContext);
const { setErrorData, setNoticeData } = useContext(alertContext);
const ws = useRef<WebSocket | null>(null);
const [lockChat, setLockChat] = useState(false);
const isOpen = useRef(open);
useEffect(() => {
isOpen.current = open;
}, [open]);
var isStream = false;
useEffect(() => {
isOpen.current = open;
}, [open]);
var isStream = false;
const addChatHistory = (
message: string,
isSend: boolean,
thought?: string,
files?: Array<any>
) => {
setChatHistory((old) => {
let newChat = _.cloneDeep(old);
if (files) {
newChat.push({ message, isSend, files, thought });
} else if (thought) {
newChat.push({ message, isSend, thought });
} else {
newChat.push({ message, isSend });
}
return newChat;
});
};
const addChatHistory = (
message: string,
isSend: boolean,
thought?: string,
files?: Array<any>
) => {
setChatHistory((old) => {
let newChat = _.cloneDeep(old);
if (files) {
newChat.push({ message, isSend, files, thought });
} else if (thought) {
newChat.push({ message, isSend, thought });
} else {
newChat.push({ message, isSend });
}
return newChat;
});
};
//add proper type signature for function
//add proper type signature for function
function updateLastMessage({str,thought}:{str?: string, thought?: string}) {
setChatHistory((old) => {
let newChat = [...old];
if (str) {
newChat[newChat.length - 1].message =
newChat[newChat.length - 1].message + str;
}
if(thought){
newChat[newChat.length - 1].thought = thought
}
return newChat;
});
}
function updateLastMessage({
str,
thought,
end = false,
}: {
str?: string;
thought?: string;
// end param default is false
end?: boolean;
}) {
setChatHistory((old) => {
let newChat = [...old];
if (str) {
if (end && !newChat[newChat.length - 1].message) {
newChat[newChat.length - 1].message = str;
}
newChat[newChat.length - 1].message =
newChat[newChat.length - 1].message + str;
}
if (thought) {
newChat[newChat.length - 1].thought = thought;
}
return newChat;
});
}
function handleOnClose(event: CloseEvent) {
if (isOpen.current) {
setLockChat(false);
setTimeout(() => {
connectWS();
}, 1000);
}
}
function handleOnClose(event: CloseEvent) {
if (isOpen.current) {
setLockChat(false);
setTimeout(() => {
connectWS();
}, 1000);
}
}
function handleWsMessage(data: any) {
if (Array.isArray(data)) {
//set chat history
setChatHistory((_) => {
let newChatHistory: ChatMessageType[] = [];
data.forEach(
(chatItem: {
intermediate_steps?: "string";
is_bot: boolean;
message: string;
type: string;
files?: Array<any>;
}) => {
if (chatItem.message) {
newChatHistory.push(
chatItem.files
? {
isSend: !chatItem.is_bot,
message: chatItem.message,
thought: chatItem.intermediate_steps,
files: chatItem.files,
}
: {
isSend: !chatItem.is_bot,
message: chatItem.message,
thought: chatItem.intermediate_steps,
}
);
}
}
);
return newChatHistory;
});
}
if (data.type === "start") {
console.log("start");
addChatHistory("", false);
isStream = true;
}
if (data.type === "end") {
if(data.intermediate_steps){
updateLastMessage({thought:data.intermediate_steps});
}
setLockChat(false);
isStream = false;
}
if (data.type === "file") {
console.log(data);
}
if (data.type === "stream" && isStream) {
updateLastMessage({str:data.message});
}
}
function handleWsMessage(data: any) {
if (Array.isArray(data)) {
//set chat history
setChatHistory((_) => {
let newChatHistory: ChatMessageType[] = [];
data.forEach(
(chatItem: {
intermediate_steps?: "string";
is_bot: boolean;
message: string;
type: string;
files?: Array<any>;
}) => {
if (chatItem.message) {
newChatHistory.push(
chatItem.files
? {
isSend: !chatItem.is_bot,
message: chatItem.message,
thought: chatItem.intermediate_steps,
files: chatItem.files,
}
: {
isSend: !chatItem.is_bot,
message: chatItem.message,
thought: chatItem.intermediate_steps,
}
);
}
}
);
return newChatHistory;
});
}
if (data.type === "start") {
console.log("start");
addChatHistory("", false);
isStream = true;
}
if (data.type === "end") {
if (data.intermediate_steps) {
updateLastMessage({
str: data.message,
thought: data.intermediate_steps,
end: true,
});
}
setLockChat(false);
isStream = false;
}
if (data.type === "file") {
console.log(data);
}
if (data.type === "stream" && isStream) {
updateLastMessage({ str: data.message });
}
}
function connectWS() {
try {
const urlWs =
process.env.NODE_ENV === "development"
? `ws://localhost:7860/chat/${flow.id}`
: `${window.location.protocol === "https:" ? "wss" : "ws"}://${
window.location.host
}/chat/${flow.id}`;
function connectWS() {
try {
const urlWs =
process.env.NODE_ENV === "development"
? `ws://localhost:7860/chat/${flow.id}`
: `${window.location.protocol === "https:" ? "wss" : "ws"}://${
window.location.host
}/chat/${flow.id}`;
const newWs = new WebSocket(urlWs);
newWs.onopen = () => {
console.log("WebSocket connection established!");
};
console.log(flow.id);
newWs.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log("Received data:", data);
handleWsMessage(data);
//get chat history
};
newWs.onclose = (event) => {
handleOnClose(event);
};
newWs.onerror = (ev) => {
console.log(ev, "error");
setErrorData({
title: "There was an error on web connection, please: ",
list: [
"Refresh the page",
"Use a new flow tab",
"Check if the backend is up",
],
});
};
ws.current = newWs;
} catch {
setErrorData({
title: "There was an error on web connection, please: ",
list: [
"Refresh the page",
"Use a new flow tab",
"Check if the backend is up",
],
});
}
}
const newWs = new WebSocket(urlWs);
newWs.onopen = () => {
console.log("WebSocket connection established!");
};
console.log(flow.id);
newWs.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log("Received data:", data);
handleWsMessage(data);
//get chat history
};
newWs.onclose = (event) => {
handleOnClose(event);
};
newWs.onerror = (ev) => {
console.log(ev, "error");
setErrorData({
title: "There was an error on web connection, please: ",
list: [
"Refresh the page",
"Use a new flow tab",
"Check if the backend is up",
],
});
};
ws.current = newWs;
} catch {
setErrorData({
title: "There was an error on web connection, please: ",
list: [
"Refresh the page",
"Use a new flow tab",
"Check if the backend is up",
],
});
}
}
useEffect(() => {
connectWS();
return () => {
console.log("unmount");
console.log(ws);
if (ws) {
ws.current.close();
}
};
}, []);
useEffect(() => {
connectWS();
return () => {
console.log("unmount");
console.log(ws);
if (ws) {
ws.current.close();
}
};
}, []);
async function sendAll(data: sendAllProps) {
try {
if (ws) {
ws.current.send(JSON.stringify(data));
}
} catch (error) {
setErrorData({
title: "There was an erro sending the message",
list: [error.message],
});
setChatValue(data.message);
connectWS();
}
}
async function sendAll(data: sendAllProps) {
try {
if (ws) {
ws.current.send(JSON.stringify(data));
}
} catch (error) {
setErrorData({
title: "There was an erro sending the message",
list: [error.message],
});
setChatValue(data.message);
connectWS();
}
}
useEffect(() => {
if (ref.current) ref.current.scrollIntoView({ behavior: "smooth" });
}, [chatHistory]);
useEffect(() => {
if (ref.current) ref.current.scrollIntoView({ behavior: "smooth" });
}, [chatHistory]);
function validateNode(n: NodeType): Array<string> {
if (!n.data?.node?.template || !Object.keys(n.data.node.template)) {
setNoticeData({
title:
"We've noticed a potential issue with a node in the flow. Please review it and, if necessary, submit a bug report with your exported flow file. Thank you for your help!",
});
return [];
}
function validateNode(n: NodeType): Array<string> {
if (!n.data?.node?.template || !Object.keys(n.data.node.template)) {
setNoticeData({
title:
"We've noticed a potential issue with a node in the flow. Please review it and, if necessary, submit a bug report with your exported flow file. Thank you for your help!",
});
return [];
}
const {
type,
node: { template },
} = n.data;
const {
type,
node: { template },
} = n.data;
return Object.keys(template).reduce(
(errors: Array<string>, t) =>
errors.concat(
template[t].required &&
template[t].show &&
(!template[t].value || template[t].value === "") &&
!reactFlowInstance
.getEdges()
.some(
(e) =>
e.targetHandle.split("|")[1] === t &&
e.targetHandle.split("|")[2] === n.id
)
? [
`${type} is missing ${
template.display_name
? template.display_name
: toNormalCase(template[t].name)
}.`,
]
: []
),
[] as string[]
);
}
return Object.keys(template).reduce(
(errors: Array<string>, t) =>
errors.concat(
template[t].required &&
template[t].show &&
(!template[t].value || template[t].value === "") &&
!reactFlowInstance
.getEdges()
.some(
(e) =>
e.targetHandle.split("|")[1] === t &&
e.targetHandle.split("|")[2] === n.id
)
? [
`${type} is missing ${
template.display_name
? template.display_name
: toNormalCase(template[t].name)
}.`,
]
: []
),
[] as string[]
);
}
function validateNodes() {
return reactFlowInstance
.getNodes()
.flatMap((n: NodeType) => validateNode(n));
}
function validateNodes() {
return reactFlowInstance
.getNodes()
.flatMap((n: NodeType) => validateNode(n));
}
const ref = useRef(null);
const ref = useRef(null);
function sendMessage() {
if (chatValue !== "") {
let nodeValidationErrors = validateNodes();
if (nodeValidationErrors.length === 0) {
setLockChat(true);
let message = chatValue;
setChatValue("");
addChatHistory(message, true);
sendAll({
...reactFlowInstance.toObject(),
message,
chatHistory,
name: flow.name,
description: flow.description,
});
} else {
setErrorData({
title: "Oops! Looks like you missed some required information:",
list: nodeValidationErrors,
});
}
} else {
setErrorData({
title: "Error sending message",
list: ["The message cannot be empty."],
});
}
}
function clearChat() {
setChatHistory([]);
ws.current.send(JSON.stringify({ clear_history: true }));
}
function sendMessage() {
if (chatValue !== "") {
let nodeValidationErrors = validateNodes();
if (nodeValidationErrors.length === 0) {
setLockChat(true);
let message = chatValue;
setChatValue("");
addChatHistory(message, true);
sendAll({
...reactFlowInstance.toObject(),
message,
chatHistory,
name: flow.name,
description: flow.description,
});
} else {
setErrorData({
title: "Oops! Looks like you missed some required information:",
list: nodeValidationErrors,
});
}
} else {
setErrorData({
title: "Error sending message",
list: ["The message cannot be empty."],
});
}
}
function clearChat() {
setChatHistory([]);
ws.current.send(JSON.stringify({ clear_history: true }));
}
function setModalOpen(x: boolean) {
setOpen(x);
}
return (
<Transition.Root show={open} appear={open} as={Fragment}>
<Dialog
as="div"
className="relative z-10"
onClose={setModalOpen}
initialFocus={ref}
>
<Transition.Child
as={Fragment}
enter="ease-out duration-300"
enterFrom="opacity-0"
enterTo="opacity-100"
leave="ease-in duration-200"
leaveFrom="opacity-100"
leaveTo="opacity-0"
>
<div className="fixed inset-0 bg-black backdrop-blur-sm dark:bg-gray-600 dark:bg-opacity-80 bg-opacity-80 transition-opacity" />
</Transition.Child>
function setModalOpen(x: boolean) {
setOpen(x);
}
return (
<Transition.Root show={open} appear={open} as={Fragment}>
<Dialog
as="div"
className="relative z-10"
onClose={setModalOpen}
initialFocus={ref}
>
<Transition.Child
as={Fragment}
enter="ease-out duration-300"
enterFrom="opacity-0"
enterTo="opacity-100"
leave="ease-in duration-200"
leaveFrom="opacity-100"
leaveTo="opacity-0"
>
<div className="fixed inset-0 bg-black backdrop-blur-sm dark:bg-gray-600 dark:bg-opacity-80 bg-opacity-80 transition-opacity" />
</Transition.Child>
<div className="fixed inset-0 z-10 overflow-y-auto">
<div className="flex h-full items-end justify-center p-4 text-center sm:items-center sm:p-0">
<Transition.Child
as={Fragment}
enter="ease-out duration-300"
enterFrom="opacity-0 translate-y-4 sm:translate-y-0 sm:scale-95"
enterTo="opacity-100 translate-y-0 sm:scale-100"
leave="ease-in duration-200"
leaveFrom="opacity-100 translate-y-0 sm:scale-100"
leaveTo="opacity-0 translate-y-4 sm:translate-y-0 sm:scale-95"
>
<Dialog.Panel className=" drop-shadow-2xl relative flex flex-col justify-between transform h-[95%] overflow-hidden rounded-lg bg-white dark:bg-gray-800 text-left shadow-xl transition-all sm:my-8 w-[690px]">
<div className="relative w-full">
<button
onClick={() => clearChat()}
className="absolute top-2 right-2 hover:text-red-500"
>
<FaEraser className="w-4 h-4" />
</button>
</div>
<div className="w-full h-full bg-white dark:bg-gray-800 border-t dark:border-t-gray-600 flex-col flex items-center overflow-scroll scrollbar-hide">
{chatHistory.length > 0 ? (
chatHistory.map((c, i) => <ChatMessage chat={c} key={i} />)
) : (
<div className="flex flex-col h-full text-center justify-center w-full items-center align-middle ">
<span>
👋{" "}
<span className="text-gray-600 text-lg">
LangFlow Chat
</span>
</span>
<br />
<div className="bg-gray-100 rounded-md w-2/4 px-6 py-8 border border-gray-200">
<span className="text-base text-gray-500">
Start a conversation and click the agents thoughts{" "}
<span>
<ChatBubbleOvalLeftEllipsisIcon className="w-6 h-6 inline animate-bounce " />
</span>{" "}
to inspect the chaining process.
</span>
</div>
</div>
)}
<div ref={ref}></div>
</div>
<div className="w-full bg-white dark:bg-gray-800 border-t dark:border-t-gray-600 flex-col flex items-center justify-between p-3">
<div className="relative w-full mt-1 rounded-md shadow-sm">
<ChatInput
chatValue={chatValue}
lockChat={lockChat}
sendMessage={sendMessage}
setChatValue={setChatValue}
/>
</div>
</div>
</Dialog.Panel>
</Transition.Child>
</div>
</div>
</Dialog>
</Transition.Root>
);
<div className="fixed inset-0 z-10 overflow-y-auto">
<div className="flex h-full items-end justify-center p-4 text-center sm:items-center sm:p-0">
<Transition.Child
as={Fragment}
enter="ease-out duration-300"
enterFrom="opacity-0 translate-y-4 sm:translate-y-0 sm:scale-95"
enterTo="opacity-100 translate-y-0 sm:scale-100"
leave="ease-in duration-200"
leaveFrom="opacity-100 translate-y-0 sm:scale-100"
leaveTo="opacity-0 translate-y-4 sm:translate-y-0 sm:scale-95"
>
<Dialog.Panel className=" drop-shadow-2xl relative flex flex-col justify-between transform h-[95%] overflow-hidden rounded-lg bg-white dark:bg-gray-800 text-left shadow-xl transition-all sm:my-8 w-[690px]">
<div className="relative w-full">
<button
onClick={() => clearChat()}
className="absolute top-2 right-2 hover:text-red-500"
>
<FaEraser className="w-4 h-4" />
</button>
</div>
<div className="w-full h-full bg-white dark:bg-gray-800 border-t dark:border-t-gray-600 flex-col flex items-center overflow-scroll scrollbar-hide">
{chatHistory.length > 0 ? (
chatHistory.map((c, i) => <ChatMessage chat={c} key={i} />)
) : (
<div className="flex flex-col h-full text-center justify-center w-full items-center align-middle ">
<span>
👋{" "}
<span className="text-gray-600 text-lg">
LangFlow Chat
</span>
</span>
<br />
<div className="bg-gray-100 rounded-md w-2/4 px-6 py-8 border border-gray-200">
<span className="text-base text-gray-500">
Start a conversation and click the agents thoughts{" "}
<span>
<ChatBubbleOvalLeftEllipsisIcon className="w-6 h-6 inline animate-bounce " />
</span>{" "}
to inspect the chaining process.
</span>
</div>
</div>
)}
<div ref={ref}></div>
</div>
<div className="w-full bg-white dark:bg-gray-800 border-t dark:border-t-gray-600 flex-col flex items-center justify-between p-3">
<div className="relative w-full mt-1 rounded-md shadow-sm">
<ChatInput
chatValue={chatValue}
lockChat={lockChat}
sendMessage={sendMessage}
setChatValue={setChatValue}
/>
</div>
</div>
</Dialog.Panel>
</Transition.Child>
</div>
</div>
</Dialog>
</Transition.Root>
);
}