📦 chore(database): add database related files and models

📦 chore(database): add database manager and factory

📦 chore(database): add flow and flow style models
This commit is contained in:
Gabriel Luiz Freitas Almeida 2023-08-05 22:14:22 -03:00
commit d51aa7ecb2
8 changed files with 306 additions and 0 deletions

View file

@ -0,0 +1,151 @@
from contextlib import contextmanager
import os
from pathlib import Path
from langflow.services.base import Service
from sqlmodel import SQLModel, Session, create_engine
from langflow.utils.logger import logger
from alembic.config import Config
from alembic import command
class Engine:
_instance = None
@classmethod
def get(cls):
logger.debug("Getting database engine")
if cls._instance is None:
cls.create()
return cls._instance
@classmethod
def create(cls):
logger.debug("Creating database engine")
from langflow.settings import settings
if langflow_database_url := os.getenv("LANGFLOW_DATABASE_URL"):
settings.DATABASE_URL = langflow_database_url
logger.debug("Using LANGFLOW_DATABASE_URL")
if settings.DATABASE_URL and settings.DATABASE_URL.startswith("sqlite"):
connect_args = {"check_same_thread": False}
else:
connect_args = {}
if not settings.DATABASE_URL:
raise RuntimeError("No database_url provided")
cls._instance = create_engine(settings.DATABASE_URL, connect_args=connect_args)
@classmethod
def update(cls):
logger.debug("Updating database engine")
cls._instance = None
cls.create()
def create_db_and_tables():
logger.debug("Creating database and tables")
try:
SQLModel.metadata.create_all(Engine.get())
except Exception as exc:
logger.error(f"Error creating database and tables: {exc}")
raise RuntimeError("Error creating database and tables") from exc
# Now check if the table Flow exists, if not, something went wrong
# and we need to create the tables again.
from sqlalchemy import inspect
inspector = inspect(Engine.get())
if "flow" not in inspector.get_table_names():
logger.error("Something went wrong creating the database and tables.")
logger.error("Please check your database settings.")
raise RuntimeError("Something went wrong creating the database and tables.")
else:
logger.debug("Database and tables created successfully")
class DatabaseManager(Service):
name = "database_manager"
def __init__(self, database_url: str):
self.database_url = database_url
# This file is in langflow.services.database.base.py
# the ini is in langflow
langflow_dir = Path(__file__).parent.parent.parent
self.script_location = langflow_dir / "alembic"
self.alembic_cfg_path = langflow_dir / "alembic.ini"
self.engine = create_engine(database_url)
def __enter__(self):
self._session = Session(self.engine)
return self._session
def __exit__(self, exc_type, exc_value, traceback):
if exc_type is not None: # If an exception has been raised
logger.error(
f"Session rollback because of exception: {exc_type.__name__} {exc_value}"
)
self._session.rollback()
else:
self._session.commit()
self._session.close()
def get_session(self):
with Session(self.engine) as session:
yield session
def run_migrations(self):
logger.info(
f"Running DB migrations in {self.script_location} on {self.database_url}"
)
alembic_cfg = Config()
alembic_cfg.set_main_option("script_location", str(self.script_location))
alembic_cfg.set_main_option("sqlalchemy.url", self.database_url)
command.upgrade(alembic_cfg, "head")
def create_db_and_tables(self):
logger.debug("Creating database and tables")
try:
SQLModel.metadata.create_all(self.engine)
except Exception as exc:
logger.error(f"Error creating database and tables: {exc}")
raise RuntimeError("Error creating database and tables") from exc
# Now check if the table "flow" exists, if not, something went wrong
# and we need to create the tables again.
from sqlalchemy import inspect
inspector = inspect(self.engine)
if "flow" not in inspector.get_table_names():
logger.error("Something went wrong creating the database and tables.")
logger.error("Please check your database settings.")
raise RuntimeError("Something went wrong creating the database and tables.")
else:
logger.debug("Database and tables created successfully")
@contextmanager
def session_getter(db_manager: DatabaseManager):
try:
session = Session(DatabaseManager.engine)
yield session
except Exception as e:
print("Session rollback because of exception:", e)
session.rollback()
raise
finally:
session.close()
def get_session():
with Session(Engine.get()) as session:
yield session
def initialize_database():
logger.debug("Initializing database")
from langflow.services import service_manager, ServiceType
database_manager = service_manager.get(ServiceType.DATABASE_MANAGER)
database_manager.run_migrations()
database_manager.create_db_and_tables()
logger.debug("Database initialized")

View file

