Add RoutingVertex class to handle routing logic in graph
This commit is contained in:
parent
9c8d470e5e
commit
03cac50323
1 changed files with 46 additions and 0 deletions
|
|
@ -383,6 +383,52 @@ class ChatVertex(StatelessVertex):
|
|||
await super()._run(*args, **kwargs)
|
||||
|
||||
|
||||
class RoutingVertex(StatelessVertex):
|
||||
def __init__(self, data: Dict, graph):
|
||||
super().__init__(data, graph=graph, base_type="routing")
|
||||
self.use_result = True
|
||||
self.steps = [self._build, self._run]
|
||||
|
||||
def _built_object_repr(self):
|
||||
if self.artifacts and "repr" in self.artifacts:
|
||||
return self.artifacts["repr"] or super()._built_object_repr()
|
||||
return super()._built_object_repr()
|
||||
|
||||
def _build(self, *args, **kwargs):
|
||||
super()._build(*args, **kwargs)
|
||||
|
||||
# After building, the _built_object should be a dict with
|
||||
# {"result": Any, "condition": bool}
|
||||
# if true, we need to set should_run attr in the target of true edge
|
||||
# to true and should_run attr in the target of false edge to false
|
||||
# TODO: Add support for multiple conditions
|
||||
|
||||
def _run(self, *args, **kwargs):
|
||||
if self._built_object:
|
||||
condition = self._built_object.get("condition")
|
||||
result = self._built_object.get("result")
|
||||
if condition is not None:
|
||||
for edge in self.edges:
|
||||
if edge.source_id == self.id:
|
||||
target_vertex = self.graph.get_vertex(edge.target_id)
|
||||
# source_handle.channel and condition should be the same
|
||||
channel_bool = edge.source_handle.channel == "true"
|
||||
if condition == channel_bool:
|
||||
target_vertex.should_run = True
|
||||
else:
|
||||
target_vertex.should_run = False
|
||||
else:
|
||||
raise ValueError(
|
||||
f"RoutingVertex {self.id} must have a condition in the _built_object"
|
||||
)
|
||||
|
||||
self._built_result = result
|
||||
else:
|
||||
raise ValueError(
|
||||
f"RoutingVertex {self.id} must have a _built_object with a condition and a result"
|
||||
)
|
||||
|
||||
|
||||
def dict_to_codeblock(d: dict) -> str:
|
||||
from langflow.api.utils import serialize_field
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue