Merge branch 'zustand/io/migration' of github.com:logspace-ai/langflow into zustand/io/migration

This commit is contained in:
igorrCarvalho 2024-03-12 16:30:14 -03:00
commit 141339d9d3
7 changed files with 81 additions and 70 deletions

View file

@ -1,5 +1,5 @@
from http import HTTPStatus
from typing import Annotated, Any, List, Optional, Union
from typing import Annotated, List, Optional, Union
import sqlalchemy as sa
from fastapi import APIRouter, Body, Depends, HTTPException, UploadFile, status
@ -14,8 +14,10 @@ from langflow.api.v1.schemas import (
RunResponse,
TaskStatusResponse,
Tweaks,
UpdateCustomComponentRequest,
UploadFileResponse,
)
from langflow.graph.schema import RunOutputs
from langflow.interface.custom.custom_component import CustomComponent
from langflow.interface.custom.directory_reader import DirectoryReader
from langflow.interface.custom.utils import build_custom_component_template
@ -24,12 +26,7 @@ from langflow.services.auth.utils import api_key_security, get_current_active_us
from langflow.services.cache.utils import save_uploaded_file
from langflow.services.database.models.flow import Flow
from langflow.services.database.models.user.model import User
from langflow.services.deps import (
get_session,
get_session_service,
get_settings_service,
get_task_service,
)
from langflow.services.deps import get_session, get_session_service, get_settings_service, get_task_service
from langflow.services.session.service import SessionService
from langflow.services.task.service import TaskService
@ -56,8 +53,8 @@ def get_all(
async def run_flow_with_caching(
session: Annotated[Session, Depends(get_session)],
flow_id: str,
inputs: Optional[List[InputValueRequest]] = None,
outputs: Optional[List[str]] = None,
inputs: Optional[List[InputValueRequest]] = [],
outputs: Optional[List[str]] = [],
tweaks: Annotated[Optional[Tweaks], Body(embed=True)] = None, # noqa: F821
stream: Annotated[bool, Body(embed=True)] = False, # noqa: F821
session_id: Annotated[Union[None, str], Body(embed=True)] = None, # noqa: F821
@ -102,25 +99,20 @@ async def run_flow_with_caching(
This endpoint facilitates complex flow executions with customized inputs, outputs, and configurations, catering to diverse application requirements.
"""
try:
if inputs is not None:
input_values: list[dict[str, Union[str, list[str]]]] = [_input.model_dump() for _input in inputs]
else:
input_values = [{}]
if outputs is None:
outputs = []
if session_id:
session_data = await session_service.load_session(session_id, flow_id=flow_id)
graph, artifacts = session_data if session_data else (None, None)
task_result: Any = None
task_result: List[RunOutputs] = []
if not graph:
raise ValueError("Graph not found in the session")
task_result, session_id = await run_graph(
graph=graph,
flow_id=flow_id,
session_id=session_id,
inputs=input_values,
inputs=inputs,
outputs=outputs,
artifacts=artifacts,
session_service=session_service,
@ -142,7 +134,7 @@ async def run_flow_with_caching(
graph=graph_data,
flow_id=flow_id,
session_id=session_id,
inputs=input_values,
inputs=inputs,
outputs=outputs,
artifacts={},
session_service=session_service,
@ -283,7 +275,7 @@ async def reload_custom_component(path: str, user: User = Depends(get_current_ac
@router.post("/custom_component/update", status_code=HTTPStatus.OK)
async def custom_component_update(
code_request: CustomComponentRequest,
code_request: UpdateCustomComponentRequest,
user: User = Depends(get_current_active_user),
):
"""
@ -300,15 +292,21 @@ async def custom_component_update(
dict: The updated custom component node.
"""
component = CustomComponent(code=code_request.code)
try:
component = CustomComponent(code=code_request.code)
component_node, cc_instance = build_custom_component_template(
component,
user_id=user.id,
)
updated_build_config = cc_instance.update_build_config(
code_request.template, code_request.field_value, code_request.field_name
)
component_node["template"] = updated_build_config
component_node, cc_instance = build_custom_component_template(
component,
user_id=user.id,
)
return component_node
updated_build_config = cc_instance.update_build_config(
build_config=code_request.get_template(),
field_value=code_request.field_value,
field_name=code_request.field,
)
component_node["template"] = updated_build_config
return component_node
except Exception as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc

View file

@ -9,11 +9,11 @@ from pydantic import (
ConfigDict,
Field,
RootModel,
field_serializer,
field_validator,
model_serializer,
)
from langflow.graph.schema import RunOutputs
from langflow.schema import dotdict
from langflow.services.database.models.api_key.model import ApiKeyRead
from langflow.services.database.models.base import orjson_dumps
@ -58,7 +58,7 @@ class ProcessResponse(BaseModel):
class RunResponse(BaseModel):
"""Run response schema."""
outputs: Optional[List[Any]] = None
outputs: Optional[List[RunOutputs]] = []
session_id: Optional[str] = None
@model_serializer(mode="wrap")
@ -173,14 +173,16 @@ class StreamData(BaseModel):
class CustomComponentRequest(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
code: str
field: Optional[str] = None
field_value: Optional[Any] = None
template: Optional[dict] = None
frontend_node: Optional[dict] = None
@field_serializer("template")
def template_into_dotdict(v):
return dotdict(v)
class UpdateCustomComponentRequest(CustomComponentRequest):
field: str
field_value: Optional[Union[str, int, float, bool, dict, list]] = None
template: dict
def get_template(self):
return dotdict(self.template)
class CustomComponentResponseError(BaseModel):

View file

@ -9,7 +9,7 @@ from langflow.graph.edge.base import ContractEdge
from langflow.graph.graph.constants import lazy_load_vertex_dict
from langflow.graph.graph.state_manager import GraphStateManager
from langflow.graph.graph.utils import process_flow
from langflow.graph.schema import INPUT_FIELD_NAME, InterfaceComponentTypes
from langflow.graph.schema import INPUT_FIELD_NAME, InterfaceComponentTypes, RunOutputs
from langflow.graph.vertex.base import Vertex
from langflow.graph.vertex.types import (
ChatVertex,
@ -184,13 +184,13 @@ class Graph:
async def run(
self,
inputs: list[Dict[str, Union[str, list[str]]]],
inputs: list[Dict[str, str]],
inputs_components: Optional[list[list[str]]] = None,
outputs: Optional[list[str]] = None,
session_id: Optional[str] = None,
stream: bool = False,
) -> List[List[Optional["ResultData"]]]:
) -> List[RunOutputs]:
"""Runs the graph with the given inputs."""
# inputs is {"message": "Hello, world!"}
# we need to go through self.inputs and update the self._raw_params
# of the vertices that are inputs
@ -198,32 +198,24 @@ class Graph:
vertex_outputs = []
if not isinstance(inputs, list):
inputs = [inputs]
for input_dict in inputs:
components: Union[str, list[str]] = input_dict.get("components", [])
for run_inputs, components in zip(inputs, inputs_components or []):
if components and not isinstance(components, list):
raise ValueError(f"Invalid components value: {components}. Expected list")
elif components is None:
components = []
if INPUT_FIELD_NAME not in input_dict:
input_value = ""
else:
_input_value = input_dict[INPUT_FIELD_NAME]
if isinstance(_input_value, str):
input_value = _input_value
else:
raise ValueError(f"Invalid input value: {input_value}. Expected string")
if not isinstance(run_inputs.get(INPUT_FIELD_NAME, ""), str):
raise ValueError(f"Invalid input value: {run_inputs.get(INPUT_FIELD_NAME)}. Expected string")
run_outputs = await self._run(
inputs={INPUT_FIELD_NAME: input_value},
inputs=run_inputs,
input_components=components,
outputs=outputs or [],
stream=stream,
session_id=session_id or "",
)
logger.debug(f"Run outputs: {run_outputs}")
vertex_outputs.append(run_outputs)
run_output_object = RunOutputs(inputs=run_inputs, outputs=run_outputs)
logger.debug(f"Run outputs: {run_output_object}")
vertex_outputs.append(run_output_object)
return vertex_outputs
# vertices_layers is a list of lists ordered by the order the vertices

View file

@ -1,5 +1,5 @@
from enum import Enum
from typing import Any, Optional
from typing import Any, List, Optional
from pydantic import BaseModel, Field, field_serializer
@ -50,3 +50,8 @@ OUTPUT_COMPONENTS = [
]
INPUT_FIELD_NAME = "input_value"
class RunOutputs(BaseModel):
inputs: dict = Field(default_factory=dict)
outputs: List[Optional[ResultData]] = Field(default_factory=list)

View file

@ -366,9 +366,10 @@ class CustomComponent(Component):
if not flow_id:
raise ValueError(f"Flow {flow_name} not found")
if isinstance(input_value, str):
input_value = [input_value]
graph = await self.load_flow(flow_id, tweaks)
input_value_dict = [{"input_value": input_value}]
input_value_dict = [{"input_value": input_val} for input_val in input_value]
return await graph.run(input_value_dict, stream=False)
def list_flows(self, *, get_session: Optional[Callable] = None) -> List[Record]:

View file

@ -248,7 +248,7 @@ def run_build_config(
try:
if custom_component.code is None:
return {}
raise ValueError("Code is None")
elif isinstance(custom_component.code, str):
custom_class = eval_custom_component_code(custom_component.code)
else:

View file

@ -11,13 +11,14 @@ from loguru import logger
from pydantic import BaseModel
from langflow.graph.graph.base import Graph
from langflow.graph.schema import INPUT_FIELD_NAME, RunOutputs
from langflow.graph.vertex.base import Vertex
from langflow.interface.custom.custom_component import CustomComponent
from langflow.interface.run import get_memory_key, update_memory_keys
from langflow.services.session.service import SessionService
if TYPE_CHECKING:
from langflow.api.v1.schemas import Tweaks
from langflow.api.v1.schemas import InputValueRequest, Tweaks
def fix_memory_inputs(langchain_object):
@ -200,31 +201,43 @@ async def run_graph(
flow_id: str,
stream: bool,
session_id: Optional[str] = None,
inputs: Optional[list[dict[str, Union[List[str], str]]]] = None,
inputs: Optional[List["InputValueRequest"]] = None,
outputs: Optional[List[str]] = None,
artifacts: Optional[Dict[str, Any]] = None,
session_service: Optional[SessionService] = None,
):
) -> tuple[List[RunOutputs], str]:
"""Run the graph and generate the result"""
inputs = inputs or []
if isinstance(graph, dict):
graph_data = graph
graph = Graph.from_payload(graph, flow_id=flow_id)
else:
graph_data = graph._graph_data
if session_id is None and session_service is not None:
session_id = session_service.generate_key(session_id=flow_id, data_graph=graph_data)
if inputs is None:
inputs = [{}]
session_id_str = session_service.generate_key(session_id=flow_id, data_graph=graph_data)
elif session_id is not None:
session_id_str = session_id
else:
raise ValueError("session_id or session_service must be provided")
components = []
inputs_list = []
for input_value_request in inputs:
if input_value_request.input_value is None:
logger.warning("InputValueRequest input_value cannot be None, defaulting to an empty string.")
input_value_request.input_value = ""
components.append(input_value_request.components or [])
inputs_list.append({INPUT_FIELD_NAME: input_value_request.input_value})
run_outputs = await graph.run(
inputs,
inputs_list,
components,
outputs or [],
stream=stream,
session_id=session_id or "",
session_id=session_id_str or "",
)
if session_id and session_service:
session_service.update_session(session_id, (graph, artifacts))
return run_outputs, session_id
if session_id_str and session_service:
session_service.update_session(session_id_str, (graph, artifacts))
return run_outputs, session_id_str
def validate_input(