Merge branch 'zustand/io/migration' of github.com:logspace-ai/langflow into zustand/io/migration

This commit is contained in:
cristhianzl 2024-03-05 14:42:18 -03:00
commit 1ae8be9c0f
26 changed files with 598 additions and 206 deletions

View file

@ -159,10 +159,10 @@ async def build_vertex(
result_data_response.duration = duration
result_data_response.timedelta = timedelta
vertex.add_build_time(timedelta)
inactive_vertices = None
if graph.inactive_vertices:
inactive_vertices = list(graph.inactive_vertices)
graph.reset_inactive_vertices()
inactivated_vertices = None
inactivated_vertices = list(graph.inactivated_vertices)
graph.reset_inactivated_vertices()
graph.reset_activated_vertices()
chat_service.set_cache(flow_id, graph)
# graph.stop_vertex tells us if the user asked
@ -173,8 +173,8 @@ async def build_vertex(
next_vertices_ids = [graph.stop_vertex]
build_response = VertexBuildResponse(
inactivated_vertices=inactivated_vertices,
next_vertices_ids=next_vertices_ids,
inactive_vertices=inactive_vertices,
valid=valid,
params=params,
id=vertex.id,

View file

@ -13,6 +13,7 @@ from langflow.api.v1.schemas import (
ProcessResponse,
RunResponse,
TaskStatusResponse,
Tweaks,
UploadFileResponse,
)
from langflow.interface.custom.custom_component import CustomComponent
@ -56,19 +57,60 @@ def get_all(
async def run_flow_with_caching(
session: Annotated[Session, Depends(get_session)],
flow_id: str,
inputs: Optional[InputValueRequest] = None,
tweaks: Optional[dict] = None,
inputs: Optional[List[InputValueRequest]] = None,
outputs: Optional[List[str]] = None,
tweaks: Annotated[Optional[Tweaks], Body(embed=True)] = None, # noqa: F821
stream: Annotated[bool, Body(embed=True)] = False, # noqa: F821
session_id: Annotated[Union[None, str], Body(embed=True)] = None, # noqa: F821
api_key_user: User = Depends(api_key_security),
session_service: SessionService = Depends(get_session_service),
):
"""
Executes a specified flow by ID with optional input values, output selection, tweaks, and streaming capability.
This endpoint supports running flows with caching to enhance performance and efficiency.
### Parameters:
- `flow_id` (str): The unique identifier of the flow to be executed.
- `inputs` (List[InputValueRequest], optional): A list of inputs specifying the input values and components for the flow. Each input can target specific components and provide custom values.
- `outputs` (List[str], optional): A list of output names to retrieve from the executed flow. If not provided, all outputs are returned.
- `tweaks` (Optional[Tweaks], optional): A dictionary of tweaks to customize the flow execution. The tweaks can be used to modify the flow's parameters and components. Tweaks can be overridden by the input values.
- `stream` (bool, optional): Specifies whether the results should be streamed. Defaults to False.
- `session_id` (Union[None, str], optional): An optional session ID to utilize existing session data for the flow execution.
- `api_key_user` (User): The user associated with the current API key. Automatically resolved from the API key.
- `session_service` (SessionService): The session service object for managing flow sessions.
### Returns:
A `RunResponse` object containing the selected outputs (or all if not specified) of the executed flow and the session ID. The structure of the response accommodates multiple inputs, providing a nested list of outputs for each input.
### Raises:
HTTPException: Indicates issues with finding the specified flow, invalid input formats, or internal errors during flow execution.
### Example usage:
```json
POST /run/{flow_id}
Payload:
{
"inputs": [
{"components": ["component1"], "input_value": "value1"},
{"components": ["component3"], "input_value": "value2"}
],
"outputs": ["Component Name", "component_id"],
"tweaks": {"parameter_name": "value", "Component Name": {"parameter_name": "value"}, "component_id": {"parameter_name": "value"}}
"stream": false
}
```
This endpoint facilitates complex flow executions with customized inputs, outputs, and configurations, catering to diverse application requirements.
"""
try:
if inputs is not None:
input_values_dict: dict[str, Union[str, list[str]]] = inputs.model_dump()
else:
input_values_dict = {}
if outputs is None:
outputs = []
if session_id:
session_data = await session_service.load_session(
session_id, flow_id=flow_id
@ -82,6 +124,7 @@ async def run_flow_with_caching(
flow_id=flow_id,
session_id=session_id,
inputs=input_values_dict,
outputs=outputs,
artifacts=artifacts,
session_service=session_service,
stream=stream,
@ -107,6 +150,7 @@ async def run_flow_with_caching(
flow_id=flow_id,
session_id=session_id,
inputs=input_values_dict,
outputs=outputs,
artifacts={},
session_service=session_service,
stream=stream,

View file

@ -4,7 +4,7 @@ from pathlib import Path
from typing import Any, Dict, List, Optional, Union
from uuid import UUID
from pydantic import BaseModel, Field, field_validator, model_serializer
from pydantic import BaseModel, Field, RootModel, field_validator, model_serializer
from langflow.services.database.models.api_key.model import ApiKeyRead
from langflow.services.database.models.base import orjson_dumps
@ -229,8 +229,8 @@ class ResultDataResponse(BaseModel):
class VertexBuildResponse(BaseModel):
id: Optional[str] = None
inactivated_vertices: Optional[List[str]] = None
next_vertices_ids: Optional[List[str]] = None
inactive_vertices: Optional[List[str]] = None
valid: bool
params: Optional[Any] = Field(default_factory=dict)
"""JSON string of the params."""
@ -245,4 +245,49 @@ class VerticesBuiltResponse(BaseModel):
class InputValueRequest(BaseModel):
input_value: str
components: Optional[List[str]] = None
input_value: Optional[str] = None
# add an example
model_config = {
"json_schema_extra": {
"examples": [
{
"components": ["components_id", "Component Name"],
"input_value": "input_value",
},
{"components": ["Component Name"], "input_value": "input_value"},
{"input_value": "input_value"},
]
}
}
class Tweaks(RootModel):
root: dict[str, Union[str, dict[str, str]]] = Field(
description="A dictionary of tweaks to adjust the flow's execution. Allows customizing flow behavior dynamically. All tweaks are overridden by the input values.",
)
model_config = {
"json_schema_extra": {
"examples": [
{
"parameter_name": "value",
"Component Name": {"parameter_name": "value"},
"component_id": {"parameter_name": "value"},
}
]
}
}
# This should behave like a dict
def __getitem__(self, key):
return self.root[key]
def __setitem__(self, key, value):
self.root[key] = value
def __delitem__(self, key):
del self.root[key]
def items(self):
return self.root.items()

View file

@ -1,49 +0,0 @@
from typing import List
from langchain import document_loaders
from langchain_core.documents import Document
from langflow import CustomComponent
class UrlLoaderComponent(CustomComponent):
display_name: str = "Url Loader"
description: str = "Generic Url Loader Component"
def build_config(self):
return {
"web_path": {
"display_name": "Url",
"required": True,
},
"loader": {
"display_name": "Loader",
"is_list": True,
"required": True,
"options": [
"AZLyricsLoader",
"CollegeConfidentialLoader",
"GitbookLoader",
"HNLoader",
"IFixitLoader",
"IMSDbLoader",
"WebBaseLoader",
],
"value": "WebBaseLoader",
},
"code": {"show": False},
}
def build(self, web_path: str, loader: str) -> List[Document]:
try:
loader_instance = getattr(document_loaders, loader)(web_path=web_path)
except Exception as e:
raise ValueError(f"No loader found for: {web_path}") from e
docs = loader_instance.load()
avg_length = sum(
len(doc.page_content) for doc in docs if hasattr(doc, "page_content")
) / len(docs)
self.status = f"""{len(docs)} documents)
\nAvg. Document Length (characters): {int(avg_length)}
Documents: {docs[:3]}..."""
return docs

View file

@ -0,0 +1,20 @@
from langflow import CustomComponent
from langflow.schema import Record
class GetNotifiedComponent(CustomComponent):
display_name = "Get Notified"
description = "A component to get notified by Notify component."
def build_config(self):
return {
"name": {
"display_name": "Name",
"info": "The name of the notification to listen for.",
},
}
def build(self, name: str) -> Record:
state = self.get_state(name)
self.status = state
return state

View file

@ -0,0 +1,41 @@
from typing import Optional
from langflow import CustomComponent
from langflow.schema import Record
class NotifyComponent(CustomComponent):
display_name = "Notify"
description = "A component to generate a notification to Get Notified component."
def build_config(self):
return {
"name": {"display_name": "Name", "info": "The name of the notification."},
"record": {"display_name": "Record", "info": "The record to store."},
"append": {
"display_name": "Append",
"info": "If True, the record will be appended to the notification.",
},
}
def build(
self, name: str, record: Optional[Record] = None, append: bool = False
) -> Record:
if record and not isinstance(record, Record):
if isinstance(record, str):
record = Record(text=record)
elif isinstance(record, dict):
record = Record(data=record)
else:
record = Record(text=str(record))
elif not record:
record = Record(text="")
if record:
if append:
self.append_state(name, record)
else:
self.update_state(name, record)
else:
self.status = "No record provided."
self.status = record
return record

View file

@ -25,6 +25,8 @@ class RecordsAsTextComponent(CustomComponent):
records: list[Record],
template: str = "Text: {text}\nData: {data}",
) -> Text:
if not records:
return ""
if isinstance(records, Record):
records = [records]

View file

@ -1,7 +1,6 @@
from typing import Union
from typing import Optional
from langflow import CustomComponent
from langflow.field_typing import Text
from langflow.schema import Record
@ -20,19 +19,23 @@ class SharedState(CustomComponent):
}
def build(
self, name: str, record: Union[Text, Record], append: bool = False
self, name: str, record: Optional[Record] = None, append: bool = False
) -> Record:
if append:
self.append_state(name, record)
else:
self.update_state(name, record)
if record:
if append:
self.append_state(name, record)
else:
self.update_state(name, record)
state = self.get_state(name)
if not isinstance(state, Record):
if state and not isinstance(state, Record):
if isinstance(state, str):
state = Record(text=state)
elif isinstance(state, dict):
state = Record(data=state)
else:
state = Record(text=str(state))
elif not state:
state = Record(text="")
self.status = state
return state

View file

@ -0,0 +1,31 @@
from typing import Optional
from langflow import CustomComponent
from langflow.field_typing import Text
from langflow.schema import Record
class TextToRecordComponent(CustomComponent):
display_name = "Text to Record"
description = "Converts text to a Record."
def build_config(self):
return {
"text": {
"display_name": "Text",
"info": "The text to convert to a record.",
},
"data": {
"display_name": "Data",
"info": "The optional data to include in the record.",
},
}
def build(
self,
text: Text,
data: Optional[dict] = {},
) -> Record:
record = Record(text=text, data=data)
self.status = record
return record

View file

@ -17,9 +17,11 @@ from langflow.graph.vertex.types import (
FileToolVertex,
LLMVertex,
RoutingVertex,
StateVertex,
ToolkitVertex,
)
from langflow.interface.tools.constants import FILE_TOOLS
from langflow.schema import Record
from langflow.utils import payload
if TYPE_CHECKING:
@ -43,9 +45,10 @@ class Graph:
self.flow_id = flow_id
self._is_input_vertices: List[str] = []
self._is_output_vertices: List[str] = []
self._is_state_vertices: List[str] = []
self._has_session_id_vertices: List[str] = []
self._sorted_vertices_layers: List[List[str]] = []
self.run_id = None
self._run_id = None
self.top_level_vertices = []
for vertex in self._vertices:
@ -55,6 +58,8 @@ class Graph:
self._vertices = self._graph_data["nodes"]
self._edges = self._graph_data["edges"]
self.inactivated_vertices: set = set()
self.activated_vertices: List[str] = []
self.vertices_layers = []
self.vertices_to_run = set()
self.stop_vertex = None
@ -67,13 +72,67 @@ class Graph:
self.define_vertices_lists()
self.state_manager = GraphStateManager()
def get_state(self, name: str) -> Optional[Record]:
"""Returns the state of the graph."""
return self.state_manager.get_state(name, run_id=self._run_id)
def update_state(
self, name: str, record: Union[str, Record], caller: Optional[str] = None
) -> None:
"""Updates the state of the graph."""
if caller:
# If there is a caller which is a vertex_id, I want to activate
# all StateVertex in self.vertices that are not the caller
# essentially notifying all the other vertices that the state has changed
# This also has to activate their successors
self.activate_state_vertices(name, caller)
self.state_manager.update_state(name, record, run_id=self._run_id)
def activate_state_vertices(self, name: str, caller: str):
vertices_ids = []
for vertex_id in self._is_state_vertices:
if vertex_id == caller:
continue
vertex = self.get_vertex(vertex_id)
if (
isinstance(vertex._raw_params["name"], str)
and name in vertex._raw_params["name"]
and vertex_id != caller
and isinstance(vertex, StateVertex)
):
vertices_ids.append(vertex_id)
successors = self.get_all_successors(vertex, flat=True)
self.vertices_to_run.update(list(map(lambda x: x.id, successors)))
self.activated_vertices = vertices_ids
self.vertices_to_run.update(vertices_ids)
def reset_activated_vertices(self):
self.activated_vertices = []
def append_state(
self, name: str, record: Union[str, Record], caller: Optional[str] = None
) -> None:
"""Appends the state of the graph."""
if caller:
self.activate_state_vertices(name, caller)
self.state_manager.append_state(name, record, run_id=self._run_id)
@property
def run_id(self):
if not self._run_id:
raise ValueError("Run ID not set")
return self._run_id
def set_run_id(self, run_id: str):
for vertex in self.vertices:
self.state_manager.subscribe(run_id, vertex.update_graph_state)
self.run_id = run_id
self._run_id = run_id
def add_state(self, state: str):
self.state_manager.append_state(self.run_id, state)
self.state_manager.append_state(self._run_id, state)
@property
def sorted_vertices_layers(self) -> List[List[str]]:
@ -85,28 +144,44 @@ class Graph:
"""
Defines the lists of vertices that are inputs, outputs, and have session_id.
"""
attributes = ["is_input", "is_output", "has_session_id"]
attributes = ["is_input", "is_output", "has_session_id", "is_state"]
for vertex in self.vertices:
for attribute in attributes:
if getattr(vertex, attribute):
getattr(self, f"_{attribute}_vertices").append(vertex.id)
async def _run(
self, inputs: Dict[str, str], stream: bool
self,
inputs: Dict[str, str],
input_components: list[str],
outputs: list[str],
stream: bool,
session_id: str,
) -> List[Optional["ResultData"]]:
"""Runs the graph with the given inputs."""
for vertex_id in self._is_input_vertices:
vertex = self.get_vertex(vertex_id)
if input_components and (
vertex_id not in input_components
or vertex.display_name not in input_components
):
continue
if vertex is None:
raise ValueError(f"Vertex {vertex_id} not found")
vertex.update_raw_params(inputs)
vertex.update_raw_params(inputs, overwrite=True)
# Update all the vertices with the session_id
for vertex_id in self._has_session_id_vertices:
vertex = self.get_vertex(vertex_id)
if vertex is None:
raise ValueError(f"Vertex {vertex_id} not found")
vertex.update_raw_params({"session_id": session_id})
try:
await self.process()
self.increment_run_count()
except Exception as exc:
logger.exception(exc)
raise ValueError(f"Error running graph: {exc}") from exc
outputs = []
vertex_outputs = []
for vertex_id in self._is_output_vertices:
vertex = self.get_vertex(vertex_id)
if vertex is None:
@ -118,11 +193,16 @@ class Graph:
and hasattr(vertex, "consume_async_generator")
):
await vertex.consume_async_generator()
outputs.append(vertex.result)
return outputs
if vertex.display_name in outputs or vertex.id in outputs:
vertex_outputs.append(vertex.result)
return vertex_outputs
async def run(
self, inputs: Dict[str, Union[str, list[str]]], stream: bool
self,
inputs: Dict[str, Union[str, list[str]]],
outputs: list[str],
stream: bool,
session_id: str,
) -> List[Optional["ResultData"]]:
"""Runs the graph with the given inputs."""
@ -130,17 +210,21 @@ class Graph:
# we need to go through self.inputs and update the self._raw_params
# of the vertices that are inputs
# if the value is a list, we need to run multiple times
outputs = []
vertex_outputs = []
inputs_values = inputs.get(INPUT_FIELD_NAME, "")
if not isinstance(inputs_values, list):
inputs_values = [inputs_values]
for input_value in inputs_values:
run_outputs = await self._run(
{INPUT_FIELD_NAME: input_value}, stream=stream
inputs={INPUT_FIELD_NAME: input_value},
input_components=inputs.get("components", []),
outputs=outputs,
stream=stream,
session_id=session_id,
)
logger.debug(f"Run outputs: {run_outputs}")
outputs.extend(run_outputs)
return outputs
vertex_outputs.append(run_outputs)
return vertex_outputs
# vertices_layers is a list of lists ordered by the order the vertices
# should be built.
@ -155,7 +239,7 @@ class Graph:
return {
"runs": self._runs,
"updates": self._updates,
"inactive_vertices": self.inactive_vertices,
"inactivated_vertices": self.inactivated_vertices,
}
def build_graph_maps(self):
@ -163,8 +247,8 @@ class Graph:
self.in_degree_map = self.build_in_degree()
self.parent_child_map = self.build_parent_child_map()
def reset_inactive_vertices(self):
self.inactive_vertices = set()
def reset_inactivated_vertices(self):
self.inactivated_vertices = set()
def mark_all_vertices(self, state: str):
"""Marks all vertices in the graph."""
@ -562,6 +646,43 @@ class Graph:
for source_id in self.predecessor_map.get(vertex.id, [])
]
def get_all_successors(self, vertex, recursive=True, flat=True):
# Recursively get the successors of the current vertex
# successors = vertex.successors
# if not successors:
# return []
# successors_result = []
# for successor in successors:
# # Just return a list of successors
# if recursive:
# next_successors = self.get_all_successors(successor)
# successors_result.extend(next_successors)
# successors_result.append(successor)
# return successors_result
# The above is the version without the flat parameter
# The below is the version with the flat parameter
# the flat parameter will define if each layer of successors
# becomes one list or if the result is a list of lists
# if flat is True, the result will be a list of vertices
# if flat is False, the result will be a list of lists of vertices
# each list will represent a layer of successors
successors = vertex.successors
if not successors:
return []
successors_result = []
for successor in successors:
if recursive:
next_successors = self.get_all_successors(successor)
if flat:
successors_result.extend(next_successors)
else:
successors_result.append(next_successors)
if flat:
successors_result.append(successor)
else:
successors_result.append([successor])
return successors_result
def get_successors(self, vertex):
"""Returns the successors of a vertex."""
return [
@ -596,14 +717,21 @@ class Graph:
# if we can't find a vertex, we raise an error
edges: List[ContractEdge] = []
edges_added = set()
for edge in self._edges:
source = self.get_vertex(edge["source"])
target = self.get_vertex(edge["target"])
if source is None:
raise ValueError(f"Source vertex {edge['source']} not found")
if target is None:
raise ValueError(f"Target vertex {edge['target']} not found")
if (source.id, target.id) in edges_added:
continue
edges.append(ContractEdge(source, target, edge))
edges_added.add((source.id, target.id))
return edges
def _get_vertex_class(
@ -616,6 +744,8 @@ class Graph:
return ChatVertex
elif node_name in ["ShouldRunNext"]:
return RoutingVertex
elif node_name in ["SharedState", "Notify", "GetNotified"]:
return StateVertex
elif node_base_type in lazy_load_vertex_dict.VERTEX_TYPE_MAP:
return lazy_load_vertex_dict.VERTEX_TYPE_MAP[node_base_type]
elif node_name in lazy_load_vertex_dict.VERTEX_TYPE_MAP:
@ -723,21 +853,28 @@ class Graph:
def layered_topological_sort(
self,
vertices: List[Vertex],
filter_graphs: bool = False,
) -> List[List[str]]:
"""Performs a layered topological sort of the vertices in the graph."""
vertices_ids = {vertex.id for vertex in vertices}
# Queue for vertices with no incoming edges
queue = deque(
vertex.id for vertex in vertices if self.in_degree_map[vertex.id] == 0
vertex.id
for vertex in vertices
# if filter_graphs then only vertex.is_input will be considered
if self.in_degree_map[vertex.id] == 0
and (not filter_graphs or vertex.is_input)
)
layers: List[List[str]] = []
visited = set(queue)
current_layer = 0
while queue:
layers.append([]) # Start a new layer
layer_size = len(queue)
for _ in range(layer_size):
vertex_id = queue.popleft()
visited.add(vertex_id)
layers[current_layer].append(vertex_id)
for neighbor in self.successor_map[vertex_id]:
# only vertices in `vertices_ids` should be considered
@ -748,8 +885,16 @@ class Graph:
continue
self.in_degree_map[neighbor] -= 1 # 'remove' edge
if self.in_degree_map[neighbor] == 0:
if self.in_degree_map[neighbor] == 0 and neighbor not in visited:
queue.append(neighbor)
# if > 0 it might mean not all predecessors have added to the queue
# so we should process the neighbors predecessors
elif self.in_degree_map[neighbor] > 0:
for predecessor in self.predecessor_map[neighbor]:
if predecessor not in queue and predecessor not in visited:
queue.append(predecessor)
current_layer += 1 # Next layer
new_layers = self.refine_layers(layers)
return new_layers
@ -821,9 +966,12 @@ class Graph:
vertices = self.sort_up_to_vertex(stop_component_id)
elif start_component_id:
vertices = self.sort_up_to_vertex(start_component_id, is_start=True)
else:
vertices = self.vertices
# without component_id we are probably running in the chat
# so we want to pick only graphs that start with ChatInput or
# TextInput
vertices_layers = self.layered_topological_sort(vertices)
vertices_layers = self.sort_by_avg_build_time(vertices_layers)
# vertices_layers = self.sort_chat_inputs_first(vertices_layers)
@ -833,7 +981,7 @@ class Graph:
# save the only the rest
self.vertices_layers = vertices_layers[1:]
self.vertices_to_run = {
vertex for vertex in chain.from_iterable(vertices_layers)
vertex_id for vertex_id in chain.from_iterable(vertices_layers)
}
# Return just the first layer
return first_layer

View file

@ -2,6 +2,8 @@ from collections import defaultdict
from threading import Lock
from typing import Callable
from loguru import logger
class GraphStateManager:
def __init__(self):
@ -9,21 +11,29 @@ class GraphStateManager:
self.observers = defaultdict(list)
self.lock = Lock()
def append_state(self, key, new_state):
def append_state(self, key, new_state, run_id: str):
with self.lock:
if key not in self.states:
self.states[key] = []
self.states[key].append(new_state)
if run_id not in self.states:
self.states[run_id] = {}
if key not in self.states[run_id]:
self.states[run_id][key] = []
elif not isinstance(self.states[key], list):
self.states[run_id][key] = [self.states[key]]
self.states[run_id][key].append(new_state)
self.notify_append_observers(key, new_state)
def update_state(self, key, new_state):
def update_state(self, key, new_state, run_id: str):
with self.lock:
self.states[key] = new_state
if run_id not in self.states:
self.states[run_id] = {}
if key not in self.states[run_id]:
self.states[run_id][key] = {}
self.states[run_id][key] = new_state
self.notify_observers(key, new_state)
def get_state(self, key):
def get_state(self, key, run_id: str):
with self.lock:
return self.states.get(key, None)
return self.states.get(run_id, {}).get(key, "")
def subscribe(self, key, observer: Callable):
with self.lock:
@ -36,4 +46,8 @@ class GraphStateManager:
def notify_append_observers(self, key, new_state):
for callback in self.observers[key]:
callback(key, new_state, append=True)
try:
callback(key, new_state, append=True)
except Exception as e:
logger.error(f"Error in observer {callback} for key {key}: {e}")
logger.warning("Callbacks not implemented yet")

View file

@ -58,6 +58,7 @@ class Vertex:
self.will_stream = False
self.updated_raw_params = False
self.id: str = data["id"]
self.is_state = False
self.is_input = any(
input_component_name in self.id for input_component_name in INPUT_COMPONENTS
)
@ -99,12 +100,9 @@ class Vertex:
def update_graph_state(self, key, new_state, append: bool):
if append:
if key in self.graph_state:
self.graph_state[key].append(new_state)
else:
self.graph_state[key] = [new_state]
self.graph.append_state(key, new_state, caller=self.id)
else:
self.graph_state[key] = new_state
self.graph.update_state(key, new_state, caller=self.id)
def set_state(self, state: str):
self.state = VertexStates[state]
@ -114,12 +112,12 @@ class Vertex:
):
# If the vertex is inactive and has only one in degree
# it means that it is not a merge point in the graph
self.graph.inactive_vertices.add(self.id)
self.graph.inactivated_vertices.add(self.id)
elif (
self.state == VertexStates.ACTIVE
and self.id in self.graph.inactive_vertices
and self.id in self.graph.inactivated_vertices
):
self.graph.inactive_vertices.remove(self.id)
self.graph.inactivated_vertices.remove(self.id)
@property
def avg_build_time(self):
@ -385,7 +383,7 @@ class Vertex:
self.params = params
self._raw_params = params.copy()
def update_raw_params(self, new_params: Dict[str, str]):
def update_raw_params(self, new_params: Dict[str, str], overwrite: bool = False):
"""
Update the raw parameters of the vertex with the given new parameters.
@ -400,6 +398,10 @@ class Vertex:
return
if any(isinstance(self._raw_params.get(key), Vertex) for key in new_params):
return
if not overwrite:
for key in new_params.copy():
if key not in self._raw_params:
new_params.pop(key)
self._raw_params.update(new_params)
self.updated_raw_params = True
@ -560,10 +562,14 @@ class Vertex:
self.params[key].extend(built)
else:
try:
if self.params[key] == built:
continue
self.params[key].append(built)
except AttributeError as e:
logger.exception(e)
raise ValueError(
f"Params {key} ({self.params[key]}) is not a list and cannot be extended with {built}"
f"Error building node {self.display_name}: {str(e)}"
) from e
@ -672,6 +678,10 @@ class Vertex:
if self.frozen and self._built:
return self.get_requester_result(requester)
elif self._built and requester is not None:
# This means that the vertex has already been built
# and we are just getting the result for the requester
return await self.get_requester_result(requester)
self._reset()
if self._is_chat_input() and inputs is not None:
@ -740,11 +750,3 @@ class Vertex:
if self._built_object is not None
else "Failed to build 😵‍💫"
)
class StatefulVertex(Vertex):
pass
class StatelessVertex(Vertex):
pass

View file

@ -1,6 +1,7 @@
import ast
import json
from typing import AsyncIterator, Callable, Dict, Iterator, List, Optional, Union
from typing import (AsyncIterator, Callable, Dict, Iterator, List, Optional,
Union)
import yaml
from langchain_core.messages import AIMessage
@ -8,14 +9,14 @@ from loguru import logger
from langflow.graph.schema import INPUT_FIELD_NAME, InterfaceComponentTypes
from langflow.graph.utils import UnbuiltObject, flatten_list, serialize_field
from langflow.graph.vertex.base import StatefulVertex, StatelessVertex
from langflow.graph.vertex.base import Vertex
from langflow.interface.utils import extract_input_variables_from_prompt
from langflow.schema import Record
from langflow.services.monitor.utils import log_vertex_build
from langflow.utils.schemas import ChatOutputResponse
class AgentVertex(StatelessVertex):
class AgentVertex(Vertex):
def __init__(self, data: Dict, graph, params: Optional[Dict] = None):
super().__init__(data, graph=graph, base_type="agents", params=params)
@ -58,12 +59,12 @@ class AgentVertex(StatelessVertex):
await self._build(user_id=user_id)
class ToolVertex(StatelessVertex):
class ToolVertex(Vertex):
def __init__(self, data: Dict, graph, params: Optional[Dict] = None):
super().__init__(data, graph=graph, base_type="tools", params=params)
class LLMVertex(StatelessVertex):
class LLMVertex(Vertex):
built_node_type = None
class_built_object = None
@ -86,7 +87,7 @@ class LLMVertex(StatelessVertex):
self.class_built_object = self._built_object
class ToolkitVertex(StatelessVertex):
class ToolkitVertex(Vertex):
def __init__(self, data: Dict, graph, params=None):
super().__init__(data, graph=graph, base_type="toolkits", params=params)
@ -100,7 +101,7 @@ class FileToolVertex(ToolVertex):
)
class WrapperVertex(StatelessVertex):
class WrapperVertex(Vertex):
def __init__(self, data: Dict, graph, params=None):
super().__init__(data, graph=graph, base_type="wrappers")
self.steps: List[Callable] = [self._custom_build]
@ -114,7 +115,7 @@ class WrapperVertex(StatelessVertex):
await self._build(user_id=user_id)
class DocumentLoaderVertex(StatefulVertex):
class DocumentLoaderVertex(Vertex):
def __init__(self, data: Dict, graph, params: Optional[Dict] = None):
super().__init__(data, graph=graph, base_type="documentloaders", params=params)
@ -134,12 +135,12 @@ class DocumentLoaderVertex(StatefulVertex):
return f"{self.vertex_type}()"
class EmbeddingVertex(StatefulVertex):
class EmbeddingVertex(Vertex):
def __init__(self, data: Dict, graph, params: Optional[Dict] = None):
super().__init__(data, graph=graph, base_type="embeddings", params=params)
class VectorStoreVertex(StatefulVertex):
class VectorStoreVertex(Vertex):
def __init__(self, data: Dict, graph, params=None):
super().__init__(data, graph=graph, base_type="vectorstores")
@ -181,17 +182,17 @@ class VectorStoreVertex(StatefulVertex):
self.remove_docs_and_texts_from_params()
class MemoryVertex(StatefulVertex):
class MemoryVertex(Vertex):
def __init__(self, data: Dict, graph):
super().__init__(data, graph=graph, base_type="memory")
class RetrieverVertex(StatefulVertex):
class RetrieverVertex(Vertex):
def __init__(self, data: Dict, graph):
super().__init__(data, graph=graph, base_type="retrievers")
class TextSplitterVertex(StatefulVertex):
class TextSplitterVertex(Vertex):
def __init__(self, data: Dict, graph, params: Optional[Dict] = None):
super().__init__(data, graph=graph, base_type="textsplitters", params=params)
@ -209,7 +210,7 @@ class TextSplitterVertex(StatefulVertex):
return f"{self.vertex_type}()"
class ChainVertex(StatelessVertex):
class ChainVertex(Vertex):
def __init__(self, data: Dict, graph):
super().__init__(data, graph=graph, base_type="chains")
self.steps = [self._custom_build]
@ -239,7 +240,7 @@ class ChainVertex(StatelessVertex):
return super()._built_object_repr()
class PromptVertex(StatelessVertex):
class PromptVertex(Vertex):
def __init__(self, data: Dict, graph):
super().__init__(data, graph=graph, base_type="prompts")
self.steps: List[Callable] = [self._custom_build]
@ -323,12 +324,12 @@ class PromptVertex(StatelessVertex):
return str(self._built_object)
class OutputParserVertex(StatelessVertex):
class OutputParserVertex(Vertex):
def __init__(self, data: Dict, graph):
super().__init__(data, graph=graph, base_type="output_parsers")
class CustomComponentVertex(StatelessVertex):
class CustomComponentVertex(Vertex):
def __init__(self, data: Dict, graph):
super().__init__(data, graph=graph, base_type="custom_components")
@ -337,7 +338,7 @@ class CustomComponentVertex(StatelessVertex):
return self.artifacts["repr"] or super()._built_object_repr()
class ChatVertex(StatelessVertex):
class ChatVertex(Vertex):
def __init__(self, data: Dict, graph):
super().__init__(data, graph=graph, base_type="custom_components", is_task=True)
self.steps = [self._build, self._run]
@ -459,7 +460,7 @@ class ChatVertex(StatelessVertex):
return self.vertex_type == InterfaceComponentTypes.ChatInput and self.is_input
class RoutingVertex(StatelessVertex):
class RoutingVertex(Vertex):
def __init__(self, data: Dict, graph):
super().__init__(data, graph=graph, base_type="custom_components")
self.use_result = True
@ -494,6 +495,18 @@ class RoutingVertex(StatelessVertex):
self._built_result = None
class StateVertex(Vertex):
def __init__(self, data: Dict, graph):
super().__init__(data, graph=graph, base_type="custom_components")
self.steps = [self._build]
self.is_state = True
@property
def successors_ids(self) -> List[str]:
successors = self.graph.successor_map.get(self.id, [])
return successors + self.graph.activated_vertices
def dict_to_codeblock(d: dict) -> str:
serialized = {key: serialize_field(val) for key, val in d.items()}
json_str = json.dumps(serialized, indent=4)

View file

@ -14,7 +14,6 @@ from uuid import UUID
import yaml
from cachetools import TTLCache, cachedmethod
from fastapi import HTTPException
from langchain_core.documents import Document
from pydantic import BaseModel
from sqlmodel import select
@ -76,6 +75,28 @@ class CustomComponent(Component):
"""The status of the component. This is displayed on the frontend. Defaults to None."""
_flows_records: Optional[List[Record]] = None
def update_state(self, name: str, value: Any):
try:
self.vertex.graph.update_state(
name=name, record=value, caller=self.vertex.id
)
except Exception as e:
raise ValueError(f"Error updating state: {e}")
def append_state(self, name: str, value: Any):
try:
self.vertex.graph.append_state(
name=name, record=value, caller=self.vertex.id
)
except Exception as e:
raise ValueError(f"Error appending state: {e}")
def get_state(self, name: str):
try:
return self.vertex.graph.get_state(name=name)
except Exception as e:
raise ValueError(f"Error getting state: {e}")
_tree: Optional[dict] = None
def __init__(self, **data):
@ -200,18 +221,7 @@ class CustomComponent(Component):
args = build_method["args"]
for arg in args:
if arg.get("type") == "prompt":
raise HTTPException(
status_code=400,
detail={
"error": "Type hint Error",
"traceback": (
"Prompt type is not supported in the build method."
" Try using PromptTemplate instead."
),
},
)
elif not arg.get("type") and arg.get("name") != "self":
if not arg.get("type") and arg.get("name") != "self":
# Set the type to Data
arg["type"] = "Data"
return args

View file

@ -1,8 +1,9 @@
from cachetools import LRUCache, cached
from langflow.interface.agents.base import agent_creator
from langflow.interface.chains.base import chain_creator
from langflow.interface.custom.directory_reader.utils import merge_nested_dicts_with_renaming
from langflow.interface.custom.directory_reader.utils import (
merge_nested_dicts_with_renaming,
)
from langflow.interface.custom.utils import build_custom_components
from langflow.interface.document_loaders.base import documentloader_creator
from langflow.interface.embeddings.base import embedding_creator
@ -13,7 +14,6 @@ from langflow.interface.retrievers.base import retriever_creator
from langflow.interface.text_splitters.base import textsplitter_creator
from langflow.interface.toolkits.base import toolkits_creator
from langflow.interface.tools.base import tool_creator
from langflow.interface.utilities.base import utility_creator
from langflow.interface.wrappers.base import wrapper_creator
@ -48,7 +48,7 @@ def build_langchain_types_dict(): # sourcery skip: dict-assign-update-to-union
# vectorstore_creator,
documentloader_creator,
textsplitter_creator,
utility_creator,
# utility_creator,
output_parser_creator,
retriever_creator,
]
@ -66,4 +66,6 @@ def get_all_types_dict(settings_service):
"""Get all types dictionary combining native and custom components."""
native_components = build_langchain_types_dict()
custom_components_from_file = build_custom_components(settings_service)
return merge_nested_dicts_with_renaming(native_components, custom_components_from_file)
return merge_nested_dicts_with_renaming(
native_components, custom_components_from_file
)

View file

@ -126,7 +126,9 @@ async def process_runnable(runnable: Runnable, inputs: Union[dict, List[dict]]):
elif isinstance(inputs, dict) and hasattr(runnable, "ainvoke"):
result = await runnable.ainvoke(inputs)
else:
raise ValueError(f"Runnable {runnable} does not support inputs of type {type(inputs)}")
raise ValueError(
f"Runnable {runnable} does not support inputs of type {type(inputs)}"
)
# Check if the result is a list of AIMessages
if isinstance(result, list) and all(isinstance(r, AIMessage) for r in result):
result = [r.content for r in result]
@ -135,7 +137,9 @@ async def process_runnable(runnable: Runnable, inputs: Union[dict, List[dict]]):
return result
async def process_inputs_dict(built_object: Union[Chain, VectorStore, Runnable], inputs: dict):
async def process_inputs_dict(
built_object: Union[Chain, VectorStore, Runnable], inputs: dict
):
if isinstance(built_object, Chain):
if inputs is None:
raise ValueError("Inputs must be provided for a Chain")
@ -170,7 +174,9 @@ async def process_inputs_list(built_object: Runnable, inputs: List[dict]):
return await process_runnable(built_object, inputs)
async def generate_result(built_object: Union[Chain, VectorStore, Runnable], inputs: Union[dict, List[dict]]):
async def generate_result(
built_object: Union[Chain, VectorStore, Runnable], inputs: Union[dict, List[dict]]
):
if isinstance(inputs, dict):
result = await process_inputs_dict(built_object, inputs)
elif isinstance(inputs, List) and isinstance(built_object, Runnable):
@ -198,6 +204,7 @@ async def run_graph(
stream: bool,
session_id: Optional[str] = None,
inputs: Optional[dict[str, Union[List[str], str]]] = None,
outputs: Optional[List[str]] = None,
artifacts: Optional[Dict[str, Any]] = None,
session_service: Optional[SessionService] = None,
):
@ -208,24 +215,35 @@ async def run_graph(
else:
graph_data = graph._graph_data
if not session_id and session_service is not None:
session_id = session_service.generate_key(session_id=flow_id, data_graph=graph_data)
session_id = session_service.generate_key(
session_id=flow_id, data_graph=graph_data
)
if inputs is None:
inputs = {}
outputs = await graph.run(inputs, stream=stream)
outputs = await graph.run(
inputs,
outputs,
stream=stream,
session_id=session_id,
)
if session_id and session_service:
session_service.update_session(session_id, (graph, artifacts))
return outputs, session_id
def validate_input(graph_data: Dict[str, Any], tweaks: Dict[str, Dict[str, Any]]) -> List[Dict[str, Any]]:
def validate_input(
graph_data: Dict[str, Any], tweaks: Dict[str, Dict[str, Any]]
) -> List[Dict[str, Any]]:
if not isinstance(graph_data, dict) or not isinstance(tweaks, dict):
raise ValueError("graph_data and tweaks should be dictionaries")
nodes = graph_data.get("data", {}).get("nodes") or graph_data.get("nodes")
if not isinstance(nodes, list):
raise ValueError("graph_data should contain a list of nodes under 'data' key or directly under 'nodes' key")
raise ValueError(
"graph_data should contain a list of nodes under 'data' key or directly under 'nodes' key"
)
return nodes
@ -234,7 +252,9 @@ def apply_tweaks(node: Dict[str, Any], node_tweaks: Dict[str, Any]) -> None:
template_data = node.get("data", {}).get("node", {}).get("template")
if not isinstance(template_data, dict):
logger.warning(f"Template data for node {node.get('id')} should be a dictionary")
logger.warning(
f"Template data for node {node.get('id')} should be a dictionary"
)
return
for tweak_name, tweak_value in node_tweaks.items():
@ -249,28 +269,33 @@ def apply_tweaks_on_vertex(vertex: Vertex, node_tweaks: Dict[str, Any]) -> None:
vertex.params[tweak_name] = tweak_value
def process_tweaks(graph_data: Dict[str, Any], tweaks: Dict[str, Dict[str, Any]]) -> Dict[str, Any]:
def process_tweaks(
graph_data: Dict[str, Any], tweaks: Dict[str, Dict[str, Any]]
) -> Dict[str, Any]:
"""
This function is used to tweak the graph data using the node id and the tweaks dict.
:param graph_data: The dictionary containing the graph data. It must contain a 'data' key with
'nodes' as its child or directly contain 'nodes' key. Each node should have an 'id' and 'data'.
:param tweaks: A dictionary where the key is the node id and the value is a dictionary of the tweaks.
The inner dictionary contains the name of a certain parameter as the key and the value to be tweaked.
:param tweaks: The dictionary containing the tweaks. The keys can be the node id or the name of the tweak.
The values can be a dictionary containing the tweaks for the node or the value of the tweak.
:return: The modified graph_data dictionary.
:raises ValueError: If the input is not in the expected format.
"""
nodes = validate_input(graph_data, tweaks)
nodes_map = {node.get("id"): node for node in nodes}
all_nodes_tweaks = {}
for key, value in tweaks.items():
if isinstance(value, dict):
if node := nodes_map.get(key):
apply_tweaks(node, value)
else:
all_nodes_tweaks[key] = value
for node in nodes:
if isinstance(node, dict) and isinstance(node.get("id"), str):
node_id = node["id"]
if node_tweaks := tweaks.get(node_id):
apply_tweaks(node, node_tweaks)
else:
logger.warning("Each node should be a dictionary with an 'id' key of type str")
apply_tweaks(node, all_nodes_tweaks)
return graph_data
@ -282,6 +307,8 @@ def process_tweaks_on_graph(graph: Graph, tweaks: Dict[str, Dict[str, Any]]):
if node_tweaks := tweaks.get(node_id):
apply_tweaks_on_vertex(vertex, node_tweaks)
else:
logger.warning("Each node should be a Vertex with an 'id' attribute of type str")
logger.warning(
"Each node should be a Vertex with an 'id' attribute of type str"
)
return graph

View file

@ -58,13 +58,17 @@ class Settings(BaseSettings):
STORE: Optional[bool] = True
STORE_URL: Optional[str] = "https://api.langflow.store"
DOWNLOAD_WEBHOOK_URL: Optional[
str
] = "https://api.langflow.store/flows/trigger/ec611a61-8460-4438-b187-a4f65e5559d4"
LIKE_WEBHOOK_URL: Optional[str] = "https://api.langflow.store/flows/trigger/64275852-ec00-45c1-984e-3bff814732da"
DOWNLOAD_WEBHOOK_URL: Optional[str] = (
"https://api.langflow.store/flows/trigger/ec611a61-8460-4438-b187-a4f65e5559d4"
)
LIKE_WEBHOOK_URL: Optional[str] = (
"https://api.langflow.store/flows/trigger/64275852-ec00-45c1-984e-3bff814732da"
)
STORAGE_TYPE: str = "local"
CELERY_ENABLED: bool = False
@validator("CONFIG_DIR", pre=True, allow_reuse=True)
def set_langflow_dir(cls, value):
if not value:
@ -91,7 +95,9 @@ class Settings(BaseSettings):
@validator("DATABASE_URL", pre=True)
def set_database_url(cls, value, values):
if not value:
logger.debug("No database_url provided, trying LANGFLOW_DATABASE_URL env variable")
logger.debug(
"No database_url provided, trying LANGFLOW_DATABASE_URL env variable"
)
if langflow_database_url := os.getenv("LANGFLOW_DATABASE_URL"):
value = langflow_database_url
logger.debug("Using LANGFLOW_DATABASE_URL env variable.")
@ -101,7 +107,9 @@ class Settings(BaseSettings):
# so we need to migrate to the new format
# if there is a database in that location
if not values["CONFIG_DIR"]:
raise ValueError("CONFIG_DIR not set, please set it or provide a DATABASE_URL")
raise ValueError(
"CONFIG_DIR not set, please set it or provide a DATABASE_URL"
)
new_path = f"{values['CONFIG_DIR']}/langflow.db"
if Path("./langflow.db").exists():
@ -125,15 +133,22 @@ class Settings(BaseSettings):
if os.getenv("LANGFLOW_COMPONENTS_PATH"):
logger.debug("Adding LANGFLOW_COMPONENTS_PATH to components_path")
langflow_component_path = os.getenv("LANGFLOW_COMPONENTS_PATH")
if Path(langflow_component_path).exists() and langflow_component_path not in value:
if (
Path(langflow_component_path).exists()
and langflow_component_path not in value
):
if isinstance(langflow_component_path, list):
for path in langflow_component_path:
if path not in value:
value.append(path)
logger.debug(f"Extending {langflow_component_path} to components_path")
logger.debug(
f"Extending {langflow_component_path} to components_path"
)
elif langflow_component_path not in value:
value.append(langflow_component_path)
logger.debug(f"Appending {langflow_component_path} to components_path")
logger.debug(
f"Appending {langflow_component_path} to components_path"
)
if not value:
value = [BASE_COMPONENTS_PATH]
@ -145,7 +160,9 @@ class Settings(BaseSettings):
logger.debug(f"Components path: {value}")
return value
model_config = SettingsConfigDict(validate_assignment=True, extra="ignore", env_prefix="LANGFLOW_")
model_config = SettingsConfigDict(
validate_assignment=True, extra="ignore", env_prefix="LANGFLOW_"
)
# @model_validator()
# @classmethod

View file

@ -7,7 +7,7 @@ from sqlmodel import select
from langflow.api.utils import format_elapsed_time
from langflow.api.v1.schemas import ResultDataResponse, VertexBuildResponse
from langflow.graph.graph.base import Graph
from langflow.graph.vertex.base import StatelessVertex
from langflow.graph.vertex.base import Vertex
from langflow.services.database.models.flow.model import Flow
from langflow.services.deps import get_session
from langflow.services.monitor.utils import log_vertex_build
@ -63,7 +63,7 @@ async def build_vertex(
return
start_time = time.perf_counter()
try:
if isinstance(vertex, StatelessVertex) or not vertex._built:
if isinstance(vertex, Vertex) or not vertex._built:
await vertex.build(user_id=None, session_id=sid)
params = vertex._built_object_repr()
valid = True
@ -96,7 +96,9 @@ async def build_vertex(
)
# Emit the vertex build response
response = VertexBuildResponse(valid=valid, params=params, id=vertex.id, data=result_dict)
response = VertexBuildResponse(
valid=valid, params=params, id=vertex.id, data=result_dict
)
await sio.emit("vertex_build", data=response.model_dump(), to=sid)
except Exception as exc:

View file

@ -1,11 +1,14 @@
from typing import Any, Callable, Coroutine, Union
from typing import TYPE_CHECKING, Any, Callable, Coroutine, Union
from loguru import logger
from langflow.services.base import Service
from langflow.services.task.backends.anyio import AnyIOBackend
from langflow.services.task.backends.base import TaskBackend
from langflow.services.task.utils import get_celery_worker_status
from langflow.utils.logger import configure
from loguru import logger
if TYPE_CHECKING:
from langflow.services.settings.service import SettingsService
def check_celery_availability():
@ -20,28 +23,31 @@ def check_celery_availability():
return status
try:
configure()
status = check_celery_availability()
USE_CELERY = status.get("availability") is not None
except ImportError:
USE_CELERY = False
class TaskService(Service):
name = "task_service"
def __init__(self):
self.backend = self.get_backend()
def __init__(self, settings_service: "SettingsService"):
self.settings_service = settings_service
try:
if self.settings_service.settings.CELERY_ENABLED:
USE_CELERY = True
status = check_celery_availability()
USE_CELERY = status.get("availability") is not None
else:
USE_CELERY = False
except ImportError:
USE_CELERY = False
self.use_celery = USE_CELERY
self.backend = self.get_backend()
@property
def backend_name(self) -> str:
return self.backend.name
def get_backend(self) -> TaskBackend:
if USE_CELERY:
if self.use_celery:
from langflow.services.task.backends.celery import CeleryBackend
logger.debug("Using Celery backend")
@ -68,7 +74,9 @@ class TaskService(Service):
result = await result
return task.id, result
async def launch_task(self, task_func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any:
async def launch_task(
self, task_func: Callable[..., Any], *args: Any, **kwargs: Any
) -> Any:
logger.debug(f"Launching task {task_func} with args {args} and kwargs {kwargs}")
logger.debug(f"Using backend {self.backend}")
task = self.backend.launch_task(task_func, *args, **kwargs)

View file

@ -18,7 +18,6 @@ import { getHealth } from "./controllers/API";
import Router from "./routes";
import useAlertStore from "./stores/alertStore";
import { useDarkStore } from "./stores/darkStore";
import useFlowStore from "./stores/flowStore";
import useFlowsManagerStore from "./stores/flowsManagerStore";
import { useStoreStore } from "./stores/storeStore";
import { useTypesStore } from "./stores/typesStore";
@ -70,7 +69,7 @@ export default function App() {
.catch(() => {
setFetchError(true);
});
}, 20000);
}, 20000); // 20 seconds
// Clean up the timer on component unmount
return () => {

View file

@ -450,9 +450,10 @@ const useFlowStore = create<FlowStoreType>((set, get) => ({
status: BuildStatus,
runId: string
) {
if (vertexBuildData && vertexBuildData.inactive_vertices) {
get().removeFromVerticesBuild(vertexBuildData.inactive_vertices);
if (vertexBuildData && vertexBuildData.inactivated_vertices) {
get().removeFromVerticesBuild(vertexBuildData.inactivated_vertices);
}
if (vertexBuildData.next_vertices_ids) {
// next_vertices_ids is a list of vertices that are going to be built next
// verticesLayers is a list of list of vertices ids, where each list is a layer of vertices
@ -513,6 +514,7 @@ const useFlowStore = create<FlowStoreType>((set, get) => ({
get().setIsBuilding(false);
},
onBuildStart: (idList) => {
console.log("onBuildStart", idList);
useFlowStore.getState().updateBuildStatus(idList, BuildStatus.BUILDING);
},
validateNodes: validateSubgraph,
@ -538,6 +540,16 @@ const useFlowStore = create<FlowStoreType>((set, get) => ({
set({ verticesBuild: vertices });
},
verticesBuild: null,
addToVerticesBuild: (vertices: string[]) => {
const verticesBuild = get().verticesBuild;
if (!verticesBuild) return;
set({
verticesBuild: {
...verticesBuild,
verticesIds: [...verticesBuild.verticesIds, ...vertices],
},
});
},
removeFromVerticesBuild: (vertices: string[]) => {
const verticesBuild = get().verticesBuild;
if (!verticesBuild) return;

View file

@ -92,7 +92,6 @@ const useFlowsManagerStore = create<FlowsManagerStoreType>((set, get) => ({
true
);
}
set({ saveLoading: true });
}, 500); // Delay of 500ms because chat message depends on it.
},
saveFlow: (flow: FlowType, silent?: boolean) => {

View file

@ -141,8 +141,8 @@ export type VerticesOrderTypeAPI = {
export type VertexBuildTypeAPI = {
id: string;
inactivated_vertices: Array<string> | null;
next_vertices_ids: Array<string>;
inactive_vertices: Array<string> | null;
run_id: string;
valid: boolean;
params: string;

View file

@ -107,6 +107,7 @@ export type FlowStoreType = {
runId: string;
} | null
) => void;
addToVerticesBuild: (vertices: string[]) => void;
removeFromVerticesBuild: (vertices: string[]) => void;
verticesBuild: {
verticesIds: string[];

View file

@ -32,6 +32,7 @@ function getInactiveVertexData(vertexId: string): VertexBuildTypeAPI {
id: vertexId,
data: inactiveData,
params: "Inactive",
inactivated_vertices: null,
run_id: "",
next_vertices_ids: [],
inactive_vertices: null,