Merge branch 'zustand/io/migration' of github.com:logspace-ai/langflow into zustand/io/migration

This commit is contained in:
cristhianzl 2024-01-31 16:48:56 -03:00
commit 6ef92bf233
11 changed files with 220 additions and 46 deletions

View file

@ -1,20 +1,68 @@
from typing import Optional
from fastapi import APIRouter, Depends, Query
from fastapi import APIRouter, Depends, HTTPException, Query
from langflow.services.deps import get_monitor_service
from langflow.services.monitor.schema import VertexBuildModel
from langflow.services.monitor.schema import VertexBuildMapModel
from langflow.services.monitor.service import MonitorService
router = APIRouter(prefix="/monitor", tags=["Monitor"])
# Get vertex_builds data from the monitor service
@router.get("/builds", response_model=list[VertexBuildModel])
@router.get("/builds", response_model=VertexBuildMapModel)
async def get_vertex_builds(
flow_id: Optional[str] = Query(None),
vertex_id: Optional[str] = Query(None),
valid: Optional[bool] = Query(None),
order_by: Optional[str] = Query("timestamp"),
monitor_service: MonitorService = Depends(get_monitor_service),
):
return monitor_service.get_vertex_builds(flow_id=flow_id, vertex_id=vertex_id, valid=valid)
try:
vertex_build_dicts = monitor_service.get_vertex_builds(
flow_id=flow_id, vertex_id=vertex_id, valid=valid, order_by=order_by
)
vertex_build_map = VertexBuildMapModel.from_list_of_dicts(vertex_build_dicts)
return vertex_build_map
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/builds", status_code=204)
async def delete_vertex_builds(
flow_id: Optional[str] = Query(None),
monitor_service: MonitorService = Depends(get_monitor_service),
):
try:
monitor_service.delete_vertex_builds(flow_id=flow_id)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/messages")
async def get_messages(
session_id: Optional[str] = Query(None),
sender_type: Optional[str] = Query(None),
sender_name: Optional[str] = Query(None),
order_by: Optional[str] = Query("timestamp"),
monitor_service: MonitorService = Depends(get_monitor_service),
):
try:
return monitor_service.get_messages(
sender_type=sender_type, sender_name=sender_name, session_id=session_id, order_by=order_by
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/transactions")
async def get_transactions(
source: Optional[str] = Query(None),
target: Optional[str] = Query(None),
status: Optional[str] = Query(None),
order_by: Optional[str] = Query("timestamp"),
monitor_service: MonitorService = Depends(get_monitor_service),
):
try:
return monitor_service.get_transactions(source=source, target=target, status=status, order_by=order_by)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

View file

@ -47,13 +47,14 @@ class MessageModel(BaseModel):
class VertexBuildModel(BaseModel):
index: Optional[int] = Field(default=None, alias="index")
index: Optional[int] = Field(default=None, alias="index", exclude=True)
id: Optional[str] = Field(default=None, alias="id")
flow_id: str
valid: bool
params: Any
data: dict
artifacts: dict
timestamp: datetime = Field(default_factory=datetime.now)
class Config:
from_attributes = True
@ -79,3 +80,17 @@ class VertexBuildModel(BaseModel):
if isinstance(v, str):
return json.loads(v)
return v
class VertexBuildMapModel(BaseModel):
vertex_builds: dict[str, list[VertexBuildModel]]
@classmethod
def from_list_of_dicts(cls, vertex_build_dicts):
vertex_build_map = {}
for vertex_build_dict in vertex_build_dicts:
vertex_build = VertexBuildModel(**vertex_build_dict)
if vertex_build.id not in vertex_build_map:
vertex_build_map[vertex_build.id] = []
vertex_build_map[vertex_build.id].append(vertex_build)
return cls(vertex_builds=vertex_build_map)

View file

@ -3,12 +3,11 @@ from pathlib import Path
from typing import TYPE_CHECKING, Optional
import duckdb
from loguru import logger
from platformdirs import user_cache_dir
from langflow.services.base import Service
from langflow.services.monitor.schema import MessageModel, TransactionModel, VertexBuildModel
from langflow.services.monitor.utils import add_row_to_table, drop_and_create_table_if_schema_mismatch
from loguru import logger
from platformdirs import user_cache_dir
if TYPE_CHECKING:
from langflow.services.settings.manager import SettingsService
@ -59,20 +58,89 @@ class MonitorService(Service):
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def get_vertex_builds(
self, flow_id: Optional[str] = None, vertex_id: Optional[str] = None, valid: Optional[bool] = None
self,
flow_id: Optional[str] = None,
vertex_id: Optional[str] = None,
valid: Optional[bool] = None,
order_by: Optional[str] = "timestamp",
):
query = "SELECT * FROM vertex_builds"
query = "SELECT id, flow_id, valid, params, data, artifacts, timestamp FROM vertex_builds"
conditions = []
if flow_id:
conditions.append(f"flow_id = '{flow_id}'")
if vertex_id:
conditions.append(f"vertex_id = '{vertex_id}'")
conditions.append(f"id = '{vertex_id}'")
if valid is not None: # Check for None because valid is a boolean
conditions.append(f"valid = {valid}")
valid_str = "true" if valid else "false"
conditions.append(f"valid = {valid_str}")
if conditions:
query += " WHERE " + " AND ".join(conditions)
if order_by:
query += f" ORDER BY {order_by}"
with duckdb.connect(str(self.db_path)) as conn:
df = conn.execute(query).df()
return df.to_dict(orient="records")
def delete_vertex_builds(self, flow_id: Optional[str] = None):
query = "DELETE FROM vertex_builds"
if flow_id:
query += f" WHERE flow_id = '{flow_id}'"
with duckdb.connect(str(self.db_path)) as conn:
conn.execute(query)
def get_messages(
self,
sender_type: Optional[str] = None,
sender_name: Optional[str] = None,
session_id: Optional[str] = None,
order_by: Optional[str] = "timestamp",
):
query = "SELECT sender_name, sender_type, session_id, message, artifacts, timestamp FROM messages"
conditions = []
if sender_type:
conditions.append(f"sender_type = '{sender_type}'")
if sender_name:
conditions.append(f"sender_name = '{sender_name}'")
if session_id:
conditions.append(f"session_id = '{session_id}'")
if conditions:
query += " WHERE " + " AND ".join(conditions)
if order_by:
query += f" ORDER BY {order_by}"
with duckdb.connect(str(self.db_path)) as conn:
df = conn.execute(query).df()
return df.to_dict(orient="records")
def get_transactions(
self,
source: Optional[str] = None,
target: Optional[str] = None,
status: Optional[str] = None,
order_by: Optional[str] = "timestamp",
):
query = "SELECT source, target, target_args, status, error, timestamp FROM transactions"
conditions = []
if source:
conditions.append(f"source = '{source}'")
if target:
conditions.append(f"target = '{target}'")
if status:
conditions.append(f"status = '{status}'")
if conditions:
query += " WHERE " + " AND ".join(conditions)
if order_by:
query += f" ORDER BY {order_by}"
with duckdb.connect(str(self.db_path)) as conn:
df = conn.execute(query).df()

View file

@ -1,10 +1,11 @@
from typing import TYPE_CHECKING, Any, Dict, Optional, Type
import duckdb
from langflow.services.deps import get_monitor_service
from loguru import logger
from pydantic import BaseModel
from langflow.services.deps import get_monitor_service
if TYPE_CHECKING:
from langflow.api.v1.schemas import ResultDict
@ -137,7 +138,7 @@ async def log_vertex_build(
"params": params,
"data": data.model_dump(),
"artifacts": artifacts or {},
# "timestamp": monitor_service.get_timestamp(),
"timestamp": monitor_service.get_timestamp(),
}
monitor_service.add_row(table_name="vertex_builds", data=row)
except Exception as e:

View file

@ -1,8 +1,10 @@
import { cloneDeep } from "lodash";
import { useEffect, useRef, useState } from "react";
import IconComponent from "../../components/genericIconComponent";
import { deleteFlowPool } from "../../controllers/API";
import useAlertStore from "../../stores/alertStore";
import useFlowStore from "../../stores/flowStore";
import useFlowsManagerStore from "../../stores/flowsManagerStore";
import { sendAllProps } from "../../types/api";
import {
ChatMessageType,
@ -29,6 +31,7 @@ export default function newChatView(): JSX.Element {
CleanFlowPool,
} = useFlowStore();
const { setErrorData } = useAlertStore();
const currentFlowId = useFlowsManagerStore((state) => state.currentFlowId);
const [lockChat, setLockChat] = useState(false);
const messagesRef = useRef<HTMLDivElement | null>(null);
@ -112,7 +115,9 @@ export default function newChatView(): JSX.Element {
}
function clearChat(): void {
setChatHistory([]);
CleanFlowPool();
deleteFlowPool(currentFlowId).then((_) => {
CleanFlowPool();
});
//TODO tell backend to clear chat session
if (lockChat) setLockChat(false);
}

View file

@ -16,6 +16,7 @@ import {
import { UserInputType } from "../../types/components";
import { FlowStyleType, FlowType } from "../../types/flow";
import { StoreComponentResponse } from "../../types/store";
import { FlowPoolType } from "../../types/zustand/flow";
import {
APIClassType,
BuildStatusTypeAPI,
@ -869,3 +870,26 @@ export async function postBuildVertex(
export async function downloadImage({ flowId, fileName }): Promise<any> {
return await api.get(`${BASE_URL_API}files/images/${flowId}/${fileName}`);
}
export async function getFlowPool({
flowId,
nodeId,
}: {
flowId: string;
nodeId?: string;
}): Promise<AxiosResponse<FlowPoolType>> {
const config = {};
config["params"] = { flow_id: flowId };
if (nodeId) {
config["params"] = { nodeId };
}
return await api.get(`${BASE_URL_API}monitor/builds`, config);
}
export async function deleteFlowPool(
flowId: string
): Promise<AxiosResponse<any>> {
const config = {};
config["params"] = { flow_id: flowId };
return await api.delete(`${BASE_URL_API}monitor/builds`, config);
}

View file

@ -156,7 +156,7 @@ export default function CodeAreaModal({
readOnly={readonly}
value={code}
mode="python"
setOptions={{ fontFamily: "monospace"}}
setOptions={{ fontFamily: "monospace" }}
height={height ?? "100%"}
highlightActiveLine={true}
showPrintMargin={false}

View file

@ -55,7 +55,6 @@ export default function Page({
const setReactFlowInstance = useFlowStore(
(state) => state.setReactFlowInstance
);
const nodes = useFlowStore((state) => state.nodes);
const edges = useFlowStore((state) => state.edges);
const onNodesChange = useFlowStore((state) => state.onNodesChange);
@ -70,6 +69,7 @@ export default function Page({
const takeSnapshot = useFlowsManagerStore((state) => state.takeSnapshot);
const paste = useFlowStore((state) => state.paste);
const resetFlow = useFlowStore((state) => state.resetFlow);
const setFlowPool = useFlowStore((state) => state.setFlowPool);
const lastCopiedSelection = useFlowStore(
(state) => state.lastCopiedSelection
);
@ -77,6 +77,11 @@ export default function Page({
(state) => state.setLastCopiedSelection
);
const onConnect = useFlowStore((state) => state.onConnect);
const currentFlowId = useFlowsManagerStore((state) => state.currentFlowId);
const setErrorData = useAlertStore((state) => state.setErrorData);
const [selectionMenuVisible, setSelectionMenuVisible] = useState(false);
const [loading, setLoading] = useState(true);
const edgeUpdateSuccessful = useRef(true);
const position = useRef({ x: 0, y: 0 });
const [lastSelection, setLastSelection] =
@ -152,23 +157,19 @@ export default function Page({
};
}, [lastCopiedSelection, lastSelection, takeSnapshot]);
const [selectionMenuVisible, setSelectionMenuVisible] = useState(false);
const setErrorData = useAlertStore((state) => state.setErrorData);
const edgeUpdateSuccessful = useRef(true);
const currentFlowId = useFlowsManagerStore((state) => state.currentFlowId);
useEffect(() => {
if (reactFlowInstance) {
if (reactFlowInstance && currentFlowId) {
resetFlow({
nodes: flow?.data?.nodes ?? [],
edges: flow?.data?.edges ?? [],
viewport: flow?.data?.viewport ?? { zoom: 1, x: 0, y: 0 },
});
// getFlowPool({flowId: currentFlowId}).then((flowPool) => {
// setFlowPool(flowPool.data)
// setLoading(false)
// })
}
}, [currentFlowId, reactFlowInstance]);
}, [currentFlowId, reactFlowInstance, setLoading, setFlowPool]);
useEffect(() => {
return () => {

View file

@ -110,7 +110,15 @@ export default function NodeToolbarComponent({
break;
case "ungroup":
takeSnapshot();
expandGroupNode(data.id, updateFlowPosition(position, data.node?.flow!), data.node!.template, nodes, edges, setNodes, setEdges);
expandGroupNode(
data.id,
updateFlowPosition(position, data.node?.flow!),
data.node!.template,
nodes,
edges,
setNodes,
setEdges
);
break;
case "override":
setShowOverrideModal(true);

View file

@ -1,4 +1,5 @@
import { AxiosError } from "axios";
import { cloneDeep } from "lodash";
import { Edge, Node, Viewport, XYPosition } from "reactflow";
import { create } from "zustand";
import {
@ -24,7 +25,6 @@ import useAlertStore from "./alertStore";
import { useDarkStore } from "./darkStore";
import useFlowStore from "./flowStore";
import { useTypesStore } from "./typesStore";
import { cloneDeep } from "lodash";
let saveTimeoutId: NodeJS.Timeout | null = null;
@ -330,7 +330,10 @@ const useFlowsManagerStore = create<FlowsManagerStoreType>((set, get) => ({
const currentFlowId = get().currentFlowId;
// push the current graph to the past state
const flowStore = useFlowStore.getState();
const newState = {nodes: cloneDeep(flowStore.nodes), edges: cloneDeep(flowStore.edges)};
const newState = {
nodes: cloneDeep(flowStore.nodes),
edges: cloneDeep(flowStore.edges),
};
const pastLength = past[currentFlowId]?.length ?? 0;
if (
pastLength > 0 &&

View file

@ -653,13 +653,19 @@ export function updateFlowPosition(NewPosition: XYPosition, flow: FlowType) {
x: NewPosition.x - middlePoint.x,
y: NewPosition.y - middlePoint.y,
};
return {...flow, data: {...flow.data!, nodes: flow.data!.nodes.map((node) => ({
...node,
position: {
x: node.position.x + deltaPosition.x,
y: node.position.y + deltaPosition.y,
return {
...flow,
data: {
...flow.data!,
nodes: flow.data!.nodes.map((node) => ({
...node,
position: {
x: node.position.x + deltaPosition.x,
y: node.position.y + deltaPosition.y,
},
})),
},
}))}};
};
}
export function concatFlows(
@ -952,7 +958,7 @@ export function expandGroupNode(
nodes: Node[],
edges: Edge[],
setNodes: (update: Node[] | ((oldState: Node[]) => Node[])) => void,
setEdges: (update: Edge[] | ((oldState: Edge[]) => Edge[])) => void,
setEdges: (update: Edge[] | ((oldState: Edge[]) => Edge[])) => void
) {
const idsMap = updateIds(flow!.data!);
updateProxyIdsOnTemplate(template, idsMap);
@ -1037,14 +1043,9 @@ export function expandGroupNode(
}
});
const filteredNodes = [
...nodes.filter((n) => n.id !== id),
...gNodes,
];
const filteredNodes = [...nodes.filter((n) => n.id !== id), ...gNodes];
const filteredEdges = [
...edges.filter(
(e) => e.target !== id && e.source !== id
),
...edges.filter((e) => e.target !== id && e.source !== id),
...gEdges,
...updatedEdges,
];
@ -1080,7 +1081,7 @@ export function createFlowComponent(
edges: [],
nodes: [
{
data: {...nodeData, node: {...nodeData.node, official: false}},
data: { ...nodeData, node: { ...nodeData.node, official: false } },
id: nodeData.id,
position: { x: 0, y: 0 },
type: "genericNode",