diff --git a/src/backend/base/langflow/services/settings/base.py b/src/backend/base/langflow/services/settings/base.py index 073f12c67..5cbed709b 100644 --- a/src/backend/base/langflow/services/settings/base.py +++ b/src/backend/base/langflow/services/settings/base.py @@ -173,6 +173,10 @@ class Settings(BaseSettings): """The maximum file size for the upload in MB.""" deactivate_tracing: bool = False """If set to True, tracing will be deactivated.""" + max_transactions_to_keep: int = 3000 + """The maximum number of transactions to keep in the database.""" + max_vertex_builds_to_keep: int = 3000 + """The maximum number of vertex builds to keep in the database.""" @field_validator("dev") @classmethod diff --git a/src/backend/base/langflow/services/utils.py b/src/backend/base/langflow/services/utils.py index 962d128f5..f71dba557 100644 --- a/src/backend/base/langflow/services/utils.py +++ b/src/backend/base/langflow/services/utils.py @@ -1,17 +1,28 @@ +from __future__ import annotations + import asyncio +from typing import TYPE_CHECKING from loguru import logger -from sqlmodel import select -from sqlmodel.ext.asyncio.session import AsyncSession +from sqlalchemy import delete +from sqlalchemy import exc as sqlalchemy_exc +from sqlmodel import col, select from langflow.services.auth.utils import create_super_user, verify_password from langflow.services.cache.factory import CacheServiceFactory +from langflow.services.database.models.transactions.model import TransactionTable +from langflow.services.database.models.vertex_builds.model import VertexBuildTable from langflow.services.database.utils import initialize_database from langflow.services.schema import ServiceType from langflow.services.settings.constants import DEFAULT_SUPERUSER, DEFAULT_SUPERUSER_PASSWORD from .deps import get_db_service, get_service, get_settings_service +if TYPE_CHECKING: + from sqlmodel.ext.asyncio.session import AsyncSession + + from langflow.services.settings.manager import SettingsService + async def get_or_create_super_user(session: AsyncSession, username, password, is_default): from langflow.services.database.models.user.model import User @@ -157,6 +168,70 @@ def initialize_session_service() -> None: ) +async def clean_transactions(settings_service: SettingsService, session: AsyncSession) -> None: + """Clean up old transactions from the database. + + This function deletes transactions that exceed the maximum number to keep (configured in settings). + It orders transactions by timestamp descending and removes the oldest ones beyond the limit. + + Args: + settings_service: The settings service containing configuration like max_transactions_to_keep + session: The database session to use for the deletion + + Returns: + None + """ + try: + # Delete transactions using bulk delete + delete_stmt = delete(TransactionTable).where( + col(TransactionTable.id).in_( + select(TransactionTable.id) + .order_by(col(TransactionTable.timestamp).desc()) + .offset(settings_service.settings.max_transactions_to_keep) + ) + ) + + await session.exec(delete_stmt) + await session.commit() + logger.debug("Successfully cleaned up old transactions") + except (sqlalchemy_exc.SQLAlchemyError, asyncio.TimeoutError) as exc: + logger.error(f"Error cleaning up transactions: {exc!s}") + await session.rollback() + # Don't re-raise since this is a cleanup task + + +async def clean_vertex_builds(settings_service: SettingsService, session: AsyncSession) -> None: + """Clean up old vertex builds from the database. + + This function deletes vertex builds that exceed the maximum number to keep (configured in settings). + It orders vertex builds by timestamp descending and removes the oldest ones beyond the limit. + + Args: + settings_service: The settings service containing configuration like max_vertex_builds_to_keep + session: The database session to use for the deletion + + Returns: + None + """ + try: + # Delete vertex builds using bulk delete + delete_stmt = delete(VertexBuildTable).where( + col(VertexBuildTable.id).in_( + select(VertexBuildTable.id) + .order_by(col(VertexBuildTable.timestamp).desc()) + .offset(settings_service.settings.max_vertex_builds_to_keep) + ) + ) + + await session.exec(delete_stmt) + await session.commit() + logger.debug("Successfully cleaned up old vertex builds") + except (sqlalchemy_exc.SQLAlchemyError, asyncio.TimeoutError) as exc: + logger.error(f"Error cleaning up vertex builds: {exc!s}") + await session.rollback() + # Don't re-raise since this is a cleanup task + + async def initialize_services(*, fix_migration: bool = False) -> None: """Initialize all the services needed.""" # Test cache connection @@ -164,10 +239,13 @@ async def initialize_services(*, fix_migration: bool = False) -> None: # Setup the superuser await asyncio.to_thread(initialize_database, fix_migration=fix_migration) async with get_db_service().with_async_session() as session: - await setup_superuser(get_service(ServiceType.SETTINGS_SERVICE), session) + settings_service = get_service(ServiceType.SETTINGS_SERVICE) + await setup_superuser(settings_service, session) try: await get_db_service().assign_orphaned_flows_to_superuser() except Exception as exc: msg = "Error assigning orphaned flows to the superuser" logger.exception(msg) raise RuntimeError(msg) from exc + await clean_transactions(settings_service, session) + await clean_vertex_builds(settings_service, session)