feat: read flows from local directory at startup (#1989)

* feat: read flows from local directory at startup

* cleanup

* add check

* fix

* fix

* fix by endpoint name

---------

Co-authored-by: Gabriel Luiz Freitas Almeida <gabriel@langflow.org>
This commit is contained in:
Nicolò Boschi 2024-05-30 17:58:57 +02:00 committed by GitHub
commit 7abfb0b232
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 95 additions and 4 deletions

View file

@ -1,7 +1,10 @@
import logging
import os
from collections import defaultdict
from copy import deepcopy
from datetime import datetime, timezone
from pathlib import Path
from uuid import UUID
import orjson
from emoji import demojize, purely_emoji # type: ignore
@ -13,9 +16,13 @@ from langflow.interface.types import get_all_components
from langflow.services.auth.utils import create_super_user
from langflow.services.database.models.flow.model import Flow, FlowCreate
from langflow.services.database.models.folder.model import Folder, FolderCreate
from langflow.services.database.models.user.crud import get_user_by_username
from langflow.services.deps import get_settings_service, session_scope
from langflow.services.database.models.folder.utils import create_default_folder_if_it_doesnt_exist
from langflow.services.deps import get_settings_service, session_scope, get_variable_service
STARTER_FOLDER_NAME = "Starter Projects"
STARTER_FOLDER_DESCRIPTION = "Starter projects to help you get started in Langflow."
@ -207,6 +214,63 @@ def create_starter_folder(session):
return session.exec(select(Folder).where(Folder.name == STARTER_FOLDER_NAME)).first()
def _is_valid_uuid(val):
try:
uuid_obj = UUID(val)
except ValueError:
return False
return str(uuid_obj) == val
def load_flows_from_directory():
settings_service = get_settings_service()
flows_path = settings_service.settings.load_flows_path
if not flows_path:
return
if not settings_service.auth_settings.AUTO_LOGIN:
logging.warning("AUTO_LOGIN is disabled, not loading flows from directory")
return
with session_scope() as session:
user_id = get_user_by_username(session, settings_service.auth_settings.SUPERUSER).id
files = [f for f in os.listdir(flows_path) if os.path.isfile(os.path.join(flows_path, f))]
for filename in files:
if not filename.endswith(".json"):
continue
logger.info(f"Loading flow from file: {filename}")
with open(os.path.join(flows_path, filename), "r", encoding="utf-8") as file:
flow = orjson.loads(file.read())
no_json_name = filename.replace(".json", "")
flow_endpoint_name = flow.get("endpoint_name")
if _is_valid_uuid(no_json_name):
flow["id"] = no_json_name
flow_id = flow.get("id")
existing = find_existing_flow(session, flow_id, flow_endpoint_name)
if existing:
logger.info(f"Updating existing flow: {flow_id} with endpoint name {flow_endpoint_name}")
for key, value in flow.items():
setattr(existing, key, value)
existing.updated_at = datetime.utcnow()
existing.user_id = user_id
session.add(existing)
session.commit()
else:
logger.info(f"Creating new flow: {flow_id} with endpoint name {flow_endpoint_name}")
flow["user_id"] = user_id
flow = Flow.model_validate(flow, from_attributes=True)
flow.updated_at = datetime.utcnow()
session.add(flow)
session.commit()
def find_existing_flow(session, flow_id, flow_endpoint_name):
if flow_endpoint_name:
stmt = select(Flow).where(Flow.endpoint_name == flow_endpoint_name)
if existing := session.exec(stmt).first():
return existing
stmt = select(Flow).where(Flow.id == flow_id)
if existing := session.exec(stmt).first():
return existing
return None
def create_or_update_starter_projects():
components_paths = get_settings_service().settings.components_path
try:

View file

@ -14,7 +14,7 @@ from rich import print as rprint
from starlette.middleware.base import BaseHTTPMiddleware
from langflow.api import router
from langflow.initial_setup.setup import create_or_update_starter_projects, initialize_super_user_if_needed
from langflow.initial_setup.setup import create_or_update_starter_projects, initialize_super_user_if_needed, load_flows_from_directory
from langflow.interface.utils import setup_llm_caching
from langflow.services.plugins.langfuse_plugin import LangfuseInstance
from langflow.services.utils import initialize_services, teardown_services
@ -55,6 +55,7 @@ def get_lifespan(fix_migration=False, socketio_server=None):
LangfuseInstance.update()
initialize_super_user_if_needed()
create_or_update_starter_projects()
load_flows_from_directory()
yield
except Exception as exc:
if "langflow migration --fix" not in str(exc):

View file

@ -71,6 +71,7 @@ class Settings(BaseSettings):
remove_api_keys: bool = False
components_path: List[str] = []
langchain_cache: str = "InMemoryCache"
load_flows_path: Optional[str] = None
# Redis
redis_host: str = "localhost"

View file

@ -1,4 +1,5 @@
import os
from typing import Optional
import yaml
from loguru import logger
@ -7,7 +8,6 @@ from langflow.services.base import Service
from langflow.services.settings.auth import AuthSettings
from langflow.services.settings.base import Settings
class SettingsService(Service):
name = "settings_service"
@ -27,7 +27,6 @@ class SettingsService(Service):
with open(file_path, "r") as f:
settings_dict = yaml.safe_load(f)
settings_dict = {k.upper(): v for k, v in settings_dict.items()}
for key in settings_dict:
if key not in Settings.model_fields.keys():

View file

@ -1,4 +1,6 @@
import json
import os.path
import shutil
# we need to import tmpdir
import tempfile
@ -92,6 +94,12 @@ class Config:
result_backend = "redis://localhost:6379/0"
@pytest.fixture(name="load_flows_dir")
def load_flows_dir():
tempdir = tempfile.TemporaryDirectory()
yield tempdir.name
@pytest.fixture(name="distributed_env")
def setup_env(monkeypatch):
monkeypatch.setenv("LANGFLOW_CACHE_TYPE", "redis")
@ -209,7 +217,7 @@ def json_vector_store():
@pytest.fixture(name="client", autouse=True)
def client_fixture(session: Session, monkeypatch, request):
def client_fixture(session: Session, monkeypatch, request, load_flows_dir):
# Set the database url to a test database
if "noclient" in request.keywords:
yield
@ -218,6 +226,11 @@ def client_fixture(session: Session, monkeypatch, request):
db_path = Path(db_dir) / "test.db"
monkeypatch.setenv("LANGFLOW_DATABASE_URL", f"sqlite:///{db_path}")
monkeypatch.setenv("LANGFLOW_AUTO_LOGIN", "false")
if "load_flows" in request.keywords:
shutil.copyfile(pytest.BASIC_EXAMPLE_PATH,
os.path.join(load_flows_dir, "c54f9130-f2fa-4a3e-b22a-3856d946351b.json"))
monkeypatch.setenv("LANGFLOW_LOAD_FLOWS_PATH", load_flows_dir)
monkeypatch.setenv("LANGFLOW_AUTO_LOGIN", "true")
from langflow.main import create_app

View file

@ -1,3 +1,5 @@
import os
from typing import Optional, List
from uuid import UUID, uuid4
import orjson
@ -11,6 +13,7 @@ from langflow.services.database.models.base import orjson_dumps
from langflow.services.database.models.flow import Flow, FlowCreate, FlowUpdate
from langflow.services.database.utils import session_getter
from langflow.services.deps import get_db_service
from langflow.services.settings.base import Settings
@pytest.fixture(scope="module")
@ -252,3 +255,13 @@ def test_read_only_starter_projects(client: TestClient, active_user, logged_in_h
starter_projects = load_starter_projects()
assert response.status_code == 200
assert len(response.json()) == len(starter_projects)
@pytest.mark.load_flows
def test_load_flows(client: TestClient, load_flows_dir):
client.get("/api/v1/auto_login")
response = client.get("api/v1/flows/c54f9130-f2fa-4a3e-b22a-3856d946351b")
assert response.status_code == 200
assert response.json()["name"] == "BasicExample"