fix(vertex/base.py): fix import statement for UnbuiltResult class

feat(vertex/base.py): add PowerComponentTypes enum to represent power components
feat(vertex/base.py): add is_power_component attribute to Vertex class to determine if a vertex is a power component
feat(vertex/base.py): add get_result_dict method to Vertex class to return a dictionary with the result of the build process
feat(vertex/base.py): add get_built_result method to Vertex class to return the built result of a vertex
feat(vertex/base.py): add set_artifacts method to Vertex class
feat(vertex/base.py): add steps and steps_ran attributes to Vertex class to keep track of build steps
feat(vertex/base.py): add layer attribute to Vertex class to represent the layer of the vertex
feat(vertex/base.py): add set_top_level method to Vertex class to set the parent_is_top_level attribute
feat(vertex/base.py): add pinned attribute to Vertex class to indicate if the vertex is pinned
feat(vertex/base.py): add _reset method to Vertex class to reset the state of the vertex before building
feat(vertex/base.py): add build method to Vertex class to build the vertex and run build steps
feat(vertex/base.py): add get_requester_result method to Vertex class to get the result of the requester vertex
fix(vertex/base.py): fix add_edge method in Vertex class to check if the edge already exists before adding it
fix(vertex/base.py): fix __getstate__ method in Vertex class to include pinned attribute in the state
fix(vertex/base.py): fix _parse_data method in Vertex class to correctly set the pinned attribute
fix(vertex/base.py): fix _run method in Vertex class to handle different types of built objects and handle exceptions
This commit is contained in:
anovazzi1 2024-01-19 21:37:40 -03:00
commit cc62b225ca

View file

