diff --git a/src/backend/base/langflow/services/monitor/utils.py b/src/backend/base/langflow/services/monitor/utils.py index b3daf6a47..97bb8e6e3 100644 --- a/src/backend/base/langflow/services/monitor/utils.py +++ b/src/backend/base/langflow/services/monitor/utils.py @@ -1,6 +1,7 @@ from typing import TYPE_CHECKING, Any, Dict, Optional, Type, Union import duckdb +import threading from loguru import logger from pydantic import BaseModel @@ -13,6 +14,9 @@ if TYPE_CHECKING: INDEX_KEY = "index" +# Lock to prevent multiple threads from creating the same table at the same time +drop_create_table_lock = threading.Lock() + def get_table_schema_as_dict(conn: duckdb.DuckDBPyConnection, table_name: str) -> dict: result = conn.execute(f"PRAGMA table_info('{table_name}')").fetchall() @@ -48,30 +52,31 @@ def model_to_sql_column_definitions(model: Type[BaseModel]) -> dict: def drop_and_create_table_if_schema_mismatch(db_path: str, table_name: str, model: Type[BaseModel]): - with duckdb.connect(db_path) as conn: - # Get the current schema from the database - try: - current_schema = get_table_schema_as_dict(conn, table_name) - except duckdb.CatalogException: - current_schema = {} - # Get the desired schema from the model - desired_schema = model_to_sql_column_definitions(model) + with drop_create_table_lock: + with duckdb.connect(db_path) as conn: + # Get the current schema from the database + try: + current_schema = get_table_schema_as_dict(conn, table_name) + except duckdb.CatalogException: + current_schema = {} + # Get the desired schema from the model + desired_schema = model_to_sql_column_definitions(model) - # Compare the current and desired schemas + # Compare the current and desired schemas - if current_schema != desired_schema: - # If they don't match, drop the existing table and create a new one - conn.execute(f"DROP TABLE IF EXISTS {table_name}") - if INDEX_KEY in desired_schema.keys(): - # Create a sequence for the id column - try: - conn.execute(f"CREATE SEQUENCE seq_{table_name} START 1;") - except duckdb.CatalogException: - pass - desired_schema[INDEX_KEY] = f"INTEGER PRIMARY KEY DEFAULT NEXTVAL('seq_{table_name}')" - columns_sql = ", ".join(f"{name} {data_type}" for name, data_type in desired_schema.items()) - create_table_sql = f"CREATE TABLE {table_name} ({columns_sql})" - conn.execute(create_table_sql) + if current_schema != desired_schema: + # If they don't match, drop the existing table and create a new one + conn.execute(f"DROP TABLE IF EXISTS {table_name}") + if INDEX_KEY in desired_schema.keys(): + # Create a sequence for the id column + try: + conn.execute(f"CREATE SEQUENCE seq_{table_name} START 1;") + except duckdb.CatalogException: + pass + desired_schema[INDEX_KEY] = f"INTEGER PRIMARY KEY DEFAULT NEXTVAL('seq_{table_name}')" + columns_sql = ", ".join(f"{name} {data_type}" for name, data_type in desired_schema.items()) + create_table_sql = f"CREATE TABLE {table_name} ({columns_sql})" + conn.execute(create_table_sql) def add_row_to_table(