📝 (main.py): Introduce asyncio tasks to asynchronously fetch and cache all types dictionary for improved performance and responsiveness

📝 (endpoints.py): Refactor code to use get_and_cache_all_types_dict function for better code organization and readability
📝 (setup.py): Refactor code to use get_all_components_coro parameter in create_or_update_starter_projects function for better flexibility and testability
📝 (types.py): Refactor code to introduce get_and_cache_all_types_dict function for improved separation of concerns and reusability
This commit is contained in:
Gabriel Luiz Freitas Almeida 2024-06-20 10:46:00 -03:00
commit 058d00b4ca
4 changed files with 38 additions and 14 deletions

View file

@ -5,6 +5,9 @@ from uuid import UUID
import sqlalchemy as sa
from fastapi import APIRouter, BackgroundTasks, Body, Depends, HTTPException, Request, UploadFile, status
from loguru import logger
from sqlmodel import Session, select
from langflow.api.utils import update_frontend_node_with_template_values
from langflow.api.v1.schemas import (
ConfigResponse,
@ -38,8 +41,6 @@ from langflow.services.deps import (
)
from langflow.services.session.service import SessionService
from langflow.services.task.service import TaskService
from loguru import logger
from sqlmodel import Session, select
if TYPE_CHECKING:
from langflow.services.cache.base import CacheService
@ -54,15 +55,14 @@ async def get_all(
cache_service: "CacheService" = Depends(dependency=get_cache_service),
force_refresh: bool = False,
):
from langflow.interface.types import aget_all_types_dict
from langflow.interface.types import get_and_cache_all_types_dict
try:
async with Lock() as lock:
all_types_dict = await cache_service.get(key="all_types_dict", lock=lock)
if not all_types_dict or force_refresh:
logger.debug("Building langchain types dict")
all_types_dict = await aget_all_types_dict(settings_service.settings.components_path)
await cache_service.set(key="all_types_dict", value=all_types_dict, lock=lock)
all_types_dict = await get_and_cache_all_types_dict(
settings_service=settings_service, cache_service=cache_service, force_refresh=force_refresh, lock=lock
)
return all_types_dict
except Exception as exc:
logger.exception(exc)

View file

@ -6,6 +6,7 @@ from collections import defaultdict
from copy import deepcopy
from datetime import datetime, timezone
from pathlib import Path
from typing import Awaitable
from uuid import UUID
import orjson
@ -15,7 +16,6 @@ from sqlmodel import select
from langflow.base.constants import FIELD_FORMAT_ATTRIBUTES, NODE_FORMAT_ATTRIBUTES, ORJSON_OPTIONS
from langflow.graph.graph.base import Graph
from langflow.interface.types import aget_all_components
from langflow.services.auth.utils import create_super_user
from langflow.services.database.models.flow.model import Flow, FlowCreate
from langflow.services.database.models.folder.model import Folder, FolderCreate
@ -537,10 +537,9 @@ def find_existing_flow(session, flow_id, flow_endpoint_name):
return None
async def create_or_update_starter_projects():
components_paths = get_settings_service().settings.components_path
async def create_or_update_starter_projects(get_all_components_coro: Awaitable[dict]):
try:
all_types_dict = await aget_all_components(components_paths, as_dict=True)
all_types_dict = await get_all_components_coro
except Exception as e:
logger.exception(f"Error loading components: {e}")
raise e

View file

@ -1,7 +1,15 @@
import asyncio
import json
from git import TYPE_CHECKING
from loguru import logger
from langflow.custom.utils import abuild_custom_components, build_custom_components
if TYPE_CHECKING:
from langflow.services.cache.base import CacheService
from langflow.services.settings.manager import SettingsService
async def aget_all_types_dict(components_paths):
"""Get all types dictionary combining native and custom components."""
@ -47,3 +55,17 @@ def get_all_components(components_paths, as_dict=False):
else:
components.append(component)
return components
async def get_and_cache_all_types_dict(
settings_service: "SettingsService",
cache_service: "CacheService",
force_refresh: bool = False,
lock: asyncio.Lock | None = None,
):
all_types_dict = await cache_service.get(key="all_types_dict", lock=lock)
if not all_types_dict or force_refresh:
logger.debug("Building langchain types dict")
all_types_dict = await aget_all_types_dict(settings_service.settings.components_path)
await cache_service.set(key="all_types_dict", value=all_types_dict, lock=lock)
return all_types_dict

View file

@ -1,3 +1,4 @@
import asyncio
import warnings
from contextlib import asynccontextmanager
from pathlib import Path
@ -21,8 +22,9 @@ from langflow.initial_setup.setup import (
initialize_super_user_if_needed,
load_flows_from_directory,
)
from langflow.interface.types import get_and_cache_all_types_dict
from langflow.interface.utils import setup_llm_caching
from langflow.services.deps import get_settings_service
from langflow.services.deps import get_cache_service, get_settings_service
from langflow.services.plugins.langfuse_plugin import LangfuseInstance
from langflow.services.utils import initialize_services, teardown_services
from langflow.utils.logger import configure
@ -57,7 +59,8 @@ def get_lifespan(fix_migration=False, socketio_server=None, version=None):
setup_llm_caching()
LangfuseInstance.update()
initialize_super_user_if_needed()
await create_or_update_starter_projects()
task = asyncio.create_task(get_and_cache_all_types_dict(get_settings_service(), get_cache_service()))
asyncio.create_task(create_or_update_starter_projects(task))
load_flows_from_directory()
yield
except Exception as exc: