Update node and edge IDs, refactor node ID generation, and fix async bug in build_vertex_stream function (#1546)

* Update node and edge IDs in PageComponent and reactflowUtils

* Merge remote-tracking branch 'origin/zustand/io/migration' into fixGroup

* Refactor node ID generation and update node IDs in selection

* Update flowStore.ts and reactflowUtils.ts

* Fix async bug in build_vertex_stream function

* Add check for missing id in vertex data

* Fix exception message for missing vertex id

* Update code: Added VertexLayerElementType type and modified updateIds function
This commit is contained in:
Gabriel Luiz Freitas Almeida 2024-03-20 20:06:25 -03:00 committed by GitHub
commit a6625bbad5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 152 additions and 84 deletions

View file

@ -202,7 +202,7 @@ async def build_vertex_stream(
async def stream_vertex():
try:
if not session_id:
cache = chat_service.get_cache(flow_id)
cache = await chat_service.get_cache(flow_id)
if not cache:
# If there's no cache
raise ValueError(f"No cache found for {flow_id}.")
@ -252,7 +252,7 @@ async def build_vertex_stream(
raise ValueError(f"No result found for vertex {vertex_id}")
except Exception as exc:
logger.error(f"Error building vertex: {exc}")
logger.exception(f"Error building vertex: {exc}")
yield str(StreamData(event="error", data={"error": str(exc)}))
finally:
logger.debug("Closing stream")

View file

@ -377,7 +377,10 @@ class Graph:
# Remove vertices that are not in the other graph
for vertex_id in removed_vertex_ids:
self.remove_vertex(vertex_id)
try:
self.remove_vertex(vertex_id)
except ValueError:
pass
# The order here matters because adding the vertex is required
# if any of them have edges that point to any of the new vertices
@ -741,8 +744,11 @@ class Graph:
vertex_data = vertex["data"]
vertex_type: str = vertex_data["type"] # type: ignore
vertex_base_type: str = vertex_data["node"]["template"]["_type"] # type: ignore
if "id" not in vertex_data:
raise ValueError(f"Vertex data for {vertex_data['display_name']} does not contain an id")
VertexClass = self._get_vertex_class(vertex_type, vertex_base_type, vertex_data["id"])
vertex_instance = VertexClass(vertex, graph=self)
vertex_instance.set_top_level(self.top_level_vertices)
vertices.append(vertex_instance)

View file

@ -1,4 +1,4 @@
import _ from "lodash";
import _, { cloneDeep, set } from "lodash";
import { MouseEvent, useCallback, useEffect, useRef, useState } from "react";
import ReactFlow, {
Background,
@ -32,6 +32,7 @@ import {
isValidConnection,
reconnectEdges,
scapeJSONParse,
updateIds,
validateSelection,
} from "../../../../utils/reactflowUtils";
import { getRandomName, isWrappedWithClass } from "../../../../utils/utils";
@ -104,35 +105,40 @@ export default function Page({
) {
event.preventDefault();
takeSnapshot();
if (validateSelection(lastSelection!, edges).length === 0) {
if (
validateSelection(lastSelection!, edges).length === 0
) {
const clonedNodes = cloneDeep(nodes)
const clonedEdges = cloneDeep(edges)
const clonedSelection = cloneDeep(lastSelection)
updateIds({ nodes: clonedNodes, edges: clonedEdges }, clonedSelection!)
const { newFlow, removedEdges } = generateFlow(
lastSelection!,
nodes,
edges,
clonedSelection!,
clonedNodes,
clonedEdges,
getRandomName()
);
const newGroupNode = generateNodeFromFlow(newFlow, getNodeId);
const newEdges = reconnectEdges(newGroupNode, removedEdges);
setNodes((oldNodes) => [
...oldNodes.filter(
(oldNodes) =>
!lastSelection?.nodes.some(
(selectionNode) => selectionNode.id === oldNodes.id
)
),
const newGroupNode = generateNodeFromFlow(
newFlow,
getNodeId
);
const newEdges = reconnectEdges(
newGroupNode,
]);
setEdges((oldEdges) => [
...oldEdges.filter(
(oldEdge) =>
!lastSelection!.nodes.some(
(selectionNode) =>
selectionNode.id === oldEdge.target ||
selectionNode.id === oldEdge.source
)
),
...newEdges,
]);
removedEdges
);
setNodes([...clonedNodes.filter(
(oldNodes) =>
!clonedSelection?.nodes.some(
(selectionNode) =>
selectionNode.id === oldNodes.id
)), newGroupNode])
setEdges([...clonedEdges.filter(
(oldEdge) =>
!clonedSelection!.nodes.some(
(selectionNode) =>
selectionNode.id === oldEdge.target ||
selectionNode.id === oldEdge.source
)), ...newEdges])
} else {
setErrorData({
title: INVALID_SELECTION_ERROR_ALERT,
@ -431,7 +437,7 @@ export default function Page({
<div className="h-full w-full">
<div className="h-full w-full" ref={reactFlowWrapper}>
{Object.keys(templates).length > 0 &&
Object.keys(types).length > 0 ? (
Object.keys(types).length > 0 ? (
<div id="react-flow-id" className="h-full w-full">
<ReactFlow
nodes={nodes}
@ -480,10 +486,14 @@ export default function Page({
if (
validateSelection(lastSelection!, edges).length === 0
) {
const clonedNodes = cloneDeep(nodes)
const clonedEdges = cloneDeep(edges)
const clonedSelection = cloneDeep(lastSelection)
updateIds({ nodes: clonedNodes, edges: clonedEdges }, clonedSelection!)
const { newFlow, removedEdges } = generateFlow(
lastSelection!,
nodes,
edges,
clonedSelection!,
clonedNodes,
clonedEdges,
getRandomName()
);
const newGroupNode = generateNodeFromFlow(
@ -494,27 +504,19 @@ export default function Page({
newGroupNode,
removedEdges
);
setNodes((oldNodes) => [
...oldNodes.filter(
(oldNodes) =>
!lastSelection?.nodes.some(
(selectionNode) =>
selectionNode.id === oldNodes.id
)
),
newGroupNode,
]);
setEdges((oldEdges) => [
...oldEdges.filter(
(oldEdge) =>
!lastSelection!.nodes.some(
(selectionNode) =>
selectionNode.id === oldEdge.target ||
selectionNode.id === oldEdge.source
)
),
...newEdges,
]);
setNodes([...clonedNodes.filter(
(oldNodes) =>
!clonedSelection?.nodes.some(
(selectionNode) =>
selectionNode.id === oldNodes.id
)), newGroupNode])
setEdges([...clonedEdges.filter(
(oldEdge) =>
!clonedSelection!.nodes.some(
(selectionNode) =>
selectionNode.id === oldEdge.target ||
selectionNode.id === oldEdge.source
)), ...newEdges])
} else {
setErrorData({
title: INVALID_SELECTION_ERROR_ALERT,

View file

@ -1,4 +1,4 @@
import { cloneDeep } from "lodash";
import { cloneDeep, zip } from "lodash";
import {
Edge,
EdgeChange,
@ -26,6 +26,7 @@ import {
ChatOutputType,
FlowPoolObjectType,
FlowStoreType,
VertexLayerElementType,
chatInputType,
} from "../types/zustand/flow";
import { buildVertices } from "../utils/buildUtils";
@ -36,6 +37,9 @@ import {
getNodeId,
scapeJSONParse,
scapedJSONStringfy,
updateEdgesIds,
updateIds,
updateProxyIdsOnTemplate,
validateNodes,
} from "../utils/reactflowUtils";
import { getInputsAndOutputs } from "../utils/storeUtils";
@ -257,6 +261,14 @@ const useFlowStore = create<FlowStoreType>((set, get) => ({
let newId = getNodeId(node.data.type);
idsMap[node.id] = newId;
if (node.data.node!.flow) {
let newFlow = node.data.node!.flow;
const idsMap = updateIds(newFlow.data!);
updateProxyIdsOnTemplate(node.data.node!.template, idsMap);
let flowEdges = selection.edges;
updateEdgesIds(flowEdges, idsMap);
}
// Create a new node object
const newNode: NodeType = {
id: newId,
@ -459,9 +471,18 @@ const useFlowStore = create<FlowStoreType>((set, get) => ({
// verticesLayers is a list of list of vertices ids, where each list is a layer of vertices
// we want to add a new layer (next_vertices_ids) to the list of layers (verticesLayers)
// and the values of next_vertices_ids to the list of vertices ids (verticesIds)
// const nextVertices will be the zip of vertexBuildData.next_vertices_ids and
// vertexBuildData.top_level_vertices
// the VertexLayerElementType as {id: next_vertices_id, layer: top_level_vertex}
const nextVertices: VertexLayerElementType[] = zip(
vertexBuildData.next_vertices_ids,
vertexBuildData.top_level_vertices
).map(([id, reference]) => ({ id: id!, reference }));
const newLayers = [
...get().verticesBuild!.verticesLayers,
vertexBuildData.next_vertices_ids,
nextVertices,
];
const newIds = [
...get().verticesBuild!.verticesIds,
@ -508,12 +529,18 @@ const useFlowStore = create<FlowStoreType>((set, get) => ({
get().setIsBuilding(false);
},
onBuildUpdate: handleBuildUpdate,
onBuildError: (title, list, idList) => {
onBuildError: (title: string, list: string[], elementList) => {
const idList = elementList
.map((element) => element.id)
.filter(Boolean) as string[];
useFlowStore.getState().updateBuildStatus(idList, BuildStatus.BUILT);
setErrorData({ list, title });
get().setIsBuilding(false);
},
onBuildStart: (idList) => {
onBuildStart: (elementList) => {
const idList = elementList
.map((element) => element.reference)
.filter(Boolean) as string[];
useFlowStore.getState().updateBuildStatus(idList, BuildStatus.BUILDING);
},
validateNodes: validateSubgraph,
@ -531,7 +558,7 @@ const useFlowStore = create<FlowStoreType>((set, get) => ({
updateVerticesBuild: (
vertices: {
verticesIds: string[];
verticesLayers: string[][];
verticesLayers: VertexLayerElementType[][];
runId: string;
} | null
) => {
@ -562,6 +589,7 @@ const useFlowStore = create<FlowStoreType>((set, get) => ({
},
updateBuildStatus: (nodeIdList: string[], status: BuildStatus) => {
const newFlowBuildStatus = { ...get().flowBuildStatus };
console.log("newFlowBuildStatus", newFlowBuildStatus);
nodeIdList.forEach((id) => {
newFlowBuildStatus[id] = {
status,

View file

@ -35,6 +35,11 @@ export type FlowPoolObjectType = {
buildId: string;
};
export type VertexLayerElementType = {
id: string;
reference?: string;
};
export type FlowPoolType = {
[key: string]: Array<FlowPoolObjectType>;
};
@ -103,7 +108,7 @@ export type FlowStoreType = {
updateVerticesBuild: (
vertices: {
verticesIds: string[];
verticesLayers: string[][];
verticesLayers: VertexLayerElementType[][];
runId: string;
} | null
) => void;
@ -111,7 +116,7 @@ export type FlowStoreType = {
removeFromVerticesBuild: (vertices: string[]) => void;
verticesBuild: {
verticesIds: string[];
verticesLayers: string[][];
verticesLayers: VertexLayerElementType[][];
runId: string;
} | null;
updateBuildStatus: (nodeId: string[], status: BuildStatus) => void;

View file

@ -4,6 +4,7 @@ import { getVerticesOrder, postBuildVertex } from "../controllers/API";
import useAlertStore from "../stores/alertStore";
import useFlowStore from "../stores/flowStore";
import { VertexBuildTypeAPI } from "../types/api";
import { VertexLayerElementType } from "../types/zustand/flow";
type BuildVerticesParams = {
flowId: string; // Assuming FlowType is the type for your flow
@ -17,8 +18,8 @@ type BuildVerticesParams = {
buildId: string
) => void; // Replace any with the actual type if it's not any
onBuildComplete?: (allNodesValid: boolean) => void;
onBuildError?: (title, list, idList: string[]) => void;
onBuildStart?: (idList: string[]) => void;
onBuildError?: (title, list, idList: VertexLayerElementType[]) => void;
onBuildStart?: (idList: VertexLayerElementType[]) => void;
validateNodes?: (nodes: string[]) => void;
};
@ -49,7 +50,7 @@ export async function updateVerticesOrder(
startNodeId?: string | null,
stopNodeId?: string | null
): Promise<{
verticesLayers: string[][];
verticesLayers: VertexLayerElementType[][];
verticesIds: string[];
runId: string;
}> {
@ -67,7 +68,14 @@ export async function updateVerticesOrder(
useFlowStore.getState().setIsBuilding(false);
throw new Error("Invalid nodes");
}
let verticesLayers: Array<Array<string>> = [orderResponse.data.ids];
// orderResponse.data.ids,
// for each id we need to build the VertexLayerElementType object as
// {id: id, reference: id}
let verticesLayers: Array<Array<VertexLayerElementType>> =
orderResponse.data.ids.map((id: string) => {
return [{ id: id, reference: id }];
});
const runId = orderResponse.data.run_id;
// if (nodeId) {
// for (let i = 0; i < verticesOrder.length; i += 1) {
@ -161,17 +169,17 @@ export async function buildVertices({
if (onBuildStart) onBuildStart(currentLayer);
// Build each vertex in the current layer
await Promise.all(
currentLayer.map(async (vertexId) => {
currentLayer.map(async (element) => {
// Check if id is in the list of inactive nodes
if (
!useFlowStore
.getState()
.verticesBuild?.verticesIds.includes(vertexId) &&
.verticesBuild?.verticesIds.includes(element.id) &&
onBuildUpdate
) {
// If it is, skip building and set the state to inactive
onBuildUpdate(
getInactiveVertexData(vertexId),
getInactiveVertexData(element.id),
BuildStatus.INACTIVE,
runId
);
@ -182,7 +190,7 @@ export async function buildVertices({
// Build the vertex
await buildVertex({
flowId,
id: vertexId,
id: element.id,
input_value,
onBuildUpdate: (data: VertexBuildTypeAPI, status: BuildStatus) => {
if (onBuildUpdate) onBuildUpdate(data, status, runId);
@ -227,7 +235,7 @@ async function buildVertex({
id: string;
input_value: string;
onBuildUpdate?: (data: any, status: BuildStatus) => void;
onBuildError?: (title, list, idList: string[]) => void;
onBuildError?: (title, list, idList: VertexLayerElementType[]) => void;
verticesIds: string[];
buildResults: boolean[];
stopBuild: () => void;
@ -241,7 +249,7 @@ async function buildVertex({
onBuildError!(
"Error Building Component",
[buildData.params],
verticesIds
verticesIds.map((id) => ({ id }))
);
stopBuild();
}
@ -252,7 +260,7 @@ async function buildVertex({
onBuildError!(
"Error Building Component",
[(error as AxiosError<any>).response?.data?.detail ?? "Unknown Error"],
verticesIds
verticesIds.map((id) => ({ id }))
);
stopBuild();
}

View file

@ -204,23 +204,33 @@ export const processDataFromFlow = (flow: FlowType, refreshIds = true) => {
return data;
};
export function updateIds(newFlow: ReactFlowJsonObject) {
export function updateIds(
{ edges, nodes }: { edges: Edge[]; nodes: Node[] },
selection?: { edges: Edge[]; nodes: Node[] }
) {
let idsMap = {};
if (newFlow.nodes)
newFlow.nodes.forEach((node: NodeType) => {
const selectionIds = selection?.nodes.map((n) => n.id);
if (nodes) {
nodes.forEach((node: NodeType) => {
// Generate a unique node ID
let newId = getNodeId(
node.data.node?.flow ? "GroupNode" : node.data.type
);
let newId = getNodeId(node.data.type);
if (selection && !selectionIds?.includes(node.id)) {
newId = node.id;
}
idsMap[node.id] = newId;
node.id = newId;
node.data.id = newId;
// Add the new node to the list of nodes in state
});
if (newFlow.edges)
newFlow.edges.forEach((edge: Edge) => {
selection?.nodes.forEach((sNode: NodeType) => {
let newId = idsMap[sNode.id];
sNode.id = newId;
sNode.data.id = newId;
});
}
const concatedEdges = [...edges, ...(selection?.edges ?? [])];
if (concatedEdges)
concatedEdges.forEach((edge: Edge) => {
edge.source = idsMap[edge.source];
edge.target = idsMap[edge.target];
const sourceHandleObject: sourceHandleType = scapeJSONParse(
@ -955,7 +965,7 @@ export function connectedInputNodesOnHandle(
return connectedNodes;
}
function updateProxyIdsOnTemplate(
export function updateProxyIdsOnTemplate(
template: APITemplateType,
idsMap: { [key: string]: string }
) {
@ -966,12 +976,16 @@ function updateProxyIdsOnTemplate(
});
}
function updateEdgesIds(edges: Edge[], idsMap: { [key: string]: string }) {
export function updateEdgesIds(
edges: Edge[],
idsMap: { [key: string]: string }
) {
edges.forEach((edge) => {
let targetHandle: targetHandleType = edge.data.targetHandle;
if (targetHandle.proxy && idsMap[targetHandle.proxy!.id]) {
targetHandle.proxy!.id = idsMap[targetHandle.proxy!.id];
}
console.log("edge", edge);
edge.data.targetHandle = targetHandle;
edge.targetHandle = scapedJSONStringfy(targetHandle);
});

View file

@ -705,3 +705,8 @@ export function sortFields(a, b, fieldOrder) {
// You might want to sort them alphabetically or in another specific manner
return a.localeCompare(b);
}
export function freezeObject(obj: any) {
if(!obj) return obj;
return(JSON.parse(JSON.stringify(obj)));
}