From 7abfb0b232e021c4e437bba7d4d7b4c7cb683772 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicol=C3=B2=20Boschi?= Date: Thu, 30 May 2024 17:58:57 +0200 Subject: [PATCH] feat: read flows from local directory at startup (#1989) * feat: read flows from local directory at startup * cleanup * add check * fix * fix * fix by endpoint name --------- Co-authored-by: Gabriel Luiz Freitas Almeida --- .../base/langflow/initial_setup/setup.py | 64 +++++++++++++++++++ src/backend/base/langflow/main.py | 3 +- .../base/langflow/services/settings/base.py | 1 + .../langflow/services/settings/service.py | 3 +- tests/conftest.py | 15 ++++- tests/test_database.py | 13 ++++ 6 files changed, 95 insertions(+), 4 deletions(-) diff --git a/src/backend/base/langflow/initial_setup/setup.py b/src/backend/base/langflow/initial_setup/setup.py index 798ffdd73..27574950c 100644 --- a/src/backend/base/langflow/initial_setup/setup.py +++ b/src/backend/base/langflow/initial_setup/setup.py @@ -1,7 +1,10 @@ +import logging +import os from collections import defaultdict from copy import deepcopy from datetime import datetime, timezone from pathlib import Path +from uuid import UUID import orjson from emoji import demojize, purely_emoji # type: ignore @@ -13,9 +16,13 @@ from langflow.interface.types import get_all_components from langflow.services.auth.utils import create_super_user from langflow.services.database.models.flow.model import Flow, FlowCreate from langflow.services.database.models.folder.model import Folder, FolderCreate +from langflow.services.database.models.user.crud import get_user_by_username +from langflow.services.deps import get_settings_service, session_scope + from langflow.services.database.models.folder.utils import create_default_folder_if_it_doesnt_exist from langflow.services.deps import get_settings_service, session_scope, get_variable_service + STARTER_FOLDER_NAME = "Starter Projects" STARTER_FOLDER_DESCRIPTION = "Starter projects to help you get started in Langflow." @@ -207,6 +214,63 @@ def create_starter_folder(session): return session.exec(select(Folder).where(Folder.name == STARTER_FOLDER_NAME)).first() +def _is_valid_uuid(val): + try: + uuid_obj = UUID(val) + except ValueError: + return False + return str(uuid_obj) == val + +def load_flows_from_directory(): + settings_service = get_settings_service() + flows_path = settings_service.settings.load_flows_path + if not flows_path: + return + if not settings_service.auth_settings.AUTO_LOGIN: + logging.warning("AUTO_LOGIN is disabled, not loading flows from directory") + return + + with session_scope() as session: + user_id = get_user_by_username(session, settings_service.auth_settings.SUPERUSER).id + files = [f for f in os.listdir(flows_path) if os.path.isfile(os.path.join(flows_path, f))] + for filename in files: + if not filename.endswith(".json"): + continue + logger.info(f"Loading flow from file: {filename}") + with open(os.path.join(flows_path, filename), "r", encoding="utf-8") as file: + flow = orjson.loads(file.read()) + no_json_name = filename.replace(".json", "") + flow_endpoint_name = flow.get("endpoint_name") + if _is_valid_uuid(no_json_name): + flow["id"] = no_json_name + flow_id = flow.get("id") + + existing = find_existing_flow(session, flow_id, flow_endpoint_name) + if existing: + logger.info(f"Updating existing flow: {flow_id} with endpoint name {flow_endpoint_name}") + for key, value in flow.items(): + setattr(existing, key, value) + existing.updated_at = datetime.utcnow() + existing.user_id = user_id + session.add(existing) + session.commit() + else: + logger.info(f"Creating new flow: {flow_id} with endpoint name {flow_endpoint_name}") + flow["user_id"] = user_id + flow = Flow.model_validate(flow, from_attributes=True) + flow.updated_at = datetime.utcnow() + session.add(flow) + session.commit() + +def find_existing_flow(session, flow_id, flow_endpoint_name): + if flow_endpoint_name: + stmt = select(Flow).where(Flow.endpoint_name == flow_endpoint_name) + if existing := session.exec(stmt).first(): + return existing + stmt = select(Flow).where(Flow.id == flow_id) + if existing := session.exec(stmt).first(): + return existing + return None def create_or_update_starter_projects(): components_paths = get_settings_service().settings.components_path try: diff --git a/src/backend/base/langflow/main.py b/src/backend/base/langflow/main.py index d8619e2ea..93d0e7f04 100644 --- a/src/backend/base/langflow/main.py +++ b/src/backend/base/langflow/main.py @@ -14,7 +14,7 @@ from rich import print as rprint from starlette.middleware.base import BaseHTTPMiddleware from langflow.api import router -from langflow.initial_setup.setup import create_or_update_starter_projects, initialize_super_user_if_needed +from langflow.initial_setup.setup import create_or_update_starter_projects, initialize_super_user_if_needed, load_flows_from_directory from langflow.interface.utils import setup_llm_caching from langflow.services.plugins.langfuse_plugin import LangfuseInstance from langflow.services.utils import initialize_services, teardown_services @@ -55,6 +55,7 @@ def get_lifespan(fix_migration=False, socketio_server=None): LangfuseInstance.update() initialize_super_user_if_needed() create_or_update_starter_projects() + load_flows_from_directory() yield except Exception as exc: if "langflow migration --fix" not in str(exc): diff --git a/src/backend/base/langflow/services/settings/base.py b/src/backend/base/langflow/services/settings/base.py index f7c6440f2..0f9d0d029 100644 --- a/src/backend/base/langflow/services/settings/base.py +++ b/src/backend/base/langflow/services/settings/base.py @@ -71,6 +71,7 @@ class Settings(BaseSettings): remove_api_keys: bool = False components_path: List[str] = [] langchain_cache: str = "InMemoryCache" + load_flows_path: Optional[str] = None # Redis redis_host: str = "localhost" diff --git a/src/backend/base/langflow/services/settings/service.py b/src/backend/base/langflow/services/settings/service.py index f7ef2980d..95088e829 100644 --- a/src/backend/base/langflow/services/settings/service.py +++ b/src/backend/base/langflow/services/settings/service.py @@ -1,4 +1,5 @@ import os +from typing import Optional import yaml from loguru import logger @@ -7,7 +8,6 @@ from langflow.services.base import Service from langflow.services.settings.auth import AuthSettings from langflow.services.settings.base import Settings - class SettingsService(Service): name = "settings_service" @@ -27,7 +27,6 @@ class SettingsService(Service): with open(file_path, "r") as f: settings_dict = yaml.safe_load(f) - settings_dict = {k.upper(): v for k, v in settings_dict.items()} for key in settings_dict: if key not in Settings.model_fields.keys(): diff --git a/tests/conftest.py b/tests/conftest.py index d876aa316..6d371ac51 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,6 @@ import json +import os.path +import shutil # we need to import tmpdir import tempfile @@ -92,6 +94,12 @@ class Config: result_backend = "redis://localhost:6379/0" +@pytest.fixture(name="load_flows_dir") +def load_flows_dir(): + tempdir = tempfile.TemporaryDirectory() + yield tempdir.name + + @pytest.fixture(name="distributed_env") def setup_env(monkeypatch): monkeypatch.setenv("LANGFLOW_CACHE_TYPE", "redis") @@ -209,7 +217,7 @@ def json_vector_store(): @pytest.fixture(name="client", autouse=True) -def client_fixture(session: Session, monkeypatch, request): +def client_fixture(session: Session, monkeypatch, request, load_flows_dir): # Set the database url to a test database if "noclient" in request.keywords: yield @@ -218,6 +226,11 @@ def client_fixture(session: Session, monkeypatch, request): db_path = Path(db_dir) / "test.db" monkeypatch.setenv("LANGFLOW_DATABASE_URL", f"sqlite:///{db_path}") monkeypatch.setenv("LANGFLOW_AUTO_LOGIN", "false") + if "load_flows" in request.keywords: + shutil.copyfile(pytest.BASIC_EXAMPLE_PATH, + os.path.join(load_flows_dir, "c54f9130-f2fa-4a3e-b22a-3856d946351b.json")) + monkeypatch.setenv("LANGFLOW_LOAD_FLOWS_PATH", load_flows_dir) + monkeypatch.setenv("LANGFLOW_AUTO_LOGIN", "true") from langflow.main import create_app diff --git a/tests/test_database.py b/tests/test_database.py index 83503cd30..bc5bc3e7a 100644 --- a/tests/test_database.py +++ b/tests/test_database.py @@ -1,3 +1,5 @@ +import os +from typing import Optional, List from uuid import UUID, uuid4 import orjson @@ -11,6 +13,7 @@ from langflow.services.database.models.base import orjson_dumps from langflow.services.database.models.flow import Flow, FlowCreate, FlowUpdate from langflow.services.database.utils import session_getter from langflow.services.deps import get_db_service +from langflow.services.settings.base import Settings @pytest.fixture(scope="module") @@ -252,3 +255,13 @@ def test_read_only_starter_projects(client: TestClient, active_user, logged_in_h starter_projects = load_starter_projects() assert response.status_code == 200 assert len(response.json()) == len(starter_projects) + + +@pytest.mark.load_flows +def test_load_flows(client: TestClient, load_flows_dir): + client.get("/api/v1/auto_login") + response = client.get("api/v1/flows/c54f9130-f2fa-4a3e-b22a-3856d946351b") + assert response.status_code == 200 + assert response.json()["name"] == "BasicExample" + +