From cc62b225ca6b5e6cde4cbd6e03bd1635bd66292f Mon Sep 17 00:00:00 2001 From: anovazzi1 Date: Fri, 19 Jan 2024 21:37:40 -0300 Subject: [PATCH] fix(vertex/base.py): fix import statement for UnbuiltResult class feat(vertex/base.py): add PowerComponentTypes enum to represent power components feat(vertex/base.py): add is_power_component attribute to Vertex class to determine if a vertex is a power component feat(vertex/base.py): add get_result_dict method to Vertex class to return a dictionary with the result of the build process feat(vertex/base.py): add get_built_result method to Vertex class to return the built result of a vertex feat(vertex/base.py): add set_artifacts method to Vertex class feat(vertex/base.py): add steps and steps_ran attributes to Vertex class to keep track of build steps feat(vertex/base.py): add layer attribute to Vertex class to represent the layer of the vertex feat(vertex/base.py): add set_top_level method to Vertex class to set the parent_is_top_level attribute feat(vertex/base.py): add pinned attribute to Vertex class to indicate if the vertex is pinned feat(vertex/base.py): add _reset method to Vertex class to reset the state of the vertex before building feat(vertex/base.py): add build method to Vertex class to build the vertex and run build steps feat(vertex/base.py): add get_requester_result method to Vertex class to get the result of the requester vertex fix(vertex/base.py): fix add_edge method in Vertex class to check if the edge already exists before adding it fix(vertex/base.py): fix __getstate__ method in Vertex class to include pinned attribute in the state fix(vertex/base.py): fix _parse_data method in Vertex class to correctly set the pinned attribute fix(vertex/base.py): fix _run method in Vertex class to handle different types of built objects and handle exceptions --- src/backend/langflow/graph/vertex/base.py | 137 +++++++++++++++++++--- 1 file changed, 123 insertions(+), 14 deletions(-) diff --git a/src/backend/langflow/graph/vertex/base.py b/src/backend/langflow/graph/vertex/base.py index e34e0a082..9284e2eca 100644 --- a/src/backend/langflow/graph/vertex/base.py +++ b/src/backend/langflow/graph/vertex/base.py @@ -1,9 +1,11 @@ import ast import inspect import types -from typing import TYPE_CHECKING, Any, Coroutine, Dict, List, Optional +from enum import Enum +from typing import (TYPE_CHECKING, Any, Callable, Coroutine, Dict, List, + Optional) -from langflow.graph.utils import UnbuiltObject +from langflow.graph.utils import UnbuiltObject, UnbuiltResult from langflow.interface.initialize import loading from langflow.interface.listing import lazy_load_dict from langflow.utils.constants import DIRECT_TYPES @@ -11,10 +13,17 @@ from langflow.utils.util import sync_to_async from loguru import logger if TYPE_CHECKING: - from langflow.graph.edge.base import Edge + from langflow.graph.edge.base import ContractEdge from langflow.graph.graph.base import Graph +class PowerComponentTypes(Enum): + # ChatInput and ChatOutput are the only ones that are + # power components + ChatInput = "ChatInput" + ChatOutput = "ChatOutput" + + class Vertex: def __init__( self, @@ -30,16 +39,50 @@ class Vertex: self.base_type: Optional[str] = base_type self._parse_data() self._built_object = UnbuiltObject() + self._built_result = None self._built = False self.artifacts: Dict[str, Any] = {} + self.steps: List[Callable] = [self._build] + self.steps_ran: List[Callable] = [] self.task_id: Optional[str] = None self.is_task = is_task self.params = params or {} self.parent_node_id: Optional[str] = self._data.get("parent_node_id") self.parent_is_top_level = False + self.layer = None + try: + self.is_power_component = PowerComponentTypes(self.vertex_type) + except ValueError: + self.is_power_component = False + + # Build a result dict for each edge + # like so: {edge.target.id: {edge.target_param: self._built_object}} + async def get_result_dict(self, force: bool = False) -> Dict[str, Dict[str, Any]]: + """ + Returns a dictionary with the result of the build process. + """ + edge_results = {} + for edge in self.edges: + target = self.graph.get_vertex(edge.target_id) + if edge.is_fulfilled and isinstance(await edge.get_result(source=self, target=target, ), str): + if edge.target_id not in edge_results: + edge_results[edge.target_id] = {} + edge_results[edge.target_id][edge.target_param] = await edge.get_result(source=self, target=target) + return edge_results + + def get_built_result(self): + # If the Vertex.type is a power component + # then we need to return the built object + # instead of the result dict + if isinstance(self._built_result, UnbuiltResult): + return {} + return self._built_result if isinstance(self._built_result, dict) else {"result": self._built_result} + + def set_artifacts(self) -> None: + pass @property - def edges(self) -> List["Edge"]: + def edges(self) -> List["ContractEdge"]: return self.graph.get_vertex_edges(self.id) def __getstate__(self): @@ -61,6 +104,7 @@ class Vertex: self.base_type = state["base_type"] self.is_task = state["is_task"] self.id = state["id"] + self.pinned = state.get("pinned", False) self._parse_data() if "_built_object" in state: self._built_object = state["_built_object"] @@ -68,10 +112,16 @@ class Vertex: else: self._built_object = UnbuiltObject() self._built = False + if "_built_result" in state: + self._built_result = state["_built_result"] + else: + self._built_result = UnbuiltResult() self.artifacts: Dict[str, Any] = {} self.task_id: Optional[str] = None self.parent_node_id = state["parent_node_id"] self.parent_is_top_level = state["parent_is_top_level"] + self.layer = state["layer"] + self.steps = state["steps"] def set_top_level(self, top_level_vertices: List[str]) -> None: self.parent_is_top_level = self.parent_node_id in top_level_vertices @@ -79,6 +129,8 @@ class Vertex: def _parse_data(self) -> None: self.data = self._data["data"] self.output = self.data["node"]["base_classes"] + self.pinned = self.data["node"].get("pinned", False) + template_dicts = {key: value for key, value in self.data["node"]["template"].items() if isinstance(value, dict)} template_dicts = {key: value for key, value in self.data["node"]["template"].items() if isinstance(value, dict)} self.required_inputs = [ @@ -205,6 +257,7 @@ class Vertex: # Add _type to params self._raw_params = params self.params = params + # TODO: Hash params dict async def _build(self, user_id=None): """ @@ -217,6 +270,27 @@ class Vertex: self._built = True + def _run(self, user_id: str, inputs: Optional[dict] = None): + # user_id is just for compatibility with the other build methods + inputs = inputs or {} + # inputs = {key: value or "" for key, value in inputs.items()} + # if hasattr(self._built_object, "input_keys"): + # # test if all keys are in inputs + # # and if not add them with empty string + # # for key in self._built_object.input_keys: + # # if key not in inputs: + # # inputs[key] = "" + # if inputs == {} and hasattr(self._built_object, "prompt"): + # inputs = self._built_object.prompt.partial_variables + if isinstance(self._built_object, str): + self._built_result = self._built_object + elif hasattr(self._built_object, "run") and not isinstance(self._built_object, UnbuiltObject): + try: + result = self._built_object.run(inputs) + self._built_result = result + except Exception as exc: + logger.error(f"Error running {self.vertex_type}: {exc}") + async def _build_each_node_in_params_dict(self, user_id=None): """ Iterates over each node in the params dictionary and builds it. @@ -242,11 +316,12 @@ class Vertex: """ return all(self._is_node(node) for node in value) - async def get_result(self, user_id=None, timeout=None) -> Any: + async def get_result(self, requester: Optional["Vertex"] = None, user_id=None, timeout=None) -> Any: + # PLEASE REVIEW THIS IF STATEMENT # Check if the Vertex was built already if self._built: return self._built_object - + if self.is_task and self.task_id is not None: task = self.get_task() @@ -261,7 +336,7 @@ class Vertex: pass # If there's no task_id, build the vertex locally - await self.build(user_id=user_id) + await self.build(requester=requester, user_id=user_id) return self._built_object async def _build_node_and_update_params(self, key, node, user_id=None): @@ -269,7 +344,7 @@ class Vertex: Builds a given node and updates the params dictionary accordingly. """ - result = await node.get_result(user_id) + result = await node.get_result(requester=self, user_id=user_id) self._handle_func(key, result) if isinstance(result, list): self._extend_params_list_with_result(key, result) @@ -281,7 +356,7 @@ class Vertex: """ self.params[key] = [] for node in nodes: - built = await node.get_result(user_id) + built = await node.get_result(requester=self, user_id=user_id) if isinstance(built, list): if key not in self.params: self.params[key] = [] @@ -351,13 +426,46 @@ class Vertex: logger.warning(message) - async def build(self, force: bool = False, user_id=None, *args, **kwargs) -> Any: - if not self._built or force: - await self._build(user_id, *args, **kwargs) + def _reset(self): + self._built = False + self._built_object = UnbuiltObject() + self._built_result = UnbuiltResult() + self.artifacts = {} + self.steps_ran = [] - return self._built_object + async def build( + self, + user_id=None, + requester: Optional["Vertex"] = None, + **kwargs, + ) -> Any: + if self.pinned: + return self.get_requester_result(requester) + self._reset() - def add_edge(self, edge: "Edge") -> None: + # Run steps + for step in self.steps: + if step not in self.steps_ran: + if inspect.iscoroutinefunction(step): + await step(user_id=user_id, **kwargs) + else: + step(user_id=user_id, **kwargs) + self.steps_ran.append(step) + + return await self.get_requester_result(requester) + + async def get_requester_result(self, requester: Optional["Vertex"]): + # If the requester is None, this means that + # the Vertex is the root of the graph + if requester is None: + return self._built_object + + # Get the requester edge + requester_edge = next((edge for edge in self.edges if edge.target_id == requester.id), None) + # Return the result of the requester edge + return None if requester_edge is None else await requester_edge.get_result(source=self, target=requester) + + def add_edge(self, edge: "ContractEdge") -> None: if edge not in self.edges: self.edges.append(edge) @@ -377,6 +485,7 @@ class Vertex: # Add a message with an emoji, stars for sucess, return "Built sucessfully ✨" if self._built_object is not None else "Failed to build 😵‍💫" + class StatefulVertex(Vertex): pass