From 66996868fd3d21d7edad5c912ad2fb576a0416eb Mon Sep 17 00:00:00 2001 From: Christophe Bornet Date: Mon, 7 Apr 2025 15:49:05 +0200 Subject: [PATCH] feat: Add configurable polling interval for syncing flows from FS (#7065) * Add configurable polling interval for syncing flows from FS * Update src/backend/base/langflow/services/settings/base.py Co-authored-by: Gabriel Luiz Freitas Almeida --------- Co-authored-by: Gabriel Luiz Freitas Almeida --- src/backend/base/langflow/alembic/env.py | 7 +-- .../base/langflow/initial_setup/setup.py | 3 +- .../base/langflow/services/settings/base.py | 2 + src/backend/tests/unit/api/v1/test_flows.py | 2 +- src/backend/tests/unit/test_initial_setup.py | 48 +++++++++++++++++++ 5 files changed, 55 insertions(+), 7 deletions(-) diff --git a/src/backend/base/langflow/alembic/env.py b/src/backend/base/langflow/alembic/env.py index affada9ba..31b88d875 100644 --- a/src/backend/base/langflow/alembic/env.py +++ b/src/backend/base/langflow/alembic/env.py @@ -49,7 +49,7 @@ def run_migrations_offline() -> None: literal_binds=True, dialect_opts={"paramstyle": "named"}, render_as_batch=True, - prepare_threshold=None + prepare_threshold=None, ) with context.begin_transaction(): @@ -73,10 +73,7 @@ def _sqlite_do_begin(conn): def _do_run_migrations(connection): context.configure( - connection=connection, - target_metadata=target_metadata, - render_as_batch=True, - prepare_threshold=None + connection=connection, target_metadata=target_metadata, render_as_batch=True, prepare_threshold=None ) with context.begin_transaction(): diff --git a/src/backend/base/langflow/initial_setup/setup.py b/src/backend/base/langflow/initial_setup/setup.py index 10ef707e1..59b2c6769 100644 --- a/src/backend/base/langflow/initial_setup/setup.py +++ b/src/backend/base/langflow/initial_setup/setup.py @@ -982,6 +982,7 @@ async def get_or_create_default_folder(session: AsyncSession, user_id: UUID) -> async def sync_flows_from_fs(): flow_mtimes = {} + fs_flows_polling_interval = get_settings_service().settings.fs_flows_polling_interval / 1000 while True: try: async with session_scope() as session: @@ -1010,4 +1011,4 @@ async def sync_flows_from_fs(): logger.exception(f"Error while handling flow file {path}") except Exception: # noqa: BLE001 logger.exception("Error while syncing flows from database") - await asyncio.sleep(10) + await asyncio.sleep(fs_flows_polling_interval) diff --git a/src/backend/base/langflow/services/settings/base.py b/src/backend/base/langflow/services/settings/base.py index 3b9fbfb8d..e89252e9c 100644 --- a/src/backend/base/langflow/services/settings/base.py +++ b/src/backend/base/langflow/services/settings/base.py @@ -222,6 +222,8 @@ class Settings(BaseSettings): """The maximum number of builds to keep per vertex. Older builds will be deleted.""" webhook_polling_interval: int = 5000 """The polling interval for the webhook in ms.""" + fs_flows_polling_interval: int = 10000 + """The polling interval in milliseconds for synchronizing flows from the file system.""" # MCP Server mcp_server_enabled: bool = True diff --git a/src/backend/tests/unit/api/v1/test_flows.py b/src/backend/tests/unit/api/v1/test_flows.py index d6e7db2ee..920415539 100644 --- a/src/backend/tests/unit/api/v1/test_flows.py +++ b/src/backend/tests/unit/api/v1/test_flows.py @@ -8,7 +8,7 @@ from langflow.services.database.models import Flow async def test_create_flow(client: AsyncClient, logged_in_headers): - flow_file = Path(tempfile.tempdir) / f"{uuid.uuid4()!s}.json" + flow_file = Path(tempfile.tempdir) / f"{uuid.uuid4()}.json" try: basic_case = { "name": "string", diff --git a/src/backend/tests/unit/test_initial_setup.py b/src/backend/tests/unit/test_initial_setup.py index 2935927a3..8657b5e0d 100644 --- a/src/backend/tests/unit/test_initial_setup.py +++ b/src/backend/tests/unit/test_initial_setup.py @@ -1,10 +1,13 @@ import asyncio +import os +import tempfile import uuid from datetime import datetime from unittest.mock import AsyncMock, patch import pytest from anyio import Path +from httpx import AsyncClient from langflow.custom.directory_reader.utils import abuild_custom_component_list_from_path from langflow.initial_setup.constants import STARTER_FOLDER_NAME from langflow.initial_setup.setup import ( @@ -253,3 +256,48 @@ async def test_load_bundles_from_urls(): finally: for temp_dir in temp_dirs: await asyncio.to_thread(temp_dir.cleanup) + + +@pytest.fixture +def set_fs_flows_polling_interval(): + os.environ["LANGFLOW_FS_FLOWS_POLLING_INTERVAL"] = "100" + yield + os.unsetenv("LANGFLOW_FS_FLOWS_POLLING_INTERVAL") + + +@pytest.mark.usefixtures("set_fs_flows_polling_interval") +async def test_sync_flows_from_fs(client: AsyncClient, logged_in_headers): + flow_file = Path(tempfile.tempdir) / f"{uuid.uuid4()}.json" + try: + basic_case = { + "name": "string", + "description": "string", + "data": {}, + "locked": False, + "fs_path": str(flow_file), + } + await client.post("api/v1/flows/", json=basic_case, headers=logged_in_headers) + + content = await flow_file.read_text(encoding="utf-8") + fs_flow = Flow.model_validate_json(content) + fs_flow.name = "new name" + fs_flow.description = "new description" + fs_flow.data = {"nodes": {}, "edges": {}} + fs_flow.locked = True + + await flow_file.write_text(fs_flow.model_dump_json(), encoding="utf-8") + + result = {} + for i in range(10): + response = await client.get(f"api/v1/flows/{fs_flow.id}", headers=logged_in_headers) + result = response.json() + if result["name"] == "new name": + break + assert i != 9, "flow name should have been updated" + await asyncio.sleep(0.1) + + assert result["description"] == "new description" + assert result["data"] == {"nodes": {}, "edges": {}} + assert result["locked"] is True + finally: + await flow_file.unlink(missing_ok=True)