feat: implement pagination for transaction log queries (#5281)

* Add pagination support for transaction logs.

* [autofix.ci] apply automated fixes

* [autofix.ci] apply automated fixes (attempt 2/3)

* Replace 'TransactionData' with 'data' variable.

* [autofix.ci] apply automated fixes

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
dhlidongming 2024-12-17 05:04:30 +08:00 committed by GitHub
commit 1ec63800b2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 105 additions and 47 deletions

View file

@ -12,7 +12,7 @@ import orjson
from fastapi import APIRouter, Depends, File, HTTPException, UploadFile
from fastapi.encoders import jsonable_encoder
from fastapi.responses import StreamingResponse
from fastapi_pagination import Page, Params, add_pagination
from fastapi_pagination import Page, Params
from fastapi_pagination.ext.sqlalchemy import paginate
from sqlmodel import and_, col, select
from sqlmodel.ext.asyncio.session import AsyncSession
@ -521,6 +521,3 @@ async def read_basic_examples(
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e
add_pagination(router)

View file

@ -2,15 +2,17 @@ from typing import Annotated
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi_pagination import Page, Params
from fastapi_pagination.ext.sqlmodel import paginate
from sqlalchemy import delete
from sqlmodel import col, select
from langflow.api.utils import DbSession
from langflow.api.utils import DbSession, custom_params
from langflow.schema.message import MessageResponse
from langflow.services.auth.utils import get_current_active_user
from langflow.services.database.models.message.model import MessageRead, MessageTable, MessageUpdate
from langflow.services.database.models.transactions.crud import get_transactions_by_flow_id
from langflow.services.database.models.transactions.model import TransactionReadResponse
from langflow.services.database.models.transactions.crud import transform_transaction_table
from langflow.services.database.models.transactions.model import TransactionTable
from langflow.services.database.models.vertex_builds.crud import (
delete_vertex_builds_by_flow_id,
get_vertex_builds_by_flow_id,
@ -160,22 +162,14 @@ async def delete_messages_session(
async def get_transactions(
flow_id: Annotated[UUID, Query()],
session: DbSession,
) -> list[TransactionReadResponse]:
params: Annotated[Params | None, Depends(custom_params)],
) -> Page[TransactionTable]:
try:
transactions = await get_transactions_by_flow_id(session, flow_id)
return [
TransactionReadResponse(
transaction_id=t.id,
timestamp=t.timestamp,
vertex_id=t.vertex_id,
target_id=t.target_id,
inputs=t.inputs,
outputs=t.outputs,
status=t.status,
error=t.error,
flow_id=t.flow_id,
)
for t in transactions
]
stmt = (
select(TransactionTable)
.where(TransactionTable.flow_id == flow_id)
.order_by(col(TransactionTable.timestamp))
)
return await paginate(session, stmt, params=params, transformer=transform_transaction_table)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e

View file

@ -13,6 +13,7 @@ from fastapi import FastAPI, HTTPException, Request, Response, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from fastapi_pagination import add_pagination
from loguru import logger
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from pydantic import PydanticDeprecatedSince20
@ -229,6 +230,7 @@ def create_app():
FastAPIInstrumentor.instrument_app(app)
add_pagination(app)
return app

View file

@ -4,7 +4,11 @@ from sqlalchemy.exc import IntegrityError
from sqlmodel import col, select
from sqlmodel.ext.asyncio.session import AsyncSession
from langflow.services.database.models.transactions.model import TransactionBase, TransactionTable
from langflow.services.database.models.transactions.model import (
TransactionBase,
TransactionReadResponse,
TransactionTable,
)
async def get_transactions_by_flow_id(
@ -31,3 +35,11 @@ async def log_transaction(db: AsyncSession, transaction: TransactionBase) -> Tra
await db.rollback()
raise
return table
def transform_transaction_table(
transaction: list[TransactionTable] | TransactionTable,
) -> list[TransactionReadResponse]:
if isinstance(transaction, list):
return [TransactionReadResponse.model_validate(t, from_attributes=True) for t in transaction]
return TransactionReadResponse.model_validate(transaction, from_attributes=True)

View file

@ -46,5 +46,5 @@ class TransactionTable(TransactionBase, table=True): # type: ignore[call-arg]
class TransactionReadResponse(TransactionBase):
transaction_id: UUID
id: UUID = Field(alias="transaction_id")
flow_id: UUID

View file

@ -324,7 +324,8 @@ async def test_delete_flows_with_transaction_and_build(client: AsyncClient, logg
"GET", "api/v1/monitor/transactions", params={"flow_id": flow_id}, headers=logged_in_headers
)
assert response.status_code == 200
assert response.json() == []
json_response = response.json()
assert json_response["items"] == []
for flow_id in flow_ids:
response = await client.request(
@ -358,11 +359,15 @@ async def test_delete_folder_with_flows_with_transaction_and_build(client: Async
class VertexTuple(NamedTuple):
id: str
params: dict
# Create a transaction for each flow
for flow_id in flow_ids:
await log_transaction(
str(flow_id), source=VertexTuple(id="vid"), target=VertexTuple(id="tid"), status="success"
str(flow_id),
source=VertexTuple(id="vid", params={}),
target=VertexTuple(id="tid", params={}),
status="success",
)
# Create a build for each flow
@ -391,8 +396,9 @@ async def test_delete_folder_with_flows_with_transaction_and_build(client: Async
response = await client.request(
"GET", "api/v1/monitor/transactions", params={"flow_id": flow_id}, headers=logged_in_headers
)
assert response.status_code == 200
assert response.json() == []
assert response.status_code == 200, response.json()
json_response = response.json()
assert json_response["items"] == []
for flow_id in flow_ids:
response = await client.request(

View file

@ -9,11 +9,23 @@ import { UseRequestProcessor } from "../../services/request-processor";
interface TransactionsQueryParams {
id: string;
params?: Record<string, unknown>;
mode?: "union" | "intersection";
mode: "union" | "intersection";
excludedColumns?: string[];
}
interface PaginationType {
page?: number;
size?: number;
total?: number;
pages?: number;
}
interface TransactionsPagination extends PaginationType {
items?: Array<object>;
}
interface TransactionsResponse {
pagination: PaginationType;
rows: Array<object>;
columns: Array<ColDef | ColGroupDef>;
}
@ -25,13 +37,12 @@ export const useGetTransactionsQuery: useQueryFunctionType<
// Function body remains unchanged
const { query } = UseRequestProcessor();
const responseFn = (data: object[]) => {
if (mode) {
const columns = extractColumnsFromRows(data, mode, excludedColumns);
return { rows: data, columns };
} else {
return data;
}
const responseFn = (data: TransactionsPagination) => {
const pagination: PaginationType = { ...data };
const rows = data.items ?? [];
const columns = extractColumnsFromRows(rows, mode, excludedColumns);
return { pagination: pagination, rows: rows, columns };
};
const getTransactionsFn = async () => {
@ -41,16 +52,23 @@ export const useGetTransactionsQuery: useQueryFunctionType<
config["params"] = { ...config["params"], ...params };
}
const result = await api.get<object[]>(`${getURL("TRANSACTIONS")}`, config);
const result = await api.get<TransactionsPagination>(
`${getURL("TRANSACTIONS")}`,
config,
);
return responseFn(result.data);
};
const queryResult = query(["useGetTransactionsQuery"], getTransactionsFn, {
placeholderData: keepPreviousData,
refetchOnWindowFocus: false,
...options,
});
const queryResult = query(
["useGetTransactionsQuery", id, { ...params }],
getTransactionsFn,
{
placeholderData: keepPreviousData,
refetchOnWindowFocus: false,
...options,
},
);
return queryResult;
};

View file

@ -1,10 +1,11 @@
import IconComponent from "@/components/common/genericIconComponent";
import PaginatorComponent from "@/components/common/paginatorComponent";
import TableComponent from "@/components/core/parameterRenderComponent/components/tableComponent";
import { useGetTransactionsQuery } from "@/controllers/API/queries/transactions";
import useFlowsManagerStore from "@/stores/flowsManagerStore";
import { FlowSettingsPropsType } from "@/types/components";
import { ColDef, ColGroupDef } from "ag-grid-community";
import { useEffect, useState } from "react";
import { useCallback, useEffect, useState } from "react";
import BaseModal from "../baseModal";
export default function FlowLogsModal({
@ -13,11 +14,17 @@ export default function FlowLogsModal({
}: FlowSettingsPropsType): JSX.Element {
const currentFlowId = useFlowsManagerStore((state) => state.currentFlowId);
const [pageIndex, setPageIndex] = useState(1);
const [pageSize, setPageSize] = useState(20);
const [columns, setColumns] = useState<Array<ColDef | ColGroupDef>>([]);
const [rows, setRows] = useState<any>([]);
const { data, isLoading, refetch } = useGetTransactionsQuery({
id: currentFlowId,
params: {
page: pageIndex,
size: pageSize,
},
mode: "union",
});
@ -27,8 +34,18 @@ export default function FlowLogsModal({
setColumns(columns.map((col) => ({ ...col, editable: true })));
setRows(rows);
}
if (open) refetch();
}, [data, open, isLoading]);
}, [data]);
useEffect(() => {
if (open) {
refetch();
}
}, [open]);
const handlePageChange = useCallback((newPageIndex, newPageSize) => {
setPageIndex(newPageIndex);
setPageSize(newPageSize);
}, []);
return (
<BaseModal open={open} setOpen={setOpen} size="x-large">
@ -46,12 +63,24 @@ export default function FlowLogsModal({
key={"Executions"}
readOnlyEdit
className="h-max-full h-full w-full"
pagination={rows.length === 0 ? false : true}
pagination={false}
columnDefs={columns}
autoSizeStrategy={{ type: "fitGridWidth" }}
rowData={rows}
headerHeight={rows.length === 0 ? 0 : undefined}
></TableComponent>
{!isLoading && (data?.pagination.total ?? 0) >= 10 && (
<div className="flex justify-end px-3 py-4">
<PaginatorComponent
pageIndex={data?.pagination.page ?? 1}
pageSize={data?.pagination.size ?? 10}
rowsCount={[12, 24, 48, 96]}
totalRowsCount={data?.pagination.total ?? 0}
paginate={handlePageChange}
pages={data?.pagination.pages}
/>
</div>
)}
</BaseModal.Content>
</BaseModal>
);