refactor: Enhance migration script with column existence check and improved error handling (#4680)

* Enhance migration script with column existence check and improved error handling

* Refactor migration script to use `column_exists` with explicit parameters
This commit is contained in:
Gabriel Luiz Freitas Almeida 2024-11-18 17:21:58 -03:00 committed by GitHub
commit 160409d19c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -12,6 +12,9 @@ from typing import Sequence, Union
import sqlalchemy as sa
import sqlmodel
from alembic import op
from sqlalchemy.engine.reflection import Inspector
from langflow.utils import migration
# revision identifiers, used by Alembic.
revision: str = "1ef9c4f3765d"
@ -21,20 +24,31 @@ depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
conn = op.get_bind()
# ### commands auto generated by Alembic - please adjust! ###
try:
with op.batch_alter_table("apikey", schema=None) as batch_op:
batch_op.alter_column("name", existing_type=sqlmodel.sql.sqltypes.AutoString(), nullable=True)
except Exception:
pass
inspector = Inspector.from_engine(conn) # type: ignore
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("apikey", schema=None) as batch_op:
if migration.column_exists(table_name="apikey", column_name="name", conn=conn):
api_key_columns = inspector.get_columns("apikey")
name_column = next((column for column in api_key_columns if column["name"] == "name"), None)
if name_column is not None and isinstance(name_column["type"], sa.VARCHAR) and not name_column["nullable"]:
batch_op.alter_column("name", existing_type=sa.VARCHAR(), nullable=True)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
try:
with op.batch_alter_table("apikey", schema=None) as batch_op:
batch_op.alter_column("name", existing_type=sa.VARCHAR(), nullable=False)
except Exception:
pass
conn = op.get_bind()
inspector = Inspector.from_engine(conn) # type: ignore
with op.batch_alter_table("apikey", schema=None) as batch_op:
if migration.column_exists(table_name="apikey", column_name="name", conn=conn):
api_key_columns = inspector.get_columns("apikey")
name_column = next((column for column in api_key_columns if column["name"] == "name"), None)
if name_column is not None and isinstance(name_column["type"], sa.VARCHAR) and name_column["nullable"]:
batch_op.alter_column("name", existing_type=sa.VARCHAR(), nullable=False)
# ### end Alembic commands ###