diff --git a/src/backend/langflow/api/v1/chat.py b/src/backend/langflow/api/v1/chat.py index 91b4bcd65..10b9d9e38 100644 --- a/src/backend/langflow/api/v1/chat.py +++ b/src/backend/langflow/api/v1/chat.py @@ -1,10 +1,11 @@ import time import uuid -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, Annotated, Optional from fastapi import ( APIRouter, BackgroundTasks, + Body, Depends, HTTPException, WebSocket, @@ -21,6 +22,7 @@ from langflow.api.utils import ( format_exception_message, ) from langflow.api.v1.schemas import ( + InputValueRequest, ResultDataResponse, StreamData, VertexBuildResponse, @@ -139,10 +141,12 @@ async def build_vertex( flow_id: str, vertex_id: str, background_tasks: BackgroundTasks, + inputs: Annotated[InputValueRequest, Body(embed=True)] = None, chat_service: "ChatService" = Depends(get_chat_service), current_user=Depends(get_current_active_user), ): """Build a vertex instead of the entire graph.""" + {"inputs": {"input_value": "some value"}} start_time = time.perf_counter() try: start_time = time.perf_counter() @@ -163,7 +167,8 @@ async def build_vertex( vertex = graph.get_vertex(vertex_id) try: if not vertex.pinned or not vertex._built: - await vertex.build(user_id=current_user.id) + inputs_dict = inputs.model_dump() if inputs else {} + await vertex.build(user_id=current_user.id, inputs=inputs_dict) if vertex.result is not None: params = vertex._built_object_repr() @@ -176,7 +181,7 @@ async def build_vertex( result_data_response = ResultDataResponse(**result_dict.model_dump()) except Exception as exc: - logger.error(f"Error building vertex: {exc}") + logger.exception(f"Error building vertex: {exc}") params = format_exception_message(exc) valid = False result_data_response = ResultDataResponse(results={}) diff --git a/src/backend/langflow/api/v1/schemas.py b/src/backend/langflow/api/v1/schemas.py index 0092efa4e..c85f946da 100644 --- a/src/backend/langflow/api/v1/schemas.py +++ b/src/backend/langflow/api/v1/schemas.py @@ -261,3 +261,7 @@ class VertexBuildResponse(BaseModel): class VerticesBuiltResponse(BaseModel): vertices: List[VertexBuildResponse] + + +class InputValueRequest(BaseModel): + input_value: str diff --git a/src/backend/langflow/components/documentloaders/GatherRecords.py b/src/backend/langflow/components/documentloaders/GatherRecords.py index e25a2d189..745d0655b 100644 --- a/src/backend/langflow/components/documentloaders/GatherRecords.py +++ b/src/backend/langflow/components/documentloaders/GatherRecords.py @@ -1,6 +1,6 @@ from concurrent import futures from pathlib import Path -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional from langflow import CustomComponent from langflow.schema import Record @@ -12,21 +12,30 @@ class GatherRecordsComponent(CustomComponent): def build_config(self) -> Dict[str, Any]: return { + "path": {"display_name": "Path"}, + "types": { + "display_name": "Types", + "info": "File types to load. Leave empty to load all types.", + }, + "depth": {"display_name": "Depth", "info": "Depth to search for files."}, + "max_concurrency": {"display_name": "Max Concurrency", "advanced": True}, "load_hidden": { - "display_name": "Load Hidden Files", - "value": False, + "display_name": "Load Hidden", "advanced": True, + "info": "If true, hidden files will be loaded.", }, - "max_concurrency": { - "display_name": "Max Concurrency", - "value": 10, + "recursive": { + "display_name": "Recursive", "advanced": True, + "info": "If true, the search will be recursive.", + }, + "silent_errors": { + "display_name": "Silent Errors", + "advanced": True, + "info": "If true, errors will not raise an exception.", }, - "path": {"display_name": "Local Directory"}, - "recursive": {"display_name": "Recursive", "value": True, "advanced": True}, "use_multithreading": { "display_name": "Use Multithreading", - "value": True, "advanced": True, }, } @@ -61,7 +70,9 @@ class GatherRecordsComponent(CustomComponent): glob = "**/*" if recursive else "*" paths = walk_level(path_obj, depth) if depth else path_obj.glob(glob) - file_paths = [str(p) for p in paths if p.is_file() and match_types(p) and is_not_hidden(p)] + file_paths = [ + str(p) for p in paths if p.is_file() and match_types(p) and is_not_hidden(p) + ] return file_paths @@ -91,13 +102,20 @@ class GatherRecordsComponent(CustomComponent): use_multithreading: bool, ) -> List[Record]: if use_multithreading: - records = self.parallel_load_records(file_paths, silent_errors, max_concurrency) + records = self.parallel_load_records( + file_paths, silent_errors, max_concurrency + ) else: - records = [self.parse_file_to_record(file_path, silent_errors) for file_path in file_paths] + records = [ + self.parse_file_to_record(file_path, silent_errors) + for file_path in file_paths + ] records = list(filter(None, records)) return records - def parallel_load_records(self, file_paths: List[str], silent_errors: bool, max_concurrency: int) -> List[Record]: + def parallel_load_records( + self, file_paths: List[str], silent_errors: bool, max_concurrency: int + ) -> List[Record]: with futures.ThreadPoolExecutor(max_workers=max_concurrency) as executor: loaded_files = executor.map( lambda file_path: self.parse_file_to_record(file_path, silent_errors), @@ -108,7 +126,7 @@ class GatherRecordsComponent(CustomComponent): def build( self, path: str, - types: List[str] = None, + types: Optional[List[str]] = None, depth: int = 0, max_concurrency: int = 2, load_hidden: bool = False, @@ -116,14 +134,23 @@ class GatherRecordsComponent(CustomComponent): silent_errors: bool = False, use_multithreading: bool = True, ) -> List[Record]: + if types is None: + types = [] resolved_path = self.resolve_path(path) - file_paths = self.retrieve_file_paths(resolved_path, types, load_hidden, recursive, depth) + file_paths = self.retrieve_file_paths( + resolved_path, types, load_hidden, recursive, depth + ) loaded_records = [] if use_multithreading: - loaded_records = self.parallel_load_records(file_paths, silent_errors, max_concurrency) + loaded_records = self.parallel_load_records( + file_paths, silent_errors, max_concurrency + ) else: - loaded_records = [self.parse_file_to_record(file_path, silent_errors) for file_path in file_paths] + loaded_records = [ + self.parse_file_to_record(file_path, silent_errors) + for file_path in file_paths + ] loaded_records = list(filter(None, loaded_records)) self.status = loaded_records return loaded_records diff --git a/src/backend/langflow/components/io/MessageHistory.py b/src/backend/langflow/components/io/MessageHistory.py index 345de4ba0..e9c3f1a87 100644 --- a/src/backend/langflow/components/io/MessageHistory.py +++ b/src/backend/langflow/components/io/MessageHistory.py @@ -12,7 +12,7 @@ class MessageHistoryComponent(CustomComponent): def build_config(self): return { "sender": { - "options": ["Machine", "User"], + "options": ["Machine", "User", "Machine and User"], "display_name": "Sender Type", }, "sender_name": {"display_name": "Sender Name"}, @@ -38,6 +38,8 @@ class MessageHistoryComponent(CustomComponent): session_id: Optional[str] = None, n_messages: int = 5, ) -> List[Record]: + if sender == "Machine and User": + sender = None messages = get_messages( sender=sender, sender_name=sender_name, diff --git a/src/backend/langflow/graph/vertex/base.py b/src/backend/langflow/graph/vertex/base.py index b917889a4..fe2b29a02 100644 --- a/src/backend/langflow/graph/vertex/base.py +++ b/src/backend/langflow/graph/vertex/base.py @@ -2,13 +2,16 @@ import ast import inspect import types from enum import Enum -from typing import (TYPE_CHECKING, Any, Callable, Coroutine, Dict, List, - Optional) +from typing import TYPE_CHECKING, Any, Callable, Coroutine, Dict, List, Optional from loguru import logger -from langflow.graph.schema import (INPUT_COMPONENTS, OUTPUT_COMPONENTS, - InterfaceComponentTypes, ResultData) +from langflow.graph.schema import ( + INPUT_COMPONENTS, + OUTPUT_COMPONENTS, + InterfaceComponentTypes, + ResultData, +) from langflow.graph.utils import UnbuiltObject, UnbuiltResult from langflow.graph.vertex.utils import generate_result from langflow.interface.initialize import loading @@ -389,6 +392,8 @@ class Vertex: ValueError: If any key in new_params is not found in self._raw_params. """ # First check if the input_value in _raw_params is not a vertex + if not new_params: + return if any(isinstance(self._raw_params.get(key), Vertex) for key in new_params): return self._raw_params.update(new_params) @@ -454,7 +459,7 @@ class Vertex: await self._build_node_and_update_params(key, value, user_id) elif isinstance(value, list) and self._is_list_of_nodes(value): await self._build_list_of_nodes_and_update_params(key, value, user_id) - elif key not in self.params: + elif key not in self.params or self.updated_raw_params: self.params[key] = value def _is_node(self, value): @@ -608,6 +613,7 @@ class Vertex: async def build( self, user_id=None, + inputs: Optional[Dict[str, Any]] = None, requester: Optional["Vertex"] = None, **kwargs, ) -> Any: @@ -620,6 +626,9 @@ class Vertex: return self.get_requester_result(requester) self._reset() + if self.is_input: + self.update_raw_params(inputs) + # Run steps for step in self.steps: if step not in self.steps_ran: diff --git a/src/backend/langflow/interface/custom/directory_reader/directory_reader.py b/src/backend/langflow/interface/custom/directory_reader/directory_reader.py index 57bacc9bc..278d014c3 100644 --- a/src/backend/langflow/interface/custom/directory_reader/directory_reader.py +++ b/src/backend/langflow/interface/custom/directory_reader/directory_reader.py @@ -1,6 +1,7 @@ import ast import os import zlib +from pathlib import Path from loguru import logger @@ -79,9 +80,13 @@ class DirectoryReader: except Exception as e: logger.error(f"Error while loading component: {e}") continue - items.append({"name": menu["name"], "path": menu["path"], "components": components}) + items.append( + {"name": menu["name"], "path": menu["path"], "components": components} + ) filtered = [menu for menu in items if menu["components"]] - logger.debug(f'Filtered components {"with errors" if with_errors else ""}: {len(filtered)}') + logger.debug( + f'Filtered components {"with errors" if with_errors else ""}: {len(filtered)}' + ) return {"menu": filtered} def validate_code(self, file_content): @@ -114,15 +119,24 @@ class DirectoryReader: Walk through the directory path and return a list of all .py files. """ if not (safe_path := self.get_safe_path()): - raise CustomComponentPathValueError(f"The path needs to start with '{self.base_path}'.") + raise CustomComponentPathValueError( + f"The path needs to start with '{self.base_path}'." + ) file_list = [] - for root, _, files in os.walk(safe_path): - file_list.extend( - os.path.join(root, filename) - for filename in files - if filename.endswith(".py") and not filename.startswith("__") - ) + safe_path_obj = Path(safe_path) + for file_path in safe_path_obj.rglob("*.py"): + # The other condtion is that it should be + # in the safe_path/[folder]/[file].py format + # any folders below [folder] will be ignored + # basically the parent folder of the file should be a + # folder in the safe_path + if ( + file_path.is_file() + and file_path.parent.parent == safe_path_obj + and not file_path.name.startswith("__") + ): + file_list.append(str(file_path)) return file_list def find_menu(self, response, menu_name): @@ -159,7 +173,9 @@ class DirectoryReader: for node in ast.walk(module): if isinstance(node, ast.FunctionDef): for arg in node.args.args: - if self._is_type_hint_in_arg_annotation(arg.annotation, type_hint_name): + if self._is_type_hint_in_arg_annotation( + arg.annotation, type_hint_name + ): return True except SyntaxError: # Returns False if the code is not valid Python @@ -177,14 +193,16 @@ class DirectoryReader: and annotation.value.id == type_hint_name ) - def is_type_hint_used_but_not_imported(self, type_hint_name: str, code: str) -> bool: + def is_type_hint_used_but_not_imported( + self, type_hint_name: str, code: str + ) -> bool: """ Check if a type hint is used but not imported in the given code. """ try: - return self._is_type_hint_used_in_args(type_hint_name, code) and not self._is_type_hint_imported( + return self._is_type_hint_used_in_args( type_hint_name, code - ) + ) and not self._is_type_hint_imported(type_hint_name, code) except SyntaxError: # Returns True if there's something wrong with the code # TODO : Find a better way to handle this @@ -205,9 +223,9 @@ class DirectoryReader: return False, "Syntax error" elif not self.validate_build(file_content): return False, "Missing build function" - elif self._is_type_hint_used_in_args("Optional", file_content) and not self._is_type_hint_imported( + elif self._is_type_hint_used_in_args( "Optional", file_content - ): + ) and not self._is_type_hint_imported("Optional", file_content): return ( False, "Type hint 'Optional' is used but not imported in the code.", @@ -223,7 +241,9 @@ class DirectoryReader: from the .py files in the directory. """ response = {"menu": []} - logger.debug("-------------------- Building component menu list --------------------") + logger.debug( + "-------------------- Building component menu list --------------------" + ) for file_path in file_paths: menu_name = os.path.basename(os.path.dirname(file_path)) @@ -243,7 +263,9 @@ class DirectoryReader: # first check if it's already CamelCase if "_" in component_name: - component_name_camelcase = " ".join(word.title() for word in component_name.split("_")) + component_name_camelcase = " ".join( + word.title() for word in component_name.split("_") + ) else: component_name_camelcase = component_name @@ -251,7 +273,9 @@ class DirectoryReader: try: output_types = self.get_output_types_from_code(result_content) except Exception as exc: - logger.exception(f"Error while getting output types from code: {str(exc)}") + logger.exception( + f"Error while getting output types from code: {str(exc)}" + ) output_types = [component_name_camelcase] else: output_types = [component_name_camelcase] @@ -267,7 +291,9 @@ class DirectoryReader: if menu_result not in response["menu"]: response["menu"].append(menu_result) - logger.debug("-------------------- Component menu list built --------------------") + logger.debug( + "-------------------- Component menu list built --------------------" + ) return response @staticmethod diff --git a/src/backend/langflow/interface/custom/directory_reader/utils.py b/src/backend/langflow/interface/custom/directory_reader/utils.py index f7378b8d0..34defe5c3 100644 --- a/src/backend/langflow/interface/custom/directory_reader/utils.py +++ b/src/backend/langflow/interface/custom/directory_reader/utils.py @@ -1,11 +1,18 @@ -from langflow.interface.custom.directory_reader import DirectoryReader -from langflow.template.frontend_node.custom_components import CustomComponentFrontendNode from loguru import logger +from langflow.interface.custom.directory_reader import DirectoryReader +from langflow.template.frontend_node.custom_components import ( + CustomComponentFrontendNode, +) + def merge_nested_dicts_with_renaming(dict1, dict2): for key, value in dict2.items(): - if key in dict1 and isinstance(value, dict) and isinstance(dict1.get(key), dict): + if ( + key in dict1 + and isinstance(value, dict) + and isinstance(dict1.get(key), dict) + ): for sub_key, sub_value in value.items(): # if sub_key in dict1[key]: # new_key = get_new_key(dict1[key], sub_key) @@ -62,7 +69,9 @@ def build_custom_component_list_from_path(path: str): file_list = load_files_from_path(path) reader = DirectoryReader(path, False) - valid_components, invalid_components = build_and_validate_all_files(reader, file_list) + valid_components, invalid_components = build_and_validate_all_files( + reader, file_list + ) valid_menu = build_valid_menu(valid_components) invalid_menu = build_invalid_menu(invalid_components) @@ -109,7 +118,9 @@ def build_invalid_menu_items(menu_item): menu_items[component_name] = component_template logger.debug(f"Added {component_name} to invalid menu.") except Exception as exc: - logger.exception(f"Error while creating custom component [{component_name}]: {str(exc)}") + logger.exception( + f"Error while creating custom component [{component_name}]: {str(exc)}" + ) return menu_items @@ -136,12 +147,14 @@ def determine_component_name(component): def build_menu_items(menu_item): """Build menu items for a given menu.""" menu_items = {} + logger.debug(f"Building menu items for {menu_item['name']}") + logger.debug(f"Loading {len(menu_item['components'])} components") for component_name, component_template, component in menu_item["components"]: try: menu_items[component_name] = component_template - logger.debug(f"Added {component_name} to valid menu.") except Exception as exc: logger.error(f"Error loading Component: {component['output_types']}") - logger.exception(f"Error while building custom component {component['output_types']}: {exc}") - return menu_items + logger.exception( + f"Error while building custom component {component['output_types']}: {exc}" + ) return menu_items diff --git a/src/backend/langflow/services/database/service.py b/src/backend/langflow/services/database/service.py index 693da8143..9765d858f 100644 --- a/src/backend/langflow/services/database/service.py +++ b/src/backend/langflow/services/database/service.py @@ -5,16 +5,17 @@ from typing import TYPE_CHECKING import sqlalchemy as sa from alembic import command, util from alembic.config import Config +from loguru import logger +from sqlalchemy import inspect +from sqlalchemy.exc import OperationalError +from sqlmodel import Session, SQLModel, create_engine, select, text + from langflow.services.base import Service from langflow.services.database import models # noqa from langflow.services.database.models.user.crud import get_user_by_username from langflow.services.database.utils import Result, TableResults from langflow.services.deps import get_settings_service from langflow.services.utils import teardown_superuser -from loguru import logger -from sqlalchemy import inspect -from sqlalchemy.exc import OperationalError -from sqlmodel import Session, SQLModel, create_engine, select, text if TYPE_CHECKING: from sqlalchemy.engine import Engine @@ -39,7 +40,7 @@ class DatabaseService(Service): connect_args = {"check_same_thread": False} else: connect_args = {} - return create_engine(self.database_url, connect_args=connect_args, max_overflow=-1) + return create_engine(self.database_url, connect_args=connect_args) def __enter__(self): self._session = Session(self.engine) diff --git a/src/frontend/src/CustomNodes/GenericNode/index.tsx b/src/frontend/src/CustomNodes/GenericNode/index.tsx index 5632f3c7a..a536ec6d6 100644 --- a/src/frontend/src/CustomNodes/GenericNode/index.tsx +++ b/src/frontend/src/CustomNodes/GenericNode/index.tsx @@ -465,7 +465,7 @@ export default function GenericNode({ if (buildStatus === BuildStatus.BUILDING || isBuilding) return; setValidationStatus(null); - buildFlow(data.id); + buildFlow({nodeId: data.id}); }} >