feat: add retry on database connection (#5772)

* Add retry to database connection

* [autofix.ci] apply automated fixes

* fix comments

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
Jordan Frazier 2025-01-21 12:41:17 -08:00 committed by GitHub
commit 471b8e2f6f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 31 additions and 4 deletions

View file

@ -22,6 +22,12 @@ LANGFLOW_SAVE_DB_IN_CONFIG_DIR=
# SQLite example:
LANGFLOW_DATABASE_URL=sqlite:///./langflow.db
# Database connection retry
# Values: true, false
# If true, the database will retry to connect to the database if it fails
# Example: LANGFLOW_DATABASE_CONNECTION_RETRY=true
LANGFLOW_DATABASE_CONNECTION_RETRY=false
# Cache type
LANGFLOW_LANGCHAIN_CACHE=SQLiteCache

View file

@ -21,6 +21,7 @@ from sqlalchemy.exc import OperationalError
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from sqlmodel import SQLModel, select, text
from sqlmodel.ext.asyncio.session import AsyncSession
from tenacity import retry, stop_after_attempt, wait_fixed
from langflow.initial_setup.constants import STARTER_FOLDER_NAME
from langflow.services.base import Service
@ -54,7 +55,10 @@ class DatabaseService(Service):
# register the event listener for sqlite as part of this class.
# Using decorator will make the method not able to use self
event.listen(Engine, "connect", self.on_connection)
self.engine = self._create_engine()
if self.settings_service.settings.database_connection_retry:
self.engine = self._create_engine_with_retry()
else:
self.engine = self._create_engine()
alembic_log_file = self.settings_service.settings.alembic_log_file
# Check if the provided path is absolute, cross-platform.
@ -72,7 +76,10 @@ class DatabaseService(Service):
def reload_engine(self) -> None:
self._sanitize_database_url()
self.engine = self._create_engine()
if self.settings_service.settings.database_connection_retry:
self.engine = self._create_engine_with_retry()
else:
self.engine = self._create_engine()
def _sanitize_database_url(self):
if self.database_url.startswith("postgres://"):
@ -101,6 +108,11 @@ class DatabaseService(Service):
**kwargs,
)
@retry(wait=wait_fixed(2), stop=stop_after_attempt(10))
def _create_engine_with_retry(self) -> AsyncEngine:
"""Create the engine for the database with retry logic."""
return self._create_engine()
def _get_connect_args(self):
if self.settings_service.settings.database_url and self.settings_service.settings.database_url.startswith(
"sqlite"
@ -141,7 +153,7 @@ class DatabaseService(Service):
if session.is_active:
await session.commit()
except Exception:
logger.error("An error occurred during the session scope.")
logger.error("An error occurred during the session scope")
await session.rollback()
raise
@ -415,6 +427,10 @@ class DatabaseService(Service):
logger.debug("Database and tables created successfully")
@retry(wait=wait_fixed(2), stop=stop_after_attempt(10))
async def create_db_and_tables_with_retry(self) -> None:
await self.create_db_and_tables()
async def create_db_and_tables(self) -> None:
async with self.with_session() as session, session.bind.connect() as conn:
await conn.run_sync(self._create_db_and_tables)

View file

@ -64,7 +64,10 @@ async def initialize_database(*, fix_migration: bool = False) -> None:
database_service: DatabaseService = get_db_service()
try:
await database_service.create_db_and_tables()
if database_service.settings_service.settings.database_connection_retry:
await database_service.create_db_and_tables_with_retry()
else:
await database_service.create_db_and_tables()
except Exception as exc:
# if the exception involves tables already existing
# we can ignore it

View file

@ -71,6 +71,8 @@ class Settings(BaseSettings):
"""If True, Langflow will run in development mode."""
database_url: str | None = None
"""Database URL for Langflow. If not provided, Langflow will use a SQLite database."""
database_connection_retry: bool = False
"""If True, Langflow will retry to connect to the database if it fails."""
pool_size: int = 10
"""The number of connections to keep open in the connection pool. If not provided, the default is 10."""
max_overflow: int = 20