🐛 fix(base.py): add exception logging to provide more information in case of KeyError
🐛 fix(utils.py): fix bug where ungrouped nodes were not added to the queue for future processing ✨ feat(utils.py): add functionality to keep track of processed nodes to avoid duplicate processing ✨ feat(utils.py): add functionality to process nodes in a breadth-first manner to ensure correct order of processing ✨ feat(utils.py): add functionality to update source handles correctly when ungrouping nodes
This commit is contained in:
parent
58548cc242
commit
01438d82e0
2 changed files with 22 additions and 3 deletions
|
|
@ -50,6 +50,7 @@ class Graph:
|
|||
edges = payload["edges"]
|
||||
return cls(nodes, edges)
|
||||
except KeyError as exc:
|
||||
logger.exception(exc)
|
||||
raise ValueError(
|
||||
f"Invalid payload. Expected keys 'nodes' and 'edges'. Found {list(payload.keys())}"
|
||||
) from exc
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
from collections import deque
|
||||
import copy
|
||||
|
||||
|
||||
|
|
@ -50,20 +51,37 @@ def ungroup_node(group_node_data, base_flow):
|
|||
base_flow["nodes"] = nodes
|
||||
base_flow["edges"] = edges
|
||||
|
||||
return nodes
|
||||
|
||||
|
||||
def process_flow(flow_object):
|
||||
cloned_flow = copy.deepcopy(flow_object)
|
||||
processed_nodes = set() # To keep track of processed nodes
|
||||
|
||||
def process_node(node):
|
||||
node_id = node.get("id")
|
||||
|
||||
# If node already processed, skip
|
||||
if node_id in processed_nodes:
|
||||
return
|
||||
|
||||
if (
|
||||
node.get("data")
|
||||
and node["data"].get("node")
|
||||
and node["data"]["node"].get("flow")
|
||||
):
|
||||
process_flow(node["data"]["node"]["flow"]["data"])
|
||||
ungroup_node(node["data"], cloned_flow)
|
||||
new_nodes = ungroup_node(node["data"], cloned_flow)
|
||||
# Add new nodes to the queue for future processing
|
||||
nodes_to_process.extend(new_nodes)
|
||||
|
||||
for node in cloned_flow["nodes"]:
|
||||
# Mark node as processed
|
||||
processed_nodes.add(node_id)
|
||||
|
||||
nodes_to_process = deque(cloned_flow["nodes"])
|
||||
|
||||
while nodes_to_process:
|
||||
node = nodes_to_process.popleft()
|
||||
process_node(node)
|
||||
|
||||
return cloned_flow
|
||||
|
|
@ -204,7 +222,7 @@ def get_updated_edges(base_flow, g_nodes, group_node_id):
|
|||
new_edge = update_target_handle(new_edge, g_nodes, group_node_id)
|
||||
|
||||
if new_edge["source"] == group_node_id:
|
||||
new_edge = update_source_handle(new_edge, g_nodes)
|
||||
new_edge = update_source_handle(new_edge, base_flow)
|
||||
|
||||
if edge["target"] == group_node_id or edge["source"] == group_node_id:
|
||||
updated_edges.append(new_edge)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue