From 1ec63800b27cc94c981f73d6b87f491bd0d07e25 Mon Sep 17 00:00:00 2001 From: dhlidongming Date: Tue, 17 Dec 2024 05:04:30 +0800 Subject: [PATCH] feat: implement pagination for transaction log queries (#5281) * Add pagination support for transaction logs. * [autofix.ci] apply automated fixes * [autofix.ci] apply automated fixes (attempt 2/3) * Replace 'TransactionData' with 'data' variable. * [autofix.ci] apply automated fixes --------- Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- src/backend/base/langflow/api/v1/flows.py | 5 +- src/backend/base/langflow/api/v1/monitor.py | 32 ++++++------- src/backend/base/langflow/main.py | 2 + .../database/models/transactions/crud.py | 14 +++++- .../database/models/transactions/model.py | 2 +- src/backend/tests/unit/test_database.py | 14 ++++-- .../transactions/use-get-transactions.ts | 46 +++++++++++++------ .../src/modals/flowLogsModal/index.tsx | 37 +++++++++++++-- 8 files changed, 105 insertions(+), 47 deletions(-) diff --git a/src/backend/base/langflow/api/v1/flows.py b/src/backend/base/langflow/api/v1/flows.py index 2988c416a..f8ce9a79f 100644 --- a/src/backend/base/langflow/api/v1/flows.py +++ b/src/backend/base/langflow/api/v1/flows.py @@ -12,7 +12,7 @@ import orjson from fastapi import APIRouter, Depends, File, HTTPException, UploadFile from fastapi.encoders import jsonable_encoder from fastapi.responses import StreamingResponse -from fastapi_pagination import Page, Params, add_pagination +from fastapi_pagination import Page, Params from fastapi_pagination.ext.sqlalchemy import paginate from sqlmodel import and_, col, select from sqlmodel.ext.asyncio.session import AsyncSession @@ -521,6 +521,3 @@ async def read_basic_examples( except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e - - -add_pagination(router) diff --git a/src/backend/base/langflow/api/v1/monitor.py b/src/backend/base/langflow/api/v1/monitor.py index 7a26368aa..4459441fd 100644 --- a/src/backend/base/langflow/api/v1/monitor.py +++ b/src/backend/base/langflow/api/v1/monitor.py @@ -2,15 +2,17 @@ from typing import Annotated from uuid import UUID from fastapi import APIRouter, Depends, HTTPException, Query +from fastapi_pagination import Page, Params +from fastapi_pagination.ext.sqlmodel import paginate from sqlalchemy import delete from sqlmodel import col, select -from langflow.api.utils import DbSession +from langflow.api.utils import DbSession, custom_params from langflow.schema.message import MessageResponse from langflow.services.auth.utils import get_current_active_user from langflow.services.database.models.message.model import MessageRead, MessageTable, MessageUpdate -from langflow.services.database.models.transactions.crud import get_transactions_by_flow_id -from langflow.services.database.models.transactions.model import TransactionReadResponse +from langflow.services.database.models.transactions.crud import transform_transaction_table +from langflow.services.database.models.transactions.model import TransactionTable from langflow.services.database.models.vertex_builds.crud import ( delete_vertex_builds_by_flow_id, get_vertex_builds_by_flow_id, @@ -160,22 +162,14 @@ async def delete_messages_session( async def get_transactions( flow_id: Annotated[UUID, Query()], session: DbSession, -) -> list[TransactionReadResponse]: + params: Annotated[Params | None, Depends(custom_params)], +) -> Page[TransactionTable]: try: - transactions = await get_transactions_by_flow_id(session, flow_id) - return [ - TransactionReadResponse( - transaction_id=t.id, - timestamp=t.timestamp, - vertex_id=t.vertex_id, - target_id=t.target_id, - inputs=t.inputs, - outputs=t.outputs, - status=t.status, - error=t.error, - flow_id=t.flow_id, - ) - for t in transactions - ] + stmt = ( + select(TransactionTable) + .where(TransactionTable.flow_id == flow_id) + .order_by(col(TransactionTable.timestamp)) + ) + return await paginate(session, stmt, params=params, transformer=transform_transaction_table) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e diff --git a/src/backend/base/langflow/main.py b/src/backend/base/langflow/main.py index 678726b5c..c696d3f7e 100644 --- a/src/backend/base/langflow/main.py +++ b/src/backend/base/langflow/main.py @@ -13,6 +13,7 @@ from fastapi import FastAPI, HTTPException, Request, Response, status from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse, JSONResponse from fastapi.staticfiles import StaticFiles +from fastapi_pagination import add_pagination from loguru import logger from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor from pydantic import PydanticDeprecatedSince20 @@ -229,6 +230,7 @@ def create_app(): FastAPIInstrumentor.instrument_app(app) + add_pagination(app) return app diff --git a/src/backend/base/langflow/services/database/models/transactions/crud.py b/src/backend/base/langflow/services/database/models/transactions/crud.py index 3b590d89b..9370702cf 100644 --- a/src/backend/base/langflow/services/database/models/transactions/crud.py +++ b/src/backend/base/langflow/services/database/models/transactions/crud.py @@ -4,7 +4,11 @@ from sqlalchemy.exc import IntegrityError from sqlmodel import col, select from sqlmodel.ext.asyncio.session import AsyncSession -from langflow.services.database.models.transactions.model import TransactionBase, TransactionTable +from langflow.services.database.models.transactions.model import ( + TransactionBase, + TransactionReadResponse, + TransactionTable, +) async def get_transactions_by_flow_id( @@ -31,3 +35,11 @@ async def log_transaction(db: AsyncSession, transaction: TransactionBase) -> Tra await db.rollback() raise return table + + +def transform_transaction_table( + transaction: list[TransactionTable] | TransactionTable, +) -> list[TransactionReadResponse]: + if isinstance(transaction, list): + return [TransactionReadResponse.model_validate(t, from_attributes=True) for t in transaction] + return TransactionReadResponse.model_validate(transaction, from_attributes=True) diff --git a/src/backend/base/langflow/services/database/models/transactions/model.py b/src/backend/base/langflow/services/database/models/transactions/model.py index 972284948..30478f15c 100644 --- a/src/backend/base/langflow/services/database/models/transactions/model.py +++ b/src/backend/base/langflow/services/database/models/transactions/model.py @@ -46,5 +46,5 @@ class TransactionTable(TransactionBase, table=True): # type: ignore[call-arg] class TransactionReadResponse(TransactionBase): - transaction_id: UUID + id: UUID = Field(alias="transaction_id") flow_id: UUID diff --git a/src/backend/tests/unit/test_database.py b/src/backend/tests/unit/test_database.py index b8e708396..7dcc0847c 100644 --- a/src/backend/tests/unit/test_database.py +++ b/src/backend/tests/unit/test_database.py @@ -324,7 +324,8 @@ async def test_delete_flows_with_transaction_and_build(client: AsyncClient, logg "GET", "api/v1/monitor/transactions", params={"flow_id": flow_id}, headers=logged_in_headers ) assert response.status_code == 200 - assert response.json() == [] + json_response = response.json() + assert json_response["items"] == [] for flow_id in flow_ids: response = await client.request( @@ -358,11 +359,15 @@ async def test_delete_folder_with_flows_with_transaction_and_build(client: Async class VertexTuple(NamedTuple): id: str + params: dict # Create a transaction for each flow for flow_id in flow_ids: await log_transaction( - str(flow_id), source=VertexTuple(id="vid"), target=VertexTuple(id="tid"), status="success" + str(flow_id), + source=VertexTuple(id="vid", params={}), + target=VertexTuple(id="tid", params={}), + status="success", ) # Create a build for each flow @@ -391,8 +396,9 @@ async def test_delete_folder_with_flows_with_transaction_and_build(client: Async response = await client.request( "GET", "api/v1/monitor/transactions", params={"flow_id": flow_id}, headers=logged_in_headers ) - assert response.status_code == 200 - assert response.json() == [] + assert response.status_code == 200, response.json() + json_response = response.json() + assert json_response["items"] == [] for flow_id in flow_ids: response = await client.request( diff --git a/src/frontend/src/controllers/API/queries/transactions/use-get-transactions.ts b/src/frontend/src/controllers/API/queries/transactions/use-get-transactions.ts index 357cb971c..dbb2860a5 100644 --- a/src/frontend/src/controllers/API/queries/transactions/use-get-transactions.ts +++ b/src/frontend/src/controllers/API/queries/transactions/use-get-transactions.ts @@ -9,11 +9,23 @@ import { UseRequestProcessor } from "../../services/request-processor"; interface TransactionsQueryParams { id: string; params?: Record; - mode?: "union" | "intersection"; + mode: "union" | "intersection"; excludedColumns?: string[]; } +interface PaginationType { + page?: number; + size?: number; + total?: number; + pages?: number; +} + +interface TransactionsPagination extends PaginationType { + items?: Array; +} + interface TransactionsResponse { + pagination: PaginationType; rows: Array; columns: Array; } @@ -25,13 +37,12 @@ export const useGetTransactionsQuery: useQueryFunctionType< // Function body remains unchanged const { query } = UseRequestProcessor(); - const responseFn = (data: object[]) => { - if (mode) { - const columns = extractColumnsFromRows(data, mode, excludedColumns); - return { rows: data, columns }; - } else { - return data; - } + const responseFn = (data: TransactionsPagination) => { + const pagination: PaginationType = { ...data }; + + const rows = data.items ?? []; + const columns = extractColumnsFromRows(rows, mode, excludedColumns); + return { pagination: pagination, rows: rows, columns }; }; const getTransactionsFn = async () => { @@ -41,16 +52,23 @@ export const useGetTransactionsQuery: useQueryFunctionType< config["params"] = { ...config["params"], ...params }; } - const result = await api.get(`${getURL("TRANSACTIONS")}`, config); + const result = await api.get( + `${getURL("TRANSACTIONS")}`, + config, + ); return responseFn(result.data); }; - const queryResult = query(["useGetTransactionsQuery"], getTransactionsFn, { - placeholderData: keepPreviousData, - refetchOnWindowFocus: false, - ...options, - }); + const queryResult = query( + ["useGetTransactionsQuery", id, { ...params }], + getTransactionsFn, + { + placeholderData: keepPreviousData, + refetchOnWindowFocus: false, + ...options, + }, + ); return queryResult; }; diff --git a/src/frontend/src/modals/flowLogsModal/index.tsx b/src/frontend/src/modals/flowLogsModal/index.tsx index 7b54200c7..0ec7f84f2 100644 --- a/src/frontend/src/modals/flowLogsModal/index.tsx +++ b/src/frontend/src/modals/flowLogsModal/index.tsx @@ -1,10 +1,11 @@ import IconComponent from "@/components/common/genericIconComponent"; +import PaginatorComponent from "@/components/common/paginatorComponent"; import TableComponent from "@/components/core/parameterRenderComponent/components/tableComponent"; import { useGetTransactionsQuery } from "@/controllers/API/queries/transactions"; import useFlowsManagerStore from "@/stores/flowsManagerStore"; import { FlowSettingsPropsType } from "@/types/components"; import { ColDef, ColGroupDef } from "ag-grid-community"; -import { useEffect, useState } from "react"; +import { useCallback, useEffect, useState } from "react"; import BaseModal from "../baseModal"; export default function FlowLogsModal({ @@ -13,11 +14,17 @@ export default function FlowLogsModal({ }: FlowSettingsPropsType): JSX.Element { const currentFlowId = useFlowsManagerStore((state) => state.currentFlowId); + const [pageIndex, setPageIndex] = useState(1); + const [pageSize, setPageSize] = useState(20); const [columns, setColumns] = useState>([]); const [rows, setRows] = useState([]); const { data, isLoading, refetch } = useGetTransactionsQuery({ id: currentFlowId, + params: { + page: pageIndex, + size: pageSize, + }, mode: "union", }); @@ -27,8 +34,18 @@ export default function FlowLogsModal({ setColumns(columns.map((col) => ({ ...col, editable: true }))); setRows(rows); } - if (open) refetch(); - }, [data, open, isLoading]); + }, [data]); + + useEffect(() => { + if (open) { + refetch(); + } + }, [open]); + + const handlePageChange = useCallback((newPageIndex, newPageSize) => { + setPageIndex(newPageIndex); + setPageSize(newPageSize); + }, []); return ( @@ -46,12 +63,24 @@ export default function FlowLogsModal({ key={"Executions"} readOnlyEdit className="h-max-full h-full w-full" - pagination={rows.length === 0 ? false : true} + pagination={false} columnDefs={columns} autoSizeStrategy={{ type: "fitGridWidth" }} rowData={rows} headerHeight={rows.length === 0 ? 0 : undefined} > + {!isLoading && (data?.pagination.total ?? 0) >= 10 && ( +
+ +
+ )}
);