feat: set sqlite pragma by env variable (#3136)

* set sqlite pragma by env variable

* configurable

* [autofix.ci] apply automated fixes

* fix circular dep

* [autofix.ci] apply automated fixes

---------

Co-authored-by: Nicolò Boschi <boschi1997@gmail.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
ming 2024-08-01 08:48:00 -04:00 committed by GitHub
commit 1dd840c526
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 42 additions and 14 deletions

View file

@ -1,7 +1,7 @@
import time
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Optional
import sqlalchemy as sa
from alembic import command, util
@ -47,12 +47,17 @@ class DatabaseService(Service):
def _create_engine(self) -> "Engine":
"""Create the engine for the database."""
settings_service = get_settings_service()
if settings_service.settings.database_url and settings_service.settings.database_url.startswith("sqlite"):
if self.settings_service.settings.database_url and self.settings_service.settings.database_url.startswith(
"sqlite"
):
connect_args = {"check_same_thread": False}
else:
connect_args = {}
try:
# register the event listener for sqlite as part of this class.
# Using decorator will make the method not able to use self
event.listen(Engine, "connect", self.on_connection)
return create_engine(
self.database_url,
connect_args=connect_args,
@ -69,19 +74,25 @@ class DatabaseService(Service):
return self._create_engine()
raise RuntimeError("Error creating database engine") from exc
@event.listens_for(Engine, "connect")
def on_connection(dbapi_connection, connection_record):
def on_connection(self, dbapi_connection, connection_record):
from sqlite3 import Connection as sqliteConnection
if isinstance(dbapi_connection, sqliteConnection):
logger.info("sqlite connect listener, setting pragmas")
cursor = dbapi_connection.cursor()
try:
cursor.execute("PRAGMA synchronous = NORMAL")
cursor.execute("PRAGMA journal_mode = WAL")
cursor.close()
except OperationalError as oe:
logger.warning("Failed to set PRAGMA: ", {oe})
pragmas: Optional[dict] = self.settings_service.settings.sqlite_pragmas
pragmas_list = []
for key, val in pragmas.items() or {}:
pragmas_list.append(f"PRAGMA {key} = {val}")
logger.info(f"sqlite connection, setting pragmas: {str(pragmas_list)}")
if pragmas_list:
cursor = dbapi_connection.cursor()
try:
for pragma in pragmas_list:
try:
cursor.execute(pragma)
except OperationalError as oe:
logger.error(f"Failed to set PRAGMA {pragma}: ", {oe})
finally:
cursor.close()
def __enter__(self):
self._session = Session(self.engine)

View file

@ -71,7 +71,14 @@ class Settings(BaseSettings):
pool_size: int = 10
"""The number of connections to keep open in the connection pool. If not provided, the default is 10."""
max_overflow: int = 20
"""The number of connections to allow that can be opened beyond the pool size. If not provided, the default is 10."""
"""The number of connections to allow that can be opened beyond the pool size.
If not provided, the default is 20."""
# sqlite configuration
sqlite_pragmas: Optional[dict] = {"synchronous": "NORMAL", "journal_mode": "WAL"}
"""SQLite pragmas to use when connecting to the database."""
# cache configuration
cache_type: str = "async"
"""The cache type can be 'async' or 'redis'."""
cache_expire: int = 3600

View file

@ -402,3 +402,13 @@ def test_migrate_transactions_no_duckdb(client: TestClient):
migrate_transactions_from_monitor_service_to_database(session)
new_trans = get_transactions_by_flow_id(session, UUID(flow_id))
assert 0 == len(new_trans)
def test_sqlite_pragmas():
db_service = get_db_service()
with db_service as session:
from sqlalchemy import text
assert "wal" == session.execute(text("PRAGMA journal_mode;")).fetchone()[0]
assert 1 == session.execute(text("PRAGMA synchronous;")).fetchone()[0]