diff --git a/src/backend/langflow/api/v1/endpoints.py b/src/backend/langflow/api/v1/endpoints.py index 44a12a0d5..cea43c496 100644 --- a/src/backend/langflow/api/v1/endpoints.py +++ b/src/backend/langflow/api/v1/endpoints.py @@ -1,5 +1,5 @@ from http import HTTPStatus -from typing import Annotated, Any, List, Optional, Union +from typing import Annotated, List, Optional, Union import sqlalchemy as sa from fastapi import APIRouter, Body, Depends, HTTPException, UploadFile, status @@ -14,8 +14,10 @@ from langflow.api.v1.schemas import ( RunResponse, TaskStatusResponse, Tweaks, + UpdateCustomComponentRequest, UploadFileResponse, ) +from langflow.graph.schema import RunOutputs from langflow.interface.custom.custom_component import CustomComponent from langflow.interface.custom.directory_reader import DirectoryReader from langflow.interface.custom.utils import build_custom_component_template @@ -24,12 +26,7 @@ from langflow.services.auth.utils import api_key_security, get_current_active_us from langflow.services.cache.utils import save_uploaded_file from langflow.services.database.models.flow import Flow from langflow.services.database.models.user.model import User -from langflow.services.deps import ( - get_session, - get_session_service, - get_settings_service, - get_task_service, -) +from langflow.services.deps import get_session, get_session_service, get_settings_service, get_task_service from langflow.services.session.service import SessionService from langflow.services.task.service import TaskService @@ -56,8 +53,8 @@ def get_all( async def run_flow_with_caching( session: Annotated[Session, Depends(get_session)], flow_id: str, - inputs: Optional[List[InputValueRequest]] = None, - outputs: Optional[List[str]] = None, + inputs: Optional[List[InputValueRequest]] = [], + outputs: Optional[List[str]] = [], tweaks: Annotated[Optional[Tweaks], Body(embed=True)] = None, # noqa: F821 stream: Annotated[bool, Body(embed=True)] = False, # noqa: F821 session_id: Annotated[Union[None, str], Body(embed=True)] = None, # noqa: F821 @@ -102,25 +99,20 @@ async def run_flow_with_caching( This endpoint facilitates complex flow executions with customized inputs, outputs, and configurations, catering to diverse application requirements. """ try: - if inputs is not None: - input_values: list[dict[str, Union[str, list[str]]]] = [_input.model_dump() for _input in inputs] - else: - input_values = [{}] - if outputs is None: outputs = [] if session_id: session_data = await session_service.load_session(session_id, flow_id=flow_id) graph, artifacts = session_data if session_data else (None, None) - task_result: Any = None + task_result: List[RunOutputs] = [] if not graph: raise ValueError("Graph not found in the session") task_result, session_id = await run_graph( graph=graph, flow_id=flow_id, session_id=session_id, - inputs=input_values, + inputs=inputs, outputs=outputs, artifacts=artifacts, session_service=session_service, @@ -142,7 +134,7 @@ async def run_flow_with_caching( graph=graph_data, flow_id=flow_id, session_id=session_id, - inputs=input_values, + inputs=inputs, outputs=outputs, artifacts={}, session_service=session_service, @@ -283,7 +275,7 @@ async def reload_custom_component(path: str, user: User = Depends(get_current_ac @router.post("/custom_component/update", status_code=HTTPStatus.OK) async def custom_component_update( - code_request: CustomComponentRequest, + code_request: UpdateCustomComponentRequest, user: User = Depends(get_current_active_user), ): """ @@ -300,15 +292,21 @@ async def custom_component_update( dict: The updated custom component node. """ - component = CustomComponent(code=code_request.code) + try: + component = CustomComponent(code=code_request.code) - component_node, cc_instance = build_custom_component_template( - component, - user_id=user.id, - ) - updated_build_config = cc_instance.update_build_config( - code_request.template, code_request.field_value, code_request.field_name - ) - component_node["template"] = updated_build_config + component_node, cc_instance = build_custom_component_template( + component, + user_id=user.id, + ) - return component_node + updated_build_config = cc_instance.update_build_config( + build_config=code_request.get_template(), + field_value=code_request.field_value, + field_name=code_request.field, + ) + component_node["template"] = updated_build_config + + return component_node + except Exception as exc: + raise HTTPException(status_code=400, detail=str(exc)) from exc diff --git a/src/backend/langflow/api/v1/schemas.py b/src/backend/langflow/api/v1/schemas.py index e28d9bca1..784f5b554 100644 --- a/src/backend/langflow/api/v1/schemas.py +++ b/src/backend/langflow/api/v1/schemas.py @@ -9,11 +9,11 @@ from pydantic import ( ConfigDict, Field, RootModel, - field_serializer, field_validator, model_serializer, ) +from langflow.graph.schema import RunOutputs from langflow.schema import dotdict from langflow.services.database.models.api_key.model import ApiKeyRead from langflow.services.database.models.base import orjson_dumps @@ -58,7 +58,7 @@ class ProcessResponse(BaseModel): class RunResponse(BaseModel): """Run response schema.""" - outputs: Optional[List[Any]] = None + outputs: Optional[List[RunOutputs]] = [] session_id: Optional[str] = None @model_serializer(mode="wrap") @@ -173,14 +173,16 @@ class StreamData(BaseModel): class CustomComponentRequest(BaseModel): model_config = ConfigDict(arbitrary_types_allowed=True) code: str - field: Optional[str] = None - field_value: Optional[Any] = None - template: Optional[dict] = None frontend_node: Optional[dict] = None - @field_serializer("template") - def template_into_dotdict(v): - return dotdict(v) + +class UpdateCustomComponentRequest(CustomComponentRequest): + field: str + field_value: Optional[Union[str, int, float, bool, dict, list]] = None + template: dict + + def get_template(self): + return dotdict(self.template) class CustomComponentResponseError(BaseModel): diff --git a/src/backend/langflow/graph/graph/base.py b/src/backend/langflow/graph/graph/base.py index f8d20d599..109cf170c 100644 --- a/src/backend/langflow/graph/graph/base.py +++ b/src/backend/langflow/graph/graph/base.py @@ -9,7 +9,7 @@ from langflow.graph.edge.base import ContractEdge from langflow.graph.graph.constants import lazy_load_vertex_dict from langflow.graph.graph.state_manager import GraphStateManager from langflow.graph.graph.utils import process_flow -from langflow.graph.schema import INPUT_FIELD_NAME, InterfaceComponentTypes +from langflow.graph.schema import INPUT_FIELD_NAME, InterfaceComponentTypes, RunOutputs from langflow.graph.vertex.base import Vertex from langflow.graph.vertex.types import ( ChatVertex, @@ -184,13 +184,13 @@ class Graph: async def run( self, - inputs: list[Dict[str, Union[str, list[str]]]], + inputs: list[Dict[str, str]], + inputs_components: Optional[list[list[str]]] = None, outputs: Optional[list[str]] = None, session_id: Optional[str] = None, stream: bool = False, - ) -> List[List[Optional["ResultData"]]]: + ) -> List[RunOutputs]: """Runs the graph with the given inputs.""" - # inputs is {"message": "Hello, world!"} # we need to go through self.inputs and update the self._raw_params # of the vertices that are inputs @@ -198,32 +198,24 @@ class Graph: vertex_outputs = [] if not isinstance(inputs, list): inputs = [inputs] - for input_dict in inputs: - components: Union[str, list[str]] = input_dict.get("components", []) - + for run_inputs, components in zip(inputs, inputs_components or []): if components and not isinstance(components, list): raise ValueError(f"Invalid components value: {components}. Expected list") elif components is None: components = [] - if INPUT_FIELD_NAME not in input_dict: - input_value = "" - else: - _input_value = input_dict[INPUT_FIELD_NAME] - if isinstance(_input_value, str): - input_value = _input_value - else: - raise ValueError(f"Invalid input value: {input_value}. Expected string") - + if not isinstance(run_inputs.get(INPUT_FIELD_NAME, ""), str): + raise ValueError(f"Invalid input value: {run_inputs.get(INPUT_FIELD_NAME)}. Expected string") run_outputs = await self._run( - inputs={INPUT_FIELD_NAME: input_value}, + inputs=run_inputs, input_components=components, outputs=outputs or [], stream=stream, session_id=session_id or "", ) - logger.debug(f"Run outputs: {run_outputs}") - vertex_outputs.append(run_outputs) + run_output_object = RunOutputs(inputs=run_inputs, outputs=run_outputs) + logger.debug(f"Run outputs: {run_output_object}") + vertex_outputs.append(run_output_object) return vertex_outputs # vertices_layers is a list of lists ordered by the order the vertices diff --git a/src/backend/langflow/graph/schema.py b/src/backend/langflow/graph/schema.py index cf1afc3fe..dc299e220 100644 --- a/src/backend/langflow/graph/schema.py +++ b/src/backend/langflow/graph/schema.py @@ -1,5 +1,5 @@ from enum import Enum -from typing import Any, Optional +from typing import Any, List, Optional from pydantic import BaseModel, Field, field_serializer @@ -50,3 +50,8 @@ OUTPUT_COMPONENTS = [ ] INPUT_FIELD_NAME = "input_value" + + +class RunOutputs(BaseModel): + inputs: dict = Field(default_factory=dict) + outputs: List[Optional[ResultData]] = Field(default_factory=list) diff --git a/src/backend/langflow/interface/custom/custom_component/custom_component.py b/src/backend/langflow/interface/custom/custom_component/custom_component.py index 0b45eb9d2..b19fbbf30 100644 --- a/src/backend/langflow/interface/custom/custom_component/custom_component.py +++ b/src/backend/langflow/interface/custom/custom_component/custom_component.py @@ -366,9 +366,10 @@ class CustomComponent(Component): if not flow_id: raise ValueError(f"Flow {flow_name} not found") - + if isinstance(input_value, str): + input_value = [input_value] graph = await self.load_flow(flow_id, tweaks) - input_value_dict = [{"input_value": input_value}] + input_value_dict = [{"input_value": input_val} for input_val in input_value] return await graph.run(input_value_dict, stream=False) def list_flows(self, *, get_session: Optional[Callable] = None) -> List[Record]: diff --git a/src/backend/langflow/interface/custom/utils.py b/src/backend/langflow/interface/custom/utils.py index c9d1cfedd..636d1ccf7 100644 --- a/src/backend/langflow/interface/custom/utils.py +++ b/src/backend/langflow/interface/custom/utils.py @@ -248,7 +248,7 @@ def run_build_config( try: if custom_component.code is None: - return {} + raise ValueError("Code is None") elif isinstance(custom_component.code, str): custom_class = eval_custom_component_code(custom_component.code) else: diff --git a/src/backend/langflow/processing/process.py b/src/backend/langflow/processing/process.py index 9c312a9e0..6063afbbe 100644 --- a/src/backend/langflow/processing/process.py +++ b/src/backend/langflow/processing/process.py @@ -11,13 +11,14 @@ from loguru import logger from pydantic import BaseModel from langflow.graph.graph.base import Graph +from langflow.graph.schema import INPUT_FIELD_NAME, RunOutputs from langflow.graph.vertex.base import Vertex from langflow.interface.custom.custom_component import CustomComponent from langflow.interface.run import get_memory_key, update_memory_keys from langflow.services.session.service import SessionService if TYPE_CHECKING: - from langflow.api.v1.schemas import Tweaks + from langflow.api.v1.schemas import InputValueRequest, Tweaks def fix_memory_inputs(langchain_object): @@ -200,31 +201,43 @@ async def run_graph( flow_id: str, stream: bool, session_id: Optional[str] = None, - inputs: Optional[list[dict[str, Union[List[str], str]]]] = None, + inputs: Optional[List["InputValueRequest"]] = None, outputs: Optional[List[str]] = None, artifacts: Optional[Dict[str, Any]] = None, session_service: Optional[SessionService] = None, -): +) -> tuple[List[RunOutputs], str]: """Run the graph and generate the result""" + inputs = inputs or [] if isinstance(graph, dict): graph_data = graph graph = Graph.from_payload(graph, flow_id=flow_id) else: graph_data = graph._graph_data if session_id is None and session_service is not None: - session_id = session_service.generate_key(session_id=flow_id, data_graph=graph_data) - if inputs is None: - inputs = [{}] + session_id_str = session_service.generate_key(session_id=flow_id, data_graph=graph_data) + elif session_id is not None: + session_id_str = session_id + else: + raise ValueError("session_id or session_service must be provided") + components = [] + inputs_list = [] + for input_value_request in inputs: + if input_value_request.input_value is None: + logger.warning("InputValueRequest input_value cannot be None, defaulting to an empty string.") + input_value_request.input_value = "" + components.append(input_value_request.components or []) + inputs_list.append({INPUT_FIELD_NAME: input_value_request.input_value}) run_outputs = await graph.run( - inputs, + inputs_list, + components, outputs or [], stream=stream, - session_id=session_id or "", + session_id=session_id_str or "", ) - if session_id and session_service: - session_service.update_session(session_id, (graph, artifacts)) - return run_outputs, session_id + if session_id_str and session_service: + session_service.update_session(session_id_str, (graph, artifacts)) + return run_outputs, session_id_str def validate_input(