feat: Add configurable polling interval for syncing flows from FS (#7065)

* Add configurable polling interval for syncing flows from FS

* Update src/backend/base/langflow/services/settings/base.py

Co-authored-by: Gabriel Luiz Freitas Almeida <gabriel@langflow.org>

---------

Co-authored-by: Gabriel Luiz Freitas Almeida <gabriel@langflow.org>
This commit is contained in:
Christophe Bornet 2025-04-07 15:49:05 +02:00 committed by GitHub
commit 66996868fd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 55 additions and 7 deletions

View file

@ -49,7 +49,7 @@ def run_migrations_offline() -> None:
literal_binds=True,
dialect_opts={"paramstyle": "named"},
render_as_batch=True,
prepare_threshold=None
prepare_threshold=None,
)
with context.begin_transaction():
@ -73,10 +73,7 @@ def _sqlite_do_begin(conn):
def _do_run_migrations(connection):
context.configure(
connection=connection,
target_metadata=target_metadata,
render_as_batch=True,
prepare_threshold=None
connection=connection, target_metadata=target_metadata, render_as_batch=True, prepare_threshold=None
)
with context.begin_transaction():

View file

@ -982,6 +982,7 @@ async def get_or_create_default_folder(session: AsyncSession, user_id: UUID) ->
async def sync_flows_from_fs():
flow_mtimes = {}
fs_flows_polling_interval = get_settings_service().settings.fs_flows_polling_interval / 1000
while True:
try:
async with session_scope() as session:
@ -1010,4 +1011,4 @@ async def sync_flows_from_fs():
logger.exception(f"Error while handling flow file {path}")
except Exception: # noqa: BLE001
logger.exception("Error while syncing flows from database")
await asyncio.sleep(10)
await asyncio.sleep(fs_flows_polling_interval)

View file

@ -222,6 +222,8 @@ class Settings(BaseSettings):
"""The maximum number of builds to keep per vertex. Older builds will be deleted."""
webhook_polling_interval: int = 5000
"""The polling interval for the webhook in ms."""
fs_flows_polling_interval: int = 10000
"""The polling interval in milliseconds for synchronizing flows from the file system."""
# MCP Server
mcp_server_enabled: bool = True

View file

@ -8,7 +8,7 @@ from langflow.services.database.models import Flow
async def test_create_flow(client: AsyncClient, logged_in_headers):
flow_file = Path(tempfile.tempdir) / f"{uuid.uuid4()!s}.json"
flow_file = Path(tempfile.tempdir) / f"{uuid.uuid4()}.json"
try:
basic_case = {
"name": "string",

View file

@ -1,10 +1,13 @@
import asyncio
import os
import tempfile
import uuid
from datetime import datetime
from unittest.mock import AsyncMock, patch
import pytest
from anyio import Path
from httpx import AsyncClient
from langflow.custom.directory_reader.utils import abuild_custom_component_list_from_path
from langflow.initial_setup.constants import STARTER_FOLDER_NAME
from langflow.initial_setup.setup import (
@ -253,3 +256,48 @@ async def test_load_bundles_from_urls():
finally:
for temp_dir in temp_dirs:
await asyncio.to_thread(temp_dir.cleanup)
@pytest.fixture
def set_fs_flows_polling_interval():
os.environ["LANGFLOW_FS_FLOWS_POLLING_INTERVAL"] = "100"
yield
os.unsetenv("LANGFLOW_FS_FLOWS_POLLING_INTERVAL")
@pytest.mark.usefixtures("set_fs_flows_polling_interval")
async def test_sync_flows_from_fs(client: AsyncClient, logged_in_headers):
flow_file = Path(tempfile.tempdir) / f"{uuid.uuid4()}.json"
try:
basic_case = {
"name": "string",
"description": "string",
"data": {},
"locked": False,
"fs_path": str(flow_file),
}
await client.post("api/v1/flows/", json=basic_case, headers=logged_in_headers)
content = await flow_file.read_text(encoding="utf-8")
fs_flow = Flow.model_validate_json(content)
fs_flow.name = "new name"
fs_flow.description = "new description"
fs_flow.data = {"nodes": {}, "edges": {}}
fs_flow.locked = True
await flow_file.write_text(fs_flow.model_dump_json(), encoding="utf-8")
result = {}
for i in range(10):
response = await client.get(f"api/v1/flows/{fs_flow.id}", headers=logged_in_headers)
result = response.json()
if result["name"] == "new name":
break
assert i != 9, "flow name should have been updated"
await asyncio.sleep(0.1)
assert result["description"] == "new description"
assert result["data"] == {"nodes": {}, "edges": {}}
assert result["locked"] is True
finally:
await flow_file.unlink(missing_ok=True)