Merge remote-tracking branch 'origin/zustand/io/migration' into ioView
This commit is contained in:
commit
ee28d18a13
10 changed files with 236 additions and 22 deletions
|
|
@ -223,7 +223,7 @@ def build_and_cache_graph(
|
|||
flow: Flow = session.get(Flow, flow_id)
|
||||
if not flow or not flow.data:
|
||||
raise ValueError("Invalid flow ID")
|
||||
other_graph = Graph.from_payload(flow.data)
|
||||
other_graph = Graph.from_payload(flow.data, flow_id)
|
||||
if graph is None:
|
||||
graph = other_graph
|
||||
else:
|
||||
|
|
|
|||
|
|
@ -10,12 +10,14 @@ from fastapi import (
|
|||
WebSocketException,
|
||||
status,
|
||||
)
|
||||
from fastapi.responses import StreamingResponse
|
||||
from loguru import logger
|
||||
from sqlmodel import Session
|
||||
|
||||
from langflow.api.utils import build_and_cache_graph, format_elapsed_time
|
||||
from langflow.api.v1.schemas import (
|
||||
ResultData,
|
||||
StreamData,
|
||||
VertexBuildResponse,
|
||||
VerticesOrderResponse,
|
||||
)
|
||||
|
|
@ -204,3 +206,51 @@ async def build_vertex(
|
|||
logger.error(f"Error building vertex: {exc}")
|
||||
logger.exception(exc)
|
||||
raise HTTPException(status_code=500, detail=str(exc)) from exc
|
||||
|
||||
|
||||
# Now onto an endpoint that is an SSE endpoint
|
||||
# it will receive a component_id and a flow_id
|
||||
#
|
||||
@router.get("/build/{flow_id}/{vertex_id}/stream", response_class=StreamingResponse)
|
||||
async def build_vertex_stream(
|
||||
flow_id: str,
|
||||
vertex_id: str,
|
||||
chat_service: "ChatService" = Depends(get_chat_service),
|
||||
):
|
||||
"""Build a vertex instead of the entire graph."""
|
||||
try:
|
||||
|
||||
async def stream_vertex():
|
||||
try:
|
||||
cache = chat_service.get_cache(flow_id)
|
||||
if not cache:
|
||||
# If there's no cache
|
||||
raise ValueError(f"No cache found for {flow_id}.")
|
||||
else:
|
||||
graph = cache.get("result")
|
||||
|
||||
vertex = graph.get_vertex(vertex_id)
|
||||
if not vertex.pinned or not vertex._built:
|
||||
stream_data = StreamData(
|
||||
event="message",
|
||||
data={"message": "Building vertex"},
|
||||
)
|
||||
yield str(stream_data)
|
||||
|
||||
async for chunk in vertex.stream():
|
||||
stream_data = StreamData(
|
||||
event="message",
|
||||
data={"chunk": chunk},
|
||||
)
|
||||
yield str(stream_data)
|
||||
else:
|
||||
raise ValueError(f"No result found for vertex {vertex_id}")
|
||||
|
||||
except Exception as exc:
|
||||
yield str(StreamData(event="error", data={"error": str(exc)}))
|
||||
|
||||
yield str(StreamData(event="close", data={"message": "Stream closed"}))
|
||||
|
||||
return StreamingResponse(stream_vertex(), media_type="text/event-stream")
|
||||
except Exception as exc:
|
||||
raise HTTPException(status_code=500, detail="Error building vertex") from exc
|
||||
|
|
|
|||
|
|
@ -1,10 +1,11 @@
|
|||
from typing import TYPE_CHECKING, Any, List, Optional
|
||||
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from langflow.graph.edge.utils import build_clean_params
|
||||
from langflow.services.deps import get_monitor_service
|
||||
from langflow.services.monitor.utils import log_message
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from langflow.graph.vertex.base import Vertex
|
||||
|
|
@ -135,11 +136,11 @@ class ContractEdge(Edge):
|
|||
|
||||
log_transaction(self, source, target, "success")
|
||||
# If the target vertex is a power component we log messages
|
||||
if (
|
||||
target.vertex_type == "ChatOutput"
|
||||
and isinstance(target.params.get("message"), str)
|
||||
or isinstance(target.params.get("message"), dict)
|
||||
if target.vertex_type == "ChatOutput" and (
|
||||
isinstance(target.params.get("message"), str) or isinstance(target.params.get("message"), dict)
|
||||
):
|
||||
if target.params.get("message") == "":
|
||||
return self.result
|
||||
await log_message(
|
||||
sender=target.params.get("sender", ""),
|
||||
sender_name=target.params.get("sender_name", ""),
|
||||
|
|
@ -168,3 +169,4 @@ def log_transaction(edge: ContractEdge, source: "Vertex", target: "Vertex", stat
|
|||
monitor_service.add_row(table_name="transactions", data=data)
|
||||
except Exception as e:
|
||||
logger.error(f"Error logging transaction: {e}")
|
||||
logger.error(f"Error logging transaction: {e}")
|
||||
|
|
|
|||
|
|
@ -26,6 +26,7 @@ class Graph:
|
|||
self,
|
||||
nodes: List[Dict],
|
||||
edges: List[Dict[str, str]],
|
||||
flow_id: Optional[str] = None,
|
||||
) -> None:
|
||||
self.inputs = []
|
||||
self.outputs = []
|
||||
|
|
@ -34,6 +35,7 @@ class Graph:
|
|||
self.raw_graph_data = {"nodes": nodes, "edges": edges}
|
||||
self._runs = 0
|
||||
self._updates = 0
|
||||
self.flow_id = flow_id
|
||||
|
||||
self.top_level_vertices = []
|
||||
for vertex in self._vertices:
|
||||
|
|
@ -113,7 +115,7 @@ class Graph:
|
|||
return predecessor_map, successor_map
|
||||
|
||||
@classmethod
|
||||
def from_payload(cls, payload: Dict) -> "Graph":
|
||||
def from_payload(cls, payload: Dict, flow_id: str) -> "Graph":
|
||||
"""
|
||||
Creates a graph from a payload.
|
||||
|
||||
|
|
@ -128,7 +130,7 @@ class Graph:
|
|||
try:
|
||||
vertices = payload["nodes"]
|
||||
edges = payload["edges"]
|
||||
return cls(vertices, edges)
|
||||
return cls(vertices, edges, flow_id)
|
||||
except KeyError as exc:
|
||||
logger.exception(exc)
|
||||
raise ValueError(
|
||||
|
|
|
|||
|
|
@ -1,14 +1,16 @@
|
|||
import ast
|
||||
import json
|
||||
from typing import Callable, Dict, List, Optional, Union
|
||||
from typing import AsyncIterator, Callable, Dict, Iterator, List, Optional, Union
|
||||
|
||||
import yaml
|
||||
from langchain_core.messages import AIMessage
|
||||
from loguru import logger
|
||||
|
||||
from langflow.graph.utils import UnbuiltObject, flatten_list
|
||||
from langflow.graph.vertex.base import StatefulVertex, StatelessVertex
|
||||
from langflow.interface.utils import extract_input_variables_from_prompt
|
||||
from langflow.schema import Record
|
||||
from langflow.services.monitor.utils import log_message
|
||||
from langflow.utils.schemas import ChatOutputResponse
|
||||
|
||||
|
||||
|
|
@ -314,6 +316,20 @@ class ChatVertex(StatelessVertex):
|
|||
super().__init__(data, graph=graph, base_type="custom_components", is_task=True)
|
||||
self.steps = [self._build, self._run]
|
||||
|
||||
def build_stream_url(self):
|
||||
return f"/api/v1/build/{self.graph.flow_id}/{self.id}/stream"
|
||||
|
||||
async def _build(self, user_id=None):
|
||||
"""
|
||||
Initiate the build process.
|
||||
"""
|
||||
logger.debug(f"Building {self.vertex_type}")
|
||||
await self._build_each_node_in_params_dict(user_id)
|
||||
await self._get_and_instantiate_class(user_id)
|
||||
self._validate_built_object()
|
||||
|
||||
self._built = True
|
||||
|
||||
def _built_object_repr(self):
|
||||
if self.task_id and self.is_task:
|
||||
if task := self.get_task():
|
||||
|
|
@ -332,6 +348,8 @@ class ChatVertex(StatelessVertex):
|
|||
artifacts = None
|
||||
sender = self.params.get("sender", None)
|
||||
sender_name = self.params.get("sender_name", None)
|
||||
message = self.params.get("message", None)
|
||||
stream_url = None
|
||||
if isinstance(self._built_object, AIMessage):
|
||||
artifacts = ChatOutputResponse.from_message(
|
||||
self._built_object,
|
||||
|
|
@ -345,8 +363,13 @@ class ChatVertex(StatelessVertex):
|
|||
message = dict_to_codeblock(self._built_object)
|
||||
elif isinstance(self._built_object, Record):
|
||||
message = self._built_object.text
|
||||
elif isinstance(message, (AsyncIterator, Iterator)):
|
||||
stream_url = self.build_stream_url()
|
||||
message = ""
|
||||
elif not isinstance(self._built_object, str):
|
||||
message = str(self._built_object)
|
||||
# if the message is a generator or iterator
|
||||
# it means that it is a stream of messages
|
||||
else:
|
||||
message = self._built_object
|
||||
|
||||
|
|
@ -354,14 +377,56 @@ class ChatVertex(StatelessVertex):
|
|||
message=message,
|
||||
sender=sender,
|
||||
sender_name=sender_name,
|
||||
stream_url=stream_url,
|
||||
)
|
||||
if artifacts:
|
||||
self.artifacts = artifacts.model_dump()
|
||||
if isinstance(self._built_object, (AsyncIterator, Iterator)):
|
||||
if self.params["as_record"]:
|
||||
self._built_object = Record(text=message, data=self.artifacts)
|
||||
else:
|
||||
self._built_object = message
|
||||
self._built_result = self._built_object
|
||||
|
||||
else:
|
||||
await super()._run(*args, **kwargs)
|
||||
|
||||
async def stream(self):
|
||||
iterator = self.params.get("message", None)
|
||||
if not isinstance(iterator, (AsyncIterator, Iterator)):
|
||||
raise ValueError("The message must be an iterator or an async iterator.")
|
||||
is_async = isinstance(iterator, AsyncIterator)
|
||||
complete_message = ""
|
||||
if is_async:
|
||||
async for message in iterator:
|
||||
message = message.content if hasattr(message, "content") else message
|
||||
message = message.text if hasattr(message, "text") else message
|
||||
yield message
|
||||
complete_message += message
|
||||
else:
|
||||
for message in iterator:
|
||||
message = message.content if hasattr(message, "content") else message
|
||||
message = message.text if hasattr(message, "text") else message
|
||||
yield message
|
||||
complete_message += message
|
||||
self._built_object = Record(text=complete_message, data=self.artifacts)
|
||||
self._built_result = complete_message
|
||||
# Update artifacts with the message
|
||||
# and remove the stream_url
|
||||
self.artifacts = ChatOutputResponse(
|
||||
message=complete_message,
|
||||
sender=self.params.get("sender", ""),
|
||||
sender_name=self.params.get("sender_name", ""),
|
||||
).model_dump()
|
||||
|
||||
await log_message(
|
||||
sender=self.params.get("sender", ""),
|
||||
sender_name=self.params.get("sender_name", ""),
|
||||
message=complete_message,
|
||||
session_id=self.params.get("session_id", ""),
|
||||
artifacts=self.artifacts,
|
||||
)
|
||||
|
||||
|
||||
class RoutingVertex(StatelessVertex):
|
||||
def __init__(self, data: Dict, graph):
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ class ChatOutputResponse(BaseModel):
|
|||
message: Union[str, List[Union[str, Dict]]]
|
||||
sender: Optional[str] = "Machine"
|
||||
sender_name: Optional[str] = "AI"
|
||||
stream_url: Optional[str] = None
|
||||
|
||||
@classmethod
|
||||
def from_message(
|
||||
|
|
|
|||
|
|
@ -174,6 +174,8 @@ export default function GenericNode({
|
|||
return "green-status";
|
||||
} else if (!isValid && buildStatus === BuildStatus.INACTIVE) {
|
||||
return "gray-status";
|
||||
} else if (!validationStatus && buildStatus === BuildStatus.BUILT) {
|
||||
return "green-status";
|
||||
} else if (!isValid && buildStatus === BuildStatus.BUILT) {
|
||||
return "red-status";
|
||||
} else if (!validationStatus) {
|
||||
|
|
@ -202,7 +204,6 @@ export default function GenericNode({
|
|||
let isInvalid = validationStatus && !validationStatus.valid;
|
||||
if (buildStatus === BuildStatus.INACTIVE && isInvalid) {
|
||||
// INACTIVE should have its own class
|
||||
// different from BUILT and TO_BUILD
|
||||
return "inactive-status";
|
||||
}
|
||||
if (buildStatus === BuildStatus.BUILT && isInvalid) {
|
||||
|
|
|
|||
|
|
@ -17,12 +17,65 @@ export default function ChatMessage({
|
|||
chat,
|
||||
lockChat,
|
||||
lastMessage,
|
||||
updateChat,
|
||||
}: chatMessagePropsType): JSX.Element {
|
||||
const convert = new Convert({ newline: true });
|
||||
const [hidden, setHidden] = useState(true);
|
||||
const template = chat.template;
|
||||
const [promptOpen, setPromptOpen] = useState(false);
|
||||
const chat_message = chat.message.toString();
|
||||
const [streamUrl, setStreamUrl] = useState(chat.stream_url);
|
||||
const [chatMessage, setChatMessage] = useState(chat.message.toString());
|
||||
const [isStreaming, setIsStreaming] = useState(false);
|
||||
|
||||
// The idea now is that chat.stream_url MAY be a URL if we should stream the output of the chat
|
||||
// probably the message is empty when we have a stream_url
|
||||
// what we need is to update the chat_message with the SSE data
|
||||
const streamChunks = (url: string) => {
|
||||
setIsStreaming(true); // Streaming starts
|
||||
return new Promise<boolean>((resolve, reject) => {
|
||||
const eventSource = new EventSource(url);
|
||||
eventSource.onmessage = (event) => {
|
||||
let parsedData = JSON.parse(event.data);
|
||||
if (parsedData.chunk) {
|
||||
setChatMessage((prev) => prev + parsedData.chunk);
|
||||
}
|
||||
};
|
||||
eventSource.onerror = (event) => {
|
||||
reject(new Error("Streaming failed"));
|
||||
setIsStreaming(false);
|
||||
eventSource.close();
|
||||
};
|
||||
eventSource.addEventListener("close", (event) => {
|
||||
setStreamUrl(null); // Update state to reflect the stream is closed
|
||||
resolve(true);
|
||||
setIsStreaming(false);
|
||||
eventSource.close();
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
useEffect(() => {
|
||||
if (streamUrl && chat.message === "") {
|
||||
streamChunks(streamUrl)
|
||||
.then(() => {
|
||||
if (updateChat) {
|
||||
updateChat(chat, chatMessage, streamUrl);
|
||||
}
|
||||
})
|
||||
.catch((error) => {
|
||||
console.error(error);
|
||||
});
|
||||
}
|
||||
}, [streamUrl]);
|
||||
|
||||
useEffect(() => {
|
||||
// This effect is specifically for calling updateChat after streaming ends
|
||||
if (!isStreaming && streamUrl === null) {
|
||||
if (updateChat) {
|
||||
updateChat(chat, chatMessage, streamUrl);
|
||||
}
|
||||
}
|
||||
}, [isStreaming]);
|
||||
|
||||
useEffect(() => {
|
||||
const element = document.getElementById("last-chat-message");
|
||||
|
|
@ -103,7 +156,7 @@ export default function ChatMessage({
|
|||
remarkPlugins={[remarkGfm, remarkMath]}
|
||||
rehypePlugins={[rehypeMathjax]}
|
||||
className="markdown prose min-w-full text-primary word-break-break-word
|
||||
dark:prose-invert"
|
||||
dark:prose-invert"
|
||||
components={{
|
||||
pre({ node, ...props }) {
|
||||
return <>{props.children}</>;
|
||||
|
|
@ -161,10 +214,10 @@ export default function ChatMessage({
|
|||
},
|
||||
}}
|
||||
>
|
||||
{chat_message}
|
||||
{chatMessage}
|
||||
</Markdown>
|
||||
),
|
||||
[chat.message, chat_message]
|
||||
[chat.message, chatMessage]
|
||||
)}
|
||||
</div>
|
||||
{chat.files && (
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
import { cloneDeep } from "lodash";
|
||||
import _, { cloneDeep } from "lodash";
|
||||
import { useEffect, useRef, useState } from "react";
|
||||
import IconComponent from "../../components/genericIconComponent";
|
||||
import { deleteFlowPool } from "../../controllers/API";
|
||||
|
|
@ -19,7 +19,6 @@ import ChatMessage from "./chatMessage";
|
|||
|
||||
export default function NewChatView(): JSX.Element {
|
||||
const [chatValue, setChatValue] = useState("");
|
||||
const [chatHistory, setChatHistory] = useState<ChatMessageType[]>([]);
|
||||
const {
|
||||
flowPool,
|
||||
outputs,
|
||||
|
|
@ -36,6 +35,7 @@ export default function NewChatView(): JSX.Element {
|
|||
const [lockChat, setLockChat] = useState(false);
|
||||
const messagesRef = useRef<HTMLDivElement | null>(null);
|
||||
const isBuilding = useFlowStore((state) => state.isBuilding);
|
||||
const [chatHistory, setChatHistory] = useState<ChatMessageType[]>([]);
|
||||
|
||||
const inputTypes = inputs.map((obj) => obj.type);
|
||||
const inputIds = inputs.map((obj) => obj.id);
|
||||
|
|
@ -67,20 +67,30 @@ export default function NewChatView(): JSX.Element {
|
|||
});
|
||||
const chatMessages: ChatMessageType[] = chatOutputResponses
|
||||
.sort((a, b) => Date.parse(a.timestamp) - Date.parse(b.timestamp))
|
||||
.filter((output) => !!output.data.artifacts?.message)
|
||||
.map((output) => {
|
||||
//
|
||||
.filter((output) => output.data.artifacts?.message !== null)
|
||||
.map((output, index) => {
|
||||
try {
|
||||
const { sender, message, sender_name } = output.data
|
||||
const { sender, message, sender_name, stream_url } = output.data
|
||||
.artifacts as ChatOutputType;
|
||||
|
||||
const componentId = output.id + index;
|
||||
|
||||
const is_ai = sender === "Machine" || sender === null;
|
||||
return { isSend: !is_ai, message: message, sender_name };
|
||||
return {
|
||||
isSend: !is_ai,
|
||||
message: message,
|
||||
sender_name,
|
||||
id: componentId,
|
||||
stream_url: stream_url,
|
||||
};
|
||||
} catch (e) {
|
||||
console.error(e);
|
||||
return {
|
||||
isSend: false,
|
||||
message: "Error parsing message",
|
||||
sender_name: "Error",
|
||||
id: output.id + index,
|
||||
};
|
||||
}
|
||||
});
|
||||
|
|
@ -148,6 +158,32 @@ export default function NewChatView(): JSX.Element {
|
|||
if (lockChat) setLockChat(false);
|
||||
}
|
||||
|
||||
function updateChat(
|
||||
chat: ChatMessageType,
|
||||
message: string,
|
||||
stream_url: string | null
|
||||
) {
|
||||
if (message === "") return;
|
||||
console.log(`updateChat: ${message}`);
|
||||
console.log("chatHistory:", chatHistory);
|
||||
chat.message = message;
|
||||
chat.stream_url = stream_url;
|
||||
// chat is one of the chatHistory
|
||||
setChatHistory((oldChatHistory) => {
|
||||
const index = oldChatHistory.findIndex((ch) => ch.id === chat.id);
|
||||
|
||||
if (index === -1) return oldChatHistory;
|
||||
let newChatHistory = _.cloneDeep(oldChatHistory);
|
||||
newChatHistory = [
|
||||
...newChatHistory.slice(0, index),
|
||||
chat,
|
||||
...newChatHistory.slice(index + 1),
|
||||
];
|
||||
console.log("newChatHistory:", newChatHistory);
|
||||
return newChatHistory;
|
||||
});
|
||||
}
|
||||
|
||||
return (
|
||||
<div className="eraser-column-arrangement">
|
||||
<div className="eraser-size">
|
||||
|
|
@ -172,7 +208,8 @@ export default function NewChatView(): JSX.Element {
|
|||
lockChat={lockChat}
|
||||
chat={chat}
|
||||
lastMessage={chatHistory.length - 1 === index ? true : false}
|
||||
key={index}
|
||||
key={`${chat.id}-${index}`}
|
||||
updateChat={updateChat}
|
||||
/>
|
||||
))
|
||||
) : (
|
||||
|
|
|
|||
|
|
@ -9,6 +9,8 @@ export type ChatMessageType = {
|
|||
files?: Array<{ data: string; type: string; data_type: string }>;
|
||||
prompt?: string;
|
||||
chatKey?: string;
|
||||
id?: string;
|
||||
stream_url?: string | null;
|
||||
sender_name?: string;
|
||||
};
|
||||
|
||||
|
|
@ -16,6 +18,7 @@ export type ChatOutputType = {
|
|||
message: string;
|
||||
sender: string;
|
||||
sender_name: string;
|
||||
stream_url?: string;
|
||||
};
|
||||
|
||||
export type chatInputType = {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue