Merge remote-tracking branch 'origin/zustand/io/migration' into cz/fixTestsIo

This commit is contained in:
anovazzi1 2024-02-28 16:22:33 -03:00
commit a43187c49d
20 changed files with 332 additions and 189 deletions

View file

@ -1,10 +1,11 @@
import time
import uuid
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING, Annotated, Optional
from fastapi import (
APIRouter,
BackgroundTasks,
Body,
Depends,
HTTPException,
WebSocket,
@ -21,6 +22,7 @@ from langflow.api.utils import (
format_exception_message,
)
from langflow.api.v1.schemas import (
InputValueRequest,
ResultDataResponse,
StreamData,
VertexBuildResponse,
@ -139,10 +141,12 @@ async def build_vertex(
flow_id: str,
vertex_id: str,
background_tasks: BackgroundTasks,
inputs: Annotated[InputValueRequest, Body(embed=True)] = None,
chat_service: "ChatService" = Depends(get_chat_service),
current_user=Depends(get_current_active_user),
):
"""Build a vertex instead of the entire graph."""
{"inputs": {"input_value": "some value"}}
start_time = time.perf_counter()
try:
start_time = time.perf_counter()
@ -163,7 +167,8 @@ async def build_vertex(
vertex = graph.get_vertex(vertex_id)
try:
if not vertex.pinned or not vertex._built:
await vertex.build(user_id=current_user.id)
inputs_dict = inputs.model_dump() if inputs else {}
await vertex.build(user_id=current_user.id, inputs=inputs_dict)
if vertex.result is not None:
params = vertex._built_object_repr()
@ -176,7 +181,7 @@ async def build_vertex(
result_data_response = ResultDataResponse(**result_dict.model_dump())
except Exception as exc:
logger.error(f"Error building vertex: {exc}")
logger.exception(f"Error building vertex: {exc}")
params = format_exception_message(exc)
valid = False
result_data_response = ResultDataResponse(results={})

View file

@ -261,3 +261,7 @@ class VertexBuildResponse(BaseModel):
class VerticesBuiltResponse(BaseModel):
vertices: List[VertexBuildResponse]
class InputValueRequest(BaseModel):
input_value: str

View file

@ -1,6 +1,6 @@
from concurrent import futures
from pathlib import Path
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional
from langflow import CustomComponent
from langflow.schema import Record
@ -12,21 +12,30 @@ class GatherRecordsComponent(CustomComponent):
def build_config(self) -> Dict[str, Any]:
return {
"path": {"display_name": "Path"},
"types": {
"display_name": "Types",
"info": "File types to load. Leave empty to load all types.",
},
"depth": {"display_name": "Depth", "info": "Depth to search for files."},
"max_concurrency": {"display_name": "Max Concurrency", "advanced": True},
"load_hidden": {
"display_name": "Load Hidden Files",
"value": False,
"display_name": "Load Hidden",
"advanced": True,
"info": "If true, hidden files will be loaded.",
},
"max_concurrency": {
"display_name": "Max Concurrency",
"value": 10,
"recursive": {
"display_name": "Recursive",
"advanced": True,
"info": "If true, the search will be recursive.",
},
"silent_errors": {
"display_name": "Silent Errors",
"advanced": True,
"info": "If true, errors will not raise an exception.",
},
"path": {"display_name": "Local Directory"},
"recursive": {"display_name": "Recursive", "value": True, "advanced": True},
"use_multithreading": {
"display_name": "Use Multithreading",
"value": True,
"advanced": True,
},
}
@ -61,7 +70,9 @@ class GatherRecordsComponent(CustomComponent):
glob = "**/*" if recursive else "*"
paths = walk_level(path_obj, depth) if depth else path_obj.glob(glob)
file_paths = [str(p) for p in paths if p.is_file() and match_types(p) and is_not_hidden(p)]
file_paths = [
str(p) for p in paths if p.is_file() and match_types(p) and is_not_hidden(p)
]
return file_paths
@ -91,13 +102,20 @@ class GatherRecordsComponent(CustomComponent):
use_multithreading: bool,
) -> List[Record]:
if use_multithreading:
records = self.parallel_load_records(file_paths, silent_errors, max_concurrency)
records = self.parallel_load_records(
file_paths, silent_errors, max_concurrency
)
else:
records = [self.parse_file_to_record(file_path, silent_errors) for file_path in file_paths]
records = [
self.parse_file_to_record(file_path, silent_errors)
for file_path in file_paths
]
records = list(filter(None, records))
return records
def parallel_load_records(self, file_paths: List[str], silent_errors: bool, max_concurrency: int) -> List[Record]:
def parallel_load_records(
self, file_paths: List[str], silent_errors: bool, max_concurrency: int
) -> List[Record]:
with futures.ThreadPoolExecutor(max_workers=max_concurrency) as executor:
loaded_files = executor.map(
lambda file_path: self.parse_file_to_record(file_path, silent_errors),
@ -108,7 +126,7 @@ class GatherRecordsComponent(CustomComponent):
def build(
self,
path: str,
types: List[str] = None,
types: Optional[List[str]] = None,
depth: int = 0,
max_concurrency: int = 2,
load_hidden: bool = False,
@ -116,14 +134,23 @@ class GatherRecordsComponent(CustomComponent):
silent_errors: bool = False,
use_multithreading: bool = True,
) -> List[Record]:
if types is None:
types = []
resolved_path = self.resolve_path(path)
file_paths = self.retrieve_file_paths(resolved_path, types, load_hidden, recursive, depth)
file_paths = self.retrieve_file_paths(
resolved_path, types, load_hidden, recursive, depth
)
loaded_records = []
if use_multithreading:
loaded_records = self.parallel_load_records(file_paths, silent_errors, max_concurrency)
loaded_records = self.parallel_load_records(
file_paths, silent_errors, max_concurrency
)
else:
loaded_records = [self.parse_file_to_record(file_path, silent_errors) for file_path in file_paths]
loaded_records = [
self.parse_file_to_record(file_path, silent_errors)
for file_path in file_paths
]
loaded_records = list(filter(None, loaded_records))
self.status = loaded_records
return loaded_records

View file

@ -12,7 +12,7 @@ class MessageHistoryComponent(CustomComponent):
def build_config(self):
return {
"sender": {
"options": ["Machine", "User"],
"options": ["Machine", "User", "Machine and User"],
"display_name": "Sender Type",
},
"sender_name": {"display_name": "Sender Name"},
@ -38,6 +38,8 @@ class MessageHistoryComponent(CustomComponent):
session_id: Optional[str] = None,
n_messages: int = 5,
) -> List[Record]:
if sender == "Machine and User":
sender = None
messages = get_messages(
sender=sender,
sender_name=sender_name,

View file

@ -2,13 +2,16 @@ import ast
import inspect
import types
from enum import Enum
from typing import (TYPE_CHECKING, Any, Callable, Coroutine, Dict, List,
Optional)
from typing import TYPE_CHECKING, Any, Callable, Coroutine, Dict, List, Optional
from loguru import logger
from langflow.graph.schema import (INPUT_COMPONENTS, OUTPUT_COMPONENTS,
InterfaceComponentTypes, ResultData)
from langflow.graph.schema import (
INPUT_COMPONENTS,
OUTPUT_COMPONENTS,
InterfaceComponentTypes,
ResultData,
)
from langflow.graph.utils import UnbuiltObject, UnbuiltResult
from langflow.graph.vertex.utils import generate_result
from langflow.interface.initialize import loading
@ -389,6 +392,8 @@ class Vertex:
ValueError: If any key in new_params is not found in self._raw_params.
"""
# First check if the input_value in _raw_params is not a vertex
if not new_params:
return
if any(isinstance(self._raw_params.get(key), Vertex) for key in new_params):
return
self._raw_params.update(new_params)
@ -454,7 +459,7 @@ class Vertex:
await self._build_node_and_update_params(key, value, user_id)
elif isinstance(value, list) and self._is_list_of_nodes(value):
await self._build_list_of_nodes_and_update_params(key, value, user_id)
elif key not in self.params:
elif key not in self.params or self.updated_raw_params:
self.params[key] = value
def _is_node(self, value):
@ -608,6 +613,7 @@ class Vertex:
async def build(
self,
user_id=None,
inputs: Optional[Dict[str, Any]] = None,
requester: Optional["Vertex"] = None,
**kwargs,
) -> Any:
@ -620,6 +626,9 @@ class Vertex:
return self.get_requester_result(requester)
self._reset()
if self.is_input:
self.update_raw_params(inputs)
# Run steps
for step in self.steps:
if step not in self.steps_ran:

View file

@ -1,6 +1,7 @@
import ast
import os
import zlib
from pathlib import Path
from loguru import logger
@ -79,9 +80,13 @@ class DirectoryReader:
except Exception as e:
logger.error(f"Error while loading component: {e}")
continue
items.append({"name": menu["name"], "path": menu["path"], "components": components})
items.append(
{"name": menu["name"], "path": menu["path"], "components": components}
)
filtered = [menu for menu in items if menu["components"]]
logger.debug(f'Filtered components {"with errors" if with_errors else ""}: {len(filtered)}')
logger.debug(
f'Filtered components {"with errors" if with_errors else ""}: {len(filtered)}'
)
return {"menu": filtered}
def validate_code(self, file_content):
@ -114,15 +119,24 @@ class DirectoryReader:
Walk through the directory path and return a list of all .py files.
"""
if not (safe_path := self.get_safe_path()):
raise CustomComponentPathValueError(f"The path needs to start with '{self.base_path}'.")
raise CustomComponentPathValueError(
f"The path needs to start with '{self.base_path}'."
)
file_list = []
for root, _, files in os.walk(safe_path):
file_list.extend(
os.path.join(root, filename)
for filename in files
if filename.endswith(".py") and not filename.startswith("__")
)
safe_path_obj = Path(safe_path)
for file_path in safe_path_obj.rglob("*.py"):
# The other condtion is that it should be
# in the safe_path/[folder]/[file].py format
# any folders below [folder] will be ignored
# basically the parent folder of the file should be a
# folder in the safe_path
if (
file_path.is_file()
and file_path.parent.parent == safe_path_obj
and not file_path.name.startswith("__")
):
file_list.append(str(file_path))
return file_list
def find_menu(self, response, menu_name):
@ -159,7 +173,9 @@ class DirectoryReader:
for node in ast.walk(module):
if isinstance(node, ast.FunctionDef):
for arg in node.args.args:
if self._is_type_hint_in_arg_annotation(arg.annotation, type_hint_name):
if self._is_type_hint_in_arg_annotation(
arg.annotation, type_hint_name
):
return True
except SyntaxError:
# Returns False if the code is not valid Python
@ -177,14 +193,16 @@ class DirectoryReader:
and annotation.value.id == type_hint_name
)
def is_type_hint_used_but_not_imported(self, type_hint_name: str, code: str) -> bool:
def is_type_hint_used_but_not_imported(
self, type_hint_name: str, code: str
) -> bool:
"""
Check if a type hint is used but not imported in the given code.
"""
try:
return self._is_type_hint_used_in_args(type_hint_name, code) and not self._is_type_hint_imported(
return self._is_type_hint_used_in_args(
type_hint_name, code
)
) and not self._is_type_hint_imported(type_hint_name, code)
except SyntaxError:
# Returns True if there's something wrong with the code
# TODO : Find a better way to handle this
@ -205,9 +223,9 @@ class DirectoryReader:
return False, "Syntax error"
elif not self.validate_build(file_content):
return False, "Missing build function"
elif self._is_type_hint_used_in_args("Optional", file_content) and not self._is_type_hint_imported(
elif self._is_type_hint_used_in_args(
"Optional", file_content
):
) and not self._is_type_hint_imported("Optional", file_content):
return (
False,
"Type hint 'Optional' is used but not imported in the code.",
@ -223,7 +241,9 @@ class DirectoryReader:
from the .py files in the directory.
"""
response = {"menu": []}
logger.debug("-------------------- Building component menu list --------------------")
logger.debug(
"-------------------- Building component menu list --------------------"
)
for file_path in file_paths:
menu_name = os.path.basename(os.path.dirname(file_path))
@ -243,7 +263,9 @@ class DirectoryReader:
# first check if it's already CamelCase
if "_" in component_name:
component_name_camelcase = " ".join(word.title() for word in component_name.split("_"))
component_name_camelcase = " ".join(
word.title() for word in component_name.split("_")
)
else:
component_name_camelcase = component_name
@ -251,7 +273,9 @@ class DirectoryReader:
try:
output_types = self.get_output_types_from_code(result_content)
except Exception as exc:
logger.exception(f"Error while getting output types from code: {str(exc)}")
logger.exception(
f"Error while getting output types from code: {str(exc)}"
)
output_types = [component_name_camelcase]
else:
output_types = [component_name_camelcase]
@ -267,7 +291,9 @@ class DirectoryReader:
if menu_result not in response["menu"]:
response["menu"].append(menu_result)
logger.debug("-------------------- Component menu list built --------------------")
logger.debug(
"-------------------- Component menu list built --------------------"
)
return response
@staticmethod

View file

@ -1,11 +1,18 @@
from langflow.interface.custom.directory_reader import DirectoryReader
from langflow.template.frontend_node.custom_components import CustomComponentFrontendNode
from loguru import logger
from langflow.interface.custom.directory_reader import DirectoryReader
from langflow.template.frontend_node.custom_components import (
CustomComponentFrontendNode,
)
def merge_nested_dicts_with_renaming(dict1, dict2):
for key, value in dict2.items():
if key in dict1 and isinstance(value, dict) and isinstance(dict1.get(key), dict):
if (
key in dict1
and isinstance(value, dict)
and isinstance(dict1.get(key), dict)
):
for sub_key, sub_value in value.items():
# if sub_key in dict1[key]:
# new_key = get_new_key(dict1[key], sub_key)
@ -62,7 +69,9 @@ def build_custom_component_list_from_path(path: str):
file_list = load_files_from_path(path)
reader = DirectoryReader(path, False)
valid_components, invalid_components = build_and_validate_all_files(reader, file_list)
valid_components, invalid_components = build_and_validate_all_files(
reader, file_list
)
valid_menu = build_valid_menu(valid_components)
invalid_menu = build_invalid_menu(invalid_components)
@ -109,7 +118,9 @@ def build_invalid_menu_items(menu_item):
menu_items[component_name] = component_template
logger.debug(f"Added {component_name} to invalid menu.")
except Exception as exc:
logger.exception(f"Error while creating custom component [{component_name}]: {str(exc)}")
logger.exception(
f"Error while creating custom component [{component_name}]: {str(exc)}"
)
return menu_items
@ -136,12 +147,14 @@ def determine_component_name(component):
def build_menu_items(menu_item):
"""Build menu items for a given menu."""
menu_items = {}
logger.debug(f"Building menu items for {menu_item['name']}")
logger.debug(f"Loading {len(menu_item['components'])} components")
for component_name, component_template, component in menu_item["components"]:
try:
menu_items[component_name] = component_template
logger.debug(f"Added {component_name} to valid menu.")
except Exception as exc:
logger.error(f"Error loading Component: {component['output_types']}")
logger.exception(f"Error while building custom component {component['output_types']}: {exc}")
return menu_items
logger.exception(
f"Error while building custom component {component['output_types']}: {exc}"
)
return menu_items

View file

@ -5,16 +5,17 @@ from typing import TYPE_CHECKING
import sqlalchemy as sa
from alembic import command, util
from alembic.config import Config
from loguru import logger
from sqlalchemy import inspect
from sqlalchemy.exc import OperationalError
from sqlmodel import Session, SQLModel, create_engine, select, text
from langflow.services.base import Service
from langflow.services.database import models # noqa
from langflow.services.database.models.user.crud import get_user_by_username
from langflow.services.database.utils import Result, TableResults
from langflow.services.deps import get_settings_service
from langflow.services.utils import teardown_superuser
from loguru import logger
from sqlalchemy import inspect
from sqlalchemy.exc import OperationalError
from sqlmodel import Session, SQLModel, create_engine, select, text
if TYPE_CHECKING:
from sqlalchemy.engine import Engine
@ -39,7 +40,7 @@ class DatabaseService(Service):
connect_args = {"check_same_thread": False}
else:
connect_args = {}
return create_engine(self.database_url, connect_args=connect_args, max_overflow=-1)
return create_engine(self.database_url, connect_args=connect_args)
def __enter__(self):
self._session = Session(self.engine)

View file

@ -465,7 +465,7 @@ export default function GenericNode({
if (buildStatus === BuildStatus.BUILDING || isBuilding)
return;
setValidationStatus(null);
buildFlow(data.id);
buildFlow({nodeId: data.id});
}}
>
<div>

View file

@ -19,12 +19,12 @@ export default function IOInputField({
<Textarea
className="w-full"
placeholder={"Enter text..."}
value={node.data.node!.template["value"].value}
value={node.data.node!.template["input_value"].value}
onChange={(e) => {
e.target.value;
if (node) {
let newNode = cloneDeep(node);
newNode.data.node!.template["value"].value = e.target.value;
newNode.data.node!.template["input_value"].value = e.target.value;
setNode(node.id, newNode);
}
}}
@ -49,12 +49,12 @@ export default function IOInputField({
<Textarea
className="w-full custom-scroll"
placeholder={"Enter text..."}
value={node.data.node!.template["value"]}
value={node.data.node!.template["input_value"]}
onChange={(e) => {
e.target.value;
if (node) {
let newNode = cloneDeep(node);
newNode.data.node!.template["value"].value = e.target.value;
newNode.data.node!.template["input_value"].value = e.target.value;
setNode(node.id, newNode);
}
}}

View file

@ -30,12 +30,12 @@ export default function IOOutputView({
<Textarea
className="w-full custom-scroll"
placeholder={"Enter text..."}
value={node.data.node!.template["value"]}
value={node.data.node!.template["input_value"]}
onChange={(e) => {
e.target.value;
if (node) {
let newNode = cloneDeep(node);
newNode.data.node!.template["value"].value = e.target.value;
newNode.data.node!.template["input_value"].value = e.target.value;
setNode(node.id, newNode);
}
}}

View file

@ -1,9 +1,13 @@
import { cloneDeep } from "lodash";
import { useEffect, useState } from "react";
import { CHAT_FORM_DIALOG_SUBTITLE, outputsModalTitle, textInputModalTitle } from "../../constants/constants";
import {
CHAT_FORM_DIALOG_SUBTITLE,
outputsModalTitle,
textInputModalTitle,
} from "../../constants/constants";
import BaseModal from "../../modals/baseModal";
import useAlertStore from "../../stores/alertStore";
import useFlowStore from "../../stores/flowStore";
import useFlowsManagerStore from "../../stores/flowsManagerStore";
import { updateVerticesOrder } from "../../utils/buildUtils";
import { cn } from "../../utils/utils";
import AccordionComponent from "../AccordionComponent";
import IOInputField from "../IOInputField";
@ -40,27 +44,30 @@ export default function IOView({ children, open, setOpen }): JSX.Element {
{ type: string; id: string } | undefined
>(undefined);
const { getNode, setNode, buildFlow, getFlow } = useFlowStore();
const { setErrorData } = useAlertStore();
const buildFlow = useFlowStore((state) => state.buildFlow);
const setIsBuilding = useFlowStore((state) => state.setIsBuilding);
const [lockChat, setLockChat] = useState(false);
const [chatValue, setChatValue] = useState("");
const isBuilding = useFlowStore((state) => state.isBuilding);
const currentFlow = useFlowsManagerStore((state) => state.currentFlow);
async function updateVertices() {
return updateVerticesOrder(currentFlow!.id, null);
}
useEffect(() => {
if (open) {
updateVertices();
}
}, [open, currentFlow]);
async function sendMessage(count = 1): Promise<void> {
if (isBuilding) return;
const { nodes, edges } = getFlow();
setIsBuilding(true);
setLockChat(true);
setChatValue("");
const chatInputNode = nodes.find((node) => node.id === chatInput?.id);
if (chatInputNode) {
let newNode = cloneDeep(chatInputNode);
newNode.data.node!.template["input_value"].value = chatValue;
setNode(chatInput!.id, newNode);
}
for (let i = 0; i < count; i++) {
await buildFlow().catch((err) => {
await buildFlow({ input_value: chatValue }).catch((err) => {
console.error(err);
setLockChat(false);
});
@ -104,7 +111,7 @@ export default function IOView({ children, open, setOpen }): JSX.Element {
<Tabs
value={selectedTab.toString()}
className={
"flex h-full flex-col overflow-y-auto custom-scroll rounded-md border bg-muted text-center"
"flex h-full flex-col overflow-y-auto rounded-md border bg-muted text-center custom-scroll"
}
onValueChange={(value) => {
setSelectedTab(Number(value));
@ -266,24 +273,27 @@ export default function IOView({ children, open, setOpen }): JSX.Element {
{selectedViewField.type}
</div>
<div className="h-full">
{inputs.some(
(input) => input.id === selectedViewField.id
) ? (
<IOInputField
inputType={selectedViewField.type!}
inputId={selectedViewField.id!}
/>
) : (
<IOOutputView
outputType={selectedViewField.type!}
outputId={selectedViewField.id!}
/>
)}
{inputs.some(
(input) => input.id === selectedViewField.id
) ? (
<IOInputField
inputType={selectedViewField.type!}
inputId={selectedViewField.id!}
/>
) : (
<IOOutputView
outputType={selectedViewField.type!}
outputId={selectedViewField.id!}
/>
)}
</div>
</div>
)}
<div
className={cn("flex w-full h-full",selectedViewField ? "hidden" : "")}
className={cn(
"flex h-full w-full",
selectedViewField ? "hidden" : ""
)}
>
<NewChatView
sendMessage={sendMessage}

View file

@ -23,8 +23,6 @@ export default function BuildTrigger({
const nodes = useFlowStore((state) => state.nodes);
const edges = useFlowStore((state) => state.edges);
const setErrorData = useAlertStore((state) => state.setErrorData);
const setSuccessData = useAlertStore((state) => state.setSuccessData);
const setFlowState = useFlowStore((state) => state.setFlowState);
const eventClick = isBuilding ? "pointer-events-none" : "";
const [progress, setProgress] = useState(0);
@ -47,7 +45,7 @@ export default function BuildTrigger({
setIsBuilding(true);
await enforceMinimumLoadingTime(startTime, minimumLoadingTime);
await buildFlow();
await buildFlow({});
} catch (error) {
console.error("Error:", error);
} finally {

View file

@ -4,6 +4,7 @@ import { Textarea } from "../../../components/ui/textarea";
import { chatInputType } from "../../../types/components";
import { classNames } from "../../../utils/utils";
import { chatInputPlaceholder, chatInputPlaceholderSend } from "../../../constants/constants";
import useFlowsManagerStore from "../../../stores/flowsManagerStore";
export default function ChatInput({
lockChat,
@ -14,20 +15,21 @@ export default function ChatInput({
noInput,
}: chatInputType): JSX.Element {
const [repeat, setRepeat] = useState(1);
const saveLoading = useFlowsManagerStore((state) => state.saveLoading);
useEffect(() => {
if (!lockChat && inputRef.current) {
inputRef.current.focus();
}
}, [lockChat, inputRef]);
function handleChange(value: number) {
/* function handleChange(value: number) {
console.log(value);
if (value > 0) {
setRepeat(value);
} else {
setRepeat(1);
}
}
} */
useEffect(() => {
if (inputRef.current) {
@ -41,13 +43,13 @@ export default function ChatInput({
<div className="relative w-full">
<Textarea
onKeyDown={(event) => {
if (event.key === "Enter" && !lockChat && !event.shiftKey) {
if (event.key === "Enter" && !lockChat && !saveLoading && !event.shiftKey) {
sendMessage(repeat);
}
}}
rows={1}
ref={inputRef}
disabled={lockChat || noInput}
disabled={lockChat || noInput || saveLoading}
style={{
resize: "none",
bottom: `${inputRef?.current?.scrollHeight}px`,
@ -58,12 +60,12 @@ export default function ChatInput({
: "hidden"
}`,
}}
value={lockChat ? "Thinking..." : chatValue}
value={lockChat ? "Thinking..." : (saveLoading ? "Saving..." : chatValue)}
onChange={(event): void => {
setChatValue(event.target.value);
}}
className={classNames(
lockChat
(lockChat || saveLoading)
? " form-modal-lock-true bg-input"
: noInput
? "form-modal-no-input bg-input"
@ -87,10 +89,10 @@ export default function ChatInput({
? "text-primary"
: "bg-chat-send text-background"
)}
disabled={lockChat}
disabled={lockChat || saveLoading}
onClick={(): void => sendMessage(repeat)}
>
{lockChat ? (
{lockChat || saveLoading ? (
<IconComponent
name="Lock"
className="form-modal-lock-icon"

View file

@ -123,7 +123,7 @@ function ApiInterceptor() {
async function clearBuildVerticesState(error) {
if (error?.response?.status === 500) {
const vertices = useFlowStore.getState().verticesBuild;
useFlowStore.getState().updateBuildStatus(vertices, BuildStatus.BUILT);
useFlowStore.getState().updateBuildStatus(vertices?.verticesIds ?? [], BuildStatus.BUILT);
useFlowStore.getState().setIsBuilding(false);
}
}

View file

@ -869,9 +869,10 @@ export async function getVerticesOrder(
export async function postBuildVertex(
flowId: string,
vertexId: string
vertexId: string,
input_value: string,
): Promise<AxiosResponse<VertexBuildTypeAPI>> {
return await api.post(`${BASE_URL_API}build/${flowId}/vertices/${vertexId}`);
return await api.post(`${BASE_URL_API}build/${flowId}/vertices/${vertexId}`, input_value ? {inputs: {input_value: input_value}} : undefined);
}
export async function downloadImage({ flowId, fileName }): Promise<any> {

View file

@ -9,9 +9,12 @@ import {
applyNodeChanges,
} from "reactflow";
import { create } from "zustand";
import { FLOW_BUILD_SUCCESS_ALERT, MISSED_ERROR_ALERT } from "../constants/alerts_constants";
import {
FLOW_BUILD_SUCCESS_ALERT,
MISSED_ERROR_ALERT,
} from "../constants/alerts_constants";
import { BuildStatus } from "../constants/enums";
import { getFlowPool, updateFlowInDatabase } from "../controllers/API";
import { getFlowPool } from "../controllers/API";
import { VertexBuildTypeAPI } from "../types/api";
import {
NodeDataType,
@ -19,7 +22,12 @@ import {
sourceHandleType,
targetHandleType,
} from "../types/flow";
import { ChatOutputType, FlowPoolObjectType, FlowStoreType, chatInputType } from "../types/zustand/flow";
import {
ChatOutputType,
FlowPoolObjectType,
FlowStoreType,
chatInputType,
} from "../types/zustand/flow";
import { buildVertices } from "../utils/buildUtils";
import {
cleanEdges,
@ -59,23 +67,25 @@ const useFlowStore = create<FlowStoreType>((set, get) => ({
}
get().setFlowPool(newFlowPool);
},
updateFlowPool:(nodeId:string,data:FlowPoolObjectType| ChatOutputType | chatInputType,buildId?:string)=>{
updateFlowPool: (
nodeId: string,
data: FlowPoolObjectType | ChatOutputType | chatInputType,
buildId?: string
) => {
let newFlowPool = cloneDeep({ ...get().flowPool });
if (!newFlowPool[nodeId]){
if (!newFlowPool[nodeId]) {
return;
}
else {
let index = newFlowPool[nodeId].length-1;
if(buildId){
index = newFlowPool[nodeId].findIndex((flow)=>flow.id===buildId);
} else {
let index = newFlowPool[nodeId].length - 1;
if (buildId) {
index = newFlowPool[nodeId].findIndex((flow) => flow.id === buildId);
}
//check if the data is a flowpool object
if((data as FlowPoolObjectType).data?.artifacts!==undefined){
newFlowPool[nodeId][index] = (data as FlowPoolObjectType);
if ((data as FlowPoolObjectType).data?.artifacts !== undefined) {
newFlowPool[nodeId][index] = data as FlowPoolObjectType;
}
//update data artifact
else
{
else {
newFlowPool[nodeId][index].data.artifacts = data;
}
}
@ -394,7 +404,13 @@ const useFlowStore = create<FlowStoreType>((set, get) => ({
});
});
},
buildFlow: async (nodeId?: string) => {
buildFlow: async ({
nodeId,
input_value,
}: {
nodeId?: string;
input_value?: string;
}) => {
get().setIsBuilding(true);
const currentFlow = useFlowsManagerStore.getState().currentFlow;
const setSuccessData = useAlertStore.getState().setSuccessData;
@ -417,25 +433,19 @@ const useFlowStore = create<FlowStoreType>((set, get) => ({
function handleBuildUpdate(
vertexBuildData: VertexBuildTypeAPI,
status: BuildStatus,
buildId:string
buildId: string
) {
if (vertexBuildData && vertexBuildData.inactive_vertices) {
get().removeFromVerticesBuild(vertexBuildData.inactive_vertices);
}
get().addDataToFlowPool({...vertexBuildData,buildId}, vertexBuildData.id);
get().addDataToFlowPool(
{ ...vertexBuildData, buildId },
vertexBuildData.id
);
useFlowStore.getState().updateBuildStatus([vertexBuildData.id], status);
}
await updateFlowInDatabase({
data: {
nodes: get().nodes,
edges: get().edges,
viewport: get().reactFlowInstance?.getViewport()!,
},
id: currentFlow!.id,
name: currentFlow!.name,
description: currentFlow!.description,
});
await buildVertices({
input_value,
flowId: currentFlow!.id,
nodeId,
onGetOrderSuccess: () => {
@ -473,16 +483,22 @@ const useFlowStore = create<FlowStoreType>((set, get) => ({
viewport: get().reactFlowInstance?.getViewport()!,
};
},
updateVerticesBuild: (vertices: string[]) => {
updateVerticesBuild: (
vertices: { verticesIds: string[], verticesOrder: string[][], verticesLayers: string[][], runId: string } | null
) => {
set({ verticesBuild: vertices });
},
verticesBuild: [],
verticesBuild: null,
removeFromVerticesBuild: (vertices: string[]) => {
const verticesBuild = get().verticesBuild;
if (!verticesBuild) return;
set({
verticesBuild: get().verticesBuild.filter(
(vertex) => !vertices.includes(vertex)
),
verticesBuild: {
...verticesBuild,
verticesIds: get().verticesBuild!.verticesIds.filter(
(vertex) => !vertices.includes(vertex)
),
},
});
},
updateBuildStatus: (nodeIdList: string[], status: BuildStatus) => {

View file

@ -83,6 +83,7 @@ const useFlowsManagerStore = create<FlowsManagerStoreType>((set, get) => ({
if (saveTimeoutId) {
clearTimeout(saveTimeoutId);
}
set({ saveLoading: true });
// Set up a new timeout.
saveTimeoutId = setTimeout(() => {
if (get().currentFlow) {
@ -92,7 +93,7 @@ const useFlowsManagerStore = create<FlowsManagerStoreType>((set, get) => ({
);
}
set({ saveLoading: true });
}, 1000); // Delay of 1000ms.
}, 500); // Delay of 500ms because chat message depends on it.
},
saveFlow: (flow: FlowType, silent?: boolean) => {
set({ saveLoading: true });

View file

@ -86,11 +86,11 @@ export type FlowStoreType = {
getFilterEdge: any[];
onConnect: (connection: Connection) => void;
unselectAll: () => void;
buildFlow: (nodeId?: string) => Promise<void>;
buildFlow: ({nodeId, input_value}: {nodeId?: string, input_value?: string}) => Promise<void>;
getFlow: () => { nodes: Node[]; edges: Edge[]; viewport: Viewport };
updateVerticesBuild: (vertices: string[]) => void;
removeFromVerticesBuild: (vertices: string[]) => void;
verticesBuild: string[];
updateVerticesBuild: (vertices: {verticesIds: string[], verticesLayers: string[][], verticesOrder: string[][], runId: string} | null) => void;
removeFromVerticesBuild: (vertices: string[]) => void;
verticesBuild: {verticesIds: string[], verticesLayers: string[][], verticesOrder: string[][], runId: string} | null;
updateBuildStatus: (nodeId: string[], status: BuildStatus) => void;
revertBuiltStatusFromBuilding: () => void;
flowBuildStatus: { [key: string]: BuildStatus };

View file

@ -7,9 +7,14 @@ import { VertexBuildTypeAPI } from "../types/api";
type BuildVerticesParams = {
flowId: string; // Assuming FlowType is the type for your flow
input_value?: any; // Replace any with the actual type if it's not any
nodeId?: string | null; // Assuming nodeId is of type string, and it's optional
onGetOrderSuccess?: () => void;
onBuildUpdate?: (data: VertexBuildTypeAPI, status: BuildStatus,buildId:string) => void; // Replace any with the actual type if it's not any
onBuildUpdate?: (
data: VertexBuildTypeAPI,
status: BuildStatus,
buildId: string
) => void; // Replace any with the actual type if it's not any
onBuildComplete?: (allNodesValid: boolean) => void;
onBuildError?: (title, list, idList: string[]) => void;
onBuildStart?: (idList: string[]) => void;
@ -34,8 +39,54 @@ function getInactiveVertexData(vertexId: string): VertexBuildTypeAPI {
return inactiveVertexData;
}
export async function updateVerticesOrder(flowId: string, nodeId: string | null): Promise<{ verticesLayers: string[][], verticesIds: string[], verticesOrder: string[][], runId: string }> {
return new Promise(async (resolve, reject) => {
const setErrorData = useAlertStore.getState().setErrorData;
let orderResponse;
try {
orderResponse = await getVerticesOrder(flowId, nodeId);
} catch (error: any) {
console.log(error);
setErrorData({
title: "Oops! Looks like you missed something",
list: [error.response?.data?.detail ?? "Unknown Error"],
});
useFlowStore.getState().setIsBuilding(false);
throw new Error("Invalid nodes");
}
let verticesOrder: Array<Array<string>> = orderResponse.data.ids;
const runId = orderResponse.data.run_id;
let verticesLayers: Array<Array<string>> = [];
if (nodeId) {
for (let i = 0; i < verticesOrder.length; i += 1) {
const innerArray = verticesOrder[i];
const idIndex = innerArray.indexOf(nodeId);
if (idIndex !== -1) {
// If there's a nodeId, we want to run just that component and not the entire layer
// because a layer contains dependencies for the next layer
// and we are stopping at the layer that contains the nodeId
verticesLayers.push([innerArray[idIndex]]);
break; // Stop searching after finding the first occurrence
}
// If the targetId is not found, include the entire inner array
verticesLayers.push(innerArray);
}
} else {
verticesLayers = verticesOrder;
}
const verticesIds = verticesLayers.flat();
useFlowStore
.getState()
.updateVerticesBuild({ verticesLayers, verticesIds, verticesOrder, runId });
resolve({ verticesLayers, verticesIds, verticesOrder, runId });
});
}
export async function buildVertices({
flowId,
input_value,
nodeId = null,
onGetOrderSuccess,
onBuildUpdate,
@ -44,24 +95,18 @@ export async function buildVertices({
onBuildStart,
validateNodes,
}: BuildVerticesParams) {
const setErrorData = useAlertStore.getState().setErrorData;
let orderResponse;
try {
orderResponse = await getVerticesOrder(flowId, nodeId);
} catch (error:any) {
console.log(error);
setErrorData({
title: "Oops! Looks like you missed something",
list: [error.response?.data?.detail ?? "Unknown Error"],
});
useFlowStore.getState().setIsBuilding(false);
throw new Error("Invalid nodes");
let verticesBuild = useFlowStore.getState().verticesBuild;
if (!verticesBuild || nodeId) {
verticesBuild = await updateVerticesOrder(flowId, nodeId);
}
if (onGetOrderSuccess) onGetOrderSuccess();
let verticesOrder: Array<Array<string>> = orderResponse.data.ids;
const runId = orderResponse.data.run_id;
let vertices_layers: Array<Array<string>> = [];
const verticesIds = verticesBuild?.verticesIds!;
const verticesLayers = verticesBuild?.verticesLayers!;
const verticesOrder = verticesBuild?.verticesOrder!;
const runId = verticesBuild?.runId!;
let stop = false;
if (onGetOrderSuccess) onGetOrderSuccess();
if (validateNodes) {
try {
validateNodes(verticesOrder.flatMap((id) => id));
@ -69,48 +114,29 @@ export async function buildVertices({
return;
}
}
if (nodeId) {
for (let i = 0; i < verticesOrder.length; i += 1) {
const innerArray = verticesOrder[i];
const idIndex = innerArray.indexOf(nodeId);
if (idIndex !== -1) {
// If there's a nodeId, we want to run just that component and not the entire layer
// because a layer contains dependencies for the next layer
// and we are stopping at the layer that contains the nodeId
vertices_layers.push([innerArray[idIndex]]);
break; // Stop searching after finding the first occurrence
}
// If the targetId is not found, include the entire inner array
vertices_layers.push(innerArray);
}
} else {
vertices_layers = verticesOrder;
}
const verticesIds = vertices_layers.flat();
useFlowStore.getState().updateBuildStatus(verticesIds, BuildStatus.TO_BUILD);
useFlowStore.getState().updateVerticesBuild(verticesIds);
useFlowStore.getState().setIsBuilding(true);
// Set each vertex state to building
const buildResults: Array<boolean> = [];
for (const layer of vertices_layers) {
for (const layer of verticesLayers) {
if (onBuildStart) onBuildStart(layer);
for (const id of layer) {
// Check if id is in the list of inactive nodes
if (
!useFlowStore.getState().verticesBuild.includes(id) &&
onBuildUpdate
) {
if (!verticesIds.includes(id) && onBuildUpdate) {
// If it is, skip building and set the state to inactive
onBuildUpdate(getInactiveVertexData(id), BuildStatus.INACTIVE,runId);
onBuildUpdate(getInactiveVertexData(id), BuildStatus.INACTIVE, runId);
buildResults.push(false);
continue;
}
await buildVertex({
flowId,
id,
onBuildUpdate:(data: VertexBuildTypeAPI, status: BuildStatus) => {if(onBuildUpdate) onBuildUpdate(data, status,runId)},
input_value,
onBuildUpdate: (data: VertexBuildTypeAPI, status: BuildStatus) => {
if (onBuildUpdate) onBuildUpdate(data, status, runId);
},
onBuildError,
verticesIds,
buildResults,
@ -137,6 +163,7 @@ export async function buildVertices({
async function buildVertex({
flowId,
id,
input_value,
onBuildUpdate,
onBuildError,
verticesIds,
@ -145,6 +172,7 @@ async function buildVertex({
}: {
flowId: string;
id: string;
input_value: string;
onBuildUpdate?: (data: any, status: BuildStatus) => void;
onBuildError?: (title, list, idList: string[]) => void;
verticesIds: string[];
@ -152,7 +180,7 @@ async function buildVertex({
stopBuild: () => void;
}) {
try {
const buildRes = await postBuildVertex(flowId, id);
const buildRes = await postBuildVertex(flowId, id, input_value);
const buildData: VertexBuildTypeAPI = buildRes.data;
if (onBuildUpdate) {
if (!buildData.valid) {