diff --git a/gui/GraphEditorWidget.cpp b/gui/GraphEditorWidget.cpp index 9470e1f..763ad14 100644 --- a/gui/GraphEditorWidget.cpp +++ b/gui/GraphEditorWidget.cpp @@ -802,13 +802,13 @@ void GraphEditorWidget::onContextMenuRequested(const QPoint &pos) { QPointF scenePos = m_view->mapToScene(pos); m_lastContextMenuScenePos = scenePos; + bool hitNode = false; uint32_t hitPwNodeId = 0; QtNodes::NodeId hitQtNodeId = 0; for (auto nodeId : m_model->allNodeIds()) { const WarpNodeData *data = m_model->warpNodeData(nodeId); - if (!data) { + if (!data) continue; - } QPointF nodePos = m_model->nodeData(nodeId, QtNodes::NodeRole::Position).toPointF(); QSize nodeSize = @@ -817,12 +817,13 @@ void GraphEditorWidget::onContextMenuRequested(const QPoint &pos) { if (nodeRect.contains(scenePos)) { hitPwNodeId = data->info.id.value; hitQtNodeId = nodeId; + hitNode = true; break; } } QPoint screenPos = m_view->mapToGlobal(pos); - if (hitPwNodeId != 0) { + if (hitNode) { showNodeContextMenu(screenPos, hitPwNodeId, hitQtNodeId); } else { showCanvasContextMenu(screenPos, scenePos); @@ -1673,13 +1674,29 @@ void GraphEditorWidget::updateMeters() { const WarpNodeData *data = m_model->warpNodeData(nodeId); if (!data || !row.meter) continue; - auto peak = m_client->NodeMeterPeak(data->info.id); - if (peak.ok()) { - float level = std::max(peak.value.peak_left, peak.value.peak_right); - row.meter->setLevel(level); - m_model->setNodePeakLevel(nodeId, level); - if (level > 0.001f) + + const AppGroupData *group = m_model->appGroupData(nodeId); + if (group) { + float maxLevel = 0.0f; + for (uint32_t memberPwId : group->memberPwIds) { + auto peak = m_client->NodeMeterPeak(warppipe::NodeId{memberPwId}); + if (peak.ok()) + maxLevel = std::max(maxLevel, + std::max(peak.value.peak_left, peak.value.peak_right)); + } + row.meter->setLevel(maxLevel); + m_model->setNodePeakLevel(nodeId, maxLevel); + if (maxLevel > 0.001f) anyActive = true; + } else { + auto peak = m_client->NodeMeterPeak(data->info.id); + if (peak.ok()) { + float level = std::max(peak.value.peak_left, peak.value.peak_right); + row.meter->setLevel(level); + m_model->setNodePeakLevel(nodeId, level); + if (level > 0.001f) + anyActive = true; + } } } @@ -1720,8 +1737,16 @@ void GraphEditorWidget::rebuildNodeMeters() { if (!data) continue; - new_pw_ids[data->info.id.value] = true; - m_client->EnsureNodeMeter(data->info.id); + const AppGroupData *group = m_model->appGroupData(nodeId); + if (group) { + for (uint32_t memberPwId : group->memberPwIds) { + new_pw_ids[memberPwId] = true; + m_client->EnsureNodeMeter(warppipe::NodeId{memberPwId}); + } + } else { + new_pw_ids[data->info.id.value] = true; + m_client->EnsureNodeMeter(data->info.id); + } auto *row = new QWidget(); auto *rowLayout = new QHBoxLayout(row); @@ -2299,8 +2324,21 @@ void GraphEditorWidget::updateNodeDetailsPanel(QtNodes::NodeId nodeId) { QString::fromStdString(info.media_role)); } - addField(QStringLiteral("NODE ID"), - QString::number(info.id.value)); + const AppGroupData *groupData = m_model->appGroupData(nodeId); + if (groupData) { + addField(QStringLiteral("STREAMS"), + QString::number(groupData->memberPwIds.size())); + QString memberIds; + for (uint32_t pwId : groupData->memberPwIds) { + if (!memberIds.isEmpty()) + memberIds += QStringLiteral(", "); + memberIds += QString::number(pwId); + } + addField(QStringLiteral("MEMBER NODE IDS"), memberIds); + } else { + addField(QStringLiteral("NODE ID"), + QString::number(info.id.value)); + } if (!data->inputPorts.empty()) { layout->addSpacing(8); diff --git a/gui/WarpGraphModel.cpp b/gui/WarpGraphModel.cpp index f066cbd..e69298b 100644 --- a/gui/WarpGraphModel.cpp +++ b/gui/WarpGraphModel.cpp @@ -130,28 +130,57 @@ void WarpGraphModel::addConnection( } if (m_client) { - auto outIt = m_nodes.find(connectionId.outNodeId); - auto inIt = m_nodes.find(connectionId.inNodeId); - if (outIt == m_nodes.end() || inIt == m_nodes.end()) { - return; + auto outGroupIt = m_appGroups.find(connectionId.outNodeId); + auto inGroupIt = m_appGroups.find(connectionId.inNodeId); + + std::vector outPorts; + std::vector inPorts; + + if (outGroupIt != m_appGroups.end()) { + auto mapIt = outGroupIt->second.outputPortMap.find( + static_cast(connectionId.outPortIndex)); + if (mapIt != outGroupIt->second.outputPortMap.end()) + outPorts = mapIt->second; + } else { + auto outIt = m_nodes.find(connectionId.outNodeId); + if (outIt != m_nodes.end()) { + auto idx = static_cast(connectionId.outPortIndex); + if (idx < outIt->second.outputPorts.size()) + outPorts.push_back(outIt->second.outputPorts[idx].id); + } } - auto outIdx = static_cast(connectionId.outPortIndex); - auto inIdx = static_cast(connectionId.inPortIndex); - if (outIdx >= outIt->second.outputPorts.size() || - inIdx >= inIt->second.inputPorts.size()) { - return; + if (inGroupIt != m_appGroups.end()) { + auto mapIt = inGroupIt->second.inputPortMap.find( + static_cast(connectionId.inPortIndex)); + if (mapIt != inGroupIt->second.inputPortMap.end()) + inPorts = mapIt->second; + } else { + auto inIt = m_nodes.find(connectionId.inNodeId); + if (inIt != m_nodes.end()) { + auto idx = static_cast(connectionId.inPortIndex); + if (idx < inIt->second.inputPorts.size()) + inPorts.push_back(inIt->second.inputPorts[idx].id); + } } - warppipe::PortId outPortId = outIt->second.outputPorts[outIdx].id; - warppipe::PortId inPortId = inIt->second.inputPorts[inIdx].id; - - auto result = m_client->CreateLink(outPortId, inPortId, warppipe::LinkOptions{.linger = true}); - if (!result.ok()) { + if (outPorts.empty() || inPorts.empty()) return; + + bool anyCreated = false; + for (const auto &outPortId : outPorts) { + for (const auto &inPortId : inPorts) { + auto result = m_client->CreateLink( + outPortId, inPortId, warppipe::LinkOptions{.linger = true}); + if (result.ok()) { + m_linkIdToConn.emplace(result.value.id.value, connectionId); + anyCreated = true; + } + } } - m_linkIdToConn.emplace(result.value.id.value, connectionId); + if (!anyCreated) + return; } m_connections.insert(connectionId); @@ -172,8 +201,16 @@ QVariant WarpGraphModel::nodeData(QtNodes::NodeId nodeId, const auto &data = it->second; switch (role) { - case QtNodes::NodeRole::Caption: - return captionForNode(data.info); + case QtNodes::NodeRole::Caption: { + QString caption = captionForNode(data.info); + auto groupIt = m_appGroups.find(nodeId); + if (groupIt != m_appGroups.end()) { + int count = static_cast(groupIt->second.memberPwIds.size()); + if (count > 1) + caption += QStringLiteral(" (%1 streams)").arg(count); + } + return caption; + } case QtNodes::NodeRole::CaptionVisible: return true; case QtNodes::NodeRole::Position: { @@ -294,13 +331,14 @@ bool WarpGraphModel::deleteConnection( } if (m_client && !m_refreshing) { - for (auto linkIt = m_linkIdToConn.begin(); linkIt != m_linkIdToConn.end(); - ++linkIt) { - if (linkIt->second == connectionId) { - m_client->RemoveLink(warppipe::LinkId{linkIt->first}); - m_linkIdToConn.erase(linkIt); - break; - } + std::vector linksToRemove; + for (const auto &[linkId, connId] : m_linkIdToConn) { + if (connId == connectionId) + linksToRemove.push_back(linkId); + } + for (uint32_t linkId : linksToRemove) { + m_client->RemoveLink(warppipe::LinkId{linkId}); + m_linkIdToConn.erase(linkId); } } @@ -324,6 +362,14 @@ bool WarpGraphModel::deleteNode(QtNodes::NodeId const nodeId) { deleteConnection(conn); } + auto groupIt = m_appGroups.find(nodeId); + if (groupIt != m_appGroups.end()) { + for (uint32_t memberPwId : groupIt->second.memberPwIds) + m_pwToGroupQt.erase(memberPwId); + m_groupKeyToQt.erase(groupIt->second.groupKey); + m_appGroups.erase(groupIt); + } + m_nodes.erase(nodeId); m_positions.erase(nodeId); m_sizes.erase(nodeId); @@ -347,6 +393,53 @@ QJsonObject WarpGraphModel::saveNode(QtNodes::NodeId const nodeId) const { void WarpGraphModel::loadNode(QJsonObject const &) {} +void WarpGraphModel::rebuildGroupPortMap(QtNodes::NodeId groupQtId) { + auto groupIt = m_appGroups.find(groupQtId); + if (groupIt == m_appGroups.end()) + return; + + auto &group = groupIt->second; + group.outputPortMap.clear(); + group.inputPortMap.clear(); + + auto nodeIt = m_nodes.find(groupQtId); + if (nodeIt == m_nodes.end()) + return; + + const auto &canonOut = nodeIt->second.outputPorts; + const auto &canonIn = nodeIt->second.inputPorts; + + for (uint32_t memberPwId : group.memberPwIds) { + auto memberPorts = m_client->ListPorts(warppipe::NodeId{memberPwId}); + if (!memberPorts.ok()) + continue; + + for (const auto &port : memberPorts.value) { + if (!port.is_input) { + for (size_t ci = 0; ci < canonOut.size(); ++ci) { + if (port.name == canonOut[ci].name) { + group.outputPortMap[static_cast(ci)].push_back(port.id); + m_portToGroupPort[port.id.value] = {groupQtId, + static_cast(ci), + false}; + break; + } + } + } else { + for (size_t ci = 0; ci < canonIn.size(); ++ci) { + if (port.name == canonIn[ci].name) { + group.inputPortMap[static_cast(ci)].push_back(port.id); + m_portToGroupPort[port.id.value] = {groupQtId, + static_cast(ci), + true}; + break; + } + } + } + } + } +} + void WarpGraphModel::refreshFromClient() { if (!m_client) { return; @@ -362,16 +455,28 @@ void WarpGraphModel::refreshFromClient() { std::unordered_set seenPwIds; + // Phase 1: Separate app streams (to be grouped) from other nodes. + std::unordered_map> appStreams; + std::vector nonAppNodes; + for (const auto &nodeInfo : nodesResult.value) { seenPwIds.insert(nodeInfo.id.value); WarpNodeType nodeType = classifyNode(nodeInfo); - bool isStream = nodeType == WarpNodeType::kApplication; - if (isStream && nodeInfo.name.empty() && - nodeInfo.application_name.empty()) { - continue; + if (nodeType == WarpNodeType::kApplication) { + if (nodeInfo.name.empty() && nodeInfo.application_name.empty()) + continue; + std::string key = appGroupKey(nodeInfo); + if (key.empty()) + key = nodeInfo.name; + appStreams[key].push_back(nodeInfo); + } else { + nonAppNodes.push_back(nodeInfo); } + } + // Phase 2: Process non-app nodes (unchanged logic). + for (const auto &nodeInfo : nonAppNodes) { auto existing = m_pwToQt.find(nodeInfo.id.value); if (existing != m_pwToQt.end()) { QtNodes::NodeId qtId = existing->second; @@ -415,9 +520,12 @@ void WarpGraphModel::refreshFromClient() { continue; } + // Ghost matching for non-app nodes (rare but possible). QtNodes::NodeId ghostMatch = 0; std::string nodeName = nodeInfo.name; for (const auto &ghostId : m_ghostNodes) { + if (m_appGroups.count(ghostId)) + continue; auto ghostIt = m_nodes.find(ghostId); if (ghostIt != m_nodes.end() && ghostIt->second.info.name == nodeName) { @@ -460,6 +568,8 @@ void WarpGraphModel::refreshFromClient() { continue; } + WarpNodeType nodeType = classifyNode(nodeInfo); + auto portsResult = m_client->ListPorts(nodeInfo.id); std::vector inputs; std::vector outputs; @@ -487,23 +597,19 @@ void WarpGraphModel::refreshFromClient() { m_pwToQt.emplace(nodeInfo.id.value, qtId); auto pendingIt = m_pendingPositions.find(nodeInfo.name); - if (pendingIt != m_pendingPositions.end()) { - m_positions.emplace(qtId, pendingIt->second); - m_pendingPositions.erase(pendingIt); + if (pendingIt != m_pendingPositions.end()) { + m_positions.emplace(qtId, pendingIt->second); + m_pendingPositions.erase(pendingIt); + } else { + auto savedIt = m_savedPositions.find(nodeInfo.name); + if (savedIt != m_savedPositions.end()) { + m_positions.emplace(qtId, savedIt->second); } else { - auto groupPos = findAppGroupPosition(nodeIt->second); - if (groupPos) { - m_positions.emplace(qtId, findNonOverlappingPosition(*groupPos, nodeIt->second)); - } else { - auto savedIt = m_savedPositions.find(nodeInfo.name); - if (savedIt != m_savedPositions.end()) { - m_positions.emplace(qtId, savedIt->second); - } else { - QPointF candidate = nextPosition(nodeIt->second); - m_positions.emplace(qtId, findNonOverlappingPosition(candidate, nodeIt->second)); - } - } + QPointF candidate = nextPosition(nodeIt->second); + m_positions.emplace(qtId, + findNonOverlappingPosition(candidate, nodeIt->second)); } + } if (nodeHasVolume(nodeType)) { auto *volumeWidget = new NodeVolumeWidget(); @@ -518,6 +624,232 @@ void WarpGraphModel::refreshFromClient() { Q_EMIT nodeCreated(qtId); } + // Phase 3: Process app-stream groups. + std::unordered_set seenGroupKeys; + for (auto &[key, members] : appStreams) { + seenGroupKeys.insert(key); + + auto existingGroup = m_groupKeyToQt.find(key); + if (existingGroup != m_groupKeyToQt.end()) { + // Group already exists — update membership. + QtNodes::NodeId groupQtId = existingGroup->second; + auto &group = m_appGroups[groupQtId]; + + // Clear old reverse mappings. + for (uint32_t oldPwId : group.memberPwIds) + m_pwToGroupQt.erase(oldPwId); + + group.memberPwIds.clear(); + for (const auto &m : members) { + group.memberPwIds.push_back(m.id.value); + m_pwToGroupQt[m.id.value] = groupQtId; + } + + // Derive canonical ports from first member if node has no ports yet. + auto &nodeData = m_nodes[groupQtId]; + nodeData.info = members.front(); + nodeData.info.id = warppipe::NodeId{0}; + + bool portsMissing = + nodeData.inputPorts.empty() && nodeData.outputPorts.empty(); + if (portsMissing && !members.empty()) { + auto portsResult = m_client->ListPorts(members.front().id); + if (portsResult.ok()) { + for (const auto &port : portsResult.value) { + warppipe::PortInfo canonical = port; + canonical.id = warppipe::PortId{0}; + canonical.node = warppipe::NodeId{0}; + if (port.is_input) + nodeData.inputPorts.push_back(canonical); + else + nodeData.outputPorts.push_back(canonical); + } + std::sort(nodeData.inputPorts.begin(), nodeData.inputPorts.end(), + [](const auto &a, const auto &b) { return a.name < b.name; }); + std::sort(nodeData.outputPorts.begin(), nodeData.outputPorts.end(), + [](const auto &a, const auto &b) { return a.name < b.name; }); + } + } + + // Un-ghost if it was ghosted. + if (m_ghostNodes.erase(groupQtId)) { + std::erase_if(m_ghostConnections, [&](const auto &gc) { + if (gc.outNodeId != groupQtId && gc.inNodeId != groupQtId) + return false; + m_connections.erase(gc); + Q_EMIT connectionDeleted(gc); + return true; + }); + Q_EMIT nodeUpdated(groupQtId); + } + + rebuildGroupPortMap(groupQtId); + Q_EMIT nodeUpdated(groupQtId); + continue; + } + + // Check if any member was previously an individual node — migrate it. + QtNodes::NodeId migratedQtId = 0; + QPointF migratedPos; + for (const auto &m : members) { + auto indvIt = m_pwToQt.find(m.id.value); + if (indvIt != m_pwToQt.end()) { + if (migratedQtId == 0) { + migratedQtId = indvIt->second; + auto posIt = m_positions.find(migratedQtId); + if (posIt != m_positions.end()) + migratedPos = posIt->second; + } + if (!sceneChanged) { + sceneChanged = true; + Q_EMIT beginBatchUpdate(); + } + QtNodes::NodeId oldQt = indvIt->second; + m_pwToQt.erase(indvIt); + deleteNode(oldQt); + } + } + + // Check for a ghost group match. + QtNodes::NodeId ghostMatch = 0; + std::string groupLayoutKey = "group:" + key; + for (const auto &ghostId : m_ghostNodes) { + if (m_appGroups.count(ghostId)) { + auto &gd = m_appGroups[ghostId]; + if (gd.groupKey == key) { + ghostMatch = ghostId; + break; + } + } + } + + if (ghostMatch != 0) { + m_ghostNodes.erase(ghostMatch); + std::erase_if(m_ghostConnections, [&](const auto &gc) { + if (gc.outNodeId != ghostMatch && gc.inNodeId != ghostMatch) + return false; + m_connections.erase(gc); + Q_EMIT connectionDeleted(gc); + return true; + }); + + auto &group = m_appGroups[ghostMatch]; + for (uint32_t oldPwId : group.memberPwIds) + m_pwToGroupQt.erase(oldPwId); + group.memberPwIds.clear(); + + for (const auto &m : members) { + group.memberPwIds.push_back(m.id.value); + m_pwToGroupQt[m.id.value] = ghostMatch; + } + + auto &nodeData = m_nodes[ghostMatch]; + nodeData.info = members.front(); + nodeData.info.id = warppipe::NodeId{0}; + + auto portsResult = m_client->ListPorts(members.front().id); + if (portsResult.ok()) { + nodeData.inputPorts.clear(); + nodeData.outputPorts.clear(); + for (const auto &port : portsResult.value) { + warppipe::PortInfo canonical = port; + canonical.id = warppipe::PortId{0}; + canonical.node = warppipe::NodeId{0}; + if (port.is_input) + nodeData.inputPorts.push_back(canonical); + else + nodeData.outputPorts.push_back(canonical); + } + std::sort(nodeData.inputPorts.begin(), nodeData.inputPorts.end(), + [](const auto &a, const auto &b) { return a.name < b.name; }); + std::sort(nodeData.outputPorts.begin(), nodeData.outputPorts.end(), + [](const auto &a, const auto &b) { return a.name < b.name; }); + } + + m_groupKeyToQt[key] = ghostMatch; + rebuildGroupPortMap(ghostMatch); + Q_EMIT nodeUpdated(ghostMatch); + continue; + } + + // Create new group visual node. + QtNodes::NodeId groupQtId = newNodeId(); + + warppipe::NodeInfo synth = members.front(); + synth.id = warppipe::NodeId{0}; + + WarpNodeData data; + data.info = synth; + + // Derive canonical ports from first member. + auto portsResult = m_client->ListPorts(members.front().id); + if (portsResult.ok()) { + for (const auto &port : portsResult.value) { + warppipe::PortInfo canonical = port; + canonical.id = warppipe::PortId{0}; + canonical.node = warppipe::NodeId{0}; + if (port.is_input) + data.inputPorts.push_back(canonical); + else + data.outputPorts.push_back(canonical); + } + std::sort(data.inputPorts.begin(), data.inputPorts.end(), + [](const auto &a, const auto &b) { return a.name < b.name; }); + std::sort(data.outputPorts.begin(), data.outputPorts.end(), + [](const auto &a, const auto &b) { return a.name < b.name; }); + } + + m_nodes.emplace(groupQtId, std::move(data)); + + AppGroupData group; + group.groupKey = key; + for (const auto &m : members) { + group.memberPwIds.push_back(m.id.value); + m_pwToGroupQt[m.id.value] = groupQtId; + } + m_appGroups[groupQtId] = std::move(group); + m_groupKeyToQt[key] = groupQtId; + + // Position: migrated, pending, saved, or auto. + if (migratedQtId != 0) { + m_positions.emplace(groupQtId, migratedPos); + } else { + auto pendingIt = m_pendingPositions.find(groupLayoutKey); + if (pendingIt != m_pendingPositions.end()) { + m_positions.emplace(groupQtId, pendingIt->second); + m_pendingPositions.erase(pendingIt); + } else { + auto savedIt = m_savedPositions.find(groupLayoutKey); + if (savedIt != m_savedPositions.end()) { + m_positions.emplace(groupQtId, savedIt->second); + } else { + auto savedByKey = m_savedPositions.find(key); + if (savedByKey != m_savedPositions.end()) { + m_positions.emplace(groupQtId, savedByKey->second); + } else { + QPointF candidate = nextPosition(m_nodes[groupQtId]); + m_positions.emplace( + groupQtId, + findNonOverlappingPosition(candidate, m_nodes[groupQtId])); + } + } + } + } + + auto *volumeWidget = new NodeVolumeWidget(); + m_volumeWidgets[groupQtId] = volumeWidget; + m_volumeStates[groupQtId] = {}; + + rebuildGroupPortMap(groupQtId); + + if (!sceneChanged) { + sceneChanged = true; + Q_EMIT beginBatchUpdate(); + } + Q_EMIT nodeCreated(groupQtId); + } + + // Phase 4a: Handle disappeared non-app PW IDs. std::vector disappearedPwIds; for (const auto &[pwId, qtId] : m_pwToQt) { if (seenPwIds.find(pwId) == seenPwIds.end()) { @@ -526,129 +858,208 @@ void WarpGraphModel::refreshFromClient() { } for (uint32_t pwId : disappearedPwIds) { auto it = m_pwToQt.find(pwId); - if (it == m_pwToQt.end()) { + if (it == m_pwToQt.end()) continue; - } if (!sceneChanged) { sceneChanged = true; Q_EMIT beginBatchUpdate(); } QtNodes::NodeId qtId = it->second; - auto nodeIt = m_nodes.find(qtId); - if (nodeIt == m_nodes.end()) { - continue; - } - WarpNodeType type = classifyNode(nodeIt->second.info); - if (type == WarpNodeType::kApplication) { - m_ghostNodes.insert(qtId); - m_pwToQt.erase(it); - Q_EMIT nodeUpdated(qtId); - } else { - m_pwToQt.erase(it); + m_pwToQt.erase(it); + if (m_nodes.count(qtId)) deleteNode(qtId); + } + + // Phase 4b: Handle disappeared group PW members. + std::vector disappearedGroupPwIds; + for (const auto &[pwId, groupQtId] : m_pwToGroupQt) { + if (seenPwIds.find(pwId) == seenPwIds.end()) + disappearedGroupPwIds.push_back(pwId); + } + for (uint32_t pwId : disappearedGroupPwIds) { + auto it = m_pwToGroupQt.find(pwId); + if (it == m_pwToGroupQt.end()) + continue; + QtNodes::NodeId groupQtId = it->second; + m_pwToGroupQt.erase(it); + + auto groupIt = m_appGroups.find(groupQtId); + if (groupIt != m_appGroups.end()) { + bool anyMemberAlive = false; + for (uint32_t mid : groupIt->second.memberPwIds) { + if (mid != pwId && m_pwToGroupQt.count(mid)) + anyMemberAlive = true; + } + + if (!anyMemberAlive) { + if (!sceneChanged) { + sceneChanged = true; + Q_EMIT beginBatchUpdate(); + } + m_ghostNodes.insert(groupQtId); + Q_EMIT nodeUpdated(groupQtId); + } else { + auto &memberIds = groupIt->second.memberPwIds; + memberIds.erase( + std::remove(memberIds.begin(), memberIds.end(), pwId), + memberIds.end()); + rebuildGroupPortMap(groupQtId); + Q_EMIT nodeUpdated(groupQtId); + } } } + // Phase 4c: Remove groups whose keys no longer appear. + std::vector staleGroupKeys; + for (const auto &[key, groupQtId] : m_groupKeyToQt) { + if (seenGroupKeys.find(key) == seenGroupKeys.end()) { + auto groupIt = m_appGroups.find(groupQtId); + if (groupIt != m_appGroups.end() && groupIt->second.memberPwIds.empty()) { + staleGroupKeys.push_back(key); + } + } + } + for (const auto &key : staleGroupKeys) { + auto it = m_groupKeyToQt.find(key); + if (it == m_groupKeyToQt.end()) + continue; + QtNodes::NodeId groupQtId = it->second; + bool alreadyGhost = m_ghostNodes.count(groupQtId) > 0; + if (!alreadyGhost) { + if (!sceneChanged) { + sceneChanged = true; + Q_EMIT beginBatchUpdate(); + } + m_ghostNodes.insert(groupQtId); + Q_EMIT nodeUpdated(groupQtId); + } + } + + // Phase 5: Sync links. auto linksResult = m_client->ListLinks(); if (linksResult.ok()) { std::unordered_set seenLinkIds; for (const auto &link : linksResult.value) { seenLinkIds.insert(link.id.value); - if (m_linkIdToConn.find(link.id.value) != m_linkIdToConn.end()) { + if (m_linkIdToConn.find(link.id.value) != m_linkIdToConn.end()) continue; - } - auto outNodeIt = m_pwToQt.end(); - auto inNodeIt = m_pwToQt.end(); + QtNodes::NodeId outQtId = 0; + QtNodes::NodeId inQtId = 0; QtNodes::PortIndex outPortIdx = 0; QtNodes::PortIndex inPortIdx = 0; - bool found = false; + bool outFound = false; + bool inFound = false; - bool outPortFound = false; - bool inPortFound = false; - - for (const auto &[qtId, nodeData] : m_nodes) { - if (!outPortFound) { - for (size_t i = 0; i < nodeData.outputPorts.size(); ++i) { - if (nodeData.outputPorts[i].id.value == - link.output_port.value) { - outNodeIt = m_pwToQt.find(nodeData.info.id.value); - outPortIdx = static_cast(i); - outPortFound = true; - break; - } - } - } - if (!inPortFound) { - for (size_t i = 0; i < nodeData.inputPorts.size(); ++i) { - if (nodeData.inputPorts[i].id.value == - link.input_port.value) { - inNodeIt = m_pwToQt.find(nodeData.info.id.value); - inPortIdx = static_cast(i); - inPortFound = true; - break; - } - } - } - if (outPortFound && inPortFound) - break; + // Check group port map first. + auto outGroupIt = m_portToGroupPort.find(link.output_port.value); + if (outGroupIt != m_portToGroupPort.end() && !outGroupIt->second.isInput) { + outQtId = outGroupIt->second.groupQtId; + outPortIdx = outGroupIt->second.portIndex; + outFound = true; + } + auto inGroupIt = m_portToGroupPort.find(link.input_port.value); + if (inGroupIt != m_portToGroupPort.end() && inGroupIt->second.isInput) { + inQtId = inGroupIt->second.groupQtId; + inPortIdx = inGroupIt->second.portIndex; + inFound = true; } - if (outPortFound && inPortFound && outNodeIt != m_pwToQt.end() && - inNodeIt != m_pwToQt.end()) { - found = true; + // Fall back to individual node port scan. + if (!outFound || !inFound) { + for (const auto &[qtId, nodeData] : m_nodes) { + if (m_appGroups.count(qtId)) + continue; + if (!outFound) { + for (size_t i = 0; i < nodeData.outputPorts.size(); ++i) { + if (nodeData.outputPorts[i].id.value == link.output_port.value) { + auto pwIt = m_pwToQt.find(nodeData.info.id.value); + if (pwIt != m_pwToQt.end()) { + outQtId = pwIt->second; + outPortIdx = static_cast(i); + outFound = true; + } + break; + } + } + } + if (!inFound) { + for (size_t i = 0; i < nodeData.inputPorts.size(); ++i) { + if (nodeData.inputPorts[i].id.value == link.input_port.value) { + auto pwIt = m_pwToQt.find(nodeData.info.id.value); + if (pwIt != m_pwToQt.end()) { + inQtId = pwIt->second; + inPortIdx = static_cast(i); + inFound = true; + } + break; + } + } + } + if (outFound && inFound) + break; + } } - if (found) { - QtNodes::ConnectionId connId{outNodeIt->second, outPortIdx, - inNodeIt->second, inPortIdx}; + if (outFound && inFound) { + QtNodes::ConnectionId connId{outQtId, outPortIdx, inQtId, inPortIdx}; if (m_connections.find(connId) == m_connections.end()) { if (!sceneChanged) { sceneChanged = true; Q_EMIT beginBatchUpdate(); } m_connections.insert(connId); - m_linkIdToConn.emplace(link.id.value, connId); Q_EMIT connectionCreated(connId); } + m_linkIdToConn.emplace(link.id.value, connId); } } std::vector staleLinkIds; for (const auto &[linkId, connId] : m_linkIdToConn) { - if (seenLinkIds.find(linkId) == seenLinkIds.end()) { + if (seenLinkIds.find(linkId) == seenLinkIds.end()) staleLinkIds.push_back(linkId); - } } for (uint32_t linkId : staleLinkIds) { auto it = m_linkIdToConn.find(linkId); - if (it != m_linkIdToConn.end()) { - QtNodes::ConnectionId connId = it->second; + if (it == m_linkIdToConn.end()) + continue; + + QtNodes::ConnectionId connId = it->second; + m_linkIdToConn.erase(it); + + // Only remove visual connection if no other PW links map to it. + bool otherLinkExists = false; + for (const auto &[otherId, otherConn] : m_linkIdToConn) { + if (otherConn == connId) { + otherLinkExists = true; + break; + } + } + + if (!otherLinkExists) { bool outIsGhost = m_ghostNodes.find(connId.outNodeId) != m_ghostNodes.end(); bool inIsGhost = m_ghostNodes.find(connId.inNodeId) != m_ghostNodes.end(); - - if (outIsGhost || inIsGhost) { + if (outIsGhost || inIsGhost) m_ghostConnections.insert(connId); - } - { - auto connIt = m_connections.find(connId); - if (connIt != m_connections.end()) { - if (!sceneChanged) { - sceneChanged = true; - Q_EMIT beginBatchUpdate(); - } - m_connections.erase(connIt); - Q_EMIT connectionDeleted(connId); + + auto connIt = m_connections.find(connId); + if (connIt != m_connections.end()) { + if (!sceneChanged) { + sceneChanged = true; + Q_EMIT beginBatchUpdate(); } + m_connections.erase(connIt); + Q_EMIT connectionDeleted(connId); } - m_linkIdToConn.erase(it); } } } + // Phase 6: Pending ghost connections. if (!m_pendingGhostConnections.empty()) { auto it = m_pendingGhostConnections.begin(); while (it != m_pendingGhostConnections.end()) { @@ -687,13 +1098,14 @@ void WarpGraphModel::refreshFromClient() { } QtNodes::ConnectionId connId{outQtId, outIdx, inQtId, inIdx}; - if (m_ghostConnections.find(connId) == m_ghostConnections.end()) { + if (m_ghostConnections.find(connId) == m_ghostConnections.end()) m_ghostConnections.insert(connId); - } it = m_pendingGhostConnections.erase(it); } } + // Phase 7: Volume sync. + // Non-app nodes. for (const auto &[pwId, qtId] : m_pwToQt) { auto volResult = m_client->GetNodeVolume(warppipe::NodeId{pwId}); if (!volResult.ok()) continue; @@ -726,6 +1138,47 @@ void WarpGraphModel::refreshFromClient() { Q_EMIT nodeVolumeChanged(qtId, previous, cached); } + // Group nodes: aggregate volume from first member. + for (const auto &[groupQtId, group] : m_appGroups) { + if (group.memberPwIds.empty()) + continue; + + auto volResult = + m_client->GetNodeVolume(warppipe::NodeId{group.memberPwIds.front()}); + if (!volResult.ok()) + continue; + + float vol = volResult.value.volume; + bool mute = volResult.value.mute; + int sliderVal = volumeToSlider(vol); + sliderVal = std::clamp(sliderVal, 0, 150); + + auto stateIt = m_volumeStates.find(groupQtId); + if (stateIt == m_volumeStates.end()) + continue; + + NodeVolumeState &cached = stateIt->second; + bool changed = + (std::abs(cached.volume - vol) > 1e-4f) || (cached.mute != mute); + if (!changed) + continue; + + NodeVolumeState previous = cached; + cached.volume = vol; + cached.mute = mute; + + auto wIt = m_volumeWidgets.find(groupQtId); + if (wIt != m_volumeWidgets.end() && wIt->second) { + auto *vw = static_cast(wIt->second.data()); + if (!vw->isSliderDown()) { + vw->setVolume(sliderVal); + vw->setMuted(mute); + } + } + + Q_EMIT nodeVolumeChanged(groupQtId, previous, cached); + } + recomputeConnectionChannels(); m_refreshing = false; @@ -745,9 +1198,11 @@ WarpGraphModel::warpNodeData(QtNodes::NodeId nodeId) const { QtNodes::NodeId WarpGraphModel::qtNodeIdForPw(uint32_t pwNodeId) const { auto it = m_pwToQt.find(pwNodeId); - if (it != m_pwToQt.end()) { + if (it != m_pwToQt.end()) return it->second; - } + auto groupIt = m_pwToGroupQt.find(pwNodeId); + if (groupIt != m_pwToGroupQt.end()) + return groupIt->second; return 0; } @@ -913,12 +1368,25 @@ WarpGraphModel::allGhostConnectionIds(QtNodes::NodeId nodeId) const { uint32_t WarpGraphModel::findPwNodeIdByName(const std::string &name) const { for (const auto &[qtId, data] : m_nodes) { if (data.info.name == name) { - return data.info.id.value; + if (data.info.id.value != 0) + return data.info.id.value; + auto groupIt = m_appGroups.find(qtId); + if (groupIt != m_appGroups.end() && !groupIt->second.memberPwIds.empty()) + return groupIt->second.memberPwIds.front(); } } return 0; } +bool WarpGraphModel::isGroupNode(QtNodes::NodeId nodeId) const { + return m_appGroups.find(nodeId) != m_appGroups.end(); +} + +const AppGroupData *WarpGraphModel::appGroupData(QtNodes::NodeId nodeId) const { + auto it = m_appGroups.find(nodeId); + return it != m_appGroups.end() ? &it->second : nullptr; +} + WarpNodeType WarpGraphModel::classifyNode(const warppipe::NodeInfo &info) { const std::string &mc = info.media_class; @@ -953,13 +1421,24 @@ void WarpGraphModel::setNodeVolumeState(QtNodes::NodeId nodeId, m_volumeStates[nodeId] = state; if (m_client) { - auto it = m_nodes.find(nodeId); - if (it != m_nodes.end() && it->second.info.id.value != 0) { + auto groupIt = m_appGroups.find(nodeId); + if (groupIt != m_appGroups.end()) { + for (uint32_t memberPwId : groupIt->second.memberPwIds) { #ifdef WARPPIPE_TESTING - m_client->Test_SetNodeVolume(it->second.info.id, state.volume, state.mute); + m_client->Test_SetNodeVolume(warppipe::NodeId{memberPwId}, state.volume, state.mute); #else - m_client->SetNodeVolume(it->second.info.id, state.volume, state.mute); + m_client->SetNodeVolume(warppipe::NodeId{memberPwId}, state.volume, state.mute); #endif + } + } else { + auto it = m_nodes.find(nodeId); + if (it != m_nodes.end() && it->second.info.id.value != 0) { +#ifdef WARPPIPE_TESTING + m_client->Test_SetNodeVolume(it->second.info.id, state.volume, state.mute); +#else + m_client->SetNodeVolume(it->second.info.id, state.volume, state.mute); +#endif + } } } @@ -1038,11 +1517,17 @@ void WarpGraphModel::saveLayout(const QString &path, QJsonArray nodesArray; for (const auto &[qtId, data] : m_nodes) { auto posIt = m_positions.find(qtId); - if (posIt == m_positions.end()) { + if (posIt == m_positions.end()) continue; - } + QJsonObject nodeObj; - nodeObj["name"] = QString::fromStdString(data.info.name); + auto groupIt = m_appGroups.find(qtId); + if (groupIt != m_appGroups.end()) { + nodeObj["name"] = + QString::fromStdString("group:" + groupIt->second.groupKey); + } else { + nodeObj["name"] = QString::fromStdString(data.info.name); + } nodeObj["x"] = posIt->second.x(); nodeObj["y"] = posIt->second.y(); nodesArray.append(nodeObj); @@ -1051,12 +1536,18 @@ void WarpGraphModel::saveLayout(const QString &path, QJsonArray ghostsArray; for (const auto &ghostId : m_ghostNodes) { auto nodeIt = m_nodes.find(ghostId); - if (nodeIt == m_nodes.end()) { + if (nodeIt == m_nodes.end()) continue; - } + const auto &data = nodeIt->second; QJsonObject ghostObj; ghostObj["name"] = QString::fromStdString(data.info.name); + auto ghostGroupIt = m_appGroups.find(ghostId); + if (ghostGroupIt != m_appGroups.end()) { + ghostObj["is_group"] = true; + ghostObj["group_key"] = + QString::fromStdString(ghostGroupIt->second.groupKey); + } ghostObj["description"] = QString::fromStdString(data.info.description); ghostObj["media_class"] = QString::fromStdString(data.info.media_class); ghostObj["application_name"] = @@ -1255,14 +1746,28 @@ bool WarpGraphModel::loadLayout(const QString &path) { m_nodes.emplace(qtId, std::move(data)); m_ghostNodes.insert(qtId); + if (obj.value("is_group").toBool()) { + std::string groupKey = + obj["group_key"].toString().toStdString(); + if (groupKey.empty()) + groupKey = appGroupKey(info); + AppGroupData group; + group.groupKey = groupKey; + m_appGroups[qtId] = std::move(group); + m_groupKeyToQt[groupKey] = qtId; + } + if (obj.contains("x") && obj.contains("y")) { m_positions.emplace(qtId, QPointF(obj["x"].toDouble(), obj["y"].toDouble())); } - m_savedPositions[name] = - m_positions.count(qtId) - ? m_positions.at(qtId) - : QPointF(0, 0); + + std::string posKey = name; + auto gIt = m_appGroups.find(qtId); + if (gIt != m_appGroups.end()) + posKey = "group:" + gIt->second.groupKey; + m_savedPositions[posKey] = + m_positions.count(qtId) ? m_positions.at(qtId) : QPointF(0, 0); if (nodeHasVolume(classifyNode(info))) { auto *volumeWidget = new NodeVolumeWidget(); diff --git a/gui/WarpGraphModel.h b/gui/WarpGraphModel.h index 37a1265..1142a53 100644 --- a/gui/WarpGraphModel.h +++ b/gui/WarpGraphModel.h @@ -43,6 +43,15 @@ struct WarpNodeData { std::vector outputPorts; }; +/// Data for an app-group node (multiple PipeWire streams collapsed into one visual node). +struct AppGroupData { + std::string groupKey; + std::vector memberPwIds; ///< PipeWire node IDs in this group. + /// canonical port index → list of actual member PortIds for fan-out. + std::unordered_map> outputPortMap; + std::unordered_map> inputPortMap; +}; + class WarpGraphModel : public QtNodes::AbstractGraphModel { Q_OBJECT @@ -87,6 +96,9 @@ public: uint32_t findPwNodeIdByName(const std::string &name) const; + bool isGroupNode(QtNodes::NodeId nodeId) const; + const AppGroupData *appGroupData(QtNodes::NodeId nodeId) const; + struct NodeVolumeState { float volume = 1.0f; bool mute = false; @@ -181,5 +193,18 @@ private: std::unordered_map m_peakLevels; std::unordered_map m_connectionChannels; + std::unordered_map m_appGroups; + std::unordered_map m_groupKeyToQt; + std::unordered_map m_pwToGroupQt; + + struct GroupPortRef { + QtNodes::NodeId groupQtId; + QtNodes::PortIndex portIndex; + bool isInput; + }; + std::unordered_map m_portToGroupPort; + + void rebuildGroupPortMap(QtNodes::NodeId groupQtId); + void recomputeConnectionChannels(); };