fix: validate 'streaming' vertex connections in graph (#5067)

This commit is contained in:
Ítalo Johnny 2024-12-04 16:53:54 -03:00 committed by GitHub
commit 505368ece7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -1218,6 +1218,8 @@ class Graph:
if vertex.id in self.cycle_vertices:
self.run_manager.add_to_cycle_vertices(vertex.id)
self.assert_streaming_sequence()
def _get_edges_as_list_of_tuples(self) -> list[tuple[str, str]]:
"""Returns the edges of the graph as a list of tuples."""
return [(e["data"]["sourceHandle"]["id"], e["data"]["targetHandle"]["id"]) for e in self._edges]
@ -1789,6 +1791,18 @@ class Graph:
vertex_instance.set_top_level(self.top_level_vertices)
return vertex_instance
def assert_streaming_sequence(self) -> None:
for i in self.edges:
_source = self.get_vertex(i.source_id)
if "stream" in _source.params and _source.params["stream"] is True:
_target = self.get_vertex(i.target_id)
if _target.vertex_type != "ChatOutput":
msg = (
"Error: A 'streaming' vertex cannot be followed by a non-'chat output' vertex."
"Disable streaming to run the flow."
)
raise Exception(msg) # noqa: TRY002
def prepare(self, stop_component_id: str | None = None, start_component_id: str | None = None):
self.initialize()
if stop_component_id and start_component_id: