From 6b0435906fefd9f18e4526a09bd8c07281339e08 Mon Sep 17 00:00:00 2001 From: Christophe Bornet Date: Wed, 8 Jan 2025 16:31:20 +0100 Subject: [PATCH] feat: Add support for loading flows and components from URLs (#5020) * Add support for loading flows and components from a zip * Translate github URLs automatically --- .../base/langflow/initial_setup/setup.py | 184 +++++++++++++----- src/backend/base/langflow/main.py | 11 ++ .../base/langflow/services/settings/base.py | 1 + src/backend/tests/unit/test_initial_setup.py | 92 ++++++++- 4 files changed, 238 insertions(+), 50 deletions(-) diff --git a/src/backend/base/langflow/initial_setup/setup.py b/src/backend/base/langflow/initial_setup/setup.py index 143754134..481f3169d 100644 --- a/src/backend/base/langflow/initial_setup/setup.py +++ b/src/backend/base/langflow/initial_setup/setup.py @@ -1,14 +1,21 @@ import asyncio import copy +import io import json import os +import re import shutil +import zipfile from collections import defaultdict from copy import deepcopy from datetime import datetime, timezone +from pathlib import Path +from tempfile import TemporaryDirectory +from typing import AnyStr from uuid import UUID import anyio +import httpx import orjson from aiofile import async_open from emoji import demojize, purely_emoji @@ -16,6 +23,7 @@ from loguru import logger from sqlalchemy.exc import NoResultFound from sqlalchemy.orm import selectinload from sqlmodel import select +from sqlmodel.ext.asyncio.session import AsyncSession from langflow.base.constants import FIELD_FORMAT_ATTRIBUTES, NODE_FORMAT_ATTRIBUTES, ORJSON_OPTIONS from langflow.initial_setup.constants import STARTER_FOLDER_DESCRIPTION, STARTER_FOLDER_NAME @@ -549,66 +557,144 @@ async def load_flows_from_directory() -> None: if user is None: msg = "Superuser not found in the database" raise NoResultFound(msg) - user_id = user.id - flows_path_ = anyio.Path(flows_path) - files = [f async for f in flows_path_.iterdir() if await f.is_file()] - for file_path in files: - if file_path.suffix != ".json": + async for file_path in anyio.Path(flows_path).iterdir(): + if not await file_path.is_file() or file_path.suffix != ".json": continue logger.info(f"Loading flow from file: {file_path.name}") async with async_open(str(file_path), "r", encoding="utf-8") as f: content = await f.read() - flow = orjson.loads(content) - no_json_name = file_path.stem - flow_endpoint_name = flow.get("endpoint_name") - if _is_valid_uuid(no_json_name): - flow["id"] = no_json_name - flow_id = flow.get("id") + await upsert_flow_from_file(content, file_path.stem, session, user.id) - if isinstance(flow_id, str): - try: - flow_id = UUID(flow_id) - except ValueError: - logger.error(f"Invalid UUID string: {flow_id}") - return - existing = await find_existing_flow(session, flow_id, flow_endpoint_name) - if existing: - logger.debug(f"Found existing flow: {existing.name}") - logger.info(f"Updating existing flow: {flow_id} with endpoint name {flow_endpoint_name}") - for key, value in flow.items(): - if hasattr(existing, key): - # flow dict from json and db representation are not 100% the same - setattr(existing, key, value) - existing.updated_at = datetime.now(tz=timezone.utc).astimezone() - existing.user_id = user_id +async def detect_github_url(url: str) -> str: + if matched := re.match(r"https?://(?:www\.)?github\.com/([\w.-]+)/([\w.-]+)?/?$", url): + owner, repo = matched.groups() - # Generally, folder_id should not be None, but we must check this due to the previous - # behavior where flows could be added and folder_id was None, orphaning - # them within Langflow. - if existing.folder_id is None: - folder_id = await get_default_folder_id(session, user_id) - existing.folder_id = folder_id + repo = repo.removesuffix(".git") - if isinstance(existing.id, str): - try: - existing.id = UUID(existing.id) - except ValueError: - logger.error(f"Invalid UUID string: {existing.id}") - return + async with httpx.AsyncClient(follow_redirects=True) as client: + response = await client.get(f"https://api.github.com/repos/{owner}/{repo}") + response.raise_for_status() + default_branch = response.json().get("default_branch") + return f"https://github.com/{owner}/{repo}/archive/refs/heads/{default_branch}.zip" - session.add(existing) - else: - logger.info(f"Creating new flow: {flow_id} with endpoint name {flow_endpoint_name}") + if matched := re.match(r"https?://(?:www\.)?github\.com/([\w.-]+)/([\w.-]+)/tree/([\w\\/.-]+)", url): + owner, repo, branch = matched.groups() + if branch[-1] == "/": + branch = branch[:-1] + return f"https://github.com/{owner}/{repo}/archive/refs/heads/{branch}.zip" - # Current behavior loads all new flows into default folder - folder_id = await get_default_folder_id(session, user_id) - flow["user_id"] = user_id - flow["folder_id"] = folder_id - flow = Flow.model_validate(flow, from_attributes=True) - flow.updated_at = datetime.now(tz=timezone.utc).astimezone() + if matched := re.match(r"https?://(?:www\.)?github\.com/([\w.-]+)/([\w.-]+)/releases/tag/([\w\\/.-]+)", url): + owner, repo, tag = matched.groups() + if tag[-1] == "/": + tag = tag[:-1] + return f"https://github.com/{owner}/{repo}/archive/refs/tags/{tag}.zip" - session.add(flow) + if matched := re.match(r"https?://(?:www\.)?github\.com/([\w.-]+)/([\w.-]+)/commit/(\w+)/?$", url): + owner, repo, commit = matched.groups() + return f"https://github.com/{owner}/{repo}/archive/{commit}.zip" + + return url + + +async def load_bundles_from_urls() -> tuple[list[TemporaryDirectory], list[str]]: + component_paths: set[str] = set() + temp_dirs = [] + settings_service = get_settings_service() + bundle_urls = settings_service.settings.bundle_urls + if not bundle_urls: + return [], [] + if not settings_service.auth_settings.AUTO_LOGIN: + logger.warning("AUTO_LOGIN is disabled, not loading flows from URLs") + + async with session_scope() as session: + user = await get_user_by_username(session, settings_service.auth_settings.SUPERUSER) + if user is None: + msg = "Superuser not found in the database" + raise NoResultFound(msg) + user_id = user.id + + for url in bundle_urls: + url_ = await detect_github_url(url) + + async with httpx.AsyncClient(follow_redirects=True) as client: + response = await client.get(url_) + response.raise_for_status() + + with zipfile.ZipFile(io.BytesIO(response.content)) as zfile: + dir_names = [f.filename for f in zfile.infolist() if f.is_dir() and "/" not in f.filename[:-1]] + temp_dir = None + for filename in zfile.namelist(): + path = Path(filename) + for dir_name in dir_names: + if ( + settings_service.auth_settings.AUTO_LOGIN + and path.is_relative_to(f"{dir_name}flows/") + and path.suffix == ".json" + ): + file_content = zfile.read(filename) + await upsert_flow_from_file(file_content, path.stem, session, user_id) + elif path.is_relative_to(f"{dir_name}components/"): + if temp_dir is None: + temp_dir = await asyncio.to_thread(TemporaryDirectory) + temp_dirs.append(temp_dir) + component_paths.add(str(Path(temp_dir.name) / f"{dir_name}components")) + await asyncio.to_thread(zfile.extract, filename, temp_dir.name) + + return temp_dirs, list(component_paths) + + +async def upsert_flow_from_file(file_content: AnyStr, filename: str, session: AsyncSession, user_id: UUID) -> None: + flow = orjson.loads(file_content) + flow_endpoint_name = flow.get("endpoint_name") + if _is_valid_uuid(filename): + flow["id"] = filename + flow_id = flow.get("id") + + if isinstance(flow_id, str): + try: + flow_id = UUID(flow_id) + except ValueError: + logger.error(f"Invalid UUID string: {flow_id}") + return + + existing = await find_existing_flow(session, flow_id, flow_endpoint_name) + if existing: + logger.debug(f"Found existing flow: {existing.name}") + logger.info(f"Updating existing flow: {flow_id} with endpoint name {flow_endpoint_name}") + for key, value in flow.items(): + if hasattr(existing, key): + # flow dict from json and db representation are not 100% the same + setattr(existing, key, value) + existing.updated_at = datetime.now(tz=timezone.utc).astimezone() + existing.user_id = user_id + + # Generally, folder_id should not be None, but we must check this due to the previous + # behavior where flows could be added and folder_id was None, orphaning + # them within Langflow. + if existing.folder_id is None: + folder_id = await get_default_folder_id(session, user_id) + existing.folder_id = folder_id + + if isinstance(existing.id, str): + try: + existing.id = UUID(existing.id) + except ValueError: + logger.error(f"Invalid UUID string: {existing.id}") + return + + session.add(existing) + else: + logger.info(f"Creating new flow: {flow_id} with endpoint name {flow_endpoint_name}") + + # Current behavior loads all new flows into default folder + folder_id = await get_default_folder_id(session, user_id) + flow["user_id"] = user_id + flow["folder_id"] = folder_id + flow = Flow.model_validate(flow, from_attributes=True) + flow.updated_at = datetime.now(tz=timezone.utc).astimezone() + + session.add(flow) async def find_existing_flow(session, flow_id, flow_endpoint_name): diff --git a/src/backend/base/langflow/main.py b/src/backend/base/langflow/main.py index e2cd6944e..e102ac940 100644 --- a/src/backend/base/langflow/main.py +++ b/src/backend/base/langflow/main.py @@ -6,6 +6,7 @@ import warnings from contextlib import asynccontextmanager from http import HTTPStatus from pathlib import Path +from typing import TYPE_CHECKING from urllib.parse import urlencode import anyio @@ -25,6 +26,7 @@ from langflow.api import health_check_router, log_router, router from langflow.initial_setup.setup import ( create_or_update_starter_projects, initialize_super_user_if_needed, + load_bundles_from_urls, load_flows_from_directory, ) from langflow.interface.types import get_and_cache_all_types_dict @@ -34,6 +36,9 @@ from langflow.middleware import ContentSizeLimitMiddleware from langflow.services.deps import get_settings_service, get_telemetry_service from langflow.services.utils import initialize_services, teardown_services +if TYPE_CHECKING: + from tempfile import TemporaryDirectory + # Ignore Pydantic deprecation warnings from Langchain warnings.filterwarnings("ignore", category=PydanticDeprecatedSince20) @@ -101,10 +106,14 @@ def get_lifespan(*, fix_migration=False, version=None): rprint(f"[bold green]Starting Langflow v{version}...[/bold green]") else: rprint("[bold green]Starting Langflow...[/bold green]") + + temp_dirs: list[TemporaryDirectory] = [] try: await initialize_services(fix_migration=fix_migration) setup_llm_caching() await initialize_super_user_if_needed() + temp_dirs, bundles_components_paths = await load_bundles_from_urls() + get_settings_service().settings.components_path.extend(bundles_components_paths) all_types_dict = await get_and_cache_all_types_dict(get_settings_service()) await create_or_update_starter_projects(all_types_dict) telemetry_service.start() @@ -120,6 +129,8 @@ def get_lifespan(*, fix_migration=False, version=None): logger.info("Cleaning up resources...") await teardown_services() await logger.complete() + temp_dir_cleanups = [asyncio.to_thread(temp_dir.cleanup) for temp_dir in temp_dirs] + await asyncio.gather(*temp_dir_cleanups) # Final message rprint("[bold red]Langflow shutdown complete[/bold red]") diff --git a/src/backend/base/langflow/services/settings/base.py b/src/backend/base/langflow/services/settings/base.py index 3833b4b90..9e1da87b7 100644 --- a/src/backend/base/langflow/services/settings/base.py +++ b/src/backend/base/langflow/services/settings/base.py @@ -101,6 +101,7 @@ class Settings(BaseSettings): components_path: list[str] = [] langchain_cache: str = "InMemoryCache" load_flows_path: str | None = None + bundle_urls: list[str] = [] # Redis redis_host: str = "localhost" diff --git a/src/backend/tests/unit/test_initial_setup.py b/src/backend/tests/unit/test_initial_setup.py index 4fd736df3..9adb0d18d 100644 --- a/src/backend/tests/unit/test_initial_setup.py +++ b/src/backend/tests/unit/test_initial_setup.py @@ -1,17 +1,24 @@ +import asyncio +import uuid from datetime import datetime +from pathlib import Path import anyio import pytest +from aiofile import async_open from langflow.custom.directory_reader.utils import abuild_custom_component_list_from_path from langflow.initial_setup.constants import STARTER_FOLDER_NAME from langflow.initial_setup.setup import ( + detect_github_url, get_project_data, + load_bundles_from_urls, load_starter_projects, update_projects_components_with_latest_component_versions, ) from langflow.interface.types import aget_all_types_dict +from langflow.services.database.models import Flow from langflow.services.database.models.folder.model import Folder -from langflow.services.deps import session_scope +from langflow.services.deps import get_settings_service, session_scope from sqlalchemy.orm import selectinload from sqlmodel import select @@ -152,3 +159,86 @@ async def test_refresh_starter_projects(): assert "should_store_message" not in graph_data["nodes"][1]["data"]["node"]["template"] assert "should_store_message" in new_change["nodes"][1]["data"]["node"]["template"] + + +@pytest.mark.parametrize( + ("url", "expected"), + [ + ( + "https://github.com/langflow-ai/langflow-bundles", + "https://github.com/langflow-ai/langflow-bundles/archive/refs/heads/main.zip", + ), + ( + "https://github.com/langflow-ai/langflow-bundles/", + "https://github.com/langflow-ai/langflow-bundles/archive/refs/heads/main.zip", + ), + ( + "https://github.com/langflow-ai/langflow-bundles.git", + "https://github.com/langflow-ai/langflow-bundles/archive/refs/heads/main.zip", + ), + ( + "https://github.com/langflow-ai/langflow-bundles/tree/some.branch-0_1", + "https://github.com/langflow-ai/langflow-bundles/archive/refs/heads/some.branch-0_1.zip", + ), + ( + "https://github.com/langflow-ai/langflow-bundles/tree/some/branch", + "https://github.com/langflow-ai/langflow-bundles/archive/refs/heads/some/branch.zip", + ), + ( + "https://github.com/langflow-ai/langflow-bundles/tree/some/branch/", + "https://github.com/langflow-ai/langflow-bundles/archive/refs/heads/some/branch.zip", + ), + ( + "https://github.com/langflow-ai/langflow-bundles/releases/tag/v1.0.0-0_1", + "https://github.com/langflow-ai/langflow-bundles/archive/refs/tags/v1.0.0-0_1.zip", + ), + ( + "https://github.com/langflow-ai/langflow-bundles/releases/tag/foo/v1.0.0", + "https://github.com/langflow-ai/langflow-bundles/archive/refs/tags/foo/v1.0.0.zip", + ), + ( + "https://github.com/langflow-ai/langflow-bundles/releases/tag/foo/v1.0.0/", + "https://github.com/langflow-ai/langflow-bundles/archive/refs/tags/foo/v1.0.0.zip", + ), + ( + "https://github.com/langflow-ai/langflow-bundles/commit/68428ce16729a385fe1bcc0f1ec91fd5f5f420b9", + "https://github.com/langflow-ai/langflow-bundles/archive/68428ce16729a385fe1bcc0f1ec91fd5f5f420b9.zip", + ), + ( + "https://github.com/langflow-ai/langflow-bundles/commit/68428ce16729a385fe1bcc0f1ec91fd5f5f420b9/", + "https://github.com/langflow-ai/langflow-bundles/archive/68428ce16729a385fe1bcc0f1ec91fd5f5f420b9.zip", + ), + ("https://example.com/myzip.zip", "https://example.com/myzip.zip"), + ], +) +async def test_detect_github_url(url, expected): + assert await detect_github_url(url) == expected + + +@pytest.mark.usefixtures("client") +async def test_load_bundles_from_urls(): + settings_service = get_settings_service() + settings_service.settings.bundle_urls = [ + "https://github.com/langflow-ai/langflow-bundles/commit/68428ce16729a385fe1bcc0f1ec91fd5f5f420b9" + ] + settings_service.auth_settings.AUTO_LOGIN = True + + temp_dirs, components_paths = await load_bundles_from_urls() + + try: + assert len(components_paths) == 1 + assert "langflow-bundles-68428ce16729a385fe1bcc0f1ec91fd5f5f420b9/components" in components_paths[0] + + async with async_open(Path(components_paths[0]) / "embeddings" / "openai2.py") as f: + content = await f.read() + assert "OpenAIEmbeddings2Component" in content + + assert len(temp_dirs) == 1 + + async with session_scope() as session: + stmt = select(Flow).where(Flow.id == uuid.UUID("c54f9130-f2fa-4a3e-b22a-3856d946351b")) + flow = (await session.exec(stmt)).first() + assert flow is not None + finally: + for temp_dir in temp_dirs: + await asyncio.to_thread(temp_dir.cleanup)