From 1dd840c526c4bf61ef284cd3b6f6745db4af4773 Mon Sep 17 00:00:00 2001 From: ming Date: Thu, 1 Aug 2024 08:48:00 -0400 Subject: [PATCH] feat: set sqlite pragma by env variable (#3136) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * set sqlite pragma by env variable * configurable * [autofix.ci] apply automated fixes * fix circular dep * [autofix.ci] apply automated fixes --------- Co-authored-by: Nicolò Boschi Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- .../langflow/services/database/service.py | 37 ++++++++++++------- .../base/langflow/services/settings/base.py | 9 ++++- src/backend/tests/unit/test_database.py | 10 +++++ 3 files changed, 42 insertions(+), 14 deletions(-) diff --git a/src/backend/base/langflow/services/database/service.py b/src/backend/base/langflow/services/database/service.py index 22de47dac..5dd6fe055 100644 --- a/src/backend/base/langflow/services/database/service.py +++ b/src/backend/base/langflow/services/database/service.py @@ -1,7 +1,7 @@ import time from datetime import datetime from pathlib import Path -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Optional import sqlalchemy as sa from alembic import command, util @@ -47,12 +47,17 @@ class DatabaseService(Service): def _create_engine(self) -> "Engine": """Create the engine for the database.""" - settings_service = get_settings_service() - if settings_service.settings.database_url and settings_service.settings.database_url.startswith("sqlite"): + if self.settings_service.settings.database_url and self.settings_service.settings.database_url.startswith( + "sqlite" + ): connect_args = {"check_same_thread": False} else: connect_args = {} try: + # register the event listener for sqlite as part of this class. + # Using decorator will make the method not able to use self + event.listen(Engine, "connect", self.on_connection) + return create_engine( self.database_url, connect_args=connect_args, @@ -69,19 +74,25 @@ class DatabaseService(Service): return self._create_engine() raise RuntimeError("Error creating database engine") from exc - @event.listens_for(Engine, "connect") - def on_connection(dbapi_connection, connection_record): + def on_connection(self, dbapi_connection, connection_record): from sqlite3 import Connection as sqliteConnection if isinstance(dbapi_connection, sqliteConnection): - logger.info("sqlite connect listener, setting pragmas") - cursor = dbapi_connection.cursor() - try: - cursor.execute("PRAGMA synchronous = NORMAL") - cursor.execute("PRAGMA journal_mode = WAL") - cursor.close() - except OperationalError as oe: - logger.warning("Failed to set PRAGMA: ", {oe}) + pragmas: Optional[dict] = self.settings_service.settings.sqlite_pragmas + pragmas_list = [] + for key, val in pragmas.items() or {}: + pragmas_list.append(f"PRAGMA {key} = {val}") + logger.info(f"sqlite connection, setting pragmas: {str(pragmas_list)}") + if pragmas_list: + cursor = dbapi_connection.cursor() + try: + for pragma in pragmas_list: + try: + cursor.execute(pragma) + except OperationalError as oe: + logger.error(f"Failed to set PRAGMA {pragma}: ", {oe}) + finally: + cursor.close() def __enter__(self): self._session = Session(self.engine) diff --git a/src/backend/base/langflow/services/settings/base.py b/src/backend/base/langflow/services/settings/base.py index 4575069b7..b028294bd 100644 --- a/src/backend/base/langflow/services/settings/base.py +++ b/src/backend/base/langflow/services/settings/base.py @@ -71,7 +71,14 @@ class Settings(BaseSettings): pool_size: int = 10 """The number of connections to keep open in the connection pool. If not provided, the default is 10.""" max_overflow: int = 20 - """The number of connections to allow that can be opened beyond the pool size. If not provided, the default is 10.""" + """The number of connections to allow that can be opened beyond the pool size. + If not provided, the default is 20.""" + + # sqlite configuration + sqlite_pragmas: Optional[dict] = {"synchronous": "NORMAL", "journal_mode": "WAL"} + """SQLite pragmas to use when connecting to the database.""" + + # cache configuration cache_type: str = "async" """The cache type can be 'async' or 'redis'.""" cache_expire: int = 3600 diff --git a/src/backend/tests/unit/test_database.py b/src/backend/tests/unit/test_database.py index ccce97c00..b97a4fd71 100644 --- a/src/backend/tests/unit/test_database.py +++ b/src/backend/tests/unit/test_database.py @@ -402,3 +402,13 @@ def test_migrate_transactions_no_duckdb(client: TestClient): migrate_transactions_from_monitor_service_to_database(session) new_trans = get_transactions_by_flow_id(session, UUID(flow_id)) assert 0 == len(new_trans) + + +def test_sqlite_pragmas(): + db_service = get_db_service() + + with db_service as session: + from sqlalchemy import text + + assert "wal" == session.execute(text("PRAGMA journal_mode;")).fetchone()[0] + assert 1 == session.execute(text("PRAGMA synchronous;")).fetchone()[0]