From 62a7dc77d6071a375a23b49cafd0dd8c56a53589 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Thu, 29 Feb 2024 23:39:36 -0300 Subject: [PATCH 01/45] Add state management methods to CustomComponent class --- .../custom_component/custom_component.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/src/backend/langflow/interface/custom/custom_component/custom_component.py b/src/backend/langflow/interface/custom/custom_component/custom_component.py index c3b06f90a..b8ec433f3 100644 --- a/src/backend/langflow/interface/custom/custom_component/custom_component.py +++ b/src/backend/langflow/interface/custom/custom_component/custom_component.py @@ -74,6 +74,24 @@ class CustomComponent(Component): status: Optional[Any] = None """The status of the component. This is displayed on the frontend. Defaults to None.""" + def update_state(self, name: str, value: Any): + try: + self.vertex.graph.state_manager.update_state(key=name, new_state=value) + except Exception as e: + raise ValueError(f"Error updating state: {e}") + + def append_state(self, name: str, value: Any): + try: + self.vertex.graph.state_manager.append_state(key=name, new_state=value) + except Exception as e: + raise ValueError(f"Error appending state: {e}") + + def get_state(self, name: str): + try: + return self.vertex.graph.state_manager.get_state(key=name) + except Exception as e: + raise ValueError(f"Error getting state: {e}") + _tree: Optional[dict] = None def __init__(self, **data): From f3a7570aaddfad29cfbf4834250927fca0c4ebdd Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Thu, 29 Feb 2024 23:39:42 -0300 Subject: [PATCH 02/45] Add SharedState component for state sharing --- .../components/utilities/SharedState.py | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) create mode 100644 src/backend/langflow/components/utilities/SharedState.py diff --git a/src/backend/langflow/components/utilities/SharedState.py b/src/backend/langflow/components/utilities/SharedState.py new file mode 100644 index 000000000..7d29da9bb --- /dev/null +++ b/src/backend/langflow/components/utilities/SharedState.py @@ -0,0 +1,38 @@ +from typing import Union + +from langflow import CustomComponent +from langflow.field_typing import Text +from langflow.schema import Record + + +class SharedState(CustomComponent): + display_name = "Shared State" + description = "A component to share state between components." + + def build_config(self): + return { + "name": {"display_name": "Name", "info": "The name of the state."}, + "record": {"display_name": "Record", "info": "The record to store."}, + "append": { + "display_name": "Append", + "info": "If True, the record will be appended to the state.", + }, + } + + def build( + self, name: str, record: Union[Text, Record], append: bool = False + ) -> Record: + if append: + self.append_state(name, record) + else: + self.update_state(name, record) + + state = self.get_state(name) + if not isinstance(state, Record): + if isinstance(state, str): + state = Record(text=state) + elif isinstance(state, dict): + state = Record(data=state) + else: + state = Record(text=str(state)) + return state From 1ce317c252246f9d1ede2755a4ae5ba2ce5785ae Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Fri, 1 Mar 2024 07:35:46 -0300 Subject: [PATCH 03/45] Fix duplicate edges in Graph class --- src/backend/langflow/graph/graph/base.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/backend/langflow/graph/graph/base.py b/src/backend/langflow/graph/graph/base.py index 453076e1b..245a8a4ae 100644 --- a/src/backend/langflow/graph/graph/base.py +++ b/src/backend/langflow/graph/graph/base.py @@ -534,14 +534,21 @@ class Graph: # if we can't find a vertex, we raise an error edges: List[ContractEdge] = [] + edges_added = set() for edge in self._edges: source = self.get_vertex(edge["source"]) target = self.get_vertex(edge["target"]) + if source is None: raise ValueError(f"Source vertex {edge['source']} not found") if target is None: raise ValueError(f"Target vertex {edge['target']} not found") + + if (source.id, target.id) in edges_added: + continue + edges.append(ContractEdge(source, target, edge)) + edges_added.add((source.id, target.id)) return edges def _get_vertex_class( From 74b723c4da2c748ae251bc87db5a879d494a3699 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Fri, 1 Mar 2024 17:13:20 -0300 Subject: [PATCH 04/45] Refactor variable names and fix typo in code --- src/backend/langflow/api/v1/chat.py | 10 +++++----- src/backend/langflow/api/v1/schemas.py | 6 ++++-- src/backend/langflow/graph/graph/base.py | 8 ++++---- src/backend/langflow/graph/vertex/base.py | 6 +++--- src/frontend/src/stores/flowStore.ts | 16 ++++++++++++---- src/frontend/src/types/api/index.ts | 2 +- src/frontend/src/utils/buildUtils.ts | 16 +++++++--------- 7 files changed, 36 insertions(+), 28 deletions(-) diff --git a/src/backend/langflow/api/v1/chat.py b/src/backend/langflow/api/v1/chat.py index 258277bca..7126c19e3 100644 --- a/src/backend/langflow/api/v1/chat.py +++ b/src/backend/langflow/api/v1/chat.py @@ -153,14 +153,14 @@ async def build_vertex( result_data_response.duration = duration result_data_response.timedelta = timedelta vertex.add_build_time(timedelta) - inactive_vertices = None - if graph.inactive_vertices: - inactive_vertices = list(graph.inactive_vertices) - graph.reset_inactive_vertices() + inactivated_vertices = None + if graph.inactivated_vertices: + inactivated_vertices = list(graph.inactivated_vertices) + graph.reset_inactivated_vertices() chat_service.set_cache(flow_id, graph) build_response = VertexBuildResponse( - inactive_vertices=inactive_vertices, + inactivated_vertices=inactivated_vertices, valid=valid, params=params, id=vertex.id, diff --git a/src/backend/langflow/api/v1/schemas.py b/src/backend/langflow/api/v1/schemas.py index 80d40007b..044d98507 100644 --- a/src/backend/langflow/api/v1/schemas.py +++ b/src/backend/langflow/api/v1/schemas.py @@ -158,7 +158,9 @@ class StreamData(BaseModel): data: dict def __str__(self) -> str: - return f"event: {self.event}\ndata: {orjson_dumps(self.data, indent_2=False)}\n\n" + return ( + f"event: {self.event}\ndata: {orjson_dumps(self.data, indent_2=False)}\n\n" + ) class CustomComponentCode(BaseModel): @@ -227,7 +229,7 @@ class ResultDataResponse(BaseModel): class VertexBuildResponse(BaseModel): id: Optional[str] = None - inactive_vertices: Optional[List[str]] = None + inactivated_vertices: Optional[List[str]] = None valid: bool params: Optional[str] """JSON string of the params.""" diff --git a/src/backend/langflow/graph/graph/base.py b/src/backend/langflow/graph/graph/base.py index 245a8a4ae..0b5af21f6 100644 --- a/src/backend/langflow/graph/graph/base.py +++ b/src/backend/langflow/graph/graph/base.py @@ -54,7 +54,7 @@ class Graph: self._vertices = self._graph_data["nodes"] self._edges = self._graph_data["edges"] - self.inactive_vertices: set = set() + self.inactivated_vertices: set = set() self.edges: List[ContractEdge] = [] self.vertices: List[Vertex] = [] self._build_graph() @@ -137,7 +137,7 @@ class Graph: return { "runs": self._runs, "updates": self._updates, - "inactive_vertices": self.inactive_vertices, + "inactivated_vertices": self.inactivated_vertices, } def build_graph_maps(self): @@ -145,8 +145,8 @@ class Graph: self.in_degree_map = self.build_in_degree() self.parent_child_map = self.build_parent_child_map() - def reset_inactive_vertices(self): - self.inactive_vertices = set() + def reset_inactivated_vertices(self): + self.inactivated_vertices = set() def mark_all_vertices(self, state: str): """Marks all vertices in the graph.""" diff --git a/src/backend/langflow/graph/vertex/base.py b/src/backend/langflow/graph/vertex/base.py index 6425f5e08..a550b5b0f 100644 --- a/src/backend/langflow/graph/vertex/base.py +++ b/src/backend/langflow/graph/vertex/base.py @@ -103,12 +103,12 @@ class Vertex: ): # If the vertex is inactive and has only one in degree # it means that it is not a merge point in the graph - self.graph.inactive_vertices.add(self.id) + self.graph.inactivated_vertices.add(self.id) elif ( self.state == VertexStates.ACTIVE - and self.id in self.graph.inactive_vertices + and self.id in self.graph.inactivated_vertices ): - self.graph.inactive_vertices.remove(self.id) + self.graph.inactivated_vertices.remove(self.id) @property def avg_build_time(self): diff --git a/src/frontend/src/stores/flowStore.ts b/src/frontend/src/stores/flowStore.ts index 1725de2a0..9ee25623b 100644 --- a/src/frontend/src/stores/flowStore.ts +++ b/src/frontend/src/stores/flowStore.ts @@ -219,8 +219,16 @@ const useFlowStore = create((set, get) => ({ ); }, paste: (selection, position) => { - if(selection.nodes.some((node) => node.data.type === "ChatInput") && checkChatInput(get().nodes)){ - useAlertStore.getState().setErrorData({title: "Error pasting components", list: ["You can only have one ChatInput component in the flow"]}); + if ( + selection.nodes.some((node) => node.data.type === "ChatInput") && + checkChatInput(get().nodes) + ) { + useAlertStore + .getState() + .setErrorData({ + title: "Error pasting components", + list: ["You can only have one ChatInput component in the flow"], + }); return; } let minimumX = Infinity; @@ -440,8 +448,8 @@ const useFlowStore = create((set, get) => ({ status: BuildStatus, buildId: string ) { - if (vertexBuildData && vertexBuildData.inactive_vertices) { - get().removeFromVerticesBuild(vertexBuildData.inactive_vertices); + if (vertexBuildData && vertexBuildData.inactivated_vertices) { + get().removeFromVerticesBuild(vertexBuildData.inactivated_vertices); } get().addDataToFlowPool( { ...vertexBuildData, buildId }, diff --git a/src/frontend/src/types/api/index.ts b/src/frontend/src/types/api/index.ts index 6663692dc..27e5946b3 100644 --- a/src/frontend/src/types/api/index.ts +++ b/src/frontend/src/types/api/index.ts @@ -140,7 +140,7 @@ export type VerticesOrderTypeAPI = { export type VertexBuildTypeAPI = { id: string; - inactive_vertices: Array | null; + inactivated_vertices: Array | null; valid: boolean; params: string; data: VertexDataTypeAPI; diff --git a/src/frontend/src/utils/buildUtils.ts b/src/frontend/src/utils/buildUtils.ts index e5003caf2..1ce675eec 100644 --- a/src/frontend/src/utils/buildUtils.ts +++ b/src/frontend/src/utils/buildUtils.ts @@ -31,7 +31,7 @@ function getInactiveVertexData(vertexId: string): VertexBuildTypeAPI { id: vertexId, data: inactiveData, params: "Inactive", - inactive_vertices: null, + inactivated_vertices: null, valid: false, timestamp: new Date().toISOString(), }; @@ -85,14 +85,12 @@ export async function updateVerticesOrder( } const verticesIds = verticesLayers.flat(); - useFlowStore - .getState() - .updateVerticesBuild({ - verticesLayers, - verticesIds, - verticesOrder, - runId, - }); + useFlowStore.getState().updateVerticesBuild({ + verticesLayers, + verticesIds, + verticesOrder, + runId, + }); resolve({ verticesLayers, verticesIds, verticesOrder, runId }); }); } From 94a00c0c5ecb3ddb3b406f7c57cf1541d1e980bc Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Fri, 1 Mar 2024 23:56:35 -0300 Subject: [PATCH 05/45] Refactor build_vertex function to reset both inactivated and activated vertices --- src/backend/langflow/api/v1/chat.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/backend/langflow/api/v1/chat.py b/src/backend/langflow/api/v1/chat.py index 7126c19e3..399098d47 100644 --- a/src/backend/langflow/api/v1/chat.py +++ b/src/backend/langflow/api/v1/chat.py @@ -154,13 +154,15 @@ async def build_vertex( result_data_response.timedelta = timedelta vertex.add_build_time(timedelta) inactivated_vertices = None - if graph.inactivated_vertices: - inactivated_vertices = list(graph.inactivated_vertices) - graph.reset_inactivated_vertices() + inactivated_vertices = list(graph.inactivated_vertices) + graph.reset_inactivated_vertices() + activated_vertices = list(graph.activated_vertices) + graph.reset_activated_vertices() chat_service.set_cache(flow_id, graph) build_response = VertexBuildResponse( inactivated_vertices=inactivated_vertices, + activated_vertices=activated_vertices, valid=valid, params=params, id=vertex.id, From 3d4ab248588712240ce6653afbf0607623bc3f6a Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Fri, 1 Mar 2024 23:57:01 -0300 Subject: [PATCH 06/45] Add activated_vertices to VertexBuildResponse and update state management in Graph --- src/backend/langflow/api/v1/schemas.py | 1 + src/backend/langflow/graph/graph/base.py | 50 ++++++++++++ src/backend/langflow/graph/vertex/base.py | 26 +++--- src/backend/langflow/graph/vertex/types.py | 92 +++++++++++++++------- 4 files changed, 125 insertions(+), 44 deletions(-) diff --git a/src/backend/langflow/api/v1/schemas.py b/src/backend/langflow/api/v1/schemas.py index 044d98507..88610d637 100644 --- a/src/backend/langflow/api/v1/schemas.py +++ b/src/backend/langflow/api/v1/schemas.py @@ -230,6 +230,7 @@ class ResultDataResponse(BaseModel): class VertexBuildResponse(BaseModel): id: Optional[str] = None inactivated_vertices: Optional[List[str]] = None + activated_vertices: Optional[List[str]] = None valid: bool params: Optional[str] """JSON string of the params.""" diff --git a/src/backend/langflow/graph/graph/base.py b/src/backend/langflow/graph/graph/base.py index 0b5af21f6..558fac152 100644 --- a/src/backend/langflow/graph/graph/base.py +++ b/src/backend/langflow/graph/graph/base.py @@ -16,9 +16,11 @@ from langflow.graph.vertex.types import ( FileToolVertex, LLMVertex, RoutingVertex, + StateVertex, ToolkitVertex, ) from langflow.interface.tools.constants import FILE_TOOLS +from langflow.schema import Record from langflow.utils import payload if TYPE_CHECKING: @@ -55,6 +57,7 @@ class Graph: self._vertices = self._graph_data["nodes"] self._edges = self._graph_data["edges"] self.inactivated_vertices: set = set() + self.activated_vertices: set = set() self.edges: List[ContractEdge] = [] self.vertices: List[Vertex] = [] self._build_graph() @@ -62,6 +65,37 @@ class Graph: self.define_vertices_lists() self.state_manager = GraphStateManager() + def update_state( + self, name: str, record: Union[str, Record], caller: Optional[str] = None + ) -> None: + """Updates the state of the graph.""" + if caller: + # If there is a caller which is a vertex_id, I want to activate + # all StateVertex in self.vertices that are not the caller + # essentially notifying all the other vertices that the state has changed + # This also has to activate their successors + caller_vertex = self.get_vertex(caller) + for vertex in self.vertices: + if vertex.id != caller and isinstance(vertex, StateVertex): + successors = self.get_all_successors(vertex) + self.activated_vertices.add(vertex.id) + for successor in successors: + self.activated_vertices.add(successor.id) + + self.state_manager.update_state(name, record) + + def reset_activated_vertices(self): + self.activated_vertices = set() + + def append_state( + self, name: str, record: Union[str, Record], caller: Optional[str] = None + ) -> None: + """Appends the state of the graph.""" + if caller: + self.state_manager.subscribe(name, caller) + + self.state_manager.append_state(name, record) + def set_run_id(self, run_id: str): for vertex in self.vertices: self.state_manager.subscribe(run_id, vertex.update_graph_state) @@ -500,6 +534,20 @@ class Graph: for source_id in self.predecessor_map.get(vertex.id, []) ] + def get_all_successors(self, vertex, recursive=True): + # Recursively get the successors of the current vertex + successors = vertex.successors + if not successors: + return [] + successors_result = [] + for successor in successors: + # Just return a list of successors + if recursive: + next_successors = self.get_all_successors(successor) + successors_result.extend(next_successors) + successors_result.append(successor) + return successors_result + def get_successors(self, vertex): """Returns the successors of a vertex.""" return [ @@ -561,6 +609,8 @@ class Graph: return ChatVertex elif node_name in ["ShouldRunNext"]: return RoutingVertex + elif node_name in ["SharedState"]: + return StateVertex elif node_base_type in lazy_load_vertex_dict.VERTEX_TYPE_MAP: return lazy_load_vertex_dict.VERTEX_TYPE_MAP[node_base_type] elif node_name in lazy_load_vertex_dict.VERTEX_TYPE_MAP: diff --git a/src/backend/langflow/graph/vertex/base.py b/src/backend/langflow/graph/vertex/base.py index a550b5b0f..406652388 100644 --- a/src/backend/langflow/graph/vertex/base.py +++ b/src/backend/langflow/graph/vertex/base.py @@ -88,12 +88,9 @@ class Vertex: def update_graph_state(self, key, new_state, append: bool): if append: - if key in self.graph_state: - self.graph_state[key].append(new_state) - else: - self.graph_state[key] = [new_state] + self.graph.append_state(key, new_state, caller=self.id) else: - self.graph_state[key] = new_state + self.graph.update_state(key, new_state, caller=self.id) def set_state(self, state: str): self.state = VertexStates[state] @@ -511,7 +508,16 @@ class Vertex: self.params[key] = [] self.params[key].extend(built) else: - self.params[key].append(built) + try: + if self.params[key] == built: + continue + + self.params[key].append(built) + except AttributeError as e: + logger.exception(e) + raise ValueError( + f"Params {key} ({self.params[key]}) is not a list and cannot be extended with {built}" + ) from e def _handle_func(self, key, result): """ @@ -670,11 +676,3 @@ class Vertex: if self._built_object is not None else "Failed to build 😵‍💫" ) - - -class StatefulVertex(Vertex): - pass - - -class StatelessVertex(Vertex): - pass diff --git a/src/backend/langflow/graph/vertex/types.py b/src/backend/langflow/graph/vertex/types.py index c4f33df40..ba1cd2998 100644 --- a/src/backend/langflow/graph/vertex/types.py +++ b/src/backend/langflow/graph/vertex/types.py @@ -1,6 +1,7 @@ import ast import json -from typing import AsyncIterator, Callable, Dict, Iterator, List, Optional, Union +from typing import (AsyncIterator, Callable, Dict, Iterator, List, Optional, + Union) import yaml from langchain_core.messages import AIMessage @@ -8,14 +9,14 @@ from loguru import logger from langflow.graph.schema import INPUT_FIELD_NAME from langflow.graph.utils import UnbuiltObject, flatten_list, serialize_field -from langflow.graph.vertex.base import StatefulVertex, StatelessVertex +from langflow.graph.vertex.base import Vertex from langflow.interface.utils import extract_input_variables_from_prompt from langflow.schema import Record from langflow.services.monitor.utils import log_vertex_build from langflow.utils.schemas import ChatOutputResponse -class AgentVertex(StatelessVertex): +class AgentVertex(Vertex): def __init__(self, data: Dict, graph, params: Optional[Dict] = None): super().__init__(data, graph=graph, base_type="agents", params=params) @@ -58,12 +59,12 @@ class AgentVertex(StatelessVertex): await self._build(user_id=user_id) -class ToolVertex(StatelessVertex): +class ToolVertex(Vertex): def __init__(self, data: Dict, graph, params: Optional[Dict] = None): super().__init__(data, graph=graph, base_type="tools", params=params) -class LLMVertex(StatelessVertex): +class LLMVertex(Vertex): built_node_type = None class_built_object = None @@ -86,7 +87,7 @@ class LLMVertex(StatelessVertex): self.class_built_object = self._built_object -class ToolkitVertex(StatelessVertex): +class ToolkitVertex(Vertex): def __init__(self, data: Dict, graph, params=None): super().__init__(data, graph=graph, base_type="toolkits", params=params) @@ -100,7 +101,7 @@ class FileToolVertex(ToolVertex): ) -class WrapperVertex(StatelessVertex): +class WrapperVertex(Vertex): def __init__(self, data: Dict, graph, params=None): super().__init__(data, graph=graph, base_type="wrappers") self.steps: List[Callable] = [self._custom_build] @@ -114,7 +115,7 @@ class WrapperVertex(StatelessVertex): await self._build(user_id=user_id) -class DocumentLoaderVertex(StatefulVertex): +class DocumentLoaderVertex(Vertex): def __init__(self, data: Dict, graph, params: Optional[Dict] = None): super().__init__(data, graph=graph, base_type="documentloaders", params=params) @@ -123,21 +124,23 @@ class DocumentLoaderVertex(StatefulVertex): # show how many documents are in the list? if not isinstance(self._built_object, UnbuiltObject): - avg_length = sum(len(doc.page_content) for doc in self._built_object if hasattr(doc, "page_content")) / len( - self._built_object - ) + avg_length = sum( + len(doc.page_content) + for doc in self._built_object + if hasattr(doc, "page_content") + ) / len(self._built_object) return f"""{self.display_name}({len(self._built_object)} documents) \nAvg. Document Length (characters): {int(avg_length)} Documents: {self._built_object[:3]}...""" return f"{self.vertex_type}()" -class EmbeddingVertex(StatefulVertex): +class EmbeddingVertex(Vertex): def __init__(self, data: Dict, graph, params: Optional[Dict] = None): super().__init__(data, graph=graph, base_type="embeddings", params=params) -class VectorStoreVertex(StatefulVertex): +class VectorStoreVertex(Vertex): def __init__(self, data: Dict, graph, params=None): super().__init__(data, graph=graph, base_type="vectorstores") @@ -179,17 +182,17 @@ class VectorStoreVertex(StatefulVertex): self.remove_docs_and_texts_from_params() -class MemoryVertex(StatefulVertex): +class MemoryVertex(Vertex): def __init__(self, data: Dict, graph): super().__init__(data, graph=graph, base_type="memory") -class RetrieverVertex(StatefulVertex): +class RetrieverVertex(Vertex): def __init__(self, data: Dict, graph): super().__init__(data, graph=graph, base_type="retrievers") -class TextSplitterVertex(StatefulVertex): +class TextSplitterVertex(Vertex): def __init__(self, data: Dict, graph, params: Optional[Dict] = None): super().__init__(data, graph=graph, base_type="textsplitters", params=params) @@ -198,14 +201,16 @@ class TextSplitterVertex(StatefulVertex): # show how many documents are in the list? if not isinstance(self._built_object, UnbuiltObject): - avg_length = sum(len(doc.page_content) for doc in self._built_object) / len(self._built_object) + avg_length = sum(len(doc.page_content) for doc in self._built_object) / len( + self._built_object + ) return f"""{self.vertex_type}({len(self._built_object)} documents) \nAvg. Document Length (characters): {int(avg_length)} \nDocuments: {self._built_object[:3]}...""" return f"{self.vertex_type}()" -class ChainVertex(StatelessVertex): +class ChainVertex(Vertex): def __init__(self, data: Dict, graph): super().__init__(data, graph=graph, base_type="chains") self.steps = [self._custom_build] @@ -235,7 +240,7 @@ class ChainVertex(StatelessVertex): return super()._built_object_repr() -class PromptVertex(StatelessVertex): +class PromptVertex(Vertex): def __init__(self, data: Dict, graph): super().__init__(data, graph=graph, base_type="prompts") self.steps: List[Callable] = [self._custom_build] @@ -245,18 +250,27 @@ class PromptVertex(StatelessVertex): user_id = kwargs.get("user_id", None) tools = kwargs.get("tools", []) if not self._built or force: - if "input_variables" not in self.params or self.params["input_variables"] is None: + if ( + "input_variables" not in self.params + or self.params["input_variables"] is None + ): self.params["input_variables"] = [] # Check if it is a ZeroShotPrompt and needs a tool if "ShotPrompt" in self.vertex_type: - tools = [tool_node.build(user_id=user_id) for tool_node in tools] if tools is not None else [] + tools = ( + [tool_node.build(user_id=user_id) for tool_node in tools] + if tools is not None + else [] + ) # flatten the list of tools if it is a list of lists # first check if it is a list if tools and isinstance(tools, list) and isinstance(tools[0], list): tools = flatten_list(tools) self.params["tools"] = tools prompt_params = [ - key for key, value in self.params.items() if isinstance(value, str) and key != "format_instructions" + key + for key, value in self.params.items() + if isinstance(value, str) and key != "format_instructions" ] else: prompt_params = ["template"] @@ -266,14 +280,20 @@ class PromptVertex(StatelessVertex): prompt_text = self.params[param] variables = extract_input_variables_from_prompt(prompt_text) self.params["input_variables"].extend(variables) - self.params["input_variables"] = list(set(self.params["input_variables"])) + self.params["input_variables"] = list( + set(self.params["input_variables"]) + ) elif isinstance(self.params, dict): self.params.pop("input_variables", None) await self._build(user_id=user_id) def _built_object_repr(self): - if not self.artifacts or self._built_object is None or not hasattr(self._built_object, "format"): + if ( + not self.artifacts + or self._built_object is None + or not hasattr(self._built_object, "format") + ): return super()._built_object_repr() elif isinstance(self._built_object, UnbuiltObject): return super()._built_object_repr() @@ -285,7 +305,9 @@ class PromptVertex(StatelessVertex): # so the prompt format doesn't break artifacts.pop("handle_keys", None) try: - if not hasattr(self._built_object, "template") and hasattr(self._built_object, "prompt"): + if not hasattr(self._built_object, "template") and hasattr( + self._built_object, "prompt" + ): template = self._built_object.prompt.template else: template = self._built_object.template @@ -293,17 +315,21 @@ class PromptVertex(StatelessVertex): if value: replace_key = "{" + key + "}" template = template.replace(replace_key, value) - return template if isinstance(template, str) else f"{self.vertex_type}({template})" + return ( + template + if isinstance(template, str) + else f"{self.vertex_type}({template})" + ) except KeyError: return str(self._built_object) -class OutputParserVertex(StatelessVertex): +class OutputParserVertex(Vertex): def __init__(self, data: Dict, graph): super().__init__(data, graph=graph, base_type="output_parsers") -class CustomComponentVertex(StatelessVertex): +class CustomComponentVertex(Vertex): def __init__(self, data: Dict, graph): super().__init__(data, graph=graph, base_type="custom_components") @@ -312,7 +338,7 @@ class CustomComponentVertex(StatelessVertex): return self.artifacts["repr"] or super()._built_object_repr() -class ChatVertex(StatelessVertex): +class ChatVertex(Vertex): def __init__(self, data: Dict, graph): super().__init__(data, graph=graph, base_type="custom_components", is_task=True) self.steps = [self._build, self._run] @@ -431,7 +457,7 @@ class ChatVertex(StatelessVertex): pass -class RoutingVertex(StatelessVertex): +class RoutingVertex(Vertex): def __init__(self, data: Dict, graph): super().__init__(data, graph=graph, base_type="custom_components") self.use_result = True @@ -457,6 +483,12 @@ class RoutingVertex(StatelessVertex): self._built_result = None +class StateVertex(Vertex): + def __init__(self, data: Dict, graph): + super().__init__(data, graph=graph, base_type="custom_components") + self.steps = [self._build] + + def dict_to_codeblock(d: dict) -> str: serialized = {key: serialize_field(val) for key, val in d.items()} json_str = json.dumps(serialized, indent=4) From 88b2ab328ff87f1ffde993aa2bbdd0a5f30ae3a8 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Fri, 1 Mar 2024 23:57:28 -0300 Subject: [PATCH 07/45] Update state and append state methods in SharedState and CustomComponent --- src/backend/langflow/components/utilities/SharedState.py | 2 +- .../interface/custom/custom_component/custom_component.py | 8 ++++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/backend/langflow/components/utilities/SharedState.py b/src/backend/langflow/components/utilities/SharedState.py index 7d29da9bb..845252dab 100644 --- a/src/backend/langflow/components/utilities/SharedState.py +++ b/src/backend/langflow/components/utilities/SharedState.py @@ -28,7 +28,7 @@ class SharedState(CustomComponent): self.update_state(name, record) state = self.get_state(name) - if not isinstance(state, Record): + if state and not isinstance(state, Record): if isinstance(state, str): state = Record(text=state) elif isinstance(state, dict): diff --git a/src/backend/langflow/interface/custom/custom_component/custom_component.py b/src/backend/langflow/interface/custom/custom_component/custom_component.py index b8ec433f3..fd8cd7cd2 100644 --- a/src/backend/langflow/interface/custom/custom_component/custom_component.py +++ b/src/backend/langflow/interface/custom/custom_component/custom_component.py @@ -76,13 +76,17 @@ class CustomComponent(Component): def update_state(self, name: str, value: Any): try: - self.vertex.graph.state_manager.update_state(key=name, new_state=value) + self.vertex.graph.update_state( + name=name, record=value, caller=self.vertex.id + ) except Exception as e: raise ValueError(f"Error updating state: {e}") def append_state(self, name: str, value: Any): try: - self.vertex.graph.state_manager.append_state(key=name, new_state=value) + self.vertex.graph.append_state( + key=name, record=value, caller=self.vertex.id + ) except Exception as e: raise ValueError(f"Error appending state: {e}") From 9dac7e7635b25f4de3c1ff64932e7634fd5d76d3 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Fri, 1 Mar 2024 23:57:40 -0300 Subject: [PATCH 08/45] Update vertex class in socket utils.py --- src/backend/langflow/services/socket/utils.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/backend/langflow/services/socket/utils.py b/src/backend/langflow/services/socket/utils.py index 4176d081c..a45b85cd6 100644 --- a/src/backend/langflow/services/socket/utils.py +++ b/src/backend/langflow/services/socket/utils.py @@ -7,7 +7,7 @@ from sqlmodel import select from langflow.api.utils import format_elapsed_time from langflow.api.v1.schemas import ResultDataResponse, VertexBuildResponse from langflow.graph.graph.base import Graph -from langflow.graph.vertex.base import StatelessVertex +from langflow.graph.vertex.base import Vertex from langflow.services.database.models.flow.model import Flow from langflow.services.deps import get_session from langflow.services.monitor.utils import log_vertex_build @@ -63,7 +63,7 @@ async def build_vertex( return start_time = time.perf_counter() try: - if isinstance(vertex, StatelessVertex) or not vertex._built: + if isinstance(vertex, Vertex) or not vertex._built: await vertex.build(user_id=None, session_id=sid) params = vertex._built_object_repr() valid = True @@ -96,7 +96,9 @@ async def build_vertex( ) # Emit the vertex build response - response = VertexBuildResponse(valid=valid, params=params, id=vertex.id, data=result_dict) + response = VertexBuildResponse( + valid=valid, params=params, id=vertex.id, data=result_dict + ) await sio.emit("vertex_build", data=response.model_dump(), to=sid) except Exception as exc: From 569be9e0802c14852cde8877d8c26a87137bff15 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Fri, 1 Mar 2024 23:57:59 -0300 Subject: [PATCH 09/45] Refactor buildUtils to handle activated vertices --- src/frontend/src/stores/flowStore.ts | 23 ++++++-- src/frontend/src/types/api/index.ts | 1 + src/frontend/src/types/zustand/flow/index.ts | 1 + src/frontend/src/utils/buildUtils.ts | 62 ++++++++++++++++++-- 4 files changed, 75 insertions(+), 12 deletions(-) diff --git a/src/frontend/src/stores/flowStore.ts b/src/frontend/src/stores/flowStore.ts index 9ee25623b..fb995273b 100644 --- a/src/frontend/src/stores/flowStore.ts +++ b/src/frontend/src/stores/flowStore.ts @@ -223,12 +223,10 @@ const useFlowStore = create((set, get) => ({ selection.nodes.some((node) => node.data.type === "ChatInput") && checkChatInput(get().nodes) ) { - useAlertStore - .getState() - .setErrorData({ - title: "Error pasting components", - list: ["You can only have one ChatInput component in the flow"], - }); + useAlertStore.getState().setErrorData({ + title: "Error pasting components", + list: ["You can only have one ChatInput component in the flow"], + }); return; } let minimumX = Infinity; @@ -451,6 +449,9 @@ const useFlowStore = create((set, get) => ({ if (vertexBuildData && vertexBuildData.inactivated_vertices) { get().removeFromVerticesBuild(vertexBuildData.inactivated_vertices); } + if (vertexBuildData && vertexBuildData.activated_vertices) { + get().addToVerticesBuild(vertexBuildData.activated_vertices); + } get().addDataToFlowPool( { ...vertexBuildData, buildId }, vertexBuildData.id @@ -507,6 +508,16 @@ const useFlowStore = create((set, get) => ({ set({ verticesBuild: vertices }); }, verticesBuild: null, + addToVerticesBuild: (vertices: string[]) => { + const verticesBuild = get().verticesBuild; + if (!verticesBuild) return; + set({ + verticesBuild: { + ...verticesBuild, + verticesIds: [...verticesBuild.verticesIds, ...vertices], + }, + }); + }, removeFromVerticesBuild: (vertices: string[]) => { const verticesBuild = get().verticesBuild; if (!verticesBuild) return; diff --git a/src/frontend/src/types/api/index.ts b/src/frontend/src/types/api/index.ts index 27e5946b3..076ecbf45 100644 --- a/src/frontend/src/types/api/index.ts +++ b/src/frontend/src/types/api/index.ts @@ -141,6 +141,7 @@ export type VerticesOrderTypeAPI = { export type VertexBuildTypeAPI = { id: string; inactivated_vertices: Array | null; + activated_vertices: Array | null; valid: boolean; params: string; data: VertexDataTypeAPI; diff --git a/src/frontend/src/types/zustand/flow/index.ts b/src/frontend/src/types/zustand/flow/index.ts index c969d8d6b..5b19cee88 100644 --- a/src/frontend/src/types/zustand/flow/index.ts +++ b/src/frontend/src/types/zustand/flow/index.ts @@ -105,6 +105,7 @@ export type FlowStoreType = { runId: string; } | null ) => void; + addToVerticesBuild: (vertices: string[]) => void; removeFromVerticesBuild: (vertices: string[]) => void; verticesBuild: { verticesIds: string[]; diff --git a/src/frontend/src/utils/buildUtils.ts b/src/frontend/src/utils/buildUtils.ts index 1ce675eec..1e09d5398 100644 --- a/src/frontend/src/utils/buildUtils.ts +++ b/src/frontend/src/utils/buildUtils.ts @@ -32,6 +32,7 @@ function getInactiveVertexData(vertexId: string): VertexBuildTypeAPI { data: inactiveData, params: "Inactive", inactivated_vertices: null, + activated_vertices: null, valid: false, timestamp: new Date().toISOString(), }; @@ -110,7 +111,7 @@ export async function buildVertices({ if (!verticesBuild || nodeId) { verticesBuild = await updateVerticesOrder(flowId, nodeId); } - const verticesIds = verticesBuild?.verticesIds!; + let verticesIds = verticesBuild?.verticesIds!; const verticesLayers = verticesBuild?.verticesLayers!; const verticesOrder = verticesBuild?.verticesOrder!; const runId = verticesBuild?.runId!; @@ -128,26 +129,75 @@ export async function buildVertices({ useFlowStore.getState().updateBuildStatus(verticesIds, BuildStatus.TO_BUILD); useFlowStore.getState().setIsBuilding(true); + let dynamicVerticesLayers: Array> = [...verticesLayers]; + + const handleBuildUpdate = (data: VertexBuildTypeAPI, status: BuildStatus) => { + // Handle activated vertices + console.log("handleBuildUpdate", data, status); + if (data.activated_vertices && data.activated_vertices.length > 0) { + // Logic to determine the correct placement for activated vertices in dynamicVerticesLayers + // For simplicity, this example adds them to the next layer + // const nextLayerIndex = i + 1; i doesnt exist in this scope + // we don't want to add the activated vertices to the last layer + // because these vertices should be built right away + const thisVertexLayer = dynamicVerticesLayers.findIndex((layer) => + layer.includes(data.id) + ); + const nextLayerIndex = thisVertexLayer + 1; + console.log("nextLayerIndex", nextLayerIndex); + console.log("dynamicVerticesLayers", dynamicVerticesLayers); + if (dynamicVerticesLayers[nextLayerIndex]) { + // If the next layer exists, add the activated vertices to it + // dynamicVerticesLayers[nextLayerIndex] = dynamicVerticesLayers[ + // nextLayerIndex + // ].concat(data.activated_vertices); + // instead of adding them all at once, add them one by one + // add one per layer and if the next layer doesn't exist, create it + + for (const vertex of data.activated_vertices) { + console.log("vertex", vertex); + if (dynamicVerticesLayers[nextLayerIndex].includes(vertex)) { + continue; + } else if (dynamicVerticesLayers[nextLayerIndex].length > 0) { + dynamicVerticesLayers[nextLayerIndex].push(vertex); + } else { + dynamicVerticesLayers[nextLayerIndex] = [vertex]; + } + console.log("dynamicVerticesLayers", dynamicVerticesLayers); + } + } else { + dynamicVerticesLayers.push(data.activated_vertices); + console.log(dynamicVerticesLayers); + } + } + if (onBuildUpdate) onBuildUpdate(data, status, runId); + }; // Set each vertex state to building const buildResults: Array = []; - for (const layer of verticesLayers) { + for (let i = 0; i < dynamicVerticesLayers.length; i++) { + console.log(dynamicVerticesLayers); + const layer = dynamicVerticesLayers[i]; if (onBuildStart) onBuildStart(layer); for (const id of layer) { // Check if id is in the list of inactive nodes - if (!verticesIds.includes(id) && onBuildUpdate) { + // useFlowStore because it gets updated constantly + if ( + !useFlowStore.getState().verticesBuild?.verticesIds.includes(id) && + onBuildUpdate + ) { // If it is, skip building and set the state to inactive + console.log("inactive", id); onBuildUpdate(getInactiveVertexData(id), BuildStatus.INACTIVE, runId); buildResults.push(false); continue; } + await buildVertex({ flowId, id, input_value, - onBuildUpdate: (data: VertexBuildTypeAPI, status: BuildStatus) => { - if (onBuildUpdate) onBuildUpdate(data, status, runId); - }, + onBuildUpdate: handleBuildUpdate, onBuildError, verticesIds, buildResults, From 5ebcdf5d527c76437364dfd8ba00b954aa00df0b Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Sat, 2 Mar 2024 00:47:04 -0300 Subject: [PATCH 10/45] Refactor SharedState build method to accept optional record parameter --- .../langflow/components/utilities/SharedState.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/backend/langflow/components/utilities/SharedState.py b/src/backend/langflow/components/utilities/SharedState.py index 845252dab..c3f0c0634 100644 --- a/src/backend/langflow/components/utilities/SharedState.py +++ b/src/backend/langflow/components/utilities/SharedState.py @@ -1,4 +1,4 @@ -from typing import Union +from typing import Optional from langflow import CustomComponent from langflow.field_typing import Text @@ -20,12 +20,13 @@ class SharedState(CustomComponent): } def build( - self, name: str, record: Union[Text, Record], append: bool = False + self, name: str, record: Optional[Record] = None, append: bool = False ) -> Record: - if append: - self.append_state(name, record) - else: - self.update_state(name, record) + if record: + if append: + self.append_state(name, record) + else: + self.update_state(name, record) state = self.get_state(name) if state and not isinstance(state, Record): From 801c5462fc56f8d4f3f1d75f6a3bd949fa29565e Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Sat, 2 Mar 2024 00:47:15 -0300 Subject: [PATCH 11/45] Fix method argument type in CustomComponent class --- .../custom/custom_component/custom_component.py | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/src/backend/langflow/interface/custom/custom_component/custom_component.py b/src/backend/langflow/interface/custom/custom_component/custom_component.py index fd8cd7cd2..80e28efb8 100644 --- a/src/backend/langflow/interface/custom/custom_component/custom_component.py +++ b/src/backend/langflow/interface/custom/custom_component/custom_component.py @@ -14,7 +14,6 @@ from uuid import UUID import yaml from cachetools import TTLCache, cachedmethod -from fastapi import HTTPException from langchain_core.documents import Document from sqlmodel import select @@ -85,7 +84,7 @@ class CustomComponent(Component): def append_state(self, name: str, value: Any): try: self.vertex.graph.append_state( - key=name, record=value, caller=self.vertex.id + name=name, record=value, caller=self.vertex.id ) except Exception as e: raise ValueError(f"Error appending state: {e}") @@ -201,18 +200,7 @@ class CustomComponent(Component): args = build_method["args"] for arg in args: - if arg.get("type") == "prompt": - raise HTTPException( - status_code=400, - detail={ - "error": "Type hint Error", - "traceback": ( - "Prompt type is not supported in the build method." - " Try using PromptTemplate instead." - ), - }, - ) - elif not arg.get("type") and arg.get("name") != "self": + if not arg.get("type") and arg.get("name") != "self": # Set the type to Data arg["type"] = "Data" return args From d846d806ccd6c904927967afcc92f2c53e04fe5f Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Sat, 2 Mar 2024 01:18:43 -0300 Subject: [PATCH 12/45] Add TextToRecordComponent to convert text to a Record --- .../components/utilities/TextToRecord.py | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) create mode 100644 src/backend/langflow/components/utilities/TextToRecord.py diff --git a/src/backend/langflow/components/utilities/TextToRecord.py b/src/backend/langflow/components/utilities/TextToRecord.py new file mode 100644 index 000000000..e176e15f5 --- /dev/null +++ b/src/backend/langflow/components/utilities/TextToRecord.py @@ -0,0 +1,29 @@ +from typing import Optional + +from langflow import CustomComponent +from langflow.field_typing import Text +from langflow.schema import Record + + +class TextToRecordComponent(CustomComponent): + display_name = "Text to Record" + description = "Converts text to a Record." + + def build_config(self): + return { + "text": { + "display_name": "Text", + "info": "The text to convert to a record.", + }, + "data": { + "display_name": "Data", + "info": "The optional data to include in the record.", + }, + } + + def build( + self, + text: Text, + data: Optional[dict] = {}, + ) -> Record: + return Record(text=text, data=data) From 22884aeb7ea586648504f809394067998c087033 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Sat, 2 Mar 2024 01:18:51 -0300 Subject: [PATCH 13/45] Add state activation for specific vertices --- src/backend/langflow/graph/graph/base.py | 30 ++++++++++++++++-------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/src/backend/langflow/graph/graph/base.py b/src/backend/langflow/graph/graph/base.py index d82ccfeb8..90a6e4f9a 100644 --- a/src/backend/langflow/graph/graph/base.py +++ b/src/backend/langflow/graph/graph/base.py @@ -3,6 +3,8 @@ from collections import defaultdict, deque from typing import TYPE_CHECKING, Dict, Generator, List, Optional, Type, Union from langchain.chains.base import Chain +from loguru import logger + from langflow.graph.edge.base import ContractEdge from langflow.graph.graph.constants import lazy_load_vertex_dict from langflow.graph.graph.state_manager import GraphStateManager @@ -20,7 +22,6 @@ from langflow.graph.vertex.types import ( from langflow.interface.tools.constants import FILE_TOOLS from langflow.schema import Record from langflow.utils import payload -from loguru import logger if TYPE_CHECKING: from langflow.graph.schema import ResultData @@ -43,6 +44,7 @@ class Graph: self.flow_id = flow_id self._is_input_vertices: List[str] = [] self._is_output_vertices: List[str] = [] + self._is_state_vertices: List[str] = [] self._has_session_id_vertices: List[str] = [] self._sorted_vertices_layers: List[List[str]] = [] self.run_id = None @@ -73,16 +75,23 @@ class Graph: # all StateVertex in self.vertices that are not the caller # essentially notifying all the other vertices that the state has changed # This also has to activate their successors - caller_vertex = self.get_vertex(caller) - for vertex in self.vertices: - if vertex.id != caller and isinstance(vertex, StateVertex): - successors = self.get_all_successors(vertex) - self.activated_vertices.add(vertex.id) - for successor in successors: - self.activated_vertices.add(successor.id) + self.activate_state_vertices(name, caller) self.state_manager.update_state(name, record) + def activate_state_vertices(self, name: str, caller: str): + for vertex_id in self._is_state_vertices: + vertex = self.get_vertex(vertex_id) + if ( + name in vertex._raw_params["name"] + and vertex_id != caller + and isinstance(vertex, StateVertex) + ): + successors = self.get_all_successors(vertex) + self.activated_vertices.add(vertex_id) + for successor in successors: + self.activated_vertices.add(successor.id) + def reset_activated_vertices(self): self.activated_vertices = set() @@ -91,7 +100,8 @@ class Graph: ) -> None: """Appends the state of the graph.""" if caller: - self.state_manager.subscribe(name, caller) + + self.activate_state_vertices(name, caller) self.state_manager.append_state(name, record) @@ -113,7 +123,7 @@ class Graph: """ Defines the lists of vertices that are inputs, outputs, and have session_id. """ - attributes = ["is_input", "is_output", "has_session_id"] + attributes = ["is_input", "is_output", "has_session_id", "is_state"] for vertex in self.vertices: for attribute in attributes: if getattr(vertex, attribute): From 2b215e66f4ccfb340125af887b0bcff0499ff86e Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Sat, 2 Mar 2024 01:18:58 -0300 Subject: [PATCH 14/45] Add is_state attribute to Vertex class and StateVertex subclass --- src/backend/langflow/graph/vertex/base.py | 1 + src/backend/langflow/graph/vertex/types.py | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/backend/langflow/graph/vertex/base.py b/src/backend/langflow/graph/vertex/base.py index 2f11e0ed3..097b99fbc 100644 --- a/src/backend/langflow/graph/vertex/base.py +++ b/src/backend/langflow/graph/vertex/base.py @@ -57,6 +57,7 @@ class Vertex: self.will_stream = False self.updated_raw_params = False self.id: str = data["id"] + self.is_state = False self.is_input = any( input_component_name in self.id for input_component_name in INPUT_COMPONENTS ) diff --git a/src/backend/langflow/graph/vertex/types.py b/src/backend/langflow/graph/vertex/types.py index ba1cd2998..516b06dee 100644 --- a/src/backend/langflow/graph/vertex/types.py +++ b/src/backend/langflow/graph/vertex/types.py @@ -1,7 +1,6 @@ import ast import json -from typing import (AsyncIterator, Callable, Dict, Iterator, List, Optional, - Union) +from typing import AsyncIterator, Callable, Dict, Iterator, List, Optional, Union import yaml from langchain_core.messages import AIMessage @@ -487,6 +486,7 @@ class StateVertex(Vertex): def __init__(self, data: Dict, graph): super().__init__(data, graph=graph, base_type="custom_components") self.steps = [self._build] + self.is_state = True def dict_to_codeblock(d: dict) -> str: From 6aec697c2a23cf542314612b96c181147a810c80 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Sat, 2 Mar 2024 01:19:03 -0300 Subject: [PATCH 15/45] Add error handling for observer callbacks in GraphStateManager --- src/backend/langflow/graph/graph/state_manager.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/backend/langflow/graph/graph/state_manager.py b/src/backend/langflow/graph/graph/state_manager.py index 64476011b..46477d220 100644 --- a/src/backend/langflow/graph/graph/state_manager.py +++ b/src/backend/langflow/graph/graph/state_manager.py @@ -2,6 +2,8 @@ from collections import defaultdict from threading import Lock from typing import Callable +from loguru import logger + class GraphStateManager: def __init__(self): @@ -13,6 +15,8 @@ class GraphStateManager: with self.lock: if key not in self.states: self.states[key] = [] + elif not isinstance(self.states[key], list): + self.states[key] = [self.states[key]] self.states[key].append(new_state) self.notify_append_observers(key, new_state) @@ -36,4 +40,8 @@ class GraphStateManager: def notify_append_observers(self, key, new_state): for callback in self.observers[key]: - callback(key, new_state, append=True) + try: + callback(key, new_state, append=True) + except Exception as e: + logger.error(f"Error in observer {callback} for key {key}: {e}") + logger.warning("Callbacks not implemented yet") From d4cad8130208adef86a9e6de793eaa542a629bc8 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Sat, 2 Mar 2024 01:19:16 -0300 Subject: [PATCH 16/45] Remove unused import and update timer duration --- src/frontend/src/App.tsx | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/frontend/src/App.tsx b/src/frontend/src/App.tsx index 22fd6c5bb..b57dad461 100644 --- a/src/frontend/src/App.tsx +++ b/src/frontend/src/App.tsx @@ -18,7 +18,6 @@ import { getHealth } from "./controllers/API"; import Router from "./routes"; import useAlertStore from "./stores/alertStore"; import { useDarkStore } from "./stores/darkStore"; -import useFlowStore from "./stores/flowStore"; import useFlowsManagerStore from "./stores/flowsManagerStore"; import { useStoreStore } from "./stores/storeStore"; import { useTypesStore } from "./stores/typesStore"; @@ -70,7 +69,7 @@ export default function App() { .catch(() => { setFetchError(true); }); - }, 20000); + }, 20000); // 20 seconds // Clean up the timer on component unmount return () => { From 48ec2d2fa8a8ed8128aba789a9ed6196767129a3 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Sat, 2 Mar 2024 15:18:58 -0300 Subject: [PATCH 17/45] Fix empty records bug and return empty string if state is not found --- src/backend/langflow/components/utilities/RecordsAsText.py | 2 ++ src/backend/langflow/graph/graph/state_manager.py | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/backend/langflow/components/utilities/RecordsAsText.py b/src/backend/langflow/components/utilities/RecordsAsText.py index debf3eed2..18bf8be8c 100644 --- a/src/backend/langflow/components/utilities/RecordsAsText.py +++ b/src/backend/langflow/components/utilities/RecordsAsText.py @@ -25,6 +25,8 @@ class RecordsAsTextComponent(CustomComponent): records: list[Record], template: str = "Text: {text}\nData: {data}", ) -> Text: + if not records: + return "" if isinstance(records, Record): records = [records] diff --git a/src/backend/langflow/graph/graph/state_manager.py b/src/backend/langflow/graph/graph/state_manager.py index 46477d220..3fcbb68a3 100644 --- a/src/backend/langflow/graph/graph/state_manager.py +++ b/src/backend/langflow/graph/graph/state_manager.py @@ -27,7 +27,7 @@ class GraphStateManager: def get_state(self, key): with self.lock: - return self.states.get(key, None) + return self.states.get(key, "") def subscribe(self, key, observer: Callable): with self.lock: From 8ac3f4579e7eee2710f6356b7812d9407561c998 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Sat, 2 Mar 2024 15:19:34 -0300 Subject: [PATCH 18/45] Refactor activated_vertices to activated_layers --- src/backend/langflow/api/v1/chat.py | 4 +- src/backend/langflow/api/v1/schemas.py | 2 +- src/backend/langflow/graph/graph/base.py | 51 +++++++++++++++++++----- src/frontend/src/stores/flowStore.ts | 14 +++---- src/frontend/src/types/api/index.ts | 2 +- src/frontend/src/utils/buildUtils.ts | 42 ++++++------------- 6 files changed, 63 insertions(+), 52 deletions(-) diff --git a/src/backend/langflow/api/v1/chat.py b/src/backend/langflow/api/v1/chat.py index 399098d47..8782efb29 100644 --- a/src/backend/langflow/api/v1/chat.py +++ b/src/backend/langflow/api/v1/chat.py @@ -156,13 +156,13 @@ async def build_vertex( inactivated_vertices = None inactivated_vertices = list(graph.inactivated_vertices) graph.reset_inactivated_vertices() - activated_vertices = list(graph.activated_vertices) + activated_layers = graph.activated_layers graph.reset_activated_vertices() chat_service.set_cache(flow_id, graph) build_response = VertexBuildResponse( inactivated_vertices=inactivated_vertices, - activated_vertices=activated_vertices, + activated_layers=activated_layers, valid=valid, params=params, id=vertex.id, diff --git a/src/backend/langflow/api/v1/schemas.py b/src/backend/langflow/api/v1/schemas.py index 88610d637..f570923e1 100644 --- a/src/backend/langflow/api/v1/schemas.py +++ b/src/backend/langflow/api/v1/schemas.py @@ -230,7 +230,7 @@ class ResultDataResponse(BaseModel): class VertexBuildResponse(BaseModel): id: Optional[str] = None inactivated_vertices: Optional[List[str]] = None - activated_vertices: Optional[List[str]] = None + activated_layers: Optional[List[List[str]]] = None valid: bool params: Optional[str] """JSON string of the params.""" diff --git a/src/backend/langflow/graph/graph/base.py b/src/backend/langflow/graph/graph/base.py index 90a6e4f9a..95ab6db6b 100644 --- a/src/backend/langflow/graph/graph/base.py +++ b/src/backend/langflow/graph/graph/base.py @@ -58,7 +58,7 @@ class Graph: self._vertices = self._graph_data["nodes"] self._edges = self._graph_data["edges"] self.inactivated_vertices: set = set() - self.activated_vertices: set = set() + self.activated_layers: List[List[str]] = [] self.edges: List[ContractEdge] = [] self.vertices: List[Vertex] = [] self._build_graph() @@ -80,20 +80,26 @@ class Graph: self.state_manager.update_state(name, record) def activate_state_vertices(self, name: str, caller: str): + layers = [] for vertex_id in self._is_state_vertices: + if vertex_id == caller: + continue vertex = self.get_vertex(vertex_id) if ( - name in vertex._raw_params["name"] + isinstance(vertex._raw_params["name"], str) + and name in vertex._raw_params["name"] and vertex_id != caller and isinstance(vertex, StateVertex) ): - successors = self.get_all_successors(vertex) - self.activated_vertices.add(vertex_id) - for successor in successors: - self.activated_vertices.add(successor.id) + layers.append([vertex_id]) + successors = self.get_all_successors(vertex, flat=False) + for layer in successors: + + layers.append([v.id for v in layer]) + self.activated_layers = layers def reset_activated_vertices(self): - self.activated_vertices = set() + self.activated_layers = [] def append_state( self, name: str, record: Union[str, Record], caller: Optional[str] = None @@ -555,18 +561,41 @@ class Graph: for source_id in self.predecessor_map.get(vertex.id, []) ] - def get_all_successors(self, vertex, recursive=True): + def get_all_successors(self, vertex, recursive=True, flat=True): # Recursively get the successors of the current vertex + # successors = vertex.successors + # if not successors: + # return [] + # successors_result = [] + # for successor in successors: + # # Just return a list of successors + # if recursive: + # next_successors = self.get_all_successors(successor) + # successors_result.extend(next_successors) + # successors_result.append(successor) + # return successors_result + # The above is the version without the flat parameter + # The below is the version with the flat parameter + # the flat parameter will define if each layer of successors + # becomes one list or if the result is a list of lists + # if flat is True, the result will be a list of vertices + # if flat is False, the result will be a list of lists of vertices + # each list will represent a layer of successors successors = vertex.successors if not successors: return [] successors_result = [] for successor in successors: - # Just return a list of successors if recursive: next_successors = self.get_all_successors(successor) - successors_result.extend(next_successors) - successors_result.append(successor) + if flat: + successors_result.extend(next_successors) + else: + successors_result.append(next_successors) + if flat: + successors_result.append(successor) + else: + successors_result.append([successor]) return successors_result def get_successors(self, vertex): diff --git a/src/frontend/src/stores/flowStore.ts b/src/frontend/src/stores/flowStore.ts index f93b72858..d7780c857 100644 --- a/src/frontend/src/stores/flowStore.ts +++ b/src/frontend/src/stores/flowStore.ts @@ -223,12 +223,10 @@ const useFlowStore = create((set, get) => ({ selection.nodes.some((node) => node.data.type === "ChatInput") && checkChatInput(get().nodes) ) { - useAlertStore - .getState() - .setErrorData({ - title: "Error pasting components", - list: ["You can only have one ChatInput component in the flow"], - }); + useAlertStore.getState().setErrorData({ + title: "Error pasting components", + list: ["You can only have one ChatInput component in the flow"], + }); return; } let minimumX = Infinity; @@ -451,8 +449,8 @@ const useFlowStore = create((set, get) => ({ if (vertexBuildData && vertexBuildData.inactivated_vertices) { get().removeFromVerticesBuild(vertexBuildData.inactivated_vertices); } - if (vertexBuildData && vertexBuildData.activated_vertices) { - get().addToVerticesBuild(vertexBuildData.activated_vertices); + if (vertexBuildData && vertexBuildData.activated_layers) { + get().addToVerticesBuild(vertexBuildData.activated_layers.flat()); } get().addDataToFlowPool( { ...vertexBuildData, buildId }, diff --git a/src/frontend/src/types/api/index.ts b/src/frontend/src/types/api/index.ts index 076ecbf45..469ca3d9f 100644 --- a/src/frontend/src/types/api/index.ts +++ b/src/frontend/src/types/api/index.ts @@ -141,7 +141,7 @@ export type VerticesOrderTypeAPI = { export type VertexBuildTypeAPI = { id: string; inactivated_vertices: Array | null; - activated_vertices: Array | null; + activated_layers: Array> | null; valid: boolean; params: string; data: VertexDataTypeAPI; diff --git a/src/frontend/src/utils/buildUtils.ts b/src/frontend/src/utils/buildUtils.ts index 63da6b7e8..b89fc6a5a 100644 --- a/src/frontend/src/utils/buildUtils.ts +++ b/src/frontend/src/utils/buildUtils.ts @@ -32,7 +32,7 @@ function getInactiveVertexData(vertexId: string): VertexBuildTypeAPI { data: inactiveData, params: "Inactive", inactivated_vertices: null, - activated_vertices: null, + activated_layers: null, valid: false, timestamp: new Date().toISOString(), }; @@ -131,41 +131,25 @@ export async function buildVertices({ const handleBuildUpdate = (data: VertexBuildTypeAPI, status: BuildStatus) => { // Handle activated vertices console.log("handleBuildUpdate", data, status); - if (data.activated_vertices && data.activated_vertices.length > 0) { - // Logic to determine the correct placement for activated vertices in dynamicVerticesLayers - // For simplicity, this example adds them to the next layer - // const nextLayerIndex = i + 1; i doesnt exist in this scope - // we don't want to add the activated vertices to the last layer - // because these vertices should be built right away + if (data.activated_layers && data.activated_layers.length > 0) { const thisVertexLayer = dynamicVerticesLayers.findIndex((layer) => layer.includes(data.id) ); - const nextLayerIndex = thisVertexLayer + 1; + let nextLayerIndex = thisVertexLayer + 1; + console.log("nextLayerIndex", nextLayerIndex); console.log("dynamicVerticesLayers", dynamicVerticesLayers); - if (dynamicVerticesLayers[nextLayerIndex]) { - // If the next layer exists, add the activated vertices to it - // dynamicVerticesLayers[nextLayerIndex] = dynamicVerticesLayers[ - // nextLayerIndex - // ].concat(data.activated_vertices); - // instead of adding them all at once, add them one by one - // add one per layer and if the next layer doesn't exist, create it - for (const vertex of data.activated_vertices) { - console.log("vertex", vertex); - if (dynamicVerticesLayers[nextLayerIndex].includes(vertex)) { - continue; - } else if (dynamicVerticesLayers[nextLayerIndex].length > 0) { - dynamicVerticesLayers[nextLayerIndex].push(vertex); - } else { - dynamicVerticesLayers[nextLayerIndex] = [vertex]; - } - console.log("dynamicVerticesLayers", dynamicVerticesLayers); + data.activated_layers.forEach((newLayer) => { + if (!dynamicVerticesLayers[nextLayerIndex]) { + dynamicVerticesLayers[nextLayerIndex] = []; } - } else { - dynamicVerticesLayers.push(data.activated_vertices); - console.log(dynamicVerticesLayers); - } + dynamicVerticesLayers[nextLayerIndex] = [ + ...dynamicVerticesLayers[nextLayerIndex], + ...newLayer, + ]; + nextLayerIndex += 1; + }); } if (onBuildUpdate) onBuildUpdate(data, status, runId); }; From 8e65ba5f17a8fde900d46f3fa0ced45c0ca1ad02 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Sat, 2 Mar 2024 15:20:21 -0300 Subject: [PATCH 19/45] Fix handling of empty state in SharedState.py --- src/backend/langflow/components/utilities/SharedState.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/backend/langflow/components/utilities/SharedState.py b/src/backend/langflow/components/utilities/SharedState.py index 3e715cf1a..d3c5e20fb 100644 --- a/src/backend/langflow/components/utilities/SharedState.py +++ b/src/backend/langflow/components/utilities/SharedState.py @@ -35,4 +35,6 @@ class SharedState(CustomComponent): state = Record(data=state) else: state = Record(text=str(state)) + elif not state: + state = Record(text="") return state From 85576d420418a36b7c70e5eb340d7a6d826b56d8 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Sat, 2 Mar 2024 17:37:12 -0300 Subject: [PATCH 20/45] Add status attribute to SharedState class --- src/backend/langflow/components/utilities/SharedState.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/backend/langflow/components/utilities/SharedState.py b/src/backend/langflow/components/utilities/SharedState.py index d3c5e20fb..95de17774 100644 --- a/src/backend/langflow/components/utilities/SharedState.py +++ b/src/backend/langflow/components/utilities/SharedState.py @@ -37,4 +37,5 @@ class SharedState(CustomComponent): state = Record(text=str(state)) elif not state: state = Record(text="") + self.status = state return state From 7207c4c0a2c9b9b9171b9316a4a3d6b5fd413153 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Sat, 2 Mar 2024 17:38:01 -0300 Subject: [PATCH 21/45] Add layer to dynamicVerticesLayers array --- src/frontend/src/utils/buildUtils.ts | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/src/frontend/src/utils/buildUtils.ts b/src/frontend/src/utils/buildUtils.ts index b89fc6a5a..1fe55d209 100644 --- a/src/frontend/src/utils/buildUtils.ts +++ b/src/frontend/src/utils/buildUtils.ts @@ -139,7 +139,8 @@ export async function buildVertices({ console.log("nextLayerIndex", nextLayerIndex); console.log("dynamicVerticesLayers", dynamicVerticesLayers); - + // This adds layers to the dynamicVerticesLayers array + // starting from the index of the current layer + 1 data.activated_layers.forEach((newLayer) => { if (!dynamicVerticesLayers[nextLayerIndex]) { dynamicVerticesLayers[nextLayerIndex] = []; @@ -150,6 +151,21 @@ export async function buildVertices({ ]; nextLayerIndex += 1; }); + // Let's implement one that just adds all layers to the end of the array + // data.activated_layers.forEach((newLayer) => { + // // filter the newLayer to remove any vertices that are already in the dynamicVerticesLayers + // // after thisVertexLayer + // newLayer = newLayer.filter((vertex) => { + // return !dynamicVerticesLayers + // .slice(thisVertexLayer) + // .flat() + // .includes(vertex); + // }); + // if (newLayer.length > 0) { + // console.log("newLayer after filter", newLayer); + // dynamicVerticesLayers.push(newLayer); + // } + // }); } if (onBuildUpdate) onBuildUpdate(data, status, runId); }; @@ -157,9 +173,9 @@ export async function buildVertices({ // Set each vertex state to building const buildResults: Array = []; for (let i = 0; i < dynamicVerticesLayers.length; i++) { - console.log(dynamicVerticesLayers); const layer = dynamicVerticesLayers[i]; if (onBuildStart) onBuildStart(layer); + for (const id of layer) { // Check if id is in the list of inactive nodes // useFlowStore because it gets updated constantly From 0fe8d1116de1bc9318810cf3d37e6c1aeb878086 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Sat, 2 Mar 2024 17:38:08 -0300 Subject: [PATCH 22/45] Add console log for onBuildStart event --- src/frontend/src/stores/flowStore.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/frontend/src/stores/flowStore.ts b/src/frontend/src/stores/flowStore.ts index d7780c857..8bd3df4d8 100644 --- a/src/frontend/src/stores/flowStore.ts +++ b/src/frontend/src/stores/flowStore.ts @@ -484,6 +484,7 @@ const useFlowStore = create((set, get) => ({ setErrorData({ list, title }); }, onBuildStart: (idList) => { + console.log("onBuildStart", idList); useFlowStore.getState().updateBuildStatus(idList, BuildStatus.BUILDING); }, validateNodes: validateSubgraph, From e9772d06fc630654d4ba75e5675afb74781501c6 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Sat, 2 Mar 2024 17:38:21 -0300 Subject: [PATCH 23/45] Refactor graph state management and add get_state method --- src/backend/langflow/graph/graph/base.py | 49 +++++++++++++++---- .../langflow/graph/graph/state_manager.py | 24 +++++---- .../custom_component/custom_component.py | 2 +- 3 files changed, 56 insertions(+), 19 deletions(-) diff --git a/src/backend/langflow/graph/graph/base.py b/src/backend/langflow/graph/graph/base.py index 95ab6db6b..df270a201 100644 --- a/src/backend/langflow/graph/graph/base.py +++ b/src/backend/langflow/graph/graph/base.py @@ -47,7 +47,7 @@ class Graph: self._is_state_vertices: List[str] = [] self._has_session_id_vertices: List[str] = [] self._sorted_vertices_layers: List[List[str]] = [] - self.run_id = None + self._run_id = None self.top_level_vertices = [] for vertex in self._vertices: @@ -66,6 +66,10 @@ class Graph: self.define_vertices_lists() self.state_manager = GraphStateManager() + def get_state(self, name: str) -> Optional[Record]: + """Returns the state of the graph.""" + return self.state_manager.get_state(name, run_id=self._run_id) + def update_state( self, name: str, record: Union[str, Record], caller: Optional[str] = None ) -> None: @@ -77,7 +81,7 @@ class Graph: # This also has to activate their successors self.activate_state_vertices(name, caller) - self.state_manager.update_state(name, record) + self.state_manager.update_state(name, record, run_id=self._run_id) def activate_state_vertices(self, name: str, caller: str): layers = [] @@ -109,15 +113,21 @@ class Graph: self.activate_state_vertices(name, caller) - self.state_manager.append_state(name, record) + self.state_manager.append_state(name, record, run_id=self._run_id) + + @property + def run_id(self): + if not self._run_id: + raise ValueError("Run ID not set") + return self._run_id def set_run_id(self, run_id: str): for vertex in self.vertices: self.state_manager.subscribe(run_id, vertex.update_graph_state) - self.run_id = run_id + self._run_id = run_id def add_state(self, state: str): - self.state_manager.append_state(self.run_id, state) + self.state_manager.append_state(self._run_id, state) @property def sorted_vertices_layers(self) -> List[List[str]]: @@ -760,21 +770,28 @@ class Graph: def layered_topological_sort( self, vertices: List[Vertex], + filter_graphs: bool = False, ) -> List[List[str]]: """Performs a layered topological sort of the vertices in the graph.""" vertices_ids = {vertex.id for vertex in vertices} # Queue for vertices with no incoming edges queue = deque( - vertex.id for vertex in vertices if self.in_degree_map[vertex.id] == 0 + vertex.id + for vertex in vertices + # if filter_graphs then only vertex.is_input will be considered + if self.in_degree_map[vertex.id] == 0 + and (not filter_graphs or vertex.is_input) ) layers: List[List[str]] = [] - + visited = set(queue) current_layer = 0 while queue: layers.append([]) # Start a new layer layer_size = len(queue) for _ in range(layer_size): vertex_id = queue.popleft() + visited.add(vertex_id) + layers[current_layer].append(vertex_id) for neighbor in self.successor_map[vertex_id]: # only vertices in `vertices_ids` should be considered @@ -785,8 +802,16 @@ class Graph: continue self.in_degree_map[neighbor] -= 1 # 'remove' edge - if self.in_degree_map[neighbor] == 0: + if self.in_degree_map[neighbor] == 0 and neighbor not in visited: queue.append(neighbor) + + # if > 0 it might mean not all predecessors have added to the queue + # so we should process the neighbors predecessors + elif self.in_degree_map[neighbor] > 0: + for predecessor in self.predecessor_map[neighbor]: + if predecessor not in queue and predecessor not in visited: + queue.append(predecessor) + current_layer += 1 # Next layer new_layers = self.refine_layers(layers) return new_layers @@ -851,9 +876,15 @@ class Graph: self.mark_all_vertices("ACTIVE") if component_id: vertices = self.sort_up_to_vertex(component_id) + vertices_layers = self.layered_topological_sort(vertices) else: vertices = self.vertices - vertices_layers = self.layered_topological_sort(vertices) + # without component_id we are probably running in the chat + # so we want to pick only graphs that start with ChatInput or + # TextInput + vertices_layers = self.layered_topological_sort( + vertices, filter_graphs=True + ) vertices_layers = self.sort_by_avg_build_time(vertices_layers) vertices_layers = self.sort_chat_inputs_first(vertices_layers) self.increment_run_count() diff --git a/src/backend/langflow/graph/graph/state_manager.py b/src/backend/langflow/graph/graph/state_manager.py index 3fcbb68a3..ed5844d87 100644 --- a/src/backend/langflow/graph/graph/state_manager.py +++ b/src/backend/langflow/graph/graph/state_manager.py @@ -11,23 +11,29 @@ class GraphStateManager: self.observers = defaultdict(list) self.lock = Lock() - def append_state(self, key, new_state): + def append_state(self, key, new_state, run_id: str): with self.lock: - if key not in self.states: - self.states[key] = [] + if run_id not in self.states: + self.states[run_id] = {} + if key not in self.states[run_id]: + self.states[run_id][key] = [] elif not isinstance(self.states[key], list): - self.states[key] = [self.states[key]] - self.states[key].append(new_state) + self.states[run_id][key] = [self.states[key]] + self.states[run_id][key].append(new_state) self.notify_append_observers(key, new_state) - def update_state(self, key, new_state): + def update_state(self, key, new_state, run_id: str): with self.lock: - self.states[key] = new_state + if run_id not in self.states: + self.states[run_id] = {} + if key not in self.states[run_id]: + self.states[run_id][key] = {} + self.states[run_id][key] = new_state self.notify_observers(key, new_state) - def get_state(self, key): + def get_state(self, key, run_id: str): with self.lock: - return self.states.get(key, "") + return self.states.get(run_id, {}).get(key, "") def subscribe(self, key, observer: Callable): with self.lock: diff --git a/src/backend/langflow/interface/custom/custom_component/custom_component.py b/src/backend/langflow/interface/custom/custom_component/custom_component.py index 11c4c941d..1596533a2 100644 --- a/src/backend/langflow/interface/custom/custom_component/custom_component.py +++ b/src/backend/langflow/interface/custom/custom_component/custom_component.py @@ -92,7 +92,7 @@ class CustomComponent(Component): def get_state(self, name: str): try: - return self.vertex.graph.state_manager.get_state(key=name) + return self.vertex.graph.get_state(name=name) except Exception as e: raise ValueError(f"Error getting state: {e}") From 131cebcea6cf7c3b1a5bfad9eec4844ad6cfe7b3 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Sat, 2 Mar 2024 17:38:28 -0300 Subject: [PATCH 24/45] Add status update to TextToRecordComponent --- src/backend/langflow/components/utilities/TextToRecord.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/backend/langflow/components/utilities/TextToRecord.py b/src/backend/langflow/components/utilities/TextToRecord.py index e176e15f5..277434da6 100644 --- a/src/backend/langflow/components/utilities/TextToRecord.py +++ b/src/backend/langflow/components/utilities/TextToRecord.py @@ -26,4 +26,6 @@ class TextToRecordComponent(CustomComponent): text: Text, data: Optional[dict] = {}, ) -> Record: - return Record(text=text, data=data) + record = Record(text=text, data=data) + self.status = record + return record From 9f2d67f3bcabb68f92a26f914531d0ef4237aa2c Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Sat, 2 Mar 2024 17:38:36 -0300 Subject: [PATCH 25/45] Refactor Vertex class in base.py --- src/backend/langflow/graph/vertex/base.py | 25 ++++++++--------------- 1 file changed, 8 insertions(+), 17 deletions(-) diff --git a/src/backend/langflow/graph/vertex/base.py b/src/backend/langflow/graph/vertex/base.py index 097b99fbc..8f754325e 100644 --- a/src/backend/langflow/graph/vertex/base.py +++ b/src/backend/langflow/graph/vertex/base.py @@ -2,26 +2,13 @@ import ast import inspect import types from enum import Enum -from typing import ( - TYPE_CHECKING, - Any, - AsyncIterator, - Callable, - Coroutine, - Dict, - Iterator, - List, - Optional, -) +from typing import (TYPE_CHECKING, Any, AsyncIterator, Callable, Coroutine, + Dict, Iterator, List, Optional) from loguru import logger -from langflow.graph.schema import ( - INPUT_COMPONENTS, - OUTPUT_COMPONENTS, - InterfaceComponentTypes, - ResultData, -) +from langflow.graph.schema import (INPUT_COMPONENTS, OUTPUT_COMPONENTS, + InterfaceComponentTypes, ResultData) from langflow.graph.utils import UnbuiltObject, UnbuiltResult from langflow.graph.vertex.utils import generate_result from langflow.interface.initialize import loading @@ -639,6 +626,10 @@ class Vertex: if self.pinned and self._built: return self.get_requester_result(requester) + elif self._built and requester is not None: + # This means that the vertex has already been built + # and we are just getting the result for the requester + return await self.get_requester_result(requester) self._reset() if self.is_input and inputs is not None: From bc3402586d5451ea815cecca56bf996192f09caf Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Sat, 2 Mar 2024 18:11:07 -0300 Subject: [PATCH 26/45] Fix bug in PromptComponent --- src/backend/langflow/components/prompts/Prompt.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/backend/langflow/components/prompts/Prompt.py b/src/backend/langflow/components/prompts/Prompt.py index d8afbcb43..975998919 100644 --- a/src/backend/langflow/components/prompts/Prompt.py +++ b/src/backend/langflow/components/prompts/Prompt.py @@ -23,7 +23,7 @@ class PromptComponent(CustomComponent): prompt_template = PromptTemplate.from_template(Text(template)) attributes_to_check = ["text", "page_content"] - for key, value in kwargs.items(): + for key, value in kwargs.copy().items(): for attribute in attributes_to_check: if hasattr(value, attribute): kwargs[key] = getattr(value, attribute) From e54fe60a6b02a9f68ec76e534d6526d836450f47 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Sat, 2 Mar 2024 18:11:18 -0300 Subject: [PATCH 27/45] Add GetNotified and Notify components --- .../components/utilities/GetNotifified.py | 20 ++++++++++ .../langflow/components/utilities/Notify.py | 40 +++++++++++++++++++ 2 files changed, 60 insertions(+) create mode 100644 src/backend/langflow/components/utilities/GetNotifified.py create mode 100644 src/backend/langflow/components/utilities/Notify.py diff --git a/src/backend/langflow/components/utilities/GetNotifified.py b/src/backend/langflow/components/utilities/GetNotifified.py new file mode 100644 index 000000000..d97c30df2 --- /dev/null +++ b/src/backend/langflow/components/utilities/GetNotifified.py @@ -0,0 +1,20 @@ +from langflow import CustomComponent +from langflow.schema import Record + + +class GetNotifiedComponent(CustomComponent): + display_name = "Get Notified" + description = "A component to get notified by Notify component." + + def build_config(self): + return { + "name": { + "display_name": "Name", + "info": "The name of the notification to listen for.", + }, + } + + def build(self, name: str) -> Record: + state = self.get_state(name) + self.status = state + return state diff --git a/src/backend/langflow/components/utilities/Notify.py b/src/backend/langflow/components/utilities/Notify.py new file mode 100644 index 000000000..a43a4bb0c --- /dev/null +++ b/src/backend/langflow/components/utilities/Notify.py @@ -0,0 +1,40 @@ +from typing import Optional + +from langflow import CustomComponent +from langflow.schema import Record + + +class NotifyComponent(CustomComponent): + display_name = "Notify" + description = "A component to generate a notification to Get Notified component." + + def build_config(self): + return { + "name": {"display_name": "Name", "info": "The name of the notification."}, + "record": {"display_name": "Record", "info": "The record to store."}, + "append": { + "display_name": "Append", + "info": "If True, the record will be appended to the notification.", + }, + } + + def build( + self, name: str, record: Optional[Record] = None, append: bool = False + ) -> Record: + if state and not isinstance(state, Record): + if isinstance(state, str): + state = Record(text=state) + elif isinstance(state, dict): + state = Record(data=state) + else: + state = Record(text=str(state)) + elif not state: + state = Record(text="") + if record: + if append: + self.append_state(name, record) + else: + self.update_state(name, record) + else: + state = "No record provided." + self.status = state From 178a98dc8e6534c25c0fb584676b9cd58b1e43ed Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Sun, 3 Mar 2024 22:47:34 -0300 Subject: [PATCH 28/45] Refactor graph activation logic and add successors_ids property to StateVertex --- src/backend/langflow/api/v1/chat.py | 2 +- src/backend/langflow/graph/graph/base.py | 25 ++++++++++------------ src/backend/langflow/graph/vertex/types.py | 5 +++++ 3 files changed, 17 insertions(+), 15 deletions(-) diff --git a/src/backend/langflow/api/v1/chat.py b/src/backend/langflow/api/v1/chat.py index 09ac2aac8..edd4d4724 100644 --- a/src/backend/langflow/api/v1/chat.py +++ b/src/backend/langflow/api/v1/chat.py @@ -162,7 +162,7 @@ async def build_vertex( inactivated_vertices = None inactivated_vertices = list(graph.inactivated_vertices) graph.reset_inactivated_vertices() - activated_layers = graph.activated_layers + activated_layers = graph.activated_vertices graph.reset_activated_vertices() chat_service.set_cache(flow_id, graph) diff --git a/src/backend/langflow/graph/graph/base.py b/src/backend/langflow/graph/graph/base.py index ece989651..06d917eb5 100644 --- a/src/backend/langflow/graph/graph/base.py +++ b/src/backend/langflow/graph/graph/base.py @@ -59,7 +59,7 @@ class Graph: self._vertices = self._graph_data["nodes"] self._edges = self._graph_data["edges"] self.inactivated_vertices: set = set() - self.activated_layers: List[List[str]] = [] + self.activated_vertices: List[str] = [] self.vertices_layers = [] self.vertices_to_run = set() self.stop_vertex = None @@ -90,7 +90,7 @@ class Graph: self.state_manager.update_state(name, record, run_id=self._run_id) def activate_state_vertices(self, name: str, caller: str): - layers = [] + vertices_ids = [] for vertex_id in self._is_state_vertices: if vertex_id == caller: continue @@ -101,15 +101,14 @@ class Graph: and vertex_id != caller and isinstance(vertex, StateVertex) ): - layers.append([vertex_id]) - successors = self.get_all_successors(vertex, flat=False) - for layer in successors: - - layers.append([v.id for v in layer]) - self.activated_layers = layers + vertices_ids.append(vertex_id) + successors = self.get_all_successors(vertex, flat=True) + self.vertices_to_run.update(list(map(lambda x: x.id, successors))) + self.activated_vertices = vertices_ids + self.vertices_to_run.update(vertices_ids) def reset_activated_vertices(self): - self.activated_layers = [] + self.activated_vertices = [] def append_state( self, name: str, record: Union[str, Record], caller: Optional[str] = None @@ -922,15 +921,13 @@ class Graph: vertices = self.sort_up_to_vertex(stop_component_id) elif start_component_id: vertices = self.sort_up_to_vertex(start_component_id, is_start=True) - else: vertices = self.vertices # without component_id we are probably running in the chat # so we want to pick only graphs that start with ChatInput or # TextInput - vertices_layers = self.layered_topological_sort( - vertices, filter_graphs=True - ) + + vertices_layers = self.layered_topological_sort(vertices) vertices_layers = self.sort_by_avg_build_time(vertices_layers) # vertices_layers = self.sort_chat_inputs_first(vertices_layers) self.increment_run_count() @@ -938,7 +935,7 @@ class Graph: # save the only the rest self.vertices_layers = vertices_layers[1:] self.vertices_to_run = { - vertex for vertex in chain.from_iterable(vertices_layers) + vertex_id for vertex_id in chain.from_iterable(vertices_layers) } # Return just the first layer return first_layer diff --git a/src/backend/langflow/graph/vertex/types.py b/src/backend/langflow/graph/vertex/types.py index a759be958..927cbda0f 100644 --- a/src/backend/langflow/graph/vertex/types.py +++ b/src/backend/langflow/graph/vertex/types.py @@ -500,6 +500,11 @@ class StateVertex(Vertex): self.steps = [self._build] self.is_state = True + @property + def successors_ids(self) -> List[str]: + successors = self.graph.successor_map.get(self.id, []) + return successors + self.graph.activated_vertices + def dict_to_codeblock(d: dict) -> str: serialized = {key: serialize_field(val) for key, val in d.items()} From d1557d8c1e1ed25fd218efc63864b9b91bc8486a Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Sun, 3 Mar 2024 22:47:44 -0300 Subject: [PATCH 29/45] Remove unused variable and reset activated vertices in build_vertex function --- src/backend/langflow/api/v1/chat.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/backend/langflow/api/v1/chat.py b/src/backend/langflow/api/v1/chat.py index edd4d4724..9f0259771 100644 --- a/src/backend/langflow/api/v1/chat.py +++ b/src/backend/langflow/api/v1/chat.py @@ -162,7 +162,6 @@ async def build_vertex( inactivated_vertices = None inactivated_vertices = list(graph.inactivated_vertices) graph.reset_inactivated_vertices() - activated_layers = graph.activated_vertices graph.reset_activated_vertices() chat_service.set_cache(flow_id, graph) From 9a93e5e417a0e45c837e26e51563c2583f6aa803 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Sun, 3 Mar 2024 23:19:23 -0300 Subject: [PATCH 30/45] Add GetNotified and Notify components --- .../{GetNotifified.py => GetNotified.py} | 0 .../langflow/components/utilities/Notify.py | 21 ++++++++++--------- src/backend/langflow/graph/graph/base.py | 2 +- 3 files changed, 12 insertions(+), 11 deletions(-) rename src/backend/langflow/components/utilities/{GetNotifified.py => GetNotified.py} (100%) diff --git a/src/backend/langflow/components/utilities/GetNotifified.py b/src/backend/langflow/components/utilities/GetNotified.py similarity index 100% rename from src/backend/langflow/components/utilities/GetNotifified.py rename to src/backend/langflow/components/utilities/GetNotified.py diff --git a/src/backend/langflow/components/utilities/Notify.py b/src/backend/langflow/components/utilities/Notify.py index a43a4bb0c..2155b079e 100644 --- a/src/backend/langflow/components/utilities/Notify.py +++ b/src/backend/langflow/components/utilities/Notify.py @@ -21,20 +21,21 @@ class NotifyComponent(CustomComponent): def build( self, name: str, record: Optional[Record] = None, append: bool = False ) -> Record: - if state and not isinstance(state, Record): - if isinstance(state, str): - state = Record(text=state) - elif isinstance(state, dict): - state = Record(data=state) + if record and not isinstance(record, Record): + if isinstance(record, str): + record = Record(text=record) + elif isinstance(record, dict): + record = Record(data=record) else: - state = Record(text=str(state)) - elif not state: - state = Record(text="") + record = Record(text=str(record)) + elif not record: + record = Record(text="") if record: if append: self.append_state(name, record) else: self.update_state(name, record) else: - state = "No record provided." - self.status = state + self.status = "No record provided." + self.status = record + return record diff --git a/src/backend/langflow/graph/graph/base.py b/src/backend/langflow/graph/graph/base.py index 06d917eb5..06ae4ee8f 100644 --- a/src/backend/langflow/graph/graph/base.py +++ b/src/backend/langflow/graph/graph/base.py @@ -699,7 +699,7 @@ class Graph: return ChatVertex elif node_name in ["ShouldRunNext"]: return RoutingVertex - elif node_name in ["SharedState"]: + elif node_name in ["SharedState", "Notify", "GetNotified"]: return StateVertex elif node_base_type in lazy_load_vertex_dict.VERTEX_TYPE_MAP: return lazy_load_vertex_dict.VERTEX_TYPE_MAP[node_base_type] From a31f824457eb2d55ea9257a395c8b76945ca2c3d Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Sun, 3 Mar 2024 23:37:16 -0300 Subject: [PATCH 31/45] Fix issue with returning string in Vertex class --- src/backend/langflow/graph/vertex/base.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/backend/langflow/graph/vertex/base.py b/src/backend/langflow/graph/vertex/base.py index e40eabdaa..186030729 100644 --- a/src/backend/langflow/graph/vertex/base.py +++ b/src/backend/langflow/graph/vertex/base.py @@ -141,6 +141,8 @@ class Vertex: if not isinstance(result, (dict, str)) and hasattr(result, "content"): return result.content return result + if isinstance(self._built_object, str): + self._built_result = self._built_object if isinstance(self._built_result, UnbuiltResult): return {} From 815e9cfb5990abb62988aa4542df13d005ad0745 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Mon, 4 Mar 2024 15:17:19 -0300 Subject: [PATCH 32/45] Refactor graph processing and error handling --- src/backend/langflow/graph/graph/base.py | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/src/backend/langflow/graph/graph/base.py b/src/backend/langflow/graph/graph/base.py index 06ae4ee8f..a0b91b440 100644 --- a/src/backend/langflow/graph/graph/base.py +++ b/src/backend/langflow/graph/graph/base.py @@ -170,7 +170,12 @@ class Graph: vertex = self.get_vertex(vertex_id) if vertex is None: raise ValueError(f"Vertex {vertex_id} not found") - if not stream and hasattr(vertex, "consume_async_generator"): + + if ( + not vertex.result + and not stream + and hasattr(vertex, "consume_async_generator") + ): await vertex.consume_async_generator() outputs.append(vertex.result) return outputs @@ -524,15 +529,19 @@ class Graph: async def process(self) -> "Graph": """Processes the graph with vertices in each layer run in parallel.""" vertices_layers = self.sorted_vertices_layers - + vertex_task_run_count = {} for layer_index, layer in enumerate(vertices_layers): tasks = [] for vertex_id in layer: vertex = self.get_vertex(vertex_id) task = asyncio.create_task( - vertex.build(), name=f"layer-{layer_index}-vertex-{vertex_id}" + vertex.build(), + name=f"{vertex.display_name} Run {vertex_task_run_count.get(vertex_id, 0)}", ) tasks.append(task) + vertex_task_run_count[vertex_id] = ( + vertex_task_run_count.get(vertex_id, 0) + 1 + ) logger.debug(f"Running layer {layer_index} with {len(tasks)} tasks") await self._execute_tasks(tasks) logger.debug("Graph processing complete") @@ -551,6 +560,10 @@ class Graph: # coroutine has not attribute get_name task_name = tasks[i].get_name() logger.error(f"Task {task_name} failed with exception: {e}") + # Cancel all remaining tasks + for t in tasks[i:]: + t.cancel() + raise e return results def topological_sort(self) -> List[Vertex]: @@ -931,6 +944,7 @@ class Graph: vertices_layers = self.sort_by_avg_build_time(vertices_layers) # vertices_layers = self.sort_chat_inputs_first(vertices_layers) self.increment_run_count() + self._sorted_vertices_layers = vertices_layers first_layer = vertices_layers[0] # save the only the rest self.vertices_layers = vertices_layers[1:] From d403ca7a6c972c6377e47657941287f6e4a2f229 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Mon, 4 Mar 2024 15:17:57 -0300 Subject: [PATCH 33/45] Add session_id and component_id to ChatOutputResponse schema --- src/backend/langflow/api/v1/schemas.py | 2 +- .../langflow/components/utilities/RunFlow.py | 58 +++++++++++++++++++ src/backend/langflow/graph/schema.py | 3 +- src/backend/langflow/graph/vertex/base.py | 29 ++++++++++ .../custom_component/custom_component.py | 2 +- src/backend/langflow/schema/schema.py | 10 ++-- src/backend/langflow/utils/schemas.py | 2 + .../src/CustomNodes/GenericNode/index.tsx | 49 +++++++++++++--- 8 files changed, 138 insertions(+), 17 deletions(-) create mode 100644 src/backend/langflow/components/utilities/RunFlow.py diff --git a/src/backend/langflow/api/v1/schemas.py b/src/backend/langflow/api/v1/schemas.py index 9968e839c..433d57a37 100644 --- a/src/backend/langflow/api/v1/schemas.py +++ b/src/backend/langflow/api/v1/schemas.py @@ -232,7 +232,7 @@ class VertexBuildResponse(BaseModel): inactivated_vertices: Optional[List[str]] = None next_vertices_ids: Optional[List[str]] = None valid: bool - params: Optional[str] + params: Optional[Any] = Field(default_factory=dict) """JSON string of the params.""" data: ResultDataResponse """Mapping of vertex ids to result dict containing the param name and result value.""" diff --git a/src/backend/langflow/components/utilities/RunFlow.py b/src/backend/langflow/components/utilities/RunFlow.py new file mode 100644 index 000000000..d0e49ac90 --- /dev/null +++ b/src/backend/langflow/components/utilities/RunFlow.py @@ -0,0 +1,58 @@ +from typing import List, Optional + +from langflow import CustomComponent +from langflow.field_typing import NestedDict, Text +from langflow.graph.schema import ResultData +from langflow.schema import Record + + +class RunFlowComponent(CustomComponent): + display_name = "Run Flow" + description = "A component to run a flow." + + def get_flow_names(self) -> List[str]: + flow_records = self.list_flows() + return [flow_record.data["name"] for flow_record in flow_records] + + def build_config(self): + return { + "input_value": { + "display_name": "Input Value", + "multiline": True, + }, + "flow_name": { + "display_name": "Flow Name", + "info": "The name of the flow to run.", + "options": self.get_flow_names, + }, + "tweaks": { + "display_name": "Tweaks", + "info": "Tweaks to apply to the flow.", + }, + } + + def build_records_from_result_data(self, result_data: ResultData) -> Record: + messages = result_data.messages + records = [] + for message in messages: + record = Record(text=message.get("text", ""), data={"result": result_data}) + records.append(record) + return records + + async def build( + self, input_value: Text, flow_name: str, tweaks: NestedDict + ) -> Record: + + results: List[Optional[ResultData]] = await self.run_flow( + input_value=input_value, flow_name=flow_name, tweaks=tweaks + ) + if isinstance(results, list): + records = [] + for result in results: + if result: + records.extend(self.build_records_from_result_data(result)) + else: + records = self.build_records_from_result_data(results) + + self.status = records + return records diff --git a/src/backend/langflow/graph/schema.py b/src/backend/langflow/graph/schema.py index f53a0833f..a9f06ac1e 100644 --- a/src/backend/langflow/graph/schema.py +++ b/src/backend/langflow/graph/schema.py @@ -4,12 +4,13 @@ from typing import Any, Optional from pydantic import BaseModel, Field, field_serializer from langflow.graph.utils import serialize_field -from langflow.utils.schemas import ContainsEnumMeta +from langflow.utils.schemas import ChatOutputResponse, ContainsEnumMeta class ResultData(BaseModel): results: Optional[Any] = Field(default_factory=dict) artifacts: Optional[Any] = Field(default_factory=dict) + messages: Optional[list[ChatOutputResponse]] = Field(default_factory=list) timedelta: Optional[float] = None duration: Optional[str] = None diff --git a/src/backend/langflow/graph/vertex/base.py b/src/backend/langflow/graph/vertex/base.py index 186030729..635b6f7f3 100644 --- a/src/backend/langflow/graph/vertex/base.py +++ b/src/backend/langflow/graph/vertex/base.py @@ -28,6 +28,7 @@ from langflow.interface.initialize import loading from langflow.interface.listing import lazy_load_dict from langflow.services.deps import get_storage_service from langflow.utils.constants import DIRECT_TYPES +from langflow.utils.schemas import ChatOutputResponse from langflow.utils.util import sync_to_async if TYPE_CHECKING: @@ -411,15 +412,43 @@ class Vertex: self._built = True + def extract_messages_from_artifacts(self, artifacts: Dict[str, Any]) -> List[str]: + """ + Extracts messages from the artifacts. + + Args: + artifacts (Dict[str, Any]): The artifacts to extract messages from. + + Returns: + List[str]: The extracted messages. + """ + messages = [] + for key, artifact in artifacts.items(): + if not isinstance(artifact, dict): + continue + if "message" in artifact: + chat_output_response = ChatOutputResponse( + message=artifact["message"], + sender=artifact.get("sender"), + sender_name=artifact.get("sender_name"), + session_id=artifact.get("session_id"), + component_id=self.id, + ) + messages.append(chat_output_response.model_dump(exclude_none=True)) + + return messages + def _finalize_build(self): result_dict = self.get_built_result() # We need to set the artifacts to pass information # to the frontend self.set_artifacts() artifacts = self.artifacts + messages = self.extract_messages_from_artifacts(artifacts) result_dict = ResultData( results=result_dict, artifacts=artifacts, + messages=messages, ) self.set_result(result_dict) diff --git a/src/backend/langflow/interface/custom/custom_component/custom_component.py b/src/backend/langflow/interface/custom/custom_component/custom_component.py index 1596533a2..ced2c7a78 100644 --- a/src/backend/langflow/interface/custom/custom_component/custom_component.py +++ b/src/backend/langflow/interface/custom/custom_component/custom_component.py @@ -132,7 +132,7 @@ class CustomComponent(Component): return yaml.dump(self.repr_value) if isinstance(self.repr_value, str): return self.repr_value - return str(self.repr_value) + return self.repr_value def build_config(self): return self.field_config diff --git a/src/backend/langflow/schema/schema.py b/src/backend/langflow/schema/schema.py index c99aaf127..639c9da96 100644 --- a/src/backend/langflow/schema/schema.py +++ b/src/backend/langflow/schema/schema.py @@ -1,4 +1,4 @@ -from typing import Any +from typing import Any, Optional from langchain_core.documents import Document from pydantic import BaseModel @@ -13,7 +13,7 @@ class Record(BaseModel): data (dict, optional): Additional data associated with the record. """ - text: str + text: Optional[str] = "" data: dict = {} @classmethod @@ -52,8 +52,6 @@ class Record(BaseModel): Returns the text of the record. Returns: - str: The text of the record. + str: The text and data of the record. """ - return self.text - - + return self.model_dump_json(indent=2) diff --git a/src/backend/langflow/utils/schemas.py b/src/backend/langflow/utils/schemas.py index 354cb5949..3e6f17a5a 100644 --- a/src/backend/langflow/utils/schemas.py +++ b/src/backend/langflow/utils/schemas.py @@ -11,7 +11,9 @@ class ChatOutputResponse(BaseModel): message: Union[str, List[Union[str, Dict]]] sender: Optional[str] = "Machine" sender_name: Optional[str] = "AI" + session_id: Optional[str] = None stream_url: Optional[str] = None + component_id: Optional[str] = None @classmethod def from_message( diff --git a/src/frontend/src/CustomNodes/GenericNode/index.tsx b/src/frontend/src/CustomNodes/GenericNode/index.tsx index a7126fff7..72bedfcb3 100644 --- a/src/frontend/src/CustomNodes/GenericNode/index.tsx +++ b/src/frontend/src/CustomNodes/GenericNode/index.tsx @@ -9,6 +9,7 @@ import Loading from "../../components/ui/loading"; import { Textarea } from "../../components/ui/textarea"; import Xmark from "../../components/ui/xmark"; import { + RUN_TIMESTAMP_PREFIX, STATUS_BUILD, STATUS_BUILDING, priorityFields, @@ -59,6 +60,8 @@ export default function GenericNode({ useState(null); const [handles, setHandles] = useState(0); + const [validationString, setValidationString] = useState(""); + const takeSnapshot = useFlowsManagerStore((state) => state.takeSnapshot); function countHandles(): void { @@ -131,6 +134,18 @@ export default function GenericNode({ } }, [flowPool[data.id], data.id]); + useEffect(() => { + if (validationStatus?.params) { + // if it is not a string turn it into a string + let newValidationString = validationStatus.params; + if (typeof newValidationString !== "string") { + newValidationString = JSON.stringify(validationStatus.params); + } + + setValidationString(newValidationString); + } + }, [validationStatus, validationStatus?.params]); + const showNode = data.showNode ?? true; const nameEditable = true; @@ -493,14 +508,32 @@ export default function GenericNode({ ) : !validationStatus ? ( {STATUS_BUILD} ) : ( -
- {typeof validationStatus.params === "string" - ? `${durationString}\n${validationStatus.params}` - .split("\n") - .map((line, index) => ( -
{line}
- )) - : durationString} +
+
+ {lastRunTime && ( +
+
{RUN_TIMESTAMP_PREFIX}
+
+ {lastRunTime} +
+
+ )} +
+
+
Duration:
+
+ {validationStatus?.data.duration} +
+
+
+ + Output + +
+ {validationString.split("\n").map((line, index) => ( +
{line}
+ ))} +
) } From 16c8d5be0c34c713f227c961133d73f7fc4f75e2 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Mon, 4 Mar 2024 15:18:04 -0300 Subject: [PATCH 34/45] Refactor DocumentLoaderVertex to use record instead of doc and update display message format --- src/backend/langflow/graph/vertex/types.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/backend/langflow/graph/vertex/types.py b/src/backend/langflow/graph/vertex/types.py index 927cbda0f..ac9e1d0bc 100644 --- a/src/backend/langflow/graph/vertex/types.py +++ b/src/backend/langflow/graph/vertex/types.py @@ -1,6 +1,7 @@ import ast import json -from typing import AsyncIterator, Callable, Dict, Iterator, List, Optional, Union +from typing import (AsyncIterator, Callable, Dict, Iterator, List, Optional, + Union) import yaml from langchain_core.messages import AIMessage @@ -124,13 +125,13 @@ class DocumentLoaderVertex(Vertex): if not isinstance(self._built_object, UnbuiltObject): avg_length = sum( - len(doc.page_content) - for doc in self._built_object - if hasattr(doc, "page_content") + len(record.text) + for record in self._built_object + if hasattr(record, "text") ) / len(self._built_object) - return f"""{self.display_name}({len(self._built_object)} documents) - \nAvg. Document Length (characters): {int(avg_length)} - Documents: {self._built_object[:3]}...""" + return f"""{self.display_name}({len(self._built_object)} records) + \nAvg. Record Length (characters): {int(avg_length)} + Records: {self._built_object[:3]}...""" return f"{self.vertex_type}()" From c6045809f2a16cdf6cee5c1242b0a5adae62d3a2 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Mon, 4 Mar 2024 18:52:50 -0300 Subject: [PATCH 35/45] Update vertex raw parameters and add session ID --- src/backend/langflow/graph/graph/base.py | 14 ++++++++++---- src/backend/langflow/graph/vertex/base.py | 6 +++++- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/src/backend/langflow/graph/graph/base.py b/src/backend/langflow/graph/graph/base.py index a0b91b440..97246aee1 100644 --- a/src/backend/langflow/graph/graph/base.py +++ b/src/backend/langflow/graph/graph/base.py @@ -151,14 +151,20 @@ class Graph: getattr(self, f"_{attribute}_vertices").append(vertex.id) async def _run( - self, inputs: Dict[str, str], stream: bool + self, inputs: Dict[str, str], stream: bool, session_id: str ) -> List[Optional["ResultData"]]: """Runs the graph with the given inputs.""" for vertex_id in self._is_input_vertices: vertex = self.get_vertex(vertex_id) if vertex is None: raise ValueError(f"Vertex {vertex_id} not found") - vertex.update_raw_params(inputs) + vertex.update_raw_params(inputs, overwrite=True) + # Update all the vertices with the session_id + for vertex_id in self._has_session_id_vertices: + vertex = self.get_vertex(vertex_id) + if vertex is None: + raise ValueError(f"Vertex {vertex_id} not found") + vertex.update_raw_params({"session_id": session_id}) try: await self.process() self.increment_run_count() @@ -181,7 +187,7 @@ class Graph: return outputs async def run( - self, inputs: Dict[str, Union[str, list[str]]], stream: bool + self, inputs: Dict[str, Union[str, list[str]]], stream: bool, session_id: str ) -> List[Optional["ResultData"]]: """Runs the graph with the given inputs.""" @@ -195,7 +201,7 @@ class Graph: inputs_values = [inputs_values] for input_value in inputs_values: run_outputs = await self._run( - {INPUT_FIELD_NAME: input_value}, stream=stream + {INPUT_FIELD_NAME: input_value}, stream=stream, session_id=session_id ) logger.debug(f"Run outputs: {run_outputs}") outputs.extend(run_outputs) diff --git a/src/backend/langflow/graph/vertex/base.py b/src/backend/langflow/graph/vertex/base.py index 635b6f7f3..f4e4823be 100644 --- a/src/backend/langflow/graph/vertex/base.py +++ b/src/backend/langflow/graph/vertex/base.py @@ -383,7 +383,7 @@ class Vertex: self.params = params self._raw_params = params.copy() - def update_raw_params(self, new_params: Dict[str, str]): + def update_raw_params(self, new_params: Dict[str, str], overwrite: bool = False): """ Update the raw parameters of the vertex with the given new parameters. @@ -398,6 +398,10 @@ class Vertex: return if any(isinstance(self._raw_params.get(key), Vertex) for key in new_params): return + if not overwrite: + for key in new_params.copy(): + if key not in self._raw_params: + new_params.pop(key) self._raw_params.update(new_params) self.updated_raw_params = True From 4b14aef8a73428046651b05f5400298d8987fbc6 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Mon, 4 Mar 2024 18:52:59 -0300 Subject: [PATCH 36/45] Refactor process.py for readability and maintainability --- src/backend/langflow/processing/process.py | 42 ++++++++++++++++------ 1 file changed, 31 insertions(+), 11 deletions(-) diff --git a/src/backend/langflow/processing/process.py b/src/backend/langflow/processing/process.py index 408db0a53..e2af56d21 100644 --- a/src/backend/langflow/processing/process.py +++ b/src/backend/langflow/processing/process.py @@ -126,7 +126,9 @@ async def process_runnable(runnable: Runnable, inputs: Union[dict, List[dict]]): elif isinstance(inputs, dict) and hasattr(runnable, "ainvoke"): result = await runnable.ainvoke(inputs) else: - raise ValueError(f"Runnable {runnable} does not support inputs of type {type(inputs)}") + raise ValueError( + f"Runnable {runnable} does not support inputs of type {type(inputs)}" + ) # Check if the result is a list of AIMessages if isinstance(result, list) and all(isinstance(r, AIMessage) for r in result): result = [r.content for r in result] @@ -135,7 +137,9 @@ async def process_runnable(runnable: Runnable, inputs: Union[dict, List[dict]]): return result -async def process_inputs_dict(built_object: Union[Chain, VectorStore, Runnable], inputs: dict): +async def process_inputs_dict( + built_object: Union[Chain, VectorStore, Runnable], inputs: dict +): if isinstance(built_object, Chain): if inputs is None: raise ValueError("Inputs must be provided for a Chain") @@ -170,7 +174,9 @@ async def process_inputs_list(built_object: Runnable, inputs: List[dict]): return await process_runnable(built_object, inputs) -async def generate_result(built_object: Union[Chain, VectorStore, Runnable], inputs: Union[dict, List[dict]]): +async def generate_result( + built_object: Union[Chain, VectorStore, Runnable], inputs: Union[dict, List[dict]] +): if isinstance(inputs, dict): result = await process_inputs_dict(built_object, inputs) elif isinstance(inputs, List) and isinstance(built_object, Runnable): @@ -208,24 +214,30 @@ async def run_graph( else: graph_data = graph._graph_data if not session_id and session_service is not None: - session_id = session_service.generate_key(session_id=flow_id, data_graph=graph_data) + session_id = session_service.generate_key( + session_id=flow_id, data_graph=graph_data + ) if inputs is None: inputs = {} - outputs = await graph.run(inputs, stream=stream) + outputs = await graph.run(inputs, stream=stream, session_id=session_id) if session_id and session_service: session_service.update_session(session_id, (graph, artifacts)) return outputs, session_id -def validate_input(graph_data: Dict[str, Any], tweaks: Dict[str, Dict[str, Any]]) -> List[Dict[str, Any]]: +def validate_input( + graph_data: Dict[str, Any], tweaks: Dict[str, Dict[str, Any]] +) -> List[Dict[str, Any]]: if not isinstance(graph_data, dict) or not isinstance(tweaks, dict): raise ValueError("graph_data and tweaks should be dictionaries") nodes = graph_data.get("data", {}).get("nodes") or graph_data.get("nodes") if not isinstance(nodes, list): - raise ValueError("graph_data should contain a list of nodes under 'data' key or directly under 'nodes' key") + raise ValueError( + "graph_data should contain a list of nodes under 'data' key or directly under 'nodes' key" + ) return nodes @@ -234,7 +246,9 @@ def apply_tweaks(node: Dict[str, Any], node_tweaks: Dict[str, Any]) -> None: template_data = node.get("data", {}).get("node", {}).get("template") if not isinstance(template_data, dict): - logger.warning(f"Template data for node {node.get('id')} should be a dictionary") + logger.warning( + f"Template data for node {node.get('id')} should be a dictionary" + ) return for tweak_name, tweak_value in node_tweaks.items(): @@ -249,7 +263,9 @@ def apply_tweaks_on_vertex(vertex: Vertex, node_tweaks: Dict[str, Any]) -> None: vertex.params[tweak_name] = tweak_value -def process_tweaks(graph_data: Dict[str, Any], tweaks: Dict[str, Dict[str, Any]]) -> Dict[str, Any]: +def process_tweaks( + graph_data: Dict[str, Any], tweaks: Dict[str, Dict[str, Any]] +) -> Dict[str, Any]: """ This function is used to tweak the graph data using the node id and the tweaks dict. @@ -270,7 +286,9 @@ def process_tweaks(graph_data: Dict[str, Any], tweaks: Dict[str, Dict[str, Any]] if node_tweaks := tweaks.get(node_id): apply_tweaks(node, node_tweaks) else: - logger.warning("Each node should be a dictionary with an 'id' key of type str") + logger.warning( + "Each node should be a dictionary with an 'id' key of type str" + ) return graph_data @@ -282,6 +300,8 @@ def process_tweaks_on_graph(graph: Graph, tweaks: Dict[str, Dict[str, Any]]): if node_tweaks := tweaks.get(node_id): apply_tweaks_on_vertex(vertex, node_tweaks) else: - logger.warning("Each node should be a Vertex with an 'id' attribute of type str") + logger.warning( + "Each node should be a Vertex with an 'id' attribute of type str" + ) return graph From 76a52c0a766765e869b8d2a77ce9ac3bf9cf7128 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Mon, 4 Mar 2024 19:19:44 -0300 Subject: [PATCH 37/45] Remove unnecessary code and delay in flowsManagerStore --- src/frontend/src/stores/flowStore.ts | 4 +--- src/frontend/src/stores/flowsManagerStore.ts | 1 - 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/src/frontend/src/stores/flowStore.ts b/src/frontend/src/stores/flowStore.ts index 48431e069..cbf183289 100644 --- a/src/frontend/src/stores/flowStore.ts +++ b/src/frontend/src/stores/flowStore.ts @@ -453,9 +453,7 @@ const useFlowStore = create((set, get) => ({ if (vertexBuildData && vertexBuildData.inactivated_vertices) { get().removeFromVerticesBuild(vertexBuildData.inactivated_vertices); } - if (vertexBuildData && vertexBuildData.activated_layers) { - get().addToVerticesBuild(vertexBuildData.activated_layers.flat()); - } + if (vertexBuildData.next_vertices_ids) { // next_vertices_ids is a list of vertices that are going to be built next // verticesLayers is a list of list of vertices ids, where each list is a layer of vertices diff --git a/src/frontend/src/stores/flowsManagerStore.ts b/src/frontend/src/stores/flowsManagerStore.ts index 49849ba54..fac806c5f 100644 --- a/src/frontend/src/stores/flowsManagerStore.ts +++ b/src/frontend/src/stores/flowsManagerStore.ts @@ -92,7 +92,6 @@ const useFlowsManagerStore = create((set, get) => ({ true ); } - set({ saveLoading: true }); }, 500); // Delay of 500ms because chat message depends on it. }, saveFlow: (flow: FlowType, silent?: boolean) => { From 971b591cd66eb02d4d54fc81125bee07ecd8d8f4 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Mon, 4 Mar 2024 23:29:32 -0300 Subject: [PATCH 38/45] Delete UrlLoaderComponent and Add DocumentToRecordComponent --- .../langflow/components/data/UrlLoader.py | 49 ------------------- .../DocumentToRecord.py | 0 2 files changed, 49 deletions(-) delete mode 100644 src/backend/langflow/components/data/UrlLoader.py rename src/backend/langflow/components/{langchain_utilities => utilities}/DocumentToRecord.py (100%) diff --git a/src/backend/langflow/components/data/UrlLoader.py b/src/backend/langflow/components/data/UrlLoader.py deleted file mode 100644 index c1346f142..000000000 --- a/src/backend/langflow/components/data/UrlLoader.py +++ /dev/null @@ -1,49 +0,0 @@ -from typing import List - -from langchain import document_loaders -from langchain_core.documents import Document - -from langflow import CustomComponent - - -class UrlLoaderComponent(CustomComponent): - display_name: str = "Url Loader" - description: str = "Generic Url Loader Component" - - def build_config(self): - return { - "web_path": { - "display_name": "Url", - "required": True, - }, - "loader": { - "display_name": "Loader", - "is_list": True, - "required": True, - "options": [ - "AZLyricsLoader", - "CollegeConfidentialLoader", - "GitbookLoader", - "HNLoader", - "IFixitLoader", - "IMSDbLoader", - "WebBaseLoader", - ], - "value": "WebBaseLoader", - }, - "code": {"show": False}, - } - - def build(self, web_path: str, loader: str) -> List[Document]: - try: - loader_instance = getattr(document_loaders, loader)(web_path=web_path) - except Exception as e: - raise ValueError(f"No loader found for: {web_path}") from e - docs = loader_instance.load() - avg_length = sum( - len(doc.page_content) for doc in docs if hasattr(doc, "page_content") - ) / len(docs) - self.status = f"""{len(docs)} documents) - \nAvg. Document Length (characters): {int(avg_length)} - Documents: {docs[:3]}...""" - return docs diff --git a/src/backend/langflow/components/langchain_utilities/DocumentToRecord.py b/src/backend/langflow/components/utilities/DocumentToRecord.py similarity index 100% rename from src/backend/langflow/components/langchain_utilities/DocumentToRecord.py rename to src/backend/langflow/components/utilities/DocumentToRecord.py From fa4b8214bf85cd2fc8180bb24a5b1f0bef8124dd Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Mon, 4 Mar 2024 23:31:00 -0300 Subject: [PATCH 39/45] Refactor imports in types.py --- src/backend/langflow/interface/types.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/backend/langflow/interface/types.py b/src/backend/langflow/interface/types.py index 3dcddf99f..9a1375869 100644 --- a/src/backend/langflow/interface/types.py +++ b/src/backend/langflow/interface/types.py @@ -1,8 +1,9 @@ from cachetools import LRUCache, cached - from langflow.interface.agents.base import agent_creator from langflow.interface.chains.base import chain_creator -from langflow.interface.custom.directory_reader.utils import merge_nested_dicts_with_renaming +from langflow.interface.custom.directory_reader.utils import ( + merge_nested_dicts_with_renaming, +) from langflow.interface.custom.utils import build_custom_components from langflow.interface.document_loaders.base import documentloader_creator from langflow.interface.embeddings.base import embedding_creator @@ -13,7 +14,6 @@ from langflow.interface.retrievers.base import retriever_creator from langflow.interface.text_splitters.base import textsplitter_creator from langflow.interface.toolkits.base import toolkits_creator from langflow.interface.tools.base import tool_creator -from langflow.interface.utilities.base import utility_creator from langflow.interface.wrappers.base import wrapper_creator @@ -48,7 +48,7 @@ def build_langchain_types_dict(): # sourcery skip: dict-assign-update-to-union # vectorstore_creator, documentloader_creator, textsplitter_creator, - utility_creator, + # utility_creator, output_parser_creator, retriever_creator, ] @@ -66,4 +66,6 @@ def get_all_types_dict(settings_service): """Get all types dictionary combining native and custom components.""" native_components = build_langchain_types_dict() custom_components_from_file = build_custom_components(settings_service) - return merge_nested_dicts_with_renaming(native_components, custom_components_from_file) + return merge_nested_dicts_with_renaming( + native_components, custom_components_from_file + ) From 78d0c122d370cadf1357f3619a12b2ea3f3928ac Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Tue, 5 Mar 2024 11:56:34 -0300 Subject: [PATCH 40/45] Update settings and task service --- .../langflow/services/settings/base.py | 37 ++++++++++++----- src/backend/langflow/services/task/service.py | 40 +++++++++++-------- 2 files changed, 51 insertions(+), 26 deletions(-) diff --git a/src/backend/langflow/services/settings/base.py b/src/backend/langflow/services/settings/base.py index 93f8b3085..3a1ec956e 100644 --- a/src/backend/langflow/services/settings/base.py +++ b/src/backend/langflow/services/settings/base.py @@ -58,13 +58,17 @@ class Settings(BaseSettings): STORE: Optional[bool] = True STORE_URL: Optional[str] = "https://api.langflow.store" - DOWNLOAD_WEBHOOK_URL: Optional[ - str - ] = "https://api.langflow.store/flows/trigger/ec611a61-8460-4438-b187-a4f65e5559d4" - LIKE_WEBHOOK_URL: Optional[str] = "https://api.langflow.store/flows/trigger/64275852-ec00-45c1-984e-3bff814732da" + DOWNLOAD_WEBHOOK_URL: Optional[str] = ( + "https://api.langflow.store/flows/trigger/ec611a61-8460-4438-b187-a4f65e5559d4" + ) + LIKE_WEBHOOK_URL: Optional[str] = ( + "https://api.langflow.store/flows/trigger/64275852-ec00-45c1-984e-3bff814732da" + ) STORAGE_TYPE: str = "local" + CELERY_ENABLED: bool = False + @validator("CONFIG_DIR", pre=True, allow_reuse=True) def set_langflow_dir(cls, value): if not value: @@ -91,7 +95,9 @@ class Settings(BaseSettings): @validator("DATABASE_URL", pre=True) def set_database_url(cls, value, values): if not value: - logger.debug("No database_url provided, trying LANGFLOW_DATABASE_URL env variable") + logger.debug( + "No database_url provided, trying LANGFLOW_DATABASE_URL env variable" + ) if langflow_database_url := os.getenv("LANGFLOW_DATABASE_URL"): value = langflow_database_url logger.debug("Using LANGFLOW_DATABASE_URL env variable.") @@ -101,7 +107,9 @@ class Settings(BaseSettings): # so we need to migrate to the new format # if there is a database in that location if not values["CONFIG_DIR"]: - raise ValueError("CONFIG_DIR not set, please set it or provide a DATABASE_URL") + raise ValueError( + "CONFIG_DIR not set, please set it or provide a DATABASE_URL" + ) new_path = f"{values['CONFIG_DIR']}/langflow.db" if Path("./langflow.db").exists(): @@ -125,15 +133,22 @@ class Settings(BaseSettings): if os.getenv("LANGFLOW_COMPONENTS_PATH"): logger.debug("Adding LANGFLOW_COMPONENTS_PATH to components_path") langflow_component_path = os.getenv("LANGFLOW_COMPONENTS_PATH") - if Path(langflow_component_path).exists() and langflow_component_path not in value: + if ( + Path(langflow_component_path).exists() + and langflow_component_path not in value + ): if isinstance(langflow_component_path, list): for path in langflow_component_path: if path not in value: value.append(path) - logger.debug(f"Extending {langflow_component_path} to components_path") + logger.debug( + f"Extending {langflow_component_path} to components_path" + ) elif langflow_component_path not in value: value.append(langflow_component_path) - logger.debug(f"Appending {langflow_component_path} to components_path") + logger.debug( + f"Appending {langflow_component_path} to components_path" + ) if not value: value = [BASE_COMPONENTS_PATH] @@ -145,7 +160,9 @@ class Settings(BaseSettings): logger.debug(f"Components path: {value}") return value - model_config = SettingsConfigDict(validate_assignment=True, extra="ignore", env_prefix="LANGFLOW_") + model_config = SettingsConfigDict( + validate_assignment=True, extra="ignore", env_prefix="LANGFLOW_" + ) # @model_validator() # @classmethod diff --git a/src/backend/langflow/services/task/service.py b/src/backend/langflow/services/task/service.py index 3f7a81f2c..e7f87d3f9 100644 --- a/src/backend/langflow/services/task/service.py +++ b/src/backend/langflow/services/task/service.py @@ -1,11 +1,14 @@ -from typing import Any, Callable, Coroutine, Union +from typing import TYPE_CHECKING, Any, Callable, Coroutine, Union + +from loguru import logger from langflow.services.base import Service from langflow.services.task.backends.anyio import AnyIOBackend from langflow.services.task.backends.base import TaskBackend from langflow.services.task.utils import get_celery_worker_status -from langflow.utils.logger import configure -from loguru import logger + +if TYPE_CHECKING: + from langflow.services.settings.service import SettingsService def check_celery_availability(): @@ -20,28 +23,31 @@ def check_celery_availability(): return status -try: - configure() - status = check_celery_availability() - - USE_CELERY = status.get("availability") is not None -except ImportError: - USE_CELERY = False - - class TaskService(Service): name = "task_service" - def __init__(self): - self.backend = self.get_backend() + def __init__(self, settings_service: "SettingsService"): + self.settings_service = settings_service + try: + if self.settings_service.settings.CELERY_ENABLED: + USE_CELERY = True + status = check_celery_availability() + + USE_CELERY = status.get("availability") is not None + else: + USE_CELERY = False + except ImportError: + USE_CELERY = False + self.use_celery = USE_CELERY + self.backend = self.get_backend() @property def backend_name(self) -> str: return self.backend.name def get_backend(self) -> TaskBackend: - if USE_CELERY: + if self.use_celery: from langflow.services.task.backends.celery import CeleryBackend logger.debug("Using Celery backend") @@ -68,7 +74,9 @@ class TaskService(Service): result = await result return task.id, result - async def launch_task(self, task_func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: + async def launch_task( + self, task_func: Callable[..., Any], *args: Any, **kwargs: Any + ) -> Any: logger.debug(f"Launching task {task_func} with args {args} and kwargs {kwargs}") logger.debug(f"Using backend {self.backend}") task = self.backend.launch_task(task_func, *args, **kwargs) From a4f5ff0daf795e98c651ba7adb920adb16099595 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Tue, 5 Mar 2024 12:07:28 -0300 Subject: [PATCH 41/45] tweaks now accept passing the name of parameter which will update all nodes --- src/backend/langflow/processing/process.py | 23 +++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/src/backend/langflow/processing/process.py b/src/backend/langflow/processing/process.py index e2af56d21..5db886282 100644 --- a/src/backend/langflow/processing/process.py +++ b/src/backend/langflow/processing/process.py @@ -271,24 +271,25 @@ def process_tweaks( :param graph_data: The dictionary containing the graph data. It must contain a 'data' key with 'nodes' as its child or directly contain 'nodes' key. Each node should have an 'id' and 'data'. - :param tweaks: A dictionary where the key is the node id and the value is a dictionary of the tweaks. - The inner dictionary contains the name of a certain parameter as the key and the value to be tweaked. - + :param tweaks: The dictionary containing the tweaks. The keys can be the node id or the name of the tweak. + The values can be a dictionary containing the tweaks for the node or the value of the tweak. :return: The modified graph_data dictionary. :raises ValueError: If the input is not in the expected format. """ nodes = validate_input(graph_data, tweaks) + nodes_map = {node.get("id"): node for node in nodes} + + all_nodes_tweaks = {} + for key, value in tweaks.items(): + if isinstance(value, dict): + if node := nodes_map.get(key): + apply_tweaks(node, value) + else: + all_nodes_tweaks[key] = value for node in nodes: - if isinstance(node, dict) and isinstance(node.get("id"), str): - node_id = node["id"] - if node_tweaks := tweaks.get(node_id): - apply_tweaks(node, node_tweaks) - else: - logger.warning( - "Each node should be a dictionary with an 'id' key of type str" - ) + apply_tweaks(node, all_nodes_tweaks) return graph_data From e1ded9c1067b4dc20c643c14c779dd5dca55df91 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Tue, 5 Mar 2024 12:24:49 -0300 Subject: [PATCH 42/45] Add support for specifying outputs in run_flow_with_caching and run_graph --- src/backend/langflow/api/v1/endpoints.py | 6 +++++ src/backend/langflow/graph/graph/base.py | 26 ++++++++++++++-------- src/backend/langflow/processing/process.py | 8 ++++++- 3 files changed, 30 insertions(+), 10 deletions(-) diff --git a/src/backend/langflow/api/v1/endpoints.py b/src/backend/langflow/api/v1/endpoints.py index bd07d38b2..09114f3ae 100644 --- a/src/backend/langflow/api/v1/endpoints.py +++ b/src/backend/langflow/api/v1/endpoints.py @@ -57,6 +57,7 @@ async def run_flow_with_caching( session: Annotated[Session, Depends(get_session)], flow_id: str, inputs: Optional[InputValueRequest] = None, + outputs: Optional[List[str]] = None, tweaks: Optional[dict] = None, stream: Annotated[bool, Body(embed=True)] = False, # noqa: F821 session_id: Annotated[Union[None, str], Body(embed=True)] = None, # noqa: F821 @@ -69,6 +70,9 @@ async def run_flow_with_caching( else: input_values_dict = {} + if outputs is None: + outputs = [] + if session_id: session_data = await session_service.load_session( session_id, flow_id=flow_id @@ -82,6 +86,7 @@ async def run_flow_with_caching( flow_id=flow_id, session_id=session_id, inputs=input_values_dict, + outputs=outputs, artifacts=artifacts, session_service=session_service, stream=stream, @@ -107,6 +112,7 @@ async def run_flow_with_caching( flow_id=flow_id, session_id=session_id, inputs=input_values_dict, + outputs=outputs, artifacts={}, session_service=session_service, stream=stream, diff --git a/src/backend/langflow/graph/graph/base.py b/src/backend/langflow/graph/graph/base.py index f08cd1193..f8d740f51 100644 --- a/src/backend/langflow/graph/graph/base.py +++ b/src/backend/langflow/graph/graph/base.py @@ -151,7 +151,7 @@ class Graph: getattr(self, f"_{attribute}_vertices").append(vertex.id) async def _run( - self, inputs: Dict[str, str], stream: bool, session_id: str + self, inputs: Dict[str, str], outputs: list[str], stream: bool, session_id: str ) -> List[Optional["ResultData"]]: """Runs the graph with the given inputs.""" for vertex_id in self._is_input_vertices: @@ -171,7 +171,7 @@ class Graph: except Exception as exc: logger.exception(exc) raise ValueError(f"Error running graph: {exc}") from exc - outputs = [] + vertex_outputs = [] for vertex_id in self._is_output_vertices: vertex = self.get_vertex(vertex_id) if vertex is None: @@ -183,11 +183,16 @@ class Graph: and hasattr(vertex, "consume_async_generator") ): await vertex.consume_async_generator() - outputs.append(vertex.result) - return outputs + if vertex.display_name in outputs or vertex.id in outputs: + vertex_outputs.append(vertex.result) + return vertex_outputs async def run( - self, inputs: Dict[str, Union[str, list[str]]], stream: bool, session_id: str + self, + inputs: Dict[str, Union[str, list[str]]], + outputs: list[str], + stream: bool, + session_id: str, ) -> List[Optional["ResultData"]]: """Runs the graph with the given inputs.""" @@ -195,17 +200,20 @@ class Graph: # we need to go through self.inputs and update the self._raw_params # of the vertices that are inputs # if the value is a list, we need to run multiple times - outputs = [] + vertex_outputs = [] inputs_values = inputs.get(INPUT_FIELD_NAME, "") if not isinstance(inputs_values, list): inputs_values = [inputs_values] for input_value in inputs_values: run_outputs = await self._run( - {INPUT_FIELD_NAME: input_value}, stream=stream, session_id=session_id + {INPUT_FIELD_NAME: input_value}, + outputs, + stream=stream, + session_id=session_id, ) logger.debug(f"Run outputs: {run_outputs}") - outputs.extend(run_outputs) - return outputs + vertex_outputs.append(run_outputs) + return vertex_outputs # vertices_layers is a list of lists ordered by the order the vertices # should be built. diff --git a/src/backend/langflow/processing/process.py b/src/backend/langflow/processing/process.py index 5db886282..8ef4ae50f 100644 --- a/src/backend/langflow/processing/process.py +++ b/src/backend/langflow/processing/process.py @@ -204,6 +204,7 @@ async def run_graph( stream: bool, session_id: Optional[str] = None, inputs: Optional[dict[str, Union[List[str], str]]] = None, + outputs: Optional[List[str]] = None, artifacts: Optional[Dict[str, Any]] = None, session_service: Optional[SessionService] = None, ): @@ -220,7 +221,12 @@ async def run_graph( if inputs is None: inputs = {} - outputs = await graph.run(inputs, stream=stream, session_id=session_id) + outputs = await graph.run( + inputs, + outputs, + stream=stream, + session_id=session_id, + ) if session_id and session_service: session_service.update_session(session_id, (graph, artifacts)) return outputs, session_id From 6b66834990bae39c333289d0d367fa248cbd6b91 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Tue, 5 Mar 2024 12:40:56 -0300 Subject: [PATCH 43/45] Refactor API endpoints and schemas --- src/backend/langflow/api/v1/endpoints.py | 24 ++++++++---------------- src/backend/langflow/api/v1/schemas.py | 17 ++++++++++++++++- src/backend/langflow/graph/graph/base.py | 17 ++++++++++++++--- 3 files changed, 38 insertions(+), 20 deletions(-) diff --git a/src/backend/langflow/api/v1/endpoints.py b/src/backend/langflow/api/v1/endpoints.py index 09114f3ae..f2f0d401a 100644 --- a/src/backend/langflow/api/v1/endpoints.py +++ b/src/backend/langflow/api/v1/endpoints.py @@ -7,28 +7,20 @@ from loguru import logger from sqlmodel import Session, select from langflow.api.utils import update_frontend_node_with_template_values -from langflow.api.v1.schemas import ( - CustomComponentCode, - InputValueRequest, - ProcessResponse, - RunResponse, - TaskStatusResponse, - UploadFileResponse, -) +from langflow.api.v1.schemas import (CustomComponentCode, InputValueRequest, + ProcessResponse, RunResponse, + TaskStatusResponse, UploadFileResponse) from langflow.interface.custom.custom_component import CustomComponent from langflow.interface.custom.directory_reader import DirectoryReader from langflow.interface.custom.utils import build_custom_component_template from langflow.processing.process import process_tweaks, run_graph -from langflow.services.auth.utils import api_key_security, get_current_active_user +from langflow.services.auth.utils import (api_key_security, + get_current_active_user) from langflow.services.cache.utils import save_uploaded_file from langflow.services.database.models.flow import Flow from langflow.services.database.models.user.model import User -from langflow.services.deps import ( - get_session, - get_session_service, - get_settings_service, - get_task_service, -) +from langflow.services.deps import (get_session, get_session_service, + get_settings_service, get_task_service) from langflow.services.session.service import SessionService from langflow.services.task.service import TaskService @@ -56,7 +48,7 @@ def get_all( async def run_flow_with_caching( session: Annotated[Session, Depends(get_session)], flow_id: str, - inputs: Optional[InputValueRequest] = None, + inputs: Optional[List[InputValueRequest]] = None, outputs: Optional[List[str]] = None, tweaks: Optional[dict] = None, stream: Annotated[bool, Body(embed=True)] = False, # noqa: F821 diff --git a/src/backend/langflow/api/v1/schemas.py b/src/backend/langflow/api/v1/schemas.py index 433d57a37..97d4722ec 100644 --- a/src/backend/langflow/api/v1/schemas.py +++ b/src/backend/langflow/api/v1/schemas.py @@ -245,4 +245,19 @@ class VerticesBuiltResponse(BaseModel): class InputValueRequest(BaseModel): - input_value: str + components: Optional[List[str]] = None + input_value: Optional[List[str]] = None + + # add an example + model_config = { + "json_schema_extra": { + "examples": [ + {"components": ["components_id"], "input_value": ["input_value"]}, + {"components": ["Component Name"], "input_value": ["input_value"]}, + {"input_value": ["input_value"]}, + { + "input_value": ["input_value1", "input_value2"], + }, + ] + } + } diff --git a/src/backend/langflow/graph/graph/base.py b/src/backend/langflow/graph/graph/base.py index f8d740f51..32c1369d8 100644 --- a/src/backend/langflow/graph/graph/base.py +++ b/src/backend/langflow/graph/graph/base.py @@ -151,11 +151,21 @@ class Graph: getattr(self, f"_{attribute}_vertices").append(vertex.id) async def _run( - self, inputs: Dict[str, str], outputs: list[str], stream: bool, session_id: str + self, + inputs: Dict[str, str], + input_components: list[str], + outputs: list[str], + stream: bool, + session_id: str, ) -> List[Optional["ResultData"]]: """Runs the graph with the given inputs.""" for vertex_id in self._is_input_vertices: vertex = self.get_vertex(vertex_id) + if input_components and ( + vertex_id not in input_components + or vertex.display_name not in input_components + ): + continue if vertex is None: raise ValueError(f"Vertex {vertex_id} not found") vertex.update_raw_params(inputs, overwrite=True) @@ -206,8 +216,9 @@ class Graph: inputs_values = [inputs_values] for input_value in inputs_values: run_outputs = await self._run( - {INPUT_FIELD_NAME: input_value}, - outputs, + inputs={INPUT_FIELD_NAME: input_value}, + input_components=inputs.get("components", []), + outputs=outputs, stream=stream, session_id=session_id, ) From b4d859db667a15636b351a89ffb70f548d4dd382 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Tue, 5 Mar 2024 12:57:18 -0300 Subject: [PATCH 44/45] Update InputValueRequest schema with additional component name --- src/backend/langflow/api/v1/schemas.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/backend/langflow/api/v1/schemas.py b/src/backend/langflow/api/v1/schemas.py index 97d4722ec..d641d3052 100644 --- a/src/backend/langflow/api/v1/schemas.py +++ b/src/backend/langflow/api/v1/schemas.py @@ -252,7 +252,10 @@ class InputValueRequest(BaseModel): model_config = { "json_schema_extra": { "examples": [ - {"components": ["components_id"], "input_value": ["input_value"]}, + { + "components": ["components_id", "Component Name"], + "input_value": ["input_value"], + }, {"components": ["Component Name"], "input_value": ["input_value"]}, {"input_value": ["input_value"]}, { From 5e1488471d37050f6a8f187d8672a043439218a9 Mon Sep 17 00:00:00 2001 From: Gabriel Luiz Freitas Almeida Date: Tue, 5 Mar 2024 14:20:21 -0300 Subject: [PATCH 45/45] Refactor API endpoints and add new schemas --- src/backend/langflow/api/v1/endpoints.py | 62 +++++++++++++++++++++--- src/backend/langflow/api/v1/schemas.py | 43 +++++++++++++--- 2 files changed, 89 insertions(+), 16 deletions(-) diff --git a/src/backend/langflow/api/v1/endpoints.py b/src/backend/langflow/api/v1/endpoints.py index f2f0d401a..c28d06bcd 100644 --- a/src/backend/langflow/api/v1/endpoints.py +++ b/src/backend/langflow/api/v1/endpoints.py @@ -7,20 +7,29 @@ from loguru import logger from sqlmodel import Session, select from langflow.api.utils import update_frontend_node_with_template_values -from langflow.api.v1.schemas import (CustomComponentCode, InputValueRequest, - ProcessResponse, RunResponse, - TaskStatusResponse, UploadFileResponse) +from langflow.api.v1.schemas import ( + CustomComponentCode, + InputValueRequest, + ProcessResponse, + RunResponse, + TaskStatusResponse, + Tweaks, + UploadFileResponse, +) from langflow.interface.custom.custom_component import CustomComponent from langflow.interface.custom.directory_reader import DirectoryReader from langflow.interface.custom.utils import build_custom_component_template from langflow.processing.process import process_tweaks, run_graph -from langflow.services.auth.utils import (api_key_security, - get_current_active_user) +from langflow.services.auth.utils import api_key_security, get_current_active_user from langflow.services.cache.utils import save_uploaded_file from langflow.services.database.models.flow import Flow from langflow.services.database.models.user.model import User -from langflow.services.deps import (get_session, get_session_service, - get_settings_service, get_task_service) +from langflow.services.deps import ( + get_session, + get_session_service, + get_settings_service, + get_task_service, +) from langflow.services.session.service import SessionService from langflow.services.task.service import TaskService @@ -50,12 +59,49 @@ async def run_flow_with_caching( flow_id: str, inputs: Optional[List[InputValueRequest]] = None, outputs: Optional[List[str]] = None, - tweaks: Optional[dict] = None, + tweaks: Annotated[Optional[Tweaks], Body(embed=True)] = None, # noqa: F821 stream: Annotated[bool, Body(embed=True)] = False, # noqa: F821 session_id: Annotated[Union[None, str], Body(embed=True)] = None, # noqa: F821 api_key_user: User = Depends(api_key_security), session_service: SessionService = Depends(get_session_service), ): + """ + Executes a specified flow by ID with optional input values, output selection, tweaks, and streaming capability. + This endpoint supports running flows with caching to enhance performance and efficiency. + + ### Parameters: + - `flow_id` (str): The unique identifier of the flow to be executed. + - `inputs` (List[InputValueRequest], optional): A list of inputs specifying the input values and components for the flow. Each input can target specific components and provide custom values. + - `outputs` (List[str], optional): A list of output names to retrieve from the executed flow. If not provided, all outputs are returned. + - `tweaks` (Optional[Tweaks], optional): A dictionary of tweaks to customize the flow execution. The tweaks can be used to modify the flow's parameters and components. Tweaks can be overridden by the input values. + - `stream` (bool, optional): Specifies whether the results should be streamed. Defaults to False. + - `session_id` (Union[None, str], optional): An optional session ID to utilize existing session data for the flow execution. + - `api_key_user` (User): The user associated with the current API key. Automatically resolved from the API key. + - `session_service` (SessionService): The session service object for managing flow sessions. + + ### Returns: + A `RunResponse` object containing the selected outputs (or all if not specified) of the executed flow and the session ID. The structure of the response accommodates multiple inputs, providing a nested list of outputs for each input. + + ### Raises: + HTTPException: Indicates issues with finding the specified flow, invalid input formats, or internal errors during flow execution. + + ### Example usage: + ```json + POST /run/{flow_id} + Payload: + { + "inputs": [ + {"components": ["component1"], "input_value": "value1"}, + {"components": ["component3"], "input_value": "value2"} + ], + "outputs": ["Component Name", "component_id"], + "tweaks": {"parameter_name": "value", "Component Name": {"parameter_name": "value"}, "component_id": {"parameter_name": "value"}} + "stream": false + } + ``` + + This endpoint facilitates complex flow executions with customized inputs, outputs, and configurations, catering to diverse application requirements. + """ try: if inputs is not None: input_values_dict: dict[str, Union[str, list[str]]] = inputs.model_dump() diff --git a/src/backend/langflow/api/v1/schemas.py b/src/backend/langflow/api/v1/schemas.py index d641d3052..c47082540 100644 --- a/src/backend/langflow/api/v1/schemas.py +++ b/src/backend/langflow/api/v1/schemas.py @@ -4,7 +4,7 @@ from pathlib import Path from typing import Any, Dict, List, Optional, Union from uuid import UUID -from pydantic import BaseModel, Field, field_validator, model_serializer +from pydantic import BaseModel, Field, RootModel, field_validator, model_serializer from langflow.services.database.models.api_key.model import ApiKeyRead from langflow.services.database.models.base import orjson_dumps @@ -246,7 +246,7 @@ class VerticesBuiltResponse(BaseModel): class InputValueRequest(BaseModel): components: Optional[List[str]] = None - input_value: Optional[List[str]] = None + input_value: Optional[str] = None # add an example model_config = { @@ -254,13 +254,40 @@ class InputValueRequest(BaseModel): "examples": [ { "components": ["components_id", "Component Name"], - "input_value": ["input_value"], - }, - {"components": ["Component Name"], "input_value": ["input_value"]}, - {"input_value": ["input_value"]}, - { - "input_value": ["input_value1", "input_value2"], + "input_value": "input_value", }, + {"components": ["Component Name"], "input_value": "input_value"}, + {"input_value": "input_value"}, ] } } + + +class Tweaks(RootModel): + root: dict[str, Union[str, dict[str, str]]] = Field( + description="A dictionary of tweaks to adjust the flow's execution. Allows customizing flow behavior dynamically. All tweaks are overridden by the input values.", + ) + model_config = { + "json_schema_extra": { + "examples": [ + { + "parameter_name": "value", + "Component Name": {"parameter_name": "value"}, + "component_id": {"parameter_name": "value"}, + } + ] + } + } + + # This should behave like a dict + def __getitem__(self, key): + return self.root[key] + + def __setitem__(self, key, value): + self.root[key] = value + + def __delitem__(self, key): + del self.root[key] + + def items(self): + return self.root.items()