diff --git a/src/backend/langflow/services/database/__init__.py b/src/backend/langflow/services/database/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/backend/langflow/services/database/base.py b/src/backend/langflow/services/database/base.py new file mode 100644 index 000000000..fffb39096 --- /dev/null +++ b/src/backend/langflow/services/database/base.py @@ -0,0 +1,151 @@ +from contextlib import contextmanager +import os +from pathlib import Path +from langflow.services.base import Service +from sqlmodel import SQLModel, Session, create_engine +from langflow.utils.logger import logger +from alembic.config import Config +from alembic import command + + +class Engine: + _instance = None + + @classmethod + def get(cls): + logger.debug("Getting database engine") + if cls._instance is None: + cls.create() + return cls._instance + + @classmethod + def create(cls): + logger.debug("Creating database engine") + from langflow.settings import settings + + if langflow_database_url := os.getenv("LANGFLOW_DATABASE_URL"): + settings.DATABASE_URL = langflow_database_url + logger.debug("Using LANGFLOW_DATABASE_URL") + + if settings.DATABASE_URL and settings.DATABASE_URL.startswith("sqlite"): + connect_args = {"check_same_thread": False} + else: + connect_args = {} + if not settings.DATABASE_URL: + raise RuntimeError("No database_url provided") + cls._instance = create_engine(settings.DATABASE_URL, connect_args=connect_args) + + @classmethod + def update(cls): + logger.debug("Updating database engine") + cls._instance = None + cls.create() + + +def create_db_and_tables(): + logger.debug("Creating database and tables") + try: + SQLModel.metadata.create_all(Engine.get()) + except Exception as exc: + logger.error(f"Error creating database and tables: {exc}") + raise RuntimeError("Error creating database and tables") from exc + # Now check if the table Flow exists, if not, something went wrong + # and we need to create the tables again. + from sqlalchemy import inspect + + inspector = inspect(Engine.get()) + if "flow" not in inspector.get_table_names(): + logger.error("Something went wrong creating the database and tables.") + logger.error("Please check your database settings.") + + raise RuntimeError("Something went wrong creating the database and tables.") + else: + logger.debug("Database and tables created successfully") + + +class DatabaseManager(Service): + name = "database_manager" + + def __init__(self, database_url: str): + self.database_url = database_url + # This file is in langflow.services.database.base.py + # the ini is in langflow + langflow_dir = Path(__file__).parent.parent.parent + self.script_location = langflow_dir / "alembic" + self.alembic_cfg_path = langflow_dir / "alembic.ini" + self.engine = create_engine(database_url) + + def __enter__(self): + self._session = Session(self.engine) + return self._session + + def __exit__(self, exc_type, exc_value, traceback): + if exc_type is not None: # If an exception has been raised + logger.error( + f"Session rollback because of exception: {exc_type.__name__} {exc_value}" + ) + self._session.rollback() + else: + self._session.commit() + self._session.close() + + def get_session(self): + with Session(self.engine) as session: + yield session + + def run_migrations(self): + logger.info( + f"Running DB migrations in {self.script_location} on {self.database_url}" + ) + alembic_cfg = Config() + alembic_cfg.set_main_option("script_location", str(self.script_location)) + alembic_cfg.set_main_option("sqlalchemy.url", self.database_url) + command.upgrade(alembic_cfg, "head") + + def create_db_and_tables(self): + logger.debug("Creating database and tables") + try: + SQLModel.metadata.create_all(self.engine) + except Exception as exc: + logger.error(f"Error creating database and tables: {exc}") + raise RuntimeError("Error creating database and tables") from exc + + # Now check if the table "flow" exists, if not, something went wrong + # and we need to create the tables again. + from sqlalchemy import inspect + + inspector = inspect(self.engine) + if "flow" not in inspector.get_table_names(): + logger.error("Something went wrong creating the database and tables.") + logger.error("Please check your database settings.") + raise RuntimeError("Something went wrong creating the database and tables.") + else: + logger.debug("Database and tables created successfully") + + +@contextmanager +def session_getter(db_manager: DatabaseManager): + try: + session = Session(DatabaseManager.engine) + yield session + except Exception as e: + print("Session rollback because of exception:", e) + session.rollback() + raise + finally: + session.close() + + +def get_session(): + with Session(Engine.get()) as session: + yield session + + +def initialize_database(): + logger.debug("Initializing database") + from langflow.services import service_manager, ServiceType + + database_manager = service_manager.get(ServiceType.DATABASE_MANAGER) + database_manager.run_migrations() + database_manager.create_db_and_tables() + logger.debug("Database initialized") diff --git a/src/backend/langflow/services/database/factory.py b/src/backend/langflow/services/database/factory.py new file mode 100644 index 000000000..187a29fdd --- /dev/null +++ b/src/backend/langflow/services/database/factory.py @@ -0,0 +1,15 @@ +from typing import TYPE_CHECKING +from langflow.services.database.base import DatabaseManager +from langflow.services.factory import ServiceFactory + +if TYPE_CHECKING: + from langflow.services.settings.manager import SettingsManager + + +class DatabaseManagerFactory(ServiceFactory): + def __init__(self): + super().__init__(DatabaseManager) + + def create(self, settings_service: "SettingsManager"): + # Here you would have logic to create and configure a DatabaseManager + return DatabaseManager(settings_service.settings.DATABASE_URL) diff --git a/src/backend/langflow/services/database/models/__init__.py b/src/backend/langflow/services/database/models/__init__.py new file mode 100644 index 000000000..da47bc5fe --- /dev/null +++ b/src/backend/langflow/services/database/models/__init__.py @@ -0,0 +1,4 @@ +from .flow import Flow + + +__all__ = ["Flow"] diff --git a/src/backend/langflow/services/database/models/base.py b/src/backend/langflow/services/database/models/base.py new file mode 100644 index 000000000..e20895b93 --- /dev/null +++ b/src/backend/langflow/services/database/models/base.py @@ -0,0 +1,14 @@ +from sqlmodel import SQLModel +import orjson + + +def orjson_dumps(v, *, default): + # orjson.dumps returns bytes, to match standard json.dumps we need to decode + return orjson.dumps(v, default=default).decode() + + +class SQLModelSerializable(SQLModel): + class Config: + orm_mode = True + json_loads = orjson.loads + json_dumps = orjson_dumps diff --git a/src/backend/langflow/services/database/models/component.py b/src/backend/langflow/services/database/models/component.py new file mode 100644 index 000000000..5c4e6c13a --- /dev/null +++ b/src/backend/langflow/services/database/models/component.py @@ -0,0 +1,29 @@ +from langflow.services.database.models.base import SQLModelSerializable, SQLModel +from sqlmodel import Field +from typing import Optional +from datetime import datetime +import uuid + + +class Component(SQLModelSerializable, table=True): + id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True) + frontend_node_id: uuid.UUID = Field(index=True) + name: str = Field(index=True) + description: Optional[str] = Field(default=None) + python_code: Optional[str] = Field(default=None) + return_type: Optional[str] = Field(default=None) + is_disabled: bool = Field(default=False) + is_read_only: bool = Field(default=False) + create_at: datetime = Field(default_factory=datetime.utcnow) + update_at: datetime = Field(default_factory=datetime.utcnow) + + +class ComponentModel(SQLModel): + id: uuid.UUID = Field(default_factory=uuid.uuid4) + frontend_node_id: uuid.UUID = Field(default=uuid.uuid4()) + name: str = Field(default="") + description: Optional[str] = None + python_code: Optional[str] = None + return_type: Optional[str] = None + is_disabled: bool = False + is_read_only: bool = False diff --git a/src/backend/langflow/services/database/models/flow.py b/src/backend/langflow/services/database/models/flow.py new file mode 100644 index 000000000..2b6c6879c --- /dev/null +++ b/src/backend/langflow/services/database/models/flow.py @@ -0,0 +1,60 @@ +# Path: src/backend/langflow/database/models/flow.py + +from langflow.services.database.models.base import SQLModelSerializable +from pydantic import validator +from sqlmodel import Field, Relationship, JSON, Column +from uuid import UUID, uuid4 +from typing import Dict, Optional + +# if TYPE_CHECKING: +from langflow.services.database.models.flow_style import FlowStyle, FlowStyleRead + + +class FlowBase(SQLModelSerializable): + name: str = Field(index=True) + description: Optional[str] = Field(index=True) + data: Optional[Dict] = Field(default=None) + + @validator("data") + def validate_json(v): + # dict_keys(['description', 'name', 'id', 'data']) + if not v: + return v + if not isinstance(v, dict): + raise ValueError("Flow must be a valid JSON") + + # data must contain nodes and edges + if "nodes" not in v.keys(): + raise ValueError("Flow must have nodes") + if "edges" not in v.keys(): + raise ValueError("Flow must have edges") + + return v + + +class Flow(FlowBase, table=True): + id: UUID = Field(default_factory=uuid4, primary_key=True, unique=True) + data: Optional[Dict] = Field(default=None, sa_column=Column(JSON)) + style: Optional["FlowStyle"] = Relationship( + back_populates="flow", + # use "uselist=False" to make it a one-to-one relationship + sa_relationship_kwargs={"uselist": False}, + ) + + +class FlowCreate(FlowBase): + pass + + +class FlowRead(FlowBase): + id: UUID + + +class FlowReadWithStyle(FlowRead): + style: Optional["FlowStyleRead"] = None + + +class FlowUpdate(SQLModelSerializable): + name: Optional[str] = None + description: Optional[str] = None + data: Optional[Dict] = None diff --git a/src/backend/langflow/services/database/models/flow_style.py b/src/backend/langflow/services/database/models/flow_style.py new file mode 100644 index 000000000..3810c7cea --- /dev/null +++ b/src/backend/langflow/services/database/models/flow_style.py @@ -0,0 +1,33 @@ +# Path: src/backend/langflow/database/models/flowstyle.py + +from langflow.services.database.models.base import SQLModelSerializable +from sqlmodel import Field, Relationship +from uuid import UUID, uuid4 +from typing import TYPE_CHECKING, Optional + +if TYPE_CHECKING: + from langflow.services.database.models.flow import Flow + + +class FlowStyleBase(SQLModelSerializable): + color: str + emoji: str + flow_id: UUID = Field(default=None, foreign_key="flow.id") + + +class FlowStyle(FlowStyleBase, table=True): + id: UUID = Field(default_factory=uuid4, primary_key=True, unique=True) + flow: "Flow" = Relationship(back_populates="style") + + +class FlowStyleUpdate(SQLModelSerializable): + color: Optional[str] = None + emoji: Optional[str] = None + + +class FlowStyleCreate(FlowStyleBase): + pass + + +class FlowStyleRead(FlowStyleBase): + id: UUID