Adds teardown logic for database and other services (#852)
This commit is contained in:
commit
cde00160b8
13 changed files with 108 additions and 26 deletions
|
|
@ -356,7 +356,7 @@ def superuser(
|
|||
with session_getter(db_manager) as session:
|
||||
from langflow.services.auth.utils import create_super_user
|
||||
|
||||
if create_super_user(session, username, password):
|
||||
if create_super_user(db=session, username=username, password=password):
|
||||
# Verify that the superuser was created
|
||||
from langflow.services.database.models.user.user import User
|
||||
|
||||
|
|
|
|||
|
|
@ -61,14 +61,13 @@ async def chat(
|
|||
await websocket.close(code=status.WS_1011_INTERNAL_ERROR, reason=str(exc))
|
||||
except Exception as exc:
|
||||
logger.error(f"Error in chat websocket: {exc}")
|
||||
if isinstance(exc, HTTPException):
|
||||
exc = exc.detail
|
||||
messsage = exc.detail if isinstance(exc, HTTPException) else str(exc)
|
||||
if "Could not validate credentials" in str(exc):
|
||||
await websocket.close(
|
||||
code=status.WS_1008_POLICY_VIOLATION, reason="Unauthorized"
|
||||
)
|
||||
else:
|
||||
await websocket.close(code=status.WS_1011_INTERNAL_ERROR, reason=str(exc))
|
||||
await websocket.close(code=status.WS_1011_INTERNAL_ERROR, reason=messsage)
|
||||
|
||||
|
||||
@router.post("/build/init/{flow_id}", response_model=InitResponse, status_code=201)
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ from langflow.api import router
|
|||
|
||||
from langflow.interface.utils import setup_llm_caching
|
||||
from langflow.services.database.utils import initialize_database
|
||||
from langflow.services.manager import initialize_services
|
||||
from langflow.services.manager import initialize_services, teardown_services
|
||||
from langflow.utils.logger import configure
|
||||
|
||||
|
||||
|
|
@ -40,6 +40,7 @@ def create_app():
|
|||
app.on_event("startup")(initialize_services)
|
||||
app.on_event("startup")(initialize_database)
|
||||
app.on_event("startup")(setup_llm_caching)
|
||||
app.on_event("shutdown")(teardown_services)
|
||||
return app
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -37,7 +37,12 @@ async def api_key_security(
|
|||
result: Optional[Union[ApiKey, User]] = None
|
||||
if settings_manager.auth_settings.AUTO_LOGIN:
|
||||
# Get the first user
|
||||
settings_manager.auth_settings.FIRST_SUPERUSER
|
||||
if not settings_manager.auth_settings.FIRST_SUPERUSER:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail="Missing first superuser credentials",
|
||||
)
|
||||
|
||||
result = get_user_by_username(
|
||||
db, settings_manager.auth_settings.FIRST_SUPERUSER
|
||||
)
|
||||
|
|
@ -80,6 +85,9 @@ async def get_current_user(
|
|||
if isinstance(token, Coroutine):
|
||||
token = await token
|
||||
|
||||
if settings_manager.auth_settings.SECRET_KEY is None:
|
||||
raise credentials_exception
|
||||
|
||||
try:
|
||||
payload = jwt.decode(
|
||||
token,
|
||||
|
|
@ -150,22 +158,16 @@ def create_token(data: dict, expires_delta: timedelta):
|
|||
|
||||
|
||||
def create_super_user(
|
||||
username: str,
|
||||
password: str,
|
||||
db: Session = Depends(get_session),
|
||||
username: Optional[str] = None,
|
||||
password: Optional[str] = None,
|
||||
) -> User:
|
||||
settings_manager = get_settings_manager()
|
||||
|
||||
super_user = get_user_by_username(
|
||||
db, username or settings_manager.auth_settings.FIRST_SUPERUSER
|
||||
)
|
||||
super_user = get_user_by_username(db, username)
|
||||
|
||||
if not super_user:
|
||||
super_user = User(
|
||||
username=username or settings_manager.auth_settings.FIRST_SUPERUSER,
|
||||
password=get_password_hash(
|
||||
password or settings_manager.auth_settings.FIRST_SUPERUSER_PASSWORD
|
||||
),
|
||||
username=username,
|
||||
password=get_password_hash(password),
|
||||
is_superuser=True,
|
||||
is_active=True,
|
||||
last_login_at=None,
|
||||
|
|
@ -179,7 +181,15 @@ def create_super_user(
|
|||
|
||||
|
||||
def create_user_longterm_token(db: Session = Depends(get_session)) -> dict:
|
||||
super_user = create_super_user(db)
|
||||
settings_manager = get_settings_manager()
|
||||
username = settings_manager.auth_settings.FIRST_SUPERUSER
|
||||
password = settings_manager.auth_settings.FIRST_SUPERUSER_PASSWORD
|
||||
if not username or not password:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail="Missing first superuser credentials",
|
||||
)
|
||||
super_user = create_super_user(db=db, username=username, password=password)
|
||||
|
||||
access_token_expires_longterm = timedelta(days=365)
|
||||
access_token = create_token(
|
||||
|
|
|
|||
|
|
@ -1,2 +1,8 @@
|
|||
class Service:
|
||||
from abc import ABC
|
||||
|
||||
|
||||
class Service(ABC):
|
||||
name: str
|
||||
|
||||
def teardown(self):
|
||||
pass
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING
|
||||
from langflow.services.base import Service
|
||||
from langflow.services.database.models.user.crud import get_user_by_username
|
||||
from langflow.services.database.utils import Result, TableResults
|
||||
from langflow.services.utils import get_settings_manager
|
||||
from sqlalchemy import inspect
|
||||
|
|
@ -159,3 +160,23 @@ class DatabaseManager(Service):
|
|||
)
|
||||
|
||||
logger.debug("Database and tables created successfully")
|
||||
|
||||
def teardown(self):
|
||||
logger.debug("Tearing down database")
|
||||
try:
|
||||
settings_manager = get_settings_manager()
|
||||
# remove the default superuser if auto_login is enabled
|
||||
# using the FIRST_SUPERUSER to get the user
|
||||
if settings_manager.auth_settings.AUTO_LOGIN:
|
||||
logger.debug("Removing default superuser")
|
||||
username = settings_manager.auth_settings.FIRST_SUPERUSER
|
||||
with Session(self.engine) as session:
|
||||
user = get_user_by_username(session, username)
|
||||
session.delete(user)
|
||||
session.commit()
|
||||
logger.debug("Default superuser removed")
|
||||
|
||||
except Exception as exc:
|
||||
logger.error(f"Error tearing down database: {exc}")
|
||||
|
||||
self.engine.dispose()
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
from langflow.services.schema import ServiceType
|
||||
from typing import TYPE_CHECKING, List, Optional
|
||||
from langflow.utils.logger import logger
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from langflow.services.factory import ServiceFactory
|
||||
|
|
@ -42,6 +43,7 @@ class ServiceManager:
|
|||
"""
|
||||
Create a new service given its name, handling dependencies.
|
||||
"""
|
||||
logger.debug(f"Create service {service_name}")
|
||||
self._validate_service_creation(service_name)
|
||||
|
||||
# Create dependencies first
|
||||
|
|
@ -74,9 +76,21 @@ class ServiceManager:
|
|||
Update a service by its name.
|
||||
"""
|
||||
if service_name in self.services:
|
||||
logger.debug(f"Update service {service_name}")
|
||||
self.services.pop(service_name, None)
|
||||
self.get(service_name)
|
||||
|
||||
def teardown(self):
|
||||
"""
|
||||
Teardown all the services.
|
||||
"""
|
||||
for service in self.services.values():
|
||||
logger.debug(f"Teardown service {service.name}")
|
||||
service.teardown()
|
||||
self.services = {}
|
||||
self.factories = {}
|
||||
self.dependencies = {}
|
||||
|
||||
|
||||
service_manager = ServiceManager()
|
||||
|
||||
|
|
@ -121,7 +135,7 @@ def initialize_session_manager():
|
|||
"""
|
||||
Initialize the session manager.
|
||||
"""
|
||||
from langflow.services.session import factory as session_manager_factory
|
||||
from langflow.services.session import factory as session_manager_factory # type: ignore
|
||||
from langflow.services.cache import factory as cache_factory
|
||||
|
||||
initialize_settings_manager()
|
||||
|
|
@ -134,3 +148,10 @@ def initialize_session_manager():
|
|||
session_manager_factory.SessionManagerFactory(),
|
||||
dependencies=[ServiceType.CACHE_MANAGER],
|
||||
)
|
||||
|
||||
|
||||
def teardown_services():
|
||||
"""
|
||||
Teardown all the services.
|
||||
"""
|
||||
service_manager.teardown()
|
||||
|
|
|
|||
|
|
@ -11,10 +11,11 @@ from langflow.utils.logger import logger
|
|||
class AuthSettings(BaseSettings):
|
||||
# Login settings
|
||||
CONFIG_DIR: str
|
||||
SECRET_KEY: Optional[str] = Field(
|
||||
None,
|
||||
SECRET_KEY: str = Field(
|
||||
default="",
|
||||
description="Secret key for JWT. If not provided, a random one will be generated.",
|
||||
env="LANGFLOW_SECRET_KEY",
|
||||
allow_mutation=False,
|
||||
)
|
||||
ALGORITHM: str = "HS256"
|
||||
ACCESS_TOKEN_EXPIRE_MINUTES: int = 60
|
||||
|
|
|
|||
|
|
@ -35,5 +35,10 @@ class SettingsManager(Service):
|
|||
)
|
||||
|
||||
settings = Settings(**settings_dict)
|
||||
auth_settings = AuthSettings(CONFIG_DIR=settings.CONFIG_DIR)
|
||||
if not settings.CONFIG_DIR:
|
||||
raise ValueError("CONFIG_DIR must be set in settings")
|
||||
|
||||
auth_settings = AuthSettings(
|
||||
CONFIG_DIR=settings.CONFIG_DIR,
|
||||
)
|
||||
return cls(settings, auth_settings)
|
||||
|
|
|
|||
|
|
@ -43,5 +43,5 @@ def write_secret_to_file(path: Path, value: str) -> None:
|
|||
|
||||
|
||||
def read_secret_from_file(path: Path) -> str:
|
||||
with path.open("rb") as f:
|
||||
with path.open("r") as f:
|
||||
return f.read()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue