prevent race condiction on drop_and_create_table_if_schema_mismatch

This commit is contained in:
ming luo 2024-06-22 23:26:01 -04:00 committed by Gabriel Luiz Freitas Almeida
commit b1e7a8b288

View file

@ -1,6 +1,7 @@
from typing import TYPE_CHECKING, Any, Dict, Optional, Type, Union
import duckdb
import threading
from loguru import logger
from pydantic import BaseModel
@ -13,6 +14,9 @@ if TYPE_CHECKING:
INDEX_KEY = "index"
# Lock to prevent multiple threads from creating the same table at the same time
drop_create_table_lock = threading.Lock()
def get_table_schema_as_dict(conn: duckdb.DuckDBPyConnection, table_name: str) -> dict:
result = conn.execute(f"PRAGMA table_info('{table_name}')").fetchall()
@ -48,30 +52,31 @@ def model_to_sql_column_definitions(model: Type[BaseModel]) -> dict:
def drop_and_create_table_if_schema_mismatch(db_path: str, table_name: str, model: Type[BaseModel]):
with duckdb.connect(db_path) as conn:
# Get the current schema from the database
try:
current_schema = get_table_schema_as_dict(conn, table_name)
except duckdb.CatalogException:
current_schema = {}
# Get the desired schema from the model
desired_schema = model_to_sql_column_definitions(model)
with drop_create_table_lock:
with duckdb.connect(db_path) as conn:
# Get the current schema from the database
try:
current_schema = get_table_schema_as_dict(conn, table_name)
except duckdb.CatalogException:
current_schema = {}
# Get the desired schema from the model
desired_schema = model_to_sql_column_definitions(model)
# Compare the current and desired schemas
# Compare the current and desired schemas
if current_schema != desired_schema:
# If they don't match, drop the existing table and create a new one
conn.execute(f"DROP TABLE IF EXISTS {table_name}")
if INDEX_KEY in desired_schema.keys():
# Create a sequence for the id column
try:
conn.execute(f"CREATE SEQUENCE seq_{table_name} START 1;")
except duckdb.CatalogException:
pass
desired_schema[INDEX_KEY] = f"INTEGER PRIMARY KEY DEFAULT NEXTVAL('seq_{table_name}')"
columns_sql = ", ".join(f"{name} {data_type}" for name, data_type in desired_schema.items())
create_table_sql = f"CREATE TABLE {table_name} ({columns_sql})"
conn.execute(create_table_sql)
if current_schema != desired_schema:
# If they don't match, drop the existing table and create a new one
conn.execute(f"DROP TABLE IF EXISTS {table_name}")
if INDEX_KEY in desired_schema.keys():
# Create a sequence for the id column
try:
conn.execute(f"CREATE SEQUENCE seq_{table_name} START 1;")
except duckdb.CatalogException:
pass
desired_schema[INDEX_KEY] = f"INTEGER PRIMARY KEY DEFAULT NEXTVAL('seq_{table_name}')"
columns_sql = ", ".join(f"{name} {data_type}" for name, data_type in desired_schema.items())
create_table_sql = f"CREATE TABLE {table_name} ({columns_sql})"
conn.execute(create_table_sql)
def add_row_to_table(