feat: Update get_all endpoint to use async/await

This commit updates the `get_all` endpoint in the `endpoints.py` file to use `async` and `await` keywords. By making the `get_all` function asynchronous, it allows for better performance and responsiveness when retrieving all types from the langchain. This change improves the overall efficiency of the codebase.
This commit is contained in:
ogabrielluiz 2024-05-30 22:46:49 -03:00
commit 5dae23bcb1
5 changed files with 184 additions and 28 deletions

View file

@ -39,14 +39,14 @@ router = APIRouter(tags=["Base"])
@router.get("/all", dependencies=[Depends(get_current_active_user)])
def get_all(
async def get_all(
settings_service=Depends(get_settings_service),
):
from langflow.interface.types import get_all_types_dict
from langflow.interface.types import aget_all_types_dict
logger.debug("Building langchain types dict")
try:
all_types_dict = get_all_types_dict(settings_service.settings.components_path)
all_types_dict = await aget_all_types_dict(settings_service.settings.components_path)
return all_types_dict
except Exception as exc:
logger.exception(exc)

View file

@ -1,4 +1,5 @@
import ast
import asyncio
import os
import zlib
from pathlib import Path
@ -286,6 +287,87 @@ class DirectoryReader:
logger.debug("-------------------- Component menu list built --------------------")
return response
async def process_file_async(self, file_path):
try:
file_content = self.read_file_content(file_path)
except Exception as exc:
logger.exception(exc)
logger.error(f"Error while reading file {file_path}: {str(exc)}")
return False, f"Could not read {file_path}"
if file_content is None:
return False, f"Could not read {file_path}"
elif self.is_empty_file(file_content):
return False, "Empty file"
elif not self.validate_code(file_content):
return False, "Syntax error"
elif not self.validate_build(file_content):
return False, "Missing build function"
elif self._is_type_hint_used_in_args("Optional", file_content) and not self._is_type_hint_imported(
"Optional", file_content
):
return (
False,
"Type hint 'Optional' is used but not imported in the code.",
)
else:
if self.compress_code_field:
file_content = str(StringCompressor(file_content).compress_string())
return True, file_content
async def get_output_types_from_code_async(self, code: str):
return await asyncio.to_thread(self.get_output_types_from_code, code)
async def abuild_component_menu_list(self, file_paths):
response = {"menu": []}
logger.debug("-------------------- Async Building component menu list --------------------")
tasks = [self.process_file_async(file_path) for file_path in file_paths]
results = await asyncio.gather(*tasks)
for file_path, (validation_result, result_content) in zip(file_paths, results):
menu_name = os.path.basename(os.path.dirname(file_path))
filename = os.path.basename(file_path)
if not validation_result:
logger.error(f"Error while processing file {file_path}")
menu_result = self.find_menu(response, menu_name) or {
"name": menu_name,
"path": os.path.dirname(file_path),
"components": [],
}
component_name = filename.split(".")[0]
if "_" in component_name:
component_name_camelcase = " ".join(word.title() for word in component_name.split("_"))
else:
component_name_camelcase = component_name
if validation_result:
try:
output_types = await self.get_output_types_from_code_async(result_content)
except Exception as exc:
logger.exception(f"Error while getting output types from code: {str(exc)}")
output_types = [component_name_camelcase]
else:
output_types = [component_name_camelcase]
component_info = {
"name": component_name_camelcase,
"output_types": output_types,
"file": filename,
"code": result_content if validation_result else "",
"error": "" if validation_result else result_content,
}
menu_result["components"].append(component_info)
if menu_result not in response["menu"]:
response["menu"].append(menu_result)
logger.debug("-------------------- Component menu list built --------------------")
return response
@staticmethod
def get_output_types_from_code(code: str) -> list:
"""

View file

@ -51,6 +51,16 @@ def build_and_validate_all_files(reader: DirectoryReader, file_list):
return valid_components, invalid_components
async def abuild_and_validate_all_files(reader: DirectoryReader, file_list):
"""Build and validate all files"""
data = await reader.abuild_component_menu_list(file_list)
valid_components = reader.filter_loaded_components(data=data, with_errors=False)
invalid_components = reader.filter_loaded_components(data=data, with_errors=True)
return valid_components, invalid_components
def load_files_from_path(path: str):
"""Load all files from a given path"""
reader = DirectoryReader(path, False)
@ -71,6 +81,19 @@ def build_custom_component_list_from_path(path: str):
return merge_nested_dicts_with_renaming(valid_menu, invalid_menu)
async def abuild_custom_component_list_from_path(path: str):
"""Build a list of custom components for the langchain from a given path"""
file_list = load_files_from_path(path)
reader = DirectoryReader(path, False)
valid_components, invalid_components = await abuild_and_validate_all_files(reader, file_list)
valid_menu = build_valid_menu(valid_components)
invalid_menu = build_invalid_menu(invalid_components)
return merge_nested_dicts_with_renaming(valid_menu, invalid_menu)
def create_invalid_component_template(component, component_name):
"""Create a template for an invalid component."""
component_code = component["code"]

View file

@ -11,9 +11,9 @@ from loguru import logger
from pydantic import BaseModel
from langflow.custom import CustomComponent
from langflow.custom.attributes import ATTR_FUNC_MAPPING
from langflow.custom.code_parser.utils import extract_inner_type
from langflow.custom.directory_reader.utils import (
abuild_custom_component_list_from_path,
build_custom_component_list_from_path,
determine_component_name,
merge_nested_dicts_with_renaming,
@ -21,6 +21,7 @@ from langflow.custom.directory_reader.utils import (
from langflow.custom.eval import eval_custom_component_code
from langflow.custom.schema import MissingDefault
from langflow.field_typing.range_spec import RangeSpec
from langflow.helpers.custom import format_type
from langflow.schema import dotdict
from langflow.template.field.base import Input
from langflow.template.frontend_node.custom_components import CustomComponentFrontendNode
@ -147,7 +148,11 @@ def add_new_custom_field(
# Check field_config if any of the keys are in it
# if it is, update the value
display_name = field_config.pop("display_name", None)
field_type = field_config.pop("field_type", field_type)
if not field_type:
if "type" in field_config and field_config["type"] is not None:
field_type = field_config.pop("type")
elif "field_type" in field_config and field_config["field_type"] is not None:
field_type = field_config.pop("field_type")
field_contains_list = "list" in field_type.lower()
field_type = process_type(field_type)
field_value = field_config.pop("value", field_value)
@ -238,6 +243,15 @@ def get_field_dict(field: Union[Input, dict]):
return field
def run_build_inputs(custom_component: CustomComponent, user_id: Optional[Union[str, UUID]] = None):
"""Run the build inputs of a custom component."""
try:
return custom_component.build_inputs(user_id=user_id)
except Exception as exc:
logger.error(f"Error running build inputs: {exc}")
raise HTTPException(status_code=500, detail=str(exc)) from exc
def run_build_config(
custom_component: CustomComponent,
user_id: Optional[Union[str, UUID]] = None,
@ -284,26 +298,6 @@ def run_build_config(
raise exc
def sanitize_template_config(template_config):
"""Sanitize the template config"""
for key in template_config.copy():
if key not in ATTR_FUNC_MAPPING.keys():
template_config.pop(key, None)
return template_config
def build_frontend_node(template_config):
"""Build a frontend node for a custom component"""
try:
sanitized_template_config = sanitize_template_config(template_config)
return CustomComponentFrontendNode(**sanitized_template_config)
except Exception as exc:
logger.error(f"Error while building base frontend node: {exc}")
raise exc
def add_code_field(frontend_node: CustomComponentFrontendNode, raw_code, field_config):
code_field = Input(
dynamic=True,
@ -322,13 +316,34 @@ def add_code_field(frontend_node: CustomComponentFrontendNode, raw_code, field_c
return frontend_node
def build_custom_component_template_from_inputs(
custom_component: CustomComponent, user_id: Optional[Union[str, UUID]] = None
):
# The List of Inputs fills the role of the build_config and the entrypoint_args
frontend_node = CustomComponentFrontendNode.from_inputs(**custom_component.template_config)
field_config = run_build_inputs(
custom_component,
user_id=user_id,
)
frontend_node = add_code_field(frontend_node, custom_component.code, field_config.get("code", {}))
# But we now need to calculate the return_type of the methods in the outputs
for output in frontend_node.outputs:
return_types = custom_component.get_method_return_type(output.method)
return_types = [format_type(return_type) for return_type in return_types]
output.add_types(return_types)
return frontend_node.to_dict(add_name=False), custom_component
def build_custom_component_template(
custom_component: CustomComponent,
user_id: Optional[Union[str, UUID]] = None,
) -> Tuple[Dict[str, Any], CustomComponent]:
"""Build a custom component template for the langchain"""
"""Build a custom component template"""
try:
frontend_node = build_frontend_node(custom_component.template_config)
if "inputs" in custom_component.template_config:
return build_custom_component_template_from_inputs(custom_component, user_id=user_id)
frontend_node = CustomComponentFrontendNode(**custom_component.template_config)
field_config, custom_instance = run_build_config(
custom_component,
@ -398,6 +413,31 @@ def build_custom_components(components_paths: List[str]):
return custom_components_from_file
async def abuild_custom_components(components_paths: List[str]):
"""Build custom components from the specified paths."""
if not components_paths:
return {}
logger.info(f"Building custom components from {components_paths}")
custom_components_from_file: dict = {}
processed_paths = set()
for path in components_paths:
path_str = str(path)
if path_str in processed_paths:
continue
custom_component_dict = await abuild_custom_component_list_from_path(path_str)
if custom_component_dict:
category = next(iter(custom_component_dict))
logger.info(f"Loading {len(custom_component_dict[category])} component(s) from category {category}")
custom_components_from_file = merge_nested_dicts_with_renaming(
custom_components_from_file, custom_component_dict
)
processed_paths.add(path_str)
return custom_components_from_file
def update_field_dict(
custom_component_instance: "CustomComponent",
field_dict: Dict,
@ -446,6 +486,11 @@ def sanitize_field_config(field_config: Union[Dict, Input]):
"show",
]:
field_dict.pop(key, None)
# Remove field_type and type because they were extracted already
field_dict.pop("field_type", None)
field_dict.pop("type", None)
return field_dict

View file

@ -1,4 +1,10 @@
from langflow.custom.utils import build_custom_components
from langflow.custom.utils import abuild_custom_components, build_custom_components
async def aget_all_types_dict(components_paths):
"""Get all types dictionary combining native and custom components."""
custom_components_from_file = await abuild_custom_components(components_paths=components_paths)
return custom_components_from_file
def get_all_types_dict(components_paths):