📦 chore(manager.py): add DatabaseManager class to handle database operations

📦 chore(utils.py): add initialize_database function and session_getter context manager to handle database initialization and session management
This commit is contained in:
Gabriel Luiz Freitas Almeida 2023-08-07 12:00:27 -03:00
commit 8436c66aa7
2 changed files with 32 additions and 25 deletions

View file

@ -1,4 +1,3 @@
from contextlib import contextmanager
from pathlib import Path
from langflow.services.base import Service
from sqlmodel import SQLModel, Session, create_engine
@ -12,7 +11,7 @@ class DatabaseManager(Service):
def __init__(self, database_url: str):
self.database_url = database_url
# This file is in langflow.services.database.base.py
# This file is in langflow.services.database.manager.py
# the ini is in langflow
langflow_dir = Path(__file__).parent.parent.parent
self.script_location = langflow_dir / "alembic"
@ -65,26 +64,3 @@ class DatabaseManager(Service):
raise RuntimeError("Something went wrong creating the database and tables.")
else:
logger.debug("Database and tables created successfully")
@contextmanager
def session_getter(db_manager: DatabaseManager):
try:
session = Session(db_manager.engine)
yield session
except Exception as e:
print("Session rollback because of exception:", e)
session.rollback()
raise
finally:
session.close()
def initialize_database():
logger.debug("Initializing database")
from langflow.services import service_manager, ServiceType
database_manager = service_manager.get(ServiceType.DATABASE_MANAGER)
database_manager.run_migrations()
database_manager.create_db_and_tables()
logger.debug("Database initialized")

View file

@ -0,0 +1,31 @@
from typing import TYPE_CHECKING
from langflow.utils.logger import logger
from contextlib import contextmanager
from sqlmodel import Session
if TYPE_CHECKING:
from langflow.services.database.manager import DatabaseManager
def initialize_database():
logger.debug("Initializing database")
from langflow.services import service_manager, ServiceType
database_manager = service_manager.get(ServiceType.DATABASE_MANAGER)
database_manager.run_migrations()
database_manager.create_db_and_tables()
logger.debug("Database initialized")
@contextmanager
def session_getter(db_manager: "DatabaseManager"):
try:
session = Session(db_manager.engine)
yield session
except Exception as e:
print("Session rollback because of exception:", e)
session.rollback()
raise
finally:
session.close()