diff --git a/src/backend/langflow/alembic/versions/4814b6f4abfd_add_flow_table.py b/src/backend/langflow/alembic/versions/4814b6f4abfd_add_flow_table.py new file mode 100644 index 000000000..0b2f32657 --- /dev/null +++ b/src/backend/langflow/alembic/versions/4814b6f4abfd_add_flow_table.py @@ -0,0 +1,65 @@ +"""Add Flow table + +Revision ID: 4814b6f4abfd +Revises: +Create Date: 2023-08-05 17:47:42.879824 + +""" + +import contextlib +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +import sqlmodel + + +# revision identifiers, used by Alembic. +revision: str = "4814b6f4abfd" +down_revision: Union[str, None] = None +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + + # This suppress is used to not break the migration if the table already exists. + with contextlib.suppress(sa.exc.OperationalError): + op.create_table( + "flow", + sa.Column("data", sa.JSON(), nullable=True), + sa.Column("name", sqlmodel.sql.sqltypes.AutoString(), nullable=False), + sa.Column("description", sqlmodel.sql.sqltypes.AutoString(), nullable=True), + sa.Column("id", sqlmodel.sql.sqltypes.GUID(), nullable=False), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint("id"), + ) + op.create_index( + op.f("ix_flow_description"), "flow", ["description"], unique=False + ) + op.create_index(op.f("ix_flow_name"), "flow", ["name"], unique=False) + with contextlib.suppress(sa.exc.OperationalError): + op.create_table( + "flowstyle", + sa.Column("color", sqlmodel.sql.sqltypes.AutoString(), nullable=False), + sa.Column("emoji", sqlmodel.sql.sqltypes.AutoString(), nullable=False), + sa.Column("flow_id", sqlmodel.sql.sqltypes.GUID(), nullable=True), + sa.Column("id", sqlmodel.sql.sqltypes.GUID(), nullable=False), + sa.ForeignKeyConstraint( + ["flow_id"], + ["flow.id"], + ), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint("id"), + ) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_table("flowstyle") + op.drop_index(op.f("ix_flow_name"), table_name="flow") + op.drop_index(op.f("ix_flow_description"), table_name="flow") + op.drop_table("flow") + # ### end Alembic commands ### diff --git a/src/backend/langflow/database/base.py b/src/backend/langflow/database/base.py index 546c341c1..518b95d15 100644 --- a/src/backend/langflow/database/base.py +++ b/src/backend/langflow/database/base.py @@ -1,8 +1,11 @@ from contextlib import contextmanager import os - +from pathlib import Path +from langflow.database import models # noqa from sqlmodel import SQLModel, Session, create_engine from langflow.utils.logger import logger +from alembic.config import Config +from alembic import command class Engine: @@ -60,10 +63,67 @@ def create_db_and_tables(): logger.debug("Database and tables created successfully") +class DatabaseManager: + def __init__(self, database_url: str): + self.database_url = database_url + # This file is in langflow.database.base.py + # the ini is in langflow + self.script_location = Path(__file__).parent.parent / "alembic" + self.alembic_cfg_path = Path(__file__).parent.parent / "alembic.ini" + self.engine = create_engine(database_url) + + def __enter__(self): + self._session = Session(self.engine) + return self._session + + def __exit__(self, exc_type, exc_value, traceback): + if exc_type is not None: # If an exception has been raised + logger.error( + f"Session rollback because of exception: {exc_type.__name__} {exc_value}" + ) + self._session.rollback() + else: + self._session.commit() + self._session.close() + + def get_session(self): + with Session(self.engine) as session: + yield session + + def run_migrations(self): + logger.info( + f"Running DB migrations in {self.script_location} on {self.database_url}" + ) + alembic_cfg = Config() + alembic_cfg.set_main_option("script_location", str(self.script_location)) + alembic_cfg.set_main_option("sqlalchemy.url", self.database_url) + command.upgrade(alembic_cfg, "head") + + def create_db_and_tables(self): + logger.debug("Creating database and tables") + try: + SQLModel.metadata.create_all(self.engine) + except Exception as exc: + logger.error(f"Error creating database and tables: {exc}") + raise RuntimeError("Error creating database and tables") from exc + + # Now check if the table "flow" exists, if not, something went wrong + # and we need to create the tables again. + from sqlalchemy import inspect + + inspector = inspect(self.engine) + if "flow" not in inspector.get_table_names(): + logger.error("Something went wrong creating the database and tables.") + logger.error("Please check your database settings.") + raise RuntimeError("Something went wrong creating the database and tables.") + else: + logger.debug("Database and tables created successfully") + + @contextmanager -def session_getter(): +def session_getter(db_manager: DatabaseManager): try: - session = Session(Engine.get()) + session = Session(DatabaseManager.engine) yield session except Exception as e: print("Session rollback because of exception:", e) @@ -71,8 +131,3 @@ def session_getter(): raise finally: session.close() - - -def get_session(): - with session_getter() as session: - yield session diff --git a/src/backend/langflow/database/models/__init__.py b/src/backend/langflow/database/models/__init__.py index e69de29bb..da47bc5fe 100644 --- a/src/backend/langflow/database/models/__init__.py +++ b/src/backend/langflow/database/models/__init__.py @@ -0,0 +1,4 @@ +from .flow import Flow + + +__all__ = ["Flow"]