From 62884f6c8b213e3b5fd866b76d98f4ec892af8ec Mon Sep 17 00:00:00 2001 From: Christophe Bornet Date: Thu, 13 Mar 2025 01:41:31 +0100 Subject: [PATCH] feat: Sync flows from FS to DB if flow has fs_path (#7043) * feat: Sync flows from FS to DB if flow has fs_path * Changes following review * Simplify flow_mtimes handling * Move sync_flows_from_fs to setup.py --------- Co-authored-by: Gabriel Luiz Freitas Almeida --- .../base/langflow/initial_setup/setup.py | 35 ++++++++++++++++++- src/backend/base/langflow/main.py | 12 ++++++- 2 files changed, 45 insertions(+), 2 deletions(-) diff --git a/src/backend/base/langflow/initial_setup/setup.py b/src/backend/base/langflow/initial_setup/setup.py index b0ea40430..ef9cdae9b 100644 --- a/src/backend/base/langflow/initial_setup/setup.py +++ b/src/backend/base/langflow/initial_setup/setup.py @@ -23,7 +23,7 @@ from emoji import demojize, purely_emoji from loguru import logger from sqlalchemy.exc import NoResultFound from sqlalchemy.orm import selectinload -from sqlmodel import select +from sqlmodel import col, select from sqlmodel.ext.asyncio.session import AsyncSession from langflow.base.constants import FIELD_FORMAT_ATTRIBUTES, NODE_FORMAT_ATTRIBUTES, ORJSON_OPTIONS @@ -978,3 +978,36 @@ async def get_or_create_default_folder(session: AsyncSession, user_id: UUID) -> msg = "Failed to get or create default folder" raise ValueError(msg) from e return FolderRead.model_validate(folder_obj, from_attributes=True) + + +async def sync_flows_from_fs(): + flow_mtimes = {} + while True: + try: + async with session_scope() as session: + stmt = select(Flow).where(col(Flow.fs_path).is_not(None)) + flows = (await session.exec(stmt)).all() + for flow in flows: + mtime = flow_mtimes.setdefault(flow.id, 0) + path = anyio.Path(flow.fs_path) + try: + if await path.exists(): + new_mtime = (await path.stat()).st_mtime + if new_mtime > mtime: + update_data = orjson.loads(await path.read_text(encoding="utf-8")) + try: + for field_name in ("name", "description", "data", "locked"): + if new_value := update_data.get(field_name): + setattr(flow, field_name, new_value) + if folder_id := update_data.get("folder_id"): + flow.folder_id = UUID(folder_id) + await session.commit() + await session.refresh(flow) + except Exception: # noqa: BLE001 + logger.exception(f"Couldn't update flow {flow.id} in database from path {path}") + flow_mtimes[flow.id] = new_mtime + except Exception: # noqa: BLE001 + logger.exception(f"Error while handling flow file {path}") + except Exception: # noqa: BLE001 + logger.exception("Error while syncing flows from database") + await asyncio.sleep(10) diff --git a/src/backend/base/langflow/main.py b/src/backend/base/langflow/main.py index c5297e1c7..684564ae0 100644 --- a/src/backend/base/langflow/main.py +++ b/src/backend/base/langflow/main.py @@ -29,12 +29,17 @@ from langflow.initial_setup.setup import ( initialize_super_user_if_needed, load_bundles_from_urls, load_flows_from_directory, + sync_flows_from_fs, ) from langflow.interface.components import get_and_cache_all_types_dict from langflow.interface.utils import setup_llm_caching from langflow.logging.logger import configure from langflow.middleware import ContentSizeLimitMiddleware -from langflow.services.deps import get_queue_service, get_settings_service, get_telemetry_service +from langflow.services.deps import ( + get_queue_service, + get_settings_service, + get_telemetry_service, +) from langflow.services.utils import initialize_services, teardown_services if TYPE_CHECKING: @@ -118,6 +123,7 @@ def get_lifespan(*, fix_migration=False, version=None): rprint("[bold green]Starting Langflow...[/bold green]") temp_dirs: list[TemporaryDirectory] = [] + sync_flows_from_fs_task = None try: await initialize_services(fix_migration=fix_migration) setup_llm_caching() @@ -128,6 +134,7 @@ def get_lifespan(*, fix_migration=False, version=None): await create_or_update_starter_projects(all_types_dict) telemetry_service.start() await load_flows_from_directory() + sync_flows_from_fs_task = asyncio.create_task(sync_flows_from_fs()) queue_service = get_queue_service() if not queue_service.is_started(): # Start if not already started queue_service.start() @@ -140,6 +147,9 @@ def get_lifespan(*, fix_migration=False, version=None): finally: # Clean shutdown logger.info("Cleaning up resources...") + if sync_flows_from_fs_task: + sync_flows_from_fs_task.cancel() + await asyncio.wait([sync_flows_from_fs_task]) await teardown_services() await logger.complete() temp_dir_cleanups = [asyncio.to_thread(temp_dir.cleanup) for temp_dir in temp_dirs]