🐛 fix(manager.py): fix typo in logger.warn to logger.warning
✨ feat(manager.py): add check_schema_health method to DatabaseManager to verify the integrity of the database schema 🐛 fix(utils.py): add error handling when checking schema health in initialize_database method
This commit is contained in:
parent
0aa8a25e3f
commit
4d08d511e9
2 changed files with 53 additions and 6 deletions
|
|
@ -2,6 +2,8 @@ from pathlib import Path
|
|||
from typing import TYPE_CHECKING
|
||||
from langflow.services.base import Service
|
||||
from langflow.services.utils import get_settings_manager
|
||||
from sqlalchemy import inspect
|
||||
import sqlalchemy as sa
|
||||
from sqlmodel import SQLModel, Session, create_engine
|
||||
from langflow.utils.logger import logger
|
||||
from alembic.config import Config
|
||||
|
|
@ -54,6 +56,41 @@ class DatabaseManager(Service):
|
|||
with Session(self.engine) as session:
|
||||
yield session
|
||||
|
||||
def check_schema_health(self) -> bool:
|
||||
inspector = inspect(self.engine)
|
||||
|
||||
model_mapping = {
|
||||
"flow": models.Flow,
|
||||
"user": models.User,
|
||||
"apikey": models.ApiKey,
|
||||
# Add other SQLModel classes here
|
||||
}
|
||||
|
||||
# To account for tables that existed in older versions
|
||||
legacy_tables = ["flowstyle"]
|
||||
|
||||
for table, model in model_mapping.items():
|
||||
expected_columns = list(model.__fields__.keys())
|
||||
|
||||
try:
|
||||
available_columns = [
|
||||
col["name"] for col in inspector.get_columns(table)
|
||||
]
|
||||
except sa.exc.NoSuchTableError:
|
||||
logger.error(f"Missing table: {table}")
|
||||
return False
|
||||
|
||||
for column in expected_columns:
|
||||
if column not in available_columns:
|
||||
logger.error(f"Missing column: {column} in table {table}")
|
||||
return False
|
||||
|
||||
for table in legacy_tables:
|
||||
if table in inspector.get_table_names():
|
||||
logger.warn(f"Legacy table exists: {table}")
|
||||
|
||||
return True
|
||||
|
||||
def run_migrations(self):
|
||||
logger.info(
|
||||
f"Running DB migrations in {self.script_location} on {self.database_url}"
|
||||
|
|
@ -76,9 +113,14 @@ class DatabaseManager(Service):
|
|||
from sqlalchemy import inspect
|
||||
|
||||
inspector = inspect(self.engine)
|
||||
if "flow" not in inspector.get_table_names():
|
||||
logger.error("Something went wrong creating the database and tables.")
|
||||
logger.error("Please check your database settings.")
|
||||
raise RuntimeError("Something went wrong creating the database and tables.")
|
||||
else:
|
||||
logger.debug("Database and tables created successfully")
|
||||
current_tables = ["flow", "user", "apikey"]
|
||||
table_names = inspector.get_table_names()
|
||||
for table in current_tables:
|
||||
if table not in table_names:
|
||||
logger.error("Something went wrong creating the database and tables.")
|
||||
logger.error("Please check your database settings.")
|
||||
raise RuntimeError(
|
||||
"Something went wrong creating the database and tables."
|
||||
)
|
||||
|
||||
logger.debug("Database and tables created successfully")
|
||||
|
|
|
|||
|
|
@ -13,6 +13,11 @@ def initialize_database():
|
|||
from langflow.services import service_manager, ServiceType
|
||||
|
||||
database_manager = service_manager.get(ServiceType.DATABASE_MANAGER)
|
||||
try:
|
||||
database_manager.check_schema_health()
|
||||
except Exception as exc:
|
||||
logger.error(f"Error checking schema health: {exc}")
|
||||
raise RuntimeError("Error checking schema health") from exc
|
||||
try:
|
||||
database_manager.run_migrations()
|
||||
except CommandError as exc:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue