fix: Implement primary key renaming in Alembic migration for message, transaction, and vertex_build tables (#7686)

feat: Add constraint existence check for primary keys in migration scripts

- Implemented a function to check for existing constraints before creating temporary tables in the upgrade and downgrade functions.
- Updated primary key names dynamically to avoid conflicts during migrations for the 'vertex_build', 'transaction', and 'message' tables.
- Enhanced the robustness of the migration process by ensuring unique constraint names across different database states.
This commit is contained in:
Gabriel Luiz Freitas Almeida 2025-04-17 12:34:43 -03:00 committed by GitHub
commit 89201e459f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -29,6 +29,29 @@ NAMING_CONVENTION = {
"pk": "pk_%(table_name)s",
}
def constraint_exists(constraint_name: str, conn) -> bool:
"""Check if a constraint with the given name already exists in the database.
Args:
constraint_name: The name of the constraint to check
conn: SQLAlchemy connection
Returns:
bool: True if the constraint exists, False otherwise
"""
inspector = Inspector.from_engine(conn)
# Get all table names
tables = inspector.get_table_names()
# Check each table for the constraint
for table in tables:
for constraint in inspector.get_pk_constraint(table).get("name"), *[c.get("name") for c in inspector.get_foreign_keys(table)]:
if constraint == constraint_name:
return True
return False
def upgrade() -> None:
conn = op.get_bind()
@ -39,6 +62,12 @@ def upgrade() -> None:
if migration.table_exists("vertex_build", conn):
# Create a temporary table without the constraint
temp_table_name = "temp_vertex_build"
pk_name = "pk_vertex_build"
# Check if PK constraint already exists
if constraint_exists(pk_name, conn):
# Use a different PK name if it already exists
pk_name = f"pk_temp_vertex_build"
# Create temp table with same schema but no FK constraint
op.create_table(
@ -51,7 +80,7 @@ def upgrade() -> None:
sa.Column("build_id", sqlmodel.sql.sqltypes.types.Uuid(), nullable=False),
sa.Column("flow_id", sqlmodel.sql.sqltypes.types.Uuid(), nullable=False),
sa.Column("valid", sa.BOOLEAN(), nullable=False),
sa.PrimaryKeyConstraint("build_id", name="pk_temp_vertex_build"),
sa.PrimaryKeyConstraint("build_id", name=pk_name),
)
# Copy data - use a window function to ensure build_id uniqueness across SQLite, PostgreSQL and MySQL
@ -76,6 +105,12 @@ def upgrade() -> None:
if migration.table_exists("transaction", conn):
# Create a temporary table without the constraint
temp_table_name = "temp_transaction"
pk_name = "pk_transaction"
# Check if PK constraint already exists
if constraint_exists(pk_name, conn):
# Use a different PK name if it already exists
pk_name = f"pk_temp_transaction"
# Create temp table with same schema but no FK constraint
op.create_table(
@ -89,7 +124,7 @@ def upgrade() -> None:
sa.Column("id", sqlmodel.sql.sqltypes.types.Uuid(), nullable=False),
sa.Column("flow_id", sqlmodel.sql.sqltypes.types.Uuid(), nullable=False),
sa.Column("error", sqlmodel.sql.sqltypes.AutoString(), nullable=True),
sa.PrimaryKeyConstraint("id", name="pk_temp_transaction"),
sa.PrimaryKeyConstraint("id", name=pk_name),
)
# Copy data - explicitly list columns and filter out rows where id is NULL
@ -108,6 +143,12 @@ def upgrade() -> None:
if migration.table_exists("message", conn):
# Create a temporary table without the constraint
temp_table_name = "temp_message"
pk_name = "pk_message"
# Check if PK constraint already exists
if constraint_exists(pk_name, conn):
# Use a different PK name if it already exists
pk_name = f"pk_temp_message"
# Create temp table with same schema but no FK constraint
op.create_table(
@ -125,7 +166,7 @@ def upgrade() -> None:
sa.Column("properties", sa.JSON(), nullable=True),
sa.Column("category", sa.Text(), nullable=True),
sa.Column("content_blocks", sa.JSON(), nullable=True),
sa.PrimaryKeyConstraint("id", name="pk_temp_message"),
sa.PrimaryKeyConstraint("id", name=pk_name),
)
# Copy data - explicitly list columns and filter out rows where id is NULL
@ -150,10 +191,17 @@ def downgrade() -> None:
if migration.table_exists("vertex_build", conn):
# Create a temporary table with the constraint
temp_table_name = "temp_vertex_build"
pk_name = "pk_vertex_build"
fk_name = "fk_vertex_build_flow_id_flow"
# Check if constraints already exist
if constraint_exists(pk_name, conn):
pk_name = f"pk_temp_vertex_build"
if constraint_exists(fk_name, conn):
fk_name = f"fk_vertex_build_flow_id_flow_{revision[:8]}"
# Create temp table with same schema including FK constraint
# Note: Original 'id' column was nullable=True here, which might be inconsistent with the model.
# Keeping it as nullable=True to match the previous state of this downgrade function.
op.create_table(
temp_table_name,
sa.Column("timestamp", sa.DateTime(), nullable=False),
@ -167,9 +215,9 @@ def downgrade() -> None:
sa.ForeignKeyConstraint(
["flow_id"],
["flow.id"],
name="fk_vertex_build_flow_id_flow",
name=fk_name,
),
sa.PrimaryKeyConstraint("build_id", name="pk_vertex_build"),
sa.PrimaryKeyConstraint("build_id", name=pk_name),
)
# Copy data - use a window function to ensure build_id uniqueness.
@ -195,6 +243,15 @@ def downgrade() -> None:
if migration.table_exists("transaction", conn):
# Create a temporary table with the constraint
temp_table_name = "temp_transaction"
pk_name = "pk_transaction"
fk_name = "fk_transaction_flow_id_flow"
# Check if constraints already exist
if constraint_exists(pk_name, conn):
pk_name = f"pk_temp_transaction"
if constraint_exists(fk_name, conn):
fk_name = f"fk_transaction_flow_id_flow_{revision[:8]}"
# Create temp table with same schema including FK constraint
op.create_table(
@ -211,9 +268,9 @@ def downgrade() -> None:
sa.ForeignKeyConstraint(
["flow_id"],
["flow.id"],
name="fk_transaction_flow_id_flow",
name=fk_name,
),
sa.PrimaryKeyConstraint("id", name="pk_transaction"),
sa.PrimaryKeyConstraint("id", name=pk_name),
)
# Copy data - explicitly list columns and filter out rows where id is NULL
@ -232,6 +289,15 @@ def downgrade() -> None:
if migration.table_exists("message", conn):
# Create a temporary table with the constraint
temp_table_name = "temp_message"
pk_name = "pk_message"
fk_name = "fk_message_flow_id_flow"
# Check if constraints already exist
if constraint_exists(pk_name, conn):
pk_name = f"pk_temp_message"
if constraint_exists(fk_name, conn):
fk_name = f"fk_message_flow_id_flow_{revision[:8]}"
# Create temp table with same schema including FK constraint
op.create_table(
@ -252,9 +318,9 @@ def downgrade() -> None:
sa.ForeignKeyConstraint(
["flow_id"],
["flow.id"],
name="fk_message_flow_id_flow",
name=fk_name,
),
sa.PrimaryKeyConstraint("id", name="pk_message"),
sa.PrimaryKeyConstraint("id", name=pk_name),
)
# Copy data - explicitly list columns and filter out rows where id is NULL