feat: Add support for loading flows and components from URLs (#5020)

* Add support for loading flows and components from a zip

* Translate github URLs automatically
This commit is contained in:
Christophe Bornet 2025-01-08 16:31:20 +01:00 committed by GitHub
commit 6b0435906f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 238 additions and 50 deletions

View file

@ -1,14 +1,21 @@
import asyncio
import copy
import io
import json
import os
import re
import shutil
import zipfile
from collections import defaultdict
from copy import deepcopy
from datetime import datetime, timezone
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import AnyStr
from uuid import UUID
import anyio
import httpx
import orjson
from aiofile import async_open
from emoji import demojize, purely_emoji
@ -16,6 +23,7 @@ from loguru import logger
from sqlalchemy.exc import NoResultFound
from sqlalchemy.orm import selectinload
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
from langflow.base.constants import FIELD_FORMAT_ATTRIBUTES, NODE_FORMAT_ATTRIBUTES, ORJSON_OPTIONS
from langflow.initial_setup.constants import STARTER_FOLDER_DESCRIPTION, STARTER_FOLDER_NAME
@ -549,66 +557,144 @@ async def load_flows_from_directory() -> None:
if user is None:
msg = "Superuser not found in the database"
raise NoResultFound(msg)
user_id = user.id
flows_path_ = anyio.Path(flows_path)
files = [f async for f in flows_path_.iterdir() if await f.is_file()]
for file_path in files:
if file_path.suffix != ".json":
async for file_path in anyio.Path(flows_path).iterdir():
if not await file_path.is_file() or file_path.suffix != ".json":
continue
logger.info(f"Loading flow from file: {file_path.name}")
async with async_open(str(file_path), "r", encoding="utf-8") as f:
content = await f.read()
flow = orjson.loads(content)
no_json_name = file_path.stem
flow_endpoint_name = flow.get("endpoint_name")
if _is_valid_uuid(no_json_name):
flow["id"] = no_json_name
flow_id = flow.get("id")
await upsert_flow_from_file(content, file_path.stem, session, user.id)
if isinstance(flow_id, str):
try:
flow_id = UUID(flow_id)
except ValueError:
logger.error(f"Invalid UUID string: {flow_id}")
return
existing = await find_existing_flow(session, flow_id, flow_endpoint_name)
if existing:
logger.debug(f"Found existing flow: {existing.name}")
logger.info(f"Updating existing flow: {flow_id} with endpoint name {flow_endpoint_name}")
for key, value in flow.items():
if hasattr(existing, key):
# flow dict from json and db representation are not 100% the same
setattr(existing, key, value)
existing.updated_at = datetime.now(tz=timezone.utc).astimezone()
existing.user_id = user_id
async def detect_github_url(url: str) -> str:
if matched := re.match(r"https?://(?:www\.)?github\.com/([\w.-]+)/([\w.-]+)?/?$", url):
owner, repo = matched.groups()
# Generally, folder_id should not be None, but we must check this due to the previous
# behavior where flows could be added and folder_id was None, orphaning
# them within Langflow.
if existing.folder_id is None:
folder_id = await get_default_folder_id(session, user_id)
existing.folder_id = folder_id
repo = repo.removesuffix(".git")
if isinstance(existing.id, str):
try:
existing.id = UUID(existing.id)
except ValueError:
logger.error(f"Invalid UUID string: {existing.id}")
return
async with httpx.AsyncClient(follow_redirects=True) as client:
response = await client.get(f"https://api.github.com/repos/{owner}/{repo}")
response.raise_for_status()
default_branch = response.json().get("default_branch")
return f"https://github.com/{owner}/{repo}/archive/refs/heads/{default_branch}.zip"
session.add(existing)
else:
logger.info(f"Creating new flow: {flow_id} with endpoint name {flow_endpoint_name}")
if matched := re.match(r"https?://(?:www\.)?github\.com/([\w.-]+)/([\w.-]+)/tree/([\w\\/.-]+)", url):
owner, repo, branch = matched.groups()
if branch[-1] == "/":
branch = branch[:-1]
return f"https://github.com/{owner}/{repo}/archive/refs/heads/{branch}.zip"
# Current behavior loads all new flows into default folder
folder_id = await get_default_folder_id(session, user_id)
flow["user_id"] = user_id
flow["folder_id"] = folder_id
flow = Flow.model_validate(flow, from_attributes=True)
flow.updated_at = datetime.now(tz=timezone.utc).astimezone()
if matched := re.match(r"https?://(?:www\.)?github\.com/([\w.-]+)/([\w.-]+)/releases/tag/([\w\\/.-]+)", url):
owner, repo, tag = matched.groups()
if tag[-1] == "/":
tag = tag[:-1]
return f"https://github.com/{owner}/{repo}/archive/refs/tags/{tag}.zip"
session.add(flow)
if matched := re.match(r"https?://(?:www\.)?github\.com/([\w.-]+)/([\w.-]+)/commit/(\w+)/?$", url):
owner, repo, commit = matched.groups()
return f"https://github.com/{owner}/{repo}/archive/{commit}.zip"
return url
async def load_bundles_from_urls() -> tuple[list[TemporaryDirectory], list[str]]:
component_paths: set[str] = set()
temp_dirs = []
settings_service = get_settings_service()
bundle_urls = settings_service.settings.bundle_urls
if not bundle_urls:
return [], []
if not settings_service.auth_settings.AUTO_LOGIN:
logger.warning("AUTO_LOGIN is disabled, not loading flows from URLs")
async with session_scope() as session:
user = await get_user_by_username(session, settings_service.auth_settings.SUPERUSER)
if user is None:
msg = "Superuser not found in the database"
raise NoResultFound(msg)
user_id = user.id
for url in bundle_urls:
url_ = await detect_github_url(url)
async with httpx.AsyncClient(follow_redirects=True) as client:
response = await client.get(url_)
response.raise_for_status()
with zipfile.ZipFile(io.BytesIO(response.content)) as zfile:
dir_names = [f.filename for f in zfile.infolist() if f.is_dir() and "/" not in f.filename[:-1]]
temp_dir = None
for filename in zfile.namelist():
path = Path(filename)
for dir_name in dir_names:
if (
settings_service.auth_settings.AUTO_LOGIN
and path.is_relative_to(f"{dir_name}flows/")
and path.suffix == ".json"
):
file_content = zfile.read(filename)
await upsert_flow_from_file(file_content, path.stem, session, user_id)
elif path.is_relative_to(f"{dir_name}components/"):
if temp_dir is None:
temp_dir = await asyncio.to_thread(TemporaryDirectory)
temp_dirs.append(temp_dir)
component_paths.add(str(Path(temp_dir.name) / f"{dir_name}components"))
await asyncio.to_thread(zfile.extract, filename, temp_dir.name)
return temp_dirs, list(component_paths)
async def upsert_flow_from_file(file_content: AnyStr, filename: str, session: AsyncSession, user_id: UUID) -> None:
flow = orjson.loads(file_content)
flow_endpoint_name = flow.get("endpoint_name")
if _is_valid_uuid(filename):
flow["id"] = filename
flow_id = flow.get("id")
if isinstance(flow_id, str):
try:
flow_id = UUID(flow_id)
except ValueError:
logger.error(f"Invalid UUID string: {flow_id}")
return
existing = await find_existing_flow(session, flow_id, flow_endpoint_name)
if existing:
logger.debug(f"Found existing flow: {existing.name}")
logger.info(f"Updating existing flow: {flow_id} with endpoint name {flow_endpoint_name}")
for key, value in flow.items():
if hasattr(existing, key):
# flow dict from json and db representation are not 100% the same
setattr(existing, key, value)
existing.updated_at = datetime.now(tz=timezone.utc).astimezone()
existing.user_id = user_id
# Generally, folder_id should not be None, but we must check this due to the previous
# behavior where flows could be added and folder_id was None, orphaning
# them within Langflow.
if existing.folder_id is None:
folder_id = await get_default_folder_id(session, user_id)
existing.folder_id = folder_id
if isinstance(existing.id, str):
try:
existing.id = UUID(existing.id)
except ValueError:
logger.error(f"Invalid UUID string: {existing.id}")
return
session.add(existing)
else:
logger.info(f"Creating new flow: {flow_id} with endpoint name {flow_endpoint_name}")
# Current behavior loads all new flows into default folder
folder_id = await get_default_folder_id(session, user_id)
flow["user_id"] = user_id
flow["folder_id"] = folder_id
flow = Flow.model_validate(flow, from_attributes=True)
flow.updated_at = datetime.now(tz=timezone.utc).astimezone()
session.add(flow)
async def find_existing_flow(session, flow_id, flow_endpoint_name):

View file

@ -6,6 +6,7 @@ import warnings
from contextlib import asynccontextmanager
from http import HTTPStatus
from pathlib import Path
from typing import TYPE_CHECKING
from urllib.parse import urlencode
import anyio
@ -25,6 +26,7 @@ from langflow.api import health_check_router, log_router, router
from langflow.initial_setup.setup import (
create_or_update_starter_projects,
initialize_super_user_if_needed,
load_bundles_from_urls,
load_flows_from_directory,
)
from langflow.interface.types import get_and_cache_all_types_dict
@ -34,6 +36,9 @@ from langflow.middleware import ContentSizeLimitMiddleware
from langflow.services.deps import get_settings_service, get_telemetry_service
from langflow.services.utils import initialize_services, teardown_services
if TYPE_CHECKING:
from tempfile import TemporaryDirectory
# Ignore Pydantic deprecation warnings from Langchain
warnings.filterwarnings("ignore", category=PydanticDeprecatedSince20)
@ -101,10 +106,14 @@ def get_lifespan(*, fix_migration=False, version=None):
rprint(f"[bold green]Starting Langflow v{version}...[/bold green]")
else:
rprint("[bold green]Starting Langflow...[/bold green]")
temp_dirs: list[TemporaryDirectory] = []
try:
await initialize_services(fix_migration=fix_migration)
setup_llm_caching()
await initialize_super_user_if_needed()
temp_dirs, bundles_components_paths = await load_bundles_from_urls()
get_settings_service().settings.components_path.extend(bundles_components_paths)
all_types_dict = await get_and_cache_all_types_dict(get_settings_service())
await create_or_update_starter_projects(all_types_dict)
telemetry_service.start()
@ -120,6 +129,8 @@ def get_lifespan(*, fix_migration=False, version=None):
logger.info("Cleaning up resources...")
await teardown_services()
await logger.complete()
temp_dir_cleanups = [asyncio.to_thread(temp_dir.cleanup) for temp_dir in temp_dirs]
await asyncio.gather(*temp_dir_cleanups)
# Final message
rprint("[bold red]Langflow shutdown complete[/bold red]")

View file

@ -101,6 +101,7 @@ class Settings(BaseSettings):
components_path: list[str] = []
langchain_cache: str = "InMemoryCache"
load_flows_path: str | None = None
bundle_urls: list[str] = []
# Redis
redis_host: str = "localhost"

View file

@ -1,17 +1,24 @@
import asyncio
import uuid
from datetime import datetime
from pathlib import Path
import anyio
import pytest
from aiofile import async_open
from langflow.custom.directory_reader.utils import abuild_custom_component_list_from_path
from langflow.initial_setup.constants import STARTER_FOLDER_NAME
from langflow.initial_setup.setup import (
detect_github_url,
get_project_data,
load_bundles_from_urls,
load_starter_projects,
update_projects_components_with_latest_component_versions,
)
from langflow.interface.types import aget_all_types_dict
from langflow.services.database.models import Flow
from langflow.services.database.models.folder.model import Folder
from langflow.services.deps import session_scope
from langflow.services.deps import get_settings_service, session_scope
from sqlalchemy.orm import selectinload
from sqlmodel import select
@ -152,3 +159,86 @@ async def test_refresh_starter_projects():
assert "should_store_message" not in graph_data["nodes"][1]["data"]["node"]["template"]
assert "should_store_message" in new_change["nodes"][1]["data"]["node"]["template"]
@pytest.mark.parametrize(
("url", "expected"),
[
(
"https://github.com/langflow-ai/langflow-bundles",
"https://github.com/langflow-ai/langflow-bundles/archive/refs/heads/main.zip",
),
(
"https://github.com/langflow-ai/langflow-bundles/",
"https://github.com/langflow-ai/langflow-bundles/archive/refs/heads/main.zip",
),
(
"https://github.com/langflow-ai/langflow-bundles.git",
"https://github.com/langflow-ai/langflow-bundles/archive/refs/heads/main.zip",
),
(
"https://github.com/langflow-ai/langflow-bundles/tree/some.branch-0_1",
"https://github.com/langflow-ai/langflow-bundles/archive/refs/heads/some.branch-0_1.zip",
),
(
"https://github.com/langflow-ai/langflow-bundles/tree/some/branch",
"https://github.com/langflow-ai/langflow-bundles/archive/refs/heads/some/branch.zip",
),
(
"https://github.com/langflow-ai/langflow-bundles/tree/some/branch/",
"https://github.com/langflow-ai/langflow-bundles/archive/refs/heads/some/branch.zip",
),
(
"https://github.com/langflow-ai/langflow-bundles/releases/tag/v1.0.0-0_1",
"https://github.com/langflow-ai/langflow-bundles/archive/refs/tags/v1.0.0-0_1.zip",
),
(
"https://github.com/langflow-ai/langflow-bundles/releases/tag/foo/v1.0.0",
"https://github.com/langflow-ai/langflow-bundles/archive/refs/tags/foo/v1.0.0.zip",
),
(
"https://github.com/langflow-ai/langflow-bundles/releases/tag/foo/v1.0.0/",
"https://github.com/langflow-ai/langflow-bundles/archive/refs/tags/foo/v1.0.0.zip",
),
(
"https://github.com/langflow-ai/langflow-bundles/commit/68428ce16729a385fe1bcc0f1ec91fd5f5f420b9",
"https://github.com/langflow-ai/langflow-bundles/archive/68428ce16729a385fe1bcc0f1ec91fd5f5f420b9.zip",
),
(
"https://github.com/langflow-ai/langflow-bundles/commit/68428ce16729a385fe1bcc0f1ec91fd5f5f420b9/",
"https://github.com/langflow-ai/langflow-bundles/archive/68428ce16729a385fe1bcc0f1ec91fd5f5f420b9.zip",
),
("https://example.com/myzip.zip", "https://example.com/myzip.zip"),
],
)
async def test_detect_github_url(url, expected):
assert await detect_github_url(url) == expected
@pytest.mark.usefixtures("client")
async def test_load_bundles_from_urls():
settings_service = get_settings_service()
settings_service.settings.bundle_urls = [
"https://github.com/langflow-ai/langflow-bundles/commit/68428ce16729a385fe1bcc0f1ec91fd5f5f420b9"
]
settings_service.auth_settings.AUTO_LOGIN = True
temp_dirs, components_paths = await load_bundles_from_urls()
try:
assert len(components_paths) == 1
assert "langflow-bundles-68428ce16729a385fe1bcc0f1ec91fd5f5f420b9/components" in components_paths[0]
async with async_open(Path(components_paths[0]) / "embeddings" / "openai2.py") as f:
content = await f.read()
assert "OpenAIEmbeddings2Component" in content
assert len(temp_dirs) == 1
async with session_scope() as session:
stmt = select(Flow).where(Flow.id == uuid.UUID("c54f9130-f2fa-4a3e-b22a-3856d946351b"))
flow = (await session.exec(stmt)).first()
assert flow is not None
finally:
for temp_dir in temp_dirs:
await asyncio.to_thread(temp_dir.cleanup)