diff --git a/src/backend/langflow/api/v1/monitor.py b/src/backend/langflow/api/v1/monitor.py index 23f4ee9df..3c4749c0d 100644 --- a/src/backend/langflow/api/v1/monitor.py +++ b/src/backend/langflow/api/v1/monitor.py @@ -1,20 +1,68 @@ from typing import Optional -from fastapi import APIRouter, Depends, Query - +from fastapi import APIRouter, Depends, HTTPException, Query from langflow.services.deps import get_monitor_service -from langflow.services.monitor.schema import VertexBuildModel +from langflow.services.monitor.schema import VertexBuildMapModel from langflow.services.monitor.service import MonitorService router = APIRouter(prefix="/monitor", tags=["Monitor"]) # Get vertex_builds data from the monitor service -@router.get("/builds", response_model=list[VertexBuildModel]) +@router.get("/builds", response_model=VertexBuildMapModel) async def get_vertex_builds( flow_id: Optional[str] = Query(None), vertex_id: Optional[str] = Query(None), valid: Optional[bool] = Query(None), + order_by: Optional[str] = Query("timestamp"), monitor_service: MonitorService = Depends(get_monitor_service), ): - return monitor_service.get_vertex_builds(flow_id=flow_id, vertex_id=vertex_id, valid=valid) + try: + vertex_build_dicts = monitor_service.get_vertex_builds( + flow_id=flow_id, vertex_id=vertex_id, valid=valid, order_by=order_by + ) + vertex_build_map = VertexBuildMapModel.from_list_of_dicts(vertex_build_dicts) + return vertex_build_map + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + +@router.delete("/builds", status_code=204) +async def delete_vertex_builds( + flow_id: Optional[str] = Query(None), + monitor_service: MonitorService = Depends(get_monitor_service), +): + try: + monitor_service.delete_vertex_builds(flow_id=flow_id) + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/messages") +async def get_messages( + session_id: Optional[str] = Query(None), + sender_type: Optional[str] = Query(None), + sender_name: Optional[str] = Query(None), + order_by: Optional[str] = Query("timestamp"), + monitor_service: MonitorService = Depends(get_monitor_service), +): + try: + return monitor_service.get_messages( + sender_type=sender_type, sender_name=sender_name, session_id=session_id, order_by=order_by + ) + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/transactions") +async def get_transactions( + source: Optional[str] = Query(None), + target: Optional[str] = Query(None), + status: Optional[str] = Query(None), + order_by: Optional[str] = Query("timestamp"), + monitor_service: MonitorService = Depends(get_monitor_service), +): + try: + return monitor_service.get_transactions(source=source, target=target, status=status, order_by=order_by) + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) diff --git a/src/backend/langflow/services/monitor/schema.py b/src/backend/langflow/services/monitor/schema.py index bd48ebfa6..e9c380a10 100644 --- a/src/backend/langflow/services/monitor/schema.py +++ b/src/backend/langflow/services/monitor/schema.py @@ -47,13 +47,14 @@ class MessageModel(BaseModel): class VertexBuildModel(BaseModel): - index: Optional[int] = Field(default=None, alias="index") + index: Optional[int] = Field(default=None, alias="index", exclude=True) id: Optional[str] = Field(default=None, alias="id") flow_id: str valid: bool params: Any data: dict artifacts: dict + timestamp: datetime = Field(default_factory=datetime.now) class Config: from_attributes = True @@ -79,3 +80,17 @@ class VertexBuildModel(BaseModel): if isinstance(v, str): return json.loads(v) return v + + +class VertexBuildMapModel(BaseModel): + vertex_builds: dict[str, list[VertexBuildModel]] + + @classmethod + def from_list_of_dicts(cls, vertex_build_dicts): + vertex_build_map = {} + for vertex_build_dict in vertex_build_dicts: + vertex_build = VertexBuildModel(**vertex_build_dict) + if vertex_build.id not in vertex_build_map: + vertex_build_map[vertex_build.id] = [] + vertex_build_map[vertex_build.id].append(vertex_build) + return cls(vertex_builds=vertex_build_map) diff --git a/src/backend/langflow/services/monitor/service.py b/src/backend/langflow/services/monitor/service.py index 9923e5f51..efa9e0ec1 100644 --- a/src/backend/langflow/services/monitor/service.py +++ b/src/backend/langflow/services/monitor/service.py @@ -3,12 +3,11 @@ from pathlib import Path from typing import TYPE_CHECKING, Optional import duckdb -from loguru import logger -from platformdirs import user_cache_dir - from langflow.services.base import Service from langflow.services.monitor.schema import MessageModel, TransactionModel, VertexBuildModel from langflow.services.monitor.utils import add_row_to_table, drop_and_create_table_if_schema_mismatch +from loguru import logger +from platformdirs import user_cache_dir if TYPE_CHECKING: from langflow.services.settings.manager import SettingsService @@ -59,20 +58,89 @@ class MonitorService(Service): return datetime.now().strftime("%Y-%m-%d %H:%M:%S") def get_vertex_builds( - self, flow_id: Optional[str] = None, vertex_id: Optional[str] = None, valid: Optional[bool] = None + self, + flow_id: Optional[str] = None, + vertex_id: Optional[str] = None, + valid: Optional[bool] = None, + order_by: Optional[str] = "timestamp", ): - query = "SELECT * FROM vertex_builds" + query = "SELECT id, flow_id, valid, params, data, artifacts, timestamp FROM vertex_builds" conditions = [] if flow_id: conditions.append(f"flow_id = '{flow_id}'") if vertex_id: - conditions.append(f"vertex_id = '{vertex_id}'") + conditions.append(f"id = '{vertex_id}'") if valid is not None: # Check for None because valid is a boolean - conditions.append(f"valid = {valid}") + valid_str = "true" if valid else "false" + conditions.append(f"valid = {valid_str}") if conditions: query += " WHERE " + " AND ".join(conditions) + if order_by: + query += f" ORDER BY {order_by}" + + with duckdb.connect(str(self.db_path)) as conn: + df = conn.execute(query).df() + + return df.to_dict(orient="records") + + def delete_vertex_builds(self, flow_id: Optional[str] = None): + query = "DELETE FROM vertex_builds" + if flow_id: + query += f" WHERE flow_id = '{flow_id}'" + + with duckdb.connect(str(self.db_path)) as conn: + conn.execute(query) + + def get_messages( + self, + sender_type: Optional[str] = None, + sender_name: Optional[str] = None, + session_id: Optional[str] = None, + order_by: Optional[str] = "timestamp", + ): + query = "SELECT sender_name, sender_type, session_id, message, artifacts, timestamp FROM messages" + conditions = [] + if sender_type: + conditions.append(f"sender_type = '{sender_type}'") + if sender_name: + conditions.append(f"sender_name = '{sender_name}'") + if session_id: + conditions.append(f"session_id = '{session_id}'") + + if conditions: + query += " WHERE " + " AND ".join(conditions) + + if order_by: + query += f" ORDER BY {order_by}" + + with duckdb.connect(str(self.db_path)) as conn: + df = conn.execute(query).df() + + return df.to_dict(orient="records") + + def get_transactions( + self, + source: Optional[str] = None, + target: Optional[str] = None, + status: Optional[str] = None, + order_by: Optional[str] = "timestamp", + ): + query = "SELECT source, target, target_args, status, error, timestamp FROM transactions" + conditions = [] + if source: + conditions.append(f"source = '{source}'") + if target: + conditions.append(f"target = '{target}'") + if status: + conditions.append(f"status = '{status}'") + + if conditions: + query += " WHERE " + " AND ".join(conditions) + + if order_by: + query += f" ORDER BY {order_by}" with duckdb.connect(str(self.db_path)) as conn: df = conn.execute(query).df() diff --git a/src/backend/langflow/services/monitor/utils.py b/src/backend/langflow/services/monitor/utils.py index 0b5a3d259..f7c4261ac 100644 --- a/src/backend/langflow/services/monitor/utils.py +++ b/src/backend/langflow/services/monitor/utils.py @@ -1,10 +1,11 @@ from typing import TYPE_CHECKING, Any, Dict, Optional, Type import duckdb -from langflow.services.deps import get_monitor_service from loguru import logger from pydantic import BaseModel +from langflow.services.deps import get_monitor_service + if TYPE_CHECKING: from langflow.api.v1.schemas import ResultDict @@ -137,7 +138,7 @@ async def log_vertex_build( "params": params, "data": data.model_dump(), "artifacts": artifacts or {}, - # "timestamp": monitor_service.get_timestamp(), + "timestamp": monitor_service.get_timestamp(), } monitor_service.add_row(table_name="vertex_builds", data=row) except Exception as e: diff --git a/src/frontend/src/components/newChatView/index.tsx b/src/frontend/src/components/newChatView/index.tsx index 56e0ae38e..477394c6b 100644 --- a/src/frontend/src/components/newChatView/index.tsx +++ b/src/frontend/src/components/newChatView/index.tsx @@ -1,8 +1,10 @@ import { cloneDeep } from "lodash"; import { useEffect, useRef, useState } from "react"; import IconComponent from "../../components/genericIconComponent"; +import { deleteFlowPool } from "../../controllers/API"; import useAlertStore from "../../stores/alertStore"; import useFlowStore from "../../stores/flowStore"; +import useFlowsManagerStore from "../../stores/flowsManagerStore"; import { sendAllProps } from "../../types/api"; import { ChatMessageType, @@ -29,6 +31,7 @@ export default function newChatView(): JSX.Element { CleanFlowPool, } = useFlowStore(); const { setErrorData } = useAlertStore(); + const currentFlowId = useFlowsManagerStore((state) => state.currentFlowId); const [lockChat, setLockChat] = useState(false); const messagesRef = useRef(null); @@ -112,7 +115,9 @@ export default function newChatView(): JSX.Element { } function clearChat(): void { setChatHistory([]); - CleanFlowPool(); + deleteFlowPool(currentFlowId).then((_) => { + CleanFlowPool(); + }); //TODO tell backend to clear chat session if (lockChat) setLockChat(false); } diff --git a/src/frontend/src/controllers/API/index.ts b/src/frontend/src/controllers/API/index.ts index dc6feefd7..73dd0f2e4 100644 --- a/src/frontend/src/controllers/API/index.ts +++ b/src/frontend/src/controllers/API/index.ts @@ -16,6 +16,7 @@ import { import { UserInputType } from "../../types/components"; import { FlowStyleType, FlowType } from "../../types/flow"; import { StoreComponentResponse } from "../../types/store"; +import { FlowPoolType } from "../../types/zustand/flow"; import { APIClassType, BuildStatusTypeAPI, @@ -869,3 +870,26 @@ export async function postBuildVertex( export async function downloadImage({ flowId, fileName }): Promise { return await api.get(`${BASE_URL_API}files/images/${flowId}/${fileName}`); } + +export async function getFlowPool({ + flowId, + nodeId, +}: { + flowId: string; + nodeId?: string; +}): Promise> { + const config = {}; + config["params"] = { flow_id: flowId }; + if (nodeId) { + config["params"] = { nodeId }; + } + return await api.get(`${BASE_URL_API}monitor/builds`, config); +} + +export async function deleteFlowPool( + flowId: string +): Promise> { + const config = {}; + config["params"] = { flow_id: flowId }; + return await api.delete(`${BASE_URL_API}monitor/builds`, config); +} diff --git a/src/frontend/src/modals/codeAreaModal/index.tsx b/src/frontend/src/modals/codeAreaModal/index.tsx index 959685500..1c737417d 100644 --- a/src/frontend/src/modals/codeAreaModal/index.tsx +++ b/src/frontend/src/modals/codeAreaModal/index.tsx @@ -156,7 +156,7 @@ export default function CodeAreaModal({ readOnly={readonly} value={code} mode="python" - setOptions={{ fontFamily: "monospace"}} + setOptions={{ fontFamily: "monospace" }} height={height ?? "100%"} highlightActiveLine={true} showPrintMargin={false} diff --git a/src/frontend/src/pages/FlowPage/components/PageComponent/index.tsx b/src/frontend/src/pages/FlowPage/components/PageComponent/index.tsx index 9e127e688..f25d688a8 100644 --- a/src/frontend/src/pages/FlowPage/components/PageComponent/index.tsx +++ b/src/frontend/src/pages/FlowPage/components/PageComponent/index.tsx @@ -55,7 +55,6 @@ export default function Page({ const setReactFlowInstance = useFlowStore( (state) => state.setReactFlowInstance ); - const nodes = useFlowStore((state) => state.nodes); const edges = useFlowStore((state) => state.edges); const onNodesChange = useFlowStore((state) => state.onNodesChange); @@ -70,6 +69,7 @@ export default function Page({ const takeSnapshot = useFlowsManagerStore((state) => state.takeSnapshot); const paste = useFlowStore((state) => state.paste); const resetFlow = useFlowStore((state) => state.resetFlow); + const setFlowPool = useFlowStore((state) => state.setFlowPool); const lastCopiedSelection = useFlowStore( (state) => state.lastCopiedSelection ); @@ -77,6 +77,11 @@ export default function Page({ (state) => state.setLastCopiedSelection ); const onConnect = useFlowStore((state) => state.onConnect); + const currentFlowId = useFlowsManagerStore((state) => state.currentFlowId); + const setErrorData = useAlertStore((state) => state.setErrorData); + const [selectionMenuVisible, setSelectionMenuVisible] = useState(false); + const [loading, setLoading] = useState(true); + const edgeUpdateSuccessful = useRef(true); const position = useRef({ x: 0, y: 0 }); const [lastSelection, setLastSelection] = @@ -152,23 +157,19 @@ export default function Page({ }; }, [lastCopiedSelection, lastSelection, takeSnapshot]); - const [selectionMenuVisible, setSelectionMenuVisible] = useState(false); - - const setErrorData = useAlertStore((state) => state.setErrorData); - - const edgeUpdateSuccessful = useRef(true); - - const currentFlowId = useFlowsManagerStore((state) => state.currentFlowId); - useEffect(() => { - if (reactFlowInstance) { + if (reactFlowInstance && currentFlowId) { resetFlow({ nodes: flow?.data?.nodes ?? [], edges: flow?.data?.edges ?? [], viewport: flow?.data?.viewport ?? { zoom: 1, x: 0, y: 0 }, }); + // getFlowPool({flowId: currentFlowId}).then((flowPool) => { + // setFlowPool(flowPool.data) + // setLoading(false) + // }) } - }, [currentFlowId, reactFlowInstance]); + }, [currentFlowId, reactFlowInstance, setLoading, setFlowPool]); useEffect(() => { return () => { diff --git a/src/frontend/src/pages/FlowPage/components/nodeToolbarComponent/index.tsx b/src/frontend/src/pages/FlowPage/components/nodeToolbarComponent/index.tsx index ec07193b8..13f6c3360 100644 --- a/src/frontend/src/pages/FlowPage/components/nodeToolbarComponent/index.tsx +++ b/src/frontend/src/pages/FlowPage/components/nodeToolbarComponent/index.tsx @@ -110,7 +110,15 @@ export default function NodeToolbarComponent({ break; case "ungroup": takeSnapshot(); - expandGroupNode(data.id, updateFlowPosition(position, data.node?.flow!), data.node!.template, nodes, edges, setNodes, setEdges); + expandGroupNode( + data.id, + updateFlowPosition(position, data.node?.flow!), + data.node!.template, + nodes, + edges, + setNodes, + setEdges + ); break; case "override": setShowOverrideModal(true); diff --git a/src/frontend/src/stores/flowsManagerStore.ts b/src/frontend/src/stores/flowsManagerStore.ts index 90d818ef9..4d6440d1f 100644 --- a/src/frontend/src/stores/flowsManagerStore.ts +++ b/src/frontend/src/stores/flowsManagerStore.ts @@ -1,4 +1,5 @@ import { AxiosError } from "axios"; +import { cloneDeep } from "lodash"; import { Edge, Node, Viewport, XYPosition } from "reactflow"; import { create } from "zustand"; import { @@ -24,7 +25,6 @@ import useAlertStore from "./alertStore"; import { useDarkStore } from "./darkStore"; import useFlowStore from "./flowStore"; import { useTypesStore } from "./typesStore"; -import { cloneDeep } from "lodash"; let saveTimeoutId: NodeJS.Timeout | null = null; @@ -330,7 +330,10 @@ const useFlowsManagerStore = create((set, get) => ({ const currentFlowId = get().currentFlowId; // push the current graph to the past state const flowStore = useFlowStore.getState(); - const newState = {nodes: cloneDeep(flowStore.nodes), edges: cloneDeep(flowStore.edges)}; + const newState = { + nodes: cloneDeep(flowStore.nodes), + edges: cloneDeep(flowStore.edges), + }; const pastLength = past[currentFlowId]?.length ?? 0; if ( pastLength > 0 && diff --git a/src/frontend/src/utils/reactflowUtils.ts b/src/frontend/src/utils/reactflowUtils.ts index b25d25a8b..9dfcdb5ee 100644 --- a/src/frontend/src/utils/reactflowUtils.ts +++ b/src/frontend/src/utils/reactflowUtils.ts @@ -653,13 +653,19 @@ export function updateFlowPosition(NewPosition: XYPosition, flow: FlowType) { x: NewPosition.x - middlePoint.x, y: NewPosition.y - middlePoint.y, }; - return {...flow, data: {...flow.data!, nodes: flow.data!.nodes.map((node) => ({ - ...node, - position: { - x: node.position.x + deltaPosition.x, - y: node.position.y + deltaPosition.y, + return { + ...flow, + data: { + ...flow.data!, + nodes: flow.data!.nodes.map((node) => ({ + ...node, + position: { + x: node.position.x + deltaPosition.x, + y: node.position.y + deltaPosition.y, + }, + })), }, - }))}}; + }; } export function concatFlows( @@ -952,7 +958,7 @@ export function expandGroupNode( nodes: Node[], edges: Edge[], setNodes: (update: Node[] | ((oldState: Node[]) => Node[])) => void, - setEdges: (update: Edge[] | ((oldState: Edge[]) => Edge[])) => void, + setEdges: (update: Edge[] | ((oldState: Edge[]) => Edge[])) => void ) { const idsMap = updateIds(flow!.data!); updateProxyIdsOnTemplate(template, idsMap); @@ -1037,14 +1043,9 @@ export function expandGroupNode( } }); - const filteredNodes = [ - ...nodes.filter((n) => n.id !== id), - ...gNodes, - ]; + const filteredNodes = [...nodes.filter((n) => n.id !== id), ...gNodes]; const filteredEdges = [ - ...edges.filter( - (e) => e.target !== id && e.source !== id - ), + ...edges.filter((e) => e.target !== id && e.source !== id), ...gEdges, ...updatedEdges, ]; @@ -1080,7 +1081,7 @@ export function createFlowComponent( edges: [], nodes: [ { - data: {...nodeData, node: {...nodeData.node, official: false}}, + data: { ...nodeData, node: { ...nodeData.node, official: false } }, id: nodeData.id, position: { x: 0, y: 0 }, type: "genericNode",