@ -1,9 +1,11 @@
import ast
import inspect
import types
from typing import TYPE_CHECKING, Any, Coroutine, Dict, List, Optional
from enum import Enum
from typing import (TYPE_CHECKING, Any, Callable, Coroutine, Dict, List,
Optional)
from langflow.graph.utils import UnbuiltObject
from langflow.graph.utils import UnbuiltObject, UnbuiltResult
from langflow.interface.initialize import loading
from langflow.interface.listing import lazy_load_dict
from langflow.utils.constants import DIRECT_TYPES
@ -11,10 +13,17 @@ from langflow.utils.util import sync_to_async
from loguru import logger
if TYPE_CHECKING:
from langflow.graph.edge.base import Edge
from langflow.graph.edge.base import ContractEdge
from langflow.graph.graph.base import Graph
class PowerComponentTypes(Enum):
# ChatInput and ChatOutput are the only ones that are
# power components
ChatInput = "ChatInput"
ChatOutput = "ChatOutput"
class Vertex:
def __init__(
self,
@ -30,16 +39,50 @@ class Vertex:
self.base_type: Optional[str] = base_type
self._parse_data()
self._built_object = UnbuiltObject()
self._built_result = None
self._built = False
self.artifacts: Dict[str, Any] = {}
self.steps: List[Callable] = [self._build]
self.steps_ran: List[Callable] = []
self.task_id: Optional[str] = None
self.is_task = is_task
self.params = params or {}
self.parent_node_id: Optional[str] = self._data.get("parent_node_id")
self.parent_is_top_level = False
self.layer = None
try:
self.is_power_component = PowerComponentTypes(self.vertex_type)
except ValueError:
self.is_power_component = False
# Build a result dict for each edge
# like so: {edge.target.id: {edge.target_param: self._built_object}}
async def get_result_dict(self, force: bool = False) -> Dict[str, Dict[str, Any]]:
"""
Returns a dictionary with the result of the build process.
"""
edge_results = {}
for edge in self.edges:
target = self.graph.get_vertex(edge.target_id)
if edge.is_fulfilled and isinstance(await edge.get_result(source=self, target=target, ), str):
if edge.target_id not in edge_results:
edge_results[edge.target_id] = {}
edge_results[edge.target_id][edge.target_param] = await edge.get_result(source=self, target=target)
return edge_results
def get_built_result(self):
# If the Vertex.type is a power component
# then we need to return the built object
# instead of the result dict
if isinstance(self._built_result, UnbuiltResult):
return {}
return self._built_result if isinstance(self._built_result, dict) else {"result": self._built_result}
def set_artifacts(self) -> None:
pass
@property
def edges(self) -> List["Edge"]:
def edges(self) -> List["ContractEdge"]:
return self.graph.get_vertex_edges(self.id)
def __getstate__(self):
@ -61,6 +104,7 @@ class Vertex:
self.base_type = state["base_type"]
self.is_task = state["is_task"]
self.id = state["id"]
self.pinned = state.get("pinned", False)
self._parse_data()
if "_built_object" in state:
self._built_object = state["_built_object"]
@ -68,10 +112,16 @@ class Vertex:
else:
self._built_object = UnbuiltObject()
self._built = False
if "_built_result" in state:
self._built_result = state["_built_result"]
else:
self._built_result = UnbuiltResult()
self.artifacts: Dict[str, Any] = {}
self.task_id: Optional[str] = None
self.parent_node_id = state["parent_node_id"]
self.parent_is_top_level = state["parent_is_top_level"]
self.layer = state["layer"]
self.steps = state["steps"]
def set_top_level(self, top_level_vertices: List[str]) -> None:
self.parent_is_top_level = self.parent_node_id in top_level_vertices
@ -79,6 +129,8 @@ class Vertex:
def _parse_data(self) -> None:
self.data = self._data["data"]
self.output = self.data["node"]["base_classes"]
self.pinned = self.data["node"].get("pinned", False)
template_dicts = {key: value for key, value in self.data["node"]["template"].items() if isinstance(value, dict)}
template_dicts = {key: value for key, value in self.data["node"]["template"].items() if isinstance(value, dict)}
self.required_inputs = [
@ -205,6 +257,7 @@ class Vertex:
# Add _type to params
self._raw_params = params
self.params = params
# TODO: Hash params dict
async def _build(self, user_id=None):
"""
@ -217,6 +270,27 @@ class Vertex:
self._built = True
def _run(self, user_id: str, inputs: Optional[dict] = None):
# user_id is just for compatibility with the other build methods
inputs = inputs or {}
# inputs = {key: value or "" for key, value in inputs.items()}
# if hasattr(self._built_object, "input_keys"):
# # test if all keys are in inputs
# # and if not add them with empty string
# # for key in self._built_object.input_keys:
# # if key not in inputs:
# # inputs[key] = ""
# if inputs == {} and hasattr(self._built_object, "prompt"):
# inputs = self._built_object.prompt.partial_variables
if isinstance(self._built_object, str):
self._built_result = self._built_object
elif hasattr(self._built_object, "run") and not isinstance(self._built_object, UnbuiltObject):
try:
result = self._built_object.run(inputs)
self._built_result = result
except Exception as exc:
logger.error(f"Error running {self.vertex_type}: {exc}")
async def _build_each_node_in_params_dict(self, user_id=None):
"""
Iterates over each node in the params dictionary and builds it.
@ -242,11 +316,12 @@ class Vertex:
"""
return all(self._is_node(node) for node in value)
async def get_result(self, user_id=None, timeout=None) -> Any:
async def get_result(self, requester: Optional["Vertex"] = None, user_id=None, timeout=None) -> Any:
# PLEASE REVIEW THIS IF STATEMENT
# Check if the Vertex was built already
if self._built:
return self._built_object
if self.is_task and self.task_id is not None:
task = self.get_task()
@ -261,7 +336,7 @@ class Vertex:
pass
# If there's no task_id, build the vertex locally
await self.build(user_id=user_id)
await self.build(requester=requester, user_id=user_id)
return self._built_object
async def _build_node_and_update_params(self, key, node, user_id=None):
@ -269,7 +344,7 @@ class Vertex:
Builds a given node and updates the params dictionary accordingly.
"""
result = await node.get_result(user_id)
result = await node.get_result(requester=self, user_id=user_id)
self._handle_func(key, result)
if isinstance(result, list):
self._extend_params_list_with_result(key, result)
@ -281,7 +356,7 @@ class Vertex:
"""
self.params[key] = []
for node in nodes:
built = await node.get_result(user_id)
built = await node.get_result(requester=self, user_id=user_id)
if isinstance(built, list):
if key not in self.params:
self.params[key] = []
@ -351,13 +426,46 @@ class Vertex:
logger.warning(message)
async def build(self, force: bool = False, user_id=None, *args, **kwargs) -> Any:
if not self._built or force:
await self._build(user_id, *args, **kwargs)
def _reset(self):
self._built = False
self._built_object = UnbuiltObject()
self._built_result = UnbuiltResult()
self.artifacts = {}
self.steps_ran = []
return self._built_object
async def build(
self,
user_id=None,
requester: Optional["Vertex"] = None,
**kwargs,
) -> Any:
if self.pinned:
return self.get_requester_result(requester)
self._reset()
def add_edge(self, edge: "Edge") -> None:
# Run steps
for step in self.steps:
if step not in self.steps_ran:
if inspect.iscoroutinefunction(step):
await step(user_id=user_id, **kwargs)
else:
step(user_id=user_id, **kwargs)
self.steps_ran.append(step)
return await self.get_requester_result(requester)
async def get_requester_result(self, requester: Optional["Vertex"]):
# If the requester is None, this means that
# the Vertex is the root of the graph
if requester is None:
return self._built_object
# Get the requester edge
requester_edge = next((edge for edge in self.edges if edge.target_id == requester.id), None)
# Return the result of the requester edge
return None if requester_edge is None else await requester_edge.get_result(source=self, target=requester)
def add_edge(self, edge: "ContractEdge") -> None:
if edge not in self.edges:
self.edges.append(edge)
@ -377,6 +485,7 @@ class Vertex:
# Add a message with an emoji, stars for sucess,
return "Built sucessfully ✨" if self._built_object is not None else "Failed to build 😵‍💫"
class StatefulVertex(Vertex):
pass