feat: Sync flows from FS to DB if flow has fs_path (#7043)

* feat: Sync flows from FS to DB if flow has fs_path

* Changes following review

* Simplify flow_mtimes handling

* Move sync_flows_from_fs to setup.py

---------

Co-authored-by: Gabriel Luiz Freitas Almeida <gabriel@langflow.org>
This commit is contained in:
Christophe Bornet 2025-03-13 01:41:31 +01:00 committed by GitHub
commit 62884f6c8b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 45 additions and 2 deletions

View file

@ -23,7 +23,7 @@ from emoji import demojize, purely_emoji
from loguru import logger
from sqlalchemy.exc import NoResultFound
from sqlalchemy.orm import selectinload
from sqlmodel import select
from sqlmodel import col, select
from sqlmodel.ext.asyncio.session import AsyncSession
from langflow.base.constants import FIELD_FORMAT_ATTRIBUTES, NODE_FORMAT_ATTRIBUTES, ORJSON_OPTIONS
@ -978,3 +978,36 @@ async def get_or_create_default_folder(session: AsyncSession, user_id: UUID) ->
msg = "Failed to get or create default folder"
raise ValueError(msg) from e
return FolderRead.model_validate(folder_obj, from_attributes=True)
async def sync_flows_from_fs():
flow_mtimes = {}
while True:
try:
async with session_scope() as session:
stmt = select(Flow).where(col(Flow.fs_path).is_not(None))
flows = (await session.exec(stmt)).all()
for flow in flows:
mtime = flow_mtimes.setdefault(flow.id, 0)
path = anyio.Path(flow.fs_path)
try:
if await path.exists():
new_mtime = (await path.stat()).st_mtime
if new_mtime > mtime:
update_data = orjson.loads(await path.read_text(encoding="utf-8"))
try:
for field_name in ("name", "description", "data", "locked"):
if new_value := update_data.get(field_name):
setattr(flow, field_name, new_value)
if folder_id := update_data.get("folder_id"):
flow.folder_id = UUID(folder_id)
await session.commit()
await session.refresh(flow)
except Exception: # noqa: BLE001
logger.exception(f"Couldn't update flow {flow.id} in database from path {path}")
flow_mtimes[flow.id] = new_mtime
except Exception: # noqa: BLE001
logger.exception(f"Error while handling flow file {path}")
except Exception: # noqa: BLE001
logger.exception("Error while syncing flows from database")
await asyncio.sleep(10)

View file

@ -29,12 +29,17 @@ from langflow.initial_setup.setup import (
initialize_super_user_if_needed,
load_bundles_from_urls,
load_flows_from_directory,
sync_flows_from_fs,
)
from langflow.interface.components import get_and_cache_all_types_dict
from langflow.interface.utils import setup_llm_caching
from langflow.logging.logger import configure
from langflow.middleware import ContentSizeLimitMiddleware
from langflow.services.deps import get_queue_service, get_settings_service, get_telemetry_service
from langflow.services.deps import (
get_queue_service,
get_settings_service,
get_telemetry_service,
)
from langflow.services.utils import initialize_services, teardown_services
if TYPE_CHECKING:
@ -118,6 +123,7 @@ def get_lifespan(*, fix_migration=False, version=None):
rprint("[bold green]Starting Langflow...[/bold green]")
temp_dirs: list[TemporaryDirectory] = []
sync_flows_from_fs_task = None
try:
await initialize_services(fix_migration=fix_migration)
setup_llm_caching()
@ -128,6 +134,7 @@ def get_lifespan(*, fix_migration=False, version=None):
await create_or_update_starter_projects(all_types_dict)
telemetry_service.start()
await load_flows_from_directory()
sync_flows_from_fs_task = asyncio.create_task(sync_flows_from_fs())
queue_service = get_queue_service()
if not queue_service.is_started(): # Start if not already started
queue_service.start()
@ -140,6 +147,9 @@ def get_lifespan(*, fix_migration=False, version=None):
finally:
# Clean shutdown
logger.info("Cleaning up resources...")
if sync_flows_from_fs_task:
sync_flows_from_fs_task.cancel()
await asyncio.wait([sync_flows_from_fs_task])
await teardown_services()
await logger.complete()
temp_dir_cleanups = [asyncio.to_thread(temp_dir.cleanup) for temp_dir in temp_dirs]