From 058d00b4cadb128e5c5b1ac7154c1bb5b14dcdc4 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Thu, 20 Jun 2024 10:46:00 -0300 Subject: [PATCH] =?UTF-8?q?=F0=9F=93=9D=20(main.py):=20Introduce=20asyncio?= =?UTF-8?q?=20tasks=20to=20asynchronously=20fetch=20and=20cache=20all=20ty?= =?UTF-8?q?pes=20dictionary=20for=20improved=20performance=20and=20respons?= =?UTF-8?q?iveness?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 📝 (endpoints.py): Refactor code to use get_and_cache_all_types_dict function for better code organization and readability 📝 (setup.py): Refactor code to use get_all_components_coro parameter in create_or_update_starter_projects function for better flexibility and testability 📝 (types.py): Refactor code to introduce get_and_cache_all_types_dict function for improved separation of concerns and reusability --- src/backend/base/langflow/api/v1/endpoints.py | 16 +++++++------- .../base/langflow/initial_setup/setup.py | 7 +++--- src/backend/base/langflow/interface/types.py | 22 +++++++++++++++++++ src/backend/base/langflow/main.py | 7 ++++-- 4 files changed, 38 insertions(+), 14 deletions(-) diff --git a/src/backend/base/langflow/api/v1/endpoints.py b/src/backend/base/langflow/api/v1/endpoints.py index b2b17949a..cbd5e3154 100644 --- a/src/backend/base/langflow/api/v1/endpoints.py +++ b/src/backend/base/langflow/api/v1/endpoints.py @@ -5,6 +5,9 @@ from uuid import UUID import sqlalchemy as sa from fastapi import APIRouter, BackgroundTasks, Body, Depends, HTTPException, Request, UploadFile, status +from loguru import logger +from sqlmodel import Session, select + from langflow.api.utils import update_frontend_node_with_template_values from langflow.api.v1.schemas import ( ConfigResponse, @@ -38,8 +41,6 @@ from langflow.services.deps import ( ) from langflow.services.session.service import SessionService from langflow.services.task.service import TaskService -from loguru import logger -from sqlmodel import Session, select if TYPE_CHECKING: from langflow.services.cache.base import CacheService @@ -54,15 +55,14 @@ async def get_all( cache_service: "CacheService" = Depends(dependency=get_cache_service), force_refresh: bool = False, ): - from langflow.interface.types import aget_all_types_dict + from langflow.interface.types import get_and_cache_all_types_dict try: async with Lock() as lock: - all_types_dict = await cache_service.get(key="all_types_dict", lock=lock) - if not all_types_dict or force_refresh: - logger.debug("Building langchain types dict") - all_types_dict = await aget_all_types_dict(settings_service.settings.components_path) - await cache_service.set(key="all_types_dict", value=all_types_dict, lock=lock) + all_types_dict = await get_and_cache_all_types_dict( + settings_service=settings_service, cache_service=cache_service, force_refresh=force_refresh, lock=lock + ) + return all_types_dict except Exception as exc: logger.exception(exc) diff --git a/src/backend/base/langflow/initial_setup/setup.py b/src/backend/base/langflow/initial_setup/setup.py index 79ec7d9d0..6d1572da6 100644 --- a/src/backend/base/langflow/initial_setup/setup.py +++ b/src/backend/base/langflow/initial_setup/setup.py @@ -6,6 +6,7 @@ from collections import defaultdict from copy import deepcopy from datetime import datetime, timezone from pathlib import Path +from typing import Awaitable from uuid import UUID import orjson @@ -15,7 +16,6 @@ from sqlmodel import select from langflow.base.constants import FIELD_FORMAT_ATTRIBUTES, NODE_FORMAT_ATTRIBUTES, ORJSON_OPTIONS from langflow.graph.graph.base import Graph -from langflow.interface.types import aget_all_components from langflow.services.auth.utils import create_super_user from langflow.services.database.models.flow.model import Flow, FlowCreate from langflow.services.database.models.folder.model import Folder, FolderCreate @@ -537,10 +537,9 @@ def find_existing_flow(session, flow_id, flow_endpoint_name): return None -async def create_or_update_starter_projects(): - components_paths = get_settings_service().settings.components_path +async def create_or_update_starter_projects(get_all_components_coro: Awaitable[dict]): try: - all_types_dict = await aget_all_components(components_paths, as_dict=True) + all_types_dict = await get_all_components_coro except Exception as e: logger.exception(f"Error loading components: {e}") raise e diff --git a/src/backend/base/langflow/interface/types.py b/src/backend/base/langflow/interface/types.py index 46e3f3a00..35b913565 100644 --- a/src/backend/base/langflow/interface/types.py +++ b/src/backend/base/langflow/interface/types.py @@ -1,7 +1,15 @@ +import asyncio import json +from git import TYPE_CHECKING +from loguru import logger + from langflow.custom.utils import abuild_custom_components, build_custom_components +if TYPE_CHECKING: + from langflow.services.cache.base import CacheService + from langflow.services.settings.manager import SettingsService + async def aget_all_types_dict(components_paths): """Get all types dictionary combining native and custom components.""" @@ -47,3 +55,17 @@ def get_all_components(components_paths, as_dict=False): else: components.append(component) return components + + +async def get_and_cache_all_types_dict( + settings_service: "SettingsService", + cache_service: "CacheService", + force_refresh: bool = False, + lock: asyncio.Lock | None = None, +): + all_types_dict = await cache_service.get(key="all_types_dict", lock=lock) + if not all_types_dict or force_refresh: + logger.debug("Building langchain types dict") + all_types_dict = await aget_all_types_dict(settings_service.settings.components_path) + await cache_service.set(key="all_types_dict", value=all_types_dict, lock=lock) + return all_types_dict diff --git a/src/backend/base/langflow/main.py b/src/backend/base/langflow/main.py index 4c8772c9f..e6c33c197 100644 --- a/src/backend/base/langflow/main.py +++ b/src/backend/base/langflow/main.py @@ -1,3 +1,4 @@ +import asyncio import warnings from contextlib import asynccontextmanager from pathlib import Path @@ -21,8 +22,9 @@ from langflow.initial_setup.setup import ( initialize_super_user_if_needed, load_flows_from_directory, ) +from langflow.interface.types import get_and_cache_all_types_dict from langflow.interface.utils import setup_llm_caching -from langflow.services.deps import get_settings_service +from langflow.services.deps import get_cache_service, get_settings_service from langflow.services.plugins.langfuse_plugin import LangfuseInstance from langflow.services.utils import initialize_services, teardown_services from langflow.utils.logger import configure @@ -57,7 +59,8 @@ def get_lifespan(fix_migration=False, socketio_server=None, version=None): setup_llm_caching() LangfuseInstance.update() initialize_super_user_if_needed() - await create_or_update_starter_projects() + task = asyncio.create_task(get_and_cache_all_types_dict(get_settings_service(), get_cache_service())) + asyncio.create_task(create_or_update_starter_projects(task)) load_flows_from_directory() yield except Exception as exc: