From 471b8e2f6fcc3f8c4474b1e357ef108ed195571b Mon Sep 17 00:00:00 2001 From: Jordan Frazier <122494242+jordanrfrazier@users.noreply.github.com> Date: Tue, 21 Jan 2025 12:41:17 -0800 Subject: [PATCH] feat: add retry on database connection (#5772) * Add retry to database connection * [autofix.ci] apply automated fixes * fix comments --------- Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- .env.example | 6 +++++ .../langflow/services/database/service.py | 22 ++++++++++++++++--- .../base/langflow/services/database/utils.py | 5 ++++- .../base/langflow/services/settings/base.py | 2 ++ 4 files changed, 31 insertions(+), 4 deletions(-) diff --git a/.env.example b/.env.example index dac588a24..e94753735 100644 --- a/.env.example +++ b/.env.example @@ -22,6 +22,12 @@ LANGFLOW_SAVE_DB_IN_CONFIG_DIR= # SQLite example: LANGFLOW_DATABASE_URL=sqlite:///./langflow.db +# Database connection retry +# Values: true, false +# If true, the database will retry to connect to the database if it fails +# Example: LANGFLOW_DATABASE_CONNECTION_RETRY=true +LANGFLOW_DATABASE_CONNECTION_RETRY=false + # Cache type LANGFLOW_LANGCHAIN_CACHE=SQLiteCache diff --git a/src/backend/base/langflow/services/database/service.py b/src/backend/base/langflow/services/database/service.py index d2874a520..72205efe2 100644 --- a/src/backend/base/langflow/services/database/service.py +++ b/src/backend/base/langflow/services/database/service.py @@ -21,6 +21,7 @@ from sqlalchemy.exc import OperationalError from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine from sqlmodel import SQLModel, select, text from sqlmodel.ext.asyncio.session import AsyncSession +from tenacity import retry, stop_after_attempt, wait_fixed from langflow.initial_setup.constants import STARTER_FOLDER_NAME from langflow.services.base import Service @@ -54,7 +55,10 @@ class DatabaseService(Service): # register the event listener for sqlite as part of this class. # Using decorator will make the method not able to use self event.listen(Engine, "connect", self.on_connection) - self.engine = self._create_engine() + if self.settings_service.settings.database_connection_retry: + self.engine = self._create_engine_with_retry() + else: + self.engine = self._create_engine() alembic_log_file = self.settings_service.settings.alembic_log_file # Check if the provided path is absolute, cross-platform. @@ -72,7 +76,10 @@ class DatabaseService(Service): def reload_engine(self) -> None: self._sanitize_database_url() - self.engine = self._create_engine() + if self.settings_service.settings.database_connection_retry: + self.engine = self._create_engine_with_retry() + else: + self.engine = self._create_engine() def _sanitize_database_url(self): if self.database_url.startswith("postgres://"): @@ -101,6 +108,11 @@ class DatabaseService(Service): **kwargs, ) + @retry(wait=wait_fixed(2), stop=stop_after_attempt(10)) + def _create_engine_with_retry(self) -> AsyncEngine: + """Create the engine for the database with retry logic.""" + return self._create_engine() + def _get_connect_args(self): if self.settings_service.settings.database_url and self.settings_service.settings.database_url.startswith( "sqlite" @@ -141,7 +153,7 @@ class DatabaseService(Service): if session.is_active: await session.commit() except Exception: - logger.error("An error occurred during the session scope.") + logger.error("An error occurred during the session scope") await session.rollback() raise @@ -415,6 +427,10 @@ class DatabaseService(Service): logger.debug("Database and tables created successfully") + @retry(wait=wait_fixed(2), stop=stop_after_attempt(10)) + async def create_db_and_tables_with_retry(self) -> None: + await self.create_db_and_tables() + async def create_db_and_tables(self) -> None: async with self.with_session() as session, session.bind.connect() as conn: await conn.run_sync(self._create_db_and_tables) diff --git a/src/backend/base/langflow/services/database/utils.py b/src/backend/base/langflow/services/database/utils.py index c64fbab22..9143dd7ff 100644 --- a/src/backend/base/langflow/services/database/utils.py +++ b/src/backend/base/langflow/services/database/utils.py @@ -64,7 +64,10 @@ async def initialize_database(*, fix_migration: bool = False) -> None: database_service: DatabaseService = get_db_service() try: - await database_service.create_db_and_tables() + if database_service.settings_service.settings.database_connection_retry: + await database_service.create_db_and_tables_with_retry() + else: + await database_service.create_db_and_tables() except Exception as exc: # if the exception involves tables already existing # we can ignore it diff --git a/src/backend/base/langflow/services/settings/base.py b/src/backend/base/langflow/services/settings/base.py index fb96859ce..ec752be77 100644 --- a/src/backend/base/langflow/services/settings/base.py +++ b/src/backend/base/langflow/services/settings/base.py @@ -71,6 +71,8 @@ class Settings(BaseSettings): """If True, Langflow will run in development mode.""" database_url: str | None = None """Database URL for Langflow. If not provided, Langflow will use a SQLite database.""" + database_connection_retry: bool = False + """If True, Langflow will retry to connect to the database if it fails.""" pool_size: int = 10 """The number of connections to keep open in the connection pool. If not provided, the default is 10.""" max_overflow: int = 20