feat: Add database cleanup functions for transactions and vertex builds (#4694)

* feat: Add configuration options for maximum transactions and vertex builds retention

* Add functions to clean up old transactions and vertex builds in the database

- Implement `clean_transactions` to delete transactions exceeding the configured limit.
- Implement `clean_vertex_builds` to delete vertex builds exceeding the configured limit.
- Integrate cleanup functions into the service initialization process.

* Add error handling and logging for cleanup tasks in utils.py

- Wrap transaction and vertex build cleanup operations in try-except blocks.
- Log success and error messages for cleanup operations.
- Rollback session on exceptions without re-raising, as these are cleanup tasks.
- Adjust service initialization order to ensure proper setup.

* Reorder setup and cleanup tasks in database initialization process

* fix: Update type hints for settings_service in cleanup functions

* Remove execution options in cleanup functions

* Handle specific exceptions during cleanup tasks in utils.py

* Use `col` for column references in delete statements to improve SQL query clarity.
This commit is contained in:
Gabriel Luiz Freitas Almeida 2024-11-19 20:16:36 -03:00 committed by GitHub
commit a0acf39f8a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 85 additions and 3 deletions

View file

@ -173,6 +173,10 @@ class Settings(BaseSettings):
"""The maximum file size for the upload in MB."""
deactivate_tracing: bool = False
"""If set to True, tracing will be deactivated."""
max_transactions_to_keep: int = 3000
"""The maximum number of transactions to keep in the database."""
max_vertex_builds_to_keep: int = 3000
"""The maximum number of vertex builds to keep in the database."""
@field_validator("dev")
@classmethod

View file

@ -1,17 +1,28 @@
from __future__ import annotations
import asyncio
from typing import TYPE_CHECKING
from loguru import logger
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from sqlalchemy import delete
from sqlalchemy import exc as sqlalchemy_exc
from sqlmodel import col, select
from langflow.services.auth.utils import create_super_user, verify_password
from langflow.services.cache.factory import CacheServiceFactory
from langflow.services.database.models.transactions.model import TransactionTable
from langflow.services.database.models.vertex_builds.model import VertexBuildTable
from langflow.services.database.utils import initialize_database
from langflow.services.schema import ServiceType
from langflow.services.settings.constants import DEFAULT_SUPERUSER, DEFAULT_SUPERUSER_PASSWORD
from .deps import get_db_service, get_service, get_settings_service
if TYPE_CHECKING:
from sqlmodel.ext.asyncio.session import AsyncSession
from langflow.services.settings.manager import SettingsService
async def get_or_create_super_user(session: AsyncSession, username, password, is_default):
from langflow.services.database.models.user.model import User
@ -157,6 +168,70 @@ def initialize_session_service() -> None:
)
async def clean_transactions(settings_service: SettingsService, session: AsyncSession) -> None:
"""Clean up old transactions from the database.
This function deletes transactions that exceed the maximum number to keep (configured in settings).
It orders transactions by timestamp descending and removes the oldest ones beyond the limit.
Args:
settings_service: The settings service containing configuration like max_transactions_to_keep
session: The database session to use for the deletion
Returns:
None
"""
try:
# Delete transactions using bulk delete
delete_stmt = delete(TransactionTable).where(
col(TransactionTable.id).in_(
select(TransactionTable.id)
.order_by(col(TransactionTable.timestamp).desc())
.offset(settings_service.settings.max_transactions_to_keep)
)
)
await session.exec(delete_stmt)
await session.commit()
logger.debug("Successfully cleaned up old transactions")
except (sqlalchemy_exc.SQLAlchemyError, asyncio.TimeoutError) as exc:
logger.error(f"Error cleaning up transactions: {exc!s}")
await session.rollback()
# Don't re-raise since this is a cleanup task
async def clean_vertex_builds(settings_service: SettingsService, session: AsyncSession) -> None:
"""Clean up old vertex builds from the database.
This function deletes vertex builds that exceed the maximum number to keep (configured in settings).
It orders vertex builds by timestamp descending and removes the oldest ones beyond the limit.
Args:
settings_service: The settings service containing configuration like max_vertex_builds_to_keep
session: The database session to use for the deletion
Returns:
None
"""
try:
# Delete vertex builds using bulk delete
delete_stmt = delete(VertexBuildTable).where(
col(VertexBuildTable.id).in_(
select(VertexBuildTable.id)
.order_by(col(VertexBuildTable.timestamp).desc())
.offset(settings_service.settings.max_vertex_builds_to_keep)
)
)
await session.exec(delete_stmt)
await session.commit()
logger.debug("Successfully cleaned up old vertex builds")
except (sqlalchemy_exc.SQLAlchemyError, asyncio.TimeoutError) as exc:
logger.error(f"Error cleaning up vertex builds: {exc!s}")
await session.rollback()
# Don't re-raise since this is a cleanup task
async def initialize_services(*, fix_migration: bool = False) -> None:
"""Initialize all the services needed."""
# Test cache connection
@ -164,10 +239,13 @@ async def initialize_services(*, fix_migration: bool = False) -> None:
# Setup the superuser
await asyncio.to_thread(initialize_database, fix_migration=fix_migration)
async with get_db_service().with_async_session() as session:
await setup_superuser(get_service(ServiceType.SETTINGS_SERVICE), session)
settings_service = get_service(ServiceType.SETTINGS_SERVICE)
await setup_superuser(settings_service, session)
try:
await get_db_service().assign_orphaned_flows_to_superuser()
except Exception as exc:
msg = "Error assigning orphaned flows to the superuser"
logger.exception(msg)
raise RuntimeError(msg) from exc
await clean_transactions(settings_service, session)
await clean_vertex_builds(settings_service, session)