import socket import threading import time import pytest from langflow.__main__ import app from langflow.services import deps @pytest.fixture(scope="module") def default_settings(): return [ "--backend-only", "--no-open-browser", ] def get_free_port(): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("", 0)) return s.getsockname()[1] def run_flow(runner, port, components_path, default_settings): args = [ "run", "--port", str(port), "--components-path", str(components_path), *default_settings, ] result = runner.invoke(app, args) if result.exit_code != 0: msg = f"CLI failed with exit code {result.exit_code}: {result.output}" raise RuntimeError(msg) def test_components_path(runner, default_settings, tmp_path): # create a "components" folder temp_dir = tmp_path / "components" temp_dir.mkdir(exist_ok=True) port = get_free_port() thread = threading.Thread( target=run_flow, args=(runner, port, temp_dir, default_settings), daemon=True, ) thread.start() # Give the server some time to start time.sleep(5) settings_service = deps.get_settings_service() assert str(temp_dir) in settings_service.settings.components_path def test_superuser(runner): result = runner.invoke(app, ["superuser"], input="admin\nadmin\n") assert result.exit_code == 0, result.stdout assert "Superuser created successfully." in result.stdout