@ -0,0 +1,15 @@
from typing import TYPE_CHECKING
from langflow.services.database.base import DatabaseManager
from langflow.services.factory import ServiceFactory
if TYPE_CHECKING:
from langflow.services.settings.manager import SettingsManager
class DatabaseManagerFactory(ServiceFactory):
def __init__(self):
super().__init__(DatabaseManager)
def create(self, settings_service: "SettingsManager"):
# Here you would have logic to create and configure a DatabaseManager
return DatabaseManager(settings_service.settings.DATABASE_URL)

View file

@ -0,0 +1,4 @@
from .flow import Flow
__all__ = ["Flow"]

View file

@ -0,0 +1,14 @@
from sqlmodel import SQLModel
import orjson
def orjson_dumps(v, *, default):
# orjson.dumps returns bytes, to match standard json.dumps we need to decode
return orjson.dumps(v, default=default).decode()
class SQLModelSerializable(SQLModel):
class Config:
orm_mode = True
json_loads = orjson.loads
json_dumps = orjson_dumps

View file

@ -0,0 +1,29 @@
from langflow.services.database.models.base import SQLModelSerializable, SQLModel
from sqlmodel import Field
from typing import Optional
from datetime import datetime
import uuid
class Component(SQLModelSerializable, table=True):
id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
frontend_node_id: uuid.UUID = Field(index=True)
name: str = Field(index=True)
description: Optional[str] = Field(default=None)
python_code: Optional[str] = Field(default=None)
return_type: Optional[str] = Field(default=None)
is_disabled: bool = Field(default=False)
is_read_only: bool = Field(default=False)
create_at: datetime = Field(default_factory=datetime.utcnow)
update_at: datetime = Field(default_factory=datetime.utcnow)
class ComponentModel(SQLModel):
id: uuid.UUID = Field(default_factory=uuid.uuid4)
frontend_node_id: uuid.UUID = Field(default=uuid.uuid4())
name: str = Field(default="")
description: Optional[str] = None
python_code: Optional[str] = None
return_type: Optional[str] = None
is_disabled: bool = False
is_read_only: bool = False

View file

@ -0,0 +1,60 @@
# Path: src/backend/langflow/database/models/flow.py
from langflow.services.database.models.base import SQLModelSerializable
from pydantic import validator
from sqlmodel import Field, Relationship, JSON, Column
from uuid import UUID, uuid4
from typing import Dict, Optional
# if TYPE_CHECKING:
from langflow.services.database.models.flow_style import FlowStyle, FlowStyleRead
class FlowBase(SQLModelSerializable):
name: str = Field(index=True)
description: Optional[str] = Field(index=True)
data: Optional[Dict] = Field(default=None)
@validator("data")
def validate_json(v):
# dict_keys(['description', 'name', 'id', 'data'])
if not v:
return v
if not isinstance(v, dict):
raise ValueError("Flow must be a valid JSON")
# data must contain nodes and edges
if "nodes" not in v.keys():
raise ValueError("Flow must have nodes")
if "edges" not in v.keys():
raise ValueError("Flow must have edges")
return v
class Flow(FlowBase, table=True):
id: UUID = Field(default_factory=uuid4, primary_key=True, unique=True)
data: Optional[Dict] = Field(default=None, sa_column=Column(JSON))
style: Optional["FlowStyle"] = Relationship(
back_populates="flow",
# use "uselist=False" to make it a one-to-one relationship
sa_relationship_kwargs={"uselist": False},
)
class FlowCreate(FlowBase):
pass
class FlowRead(FlowBase):
id: UUID
class FlowReadWithStyle(FlowRead):
style: Optional["FlowStyleRead"] = None
class FlowUpdate(SQLModelSerializable):
name: Optional[str] = None
description: Optional[str] = None
data: Optional[Dict] = None

View file

@ -0,0 +1,33 @@
# Path: src/backend/langflow/database/models/flowstyle.py
from langflow.services.database.models.base import SQLModelSerializable
from sqlmodel import Field, Relationship
from uuid import UUID, uuid4
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from langflow.services.database.models.flow import Flow
class FlowStyleBase(SQLModelSerializable):
color: str
emoji: str
flow_id: UUID = Field(default=None, foreign_key="flow.id")
class FlowStyle(FlowStyleBase, table=True):
id: UUID = Field(default_factory=uuid4, primary_key=True, unique=True)
flow: "Flow" = Relationship(back_populates="style")
class FlowStyleUpdate(SQLModelSerializable):
color: Optional[str] = None
emoji: Optional[str] = None
class FlowStyleCreate(FlowStyleBase):
pass
class FlowStyleRead(FlowStyleBase):
id: UUID