warp-pipe/tests/gui/warppipe_gui_tests.cpp

455 lines
14 KiB
C++

#include <warppipe/warppipe.hpp>
#include "../../gui/WarpGraphModel.h"
#include <QApplication>
#include <catch2/catch_test_macros.hpp>
namespace {
warppipe::ConnectionOptions TestOptions() {
warppipe::ConnectionOptions opts;
opts.threading = warppipe::ThreadingMode::kThreadLoop;
opts.autoconnect = true;
opts.application_name = "warppipe-gui-tests";
return opts;
}
struct TestClient {
std::unique_ptr<warppipe::Client> client;
bool available() const { return client != nullptr; }
static TestClient Create() {
auto result = warppipe::Client::Create(TestOptions());
if (!result.ok()) return {nullptr};
return {std::move(result.value)};
}
};
warppipe::NodeInfo MakeNode(uint32_t id, const std::string &name,
const std::string &media_class,
const std::string &app_name = {},
const std::string &desc = {}) {
warppipe::NodeInfo n;
n.id = warppipe::NodeId{id};
n.name = name;
n.media_class = media_class;
n.application_name = app_name;
n.description = desc;
return n;
}
warppipe::PortInfo MakePort(uint32_t id, uint32_t node_id,
const std::string &name, bool is_input) {
warppipe::PortInfo p;
p.id = warppipe::PortId{id};
p.node = warppipe::NodeId{node_id};
p.name = name;
p.is_input = is_input;
return p;
}
warppipe::Link MakeLink(uint32_t id, uint32_t out_port, uint32_t in_port) {
warppipe::Link l;
l.id = warppipe::LinkId{id};
l.output_port = warppipe::PortId{out_port};
l.input_port = warppipe::PortId{in_port};
return l;
}
int g_argc = 0;
char *g_argv[] = {nullptr};
struct AppGuard {
QApplication app{g_argc, g_argv};
};
AppGuard &ensureApp() {
static AppGuard guard;
return guard;
}
} // namespace
TEST_CASE("classifyNode identifies hardware sink") {
auto n = MakeNode(1, "alsa_output", "Audio/Sink");
REQUIRE(WarpGraphModel::classifyNode(n) == WarpNodeType::kHardwareSink);
}
TEST_CASE("classifyNode identifies hardware source") {
auto n = MakeNode(2, "alsa_input", "Audio/Source");
REQUIRE(WarpGraphModel::classifyNode(n) == WarpNodeType::kHardwareSource);
}
TEST_CASE("classifyNode identifies virtual sink") {
auto n = MakeNode(3, "warppipe-gaming-sink", "Audio/Sink");
REQUIRE(WarpGraphModel::classifyNode(n) == WarpNodeType::kVirtualSink);
}
TEST_CASE("classifyNode identifies virtual source") {
auto n = MakeNode(4, "warppipe-mic", "Audio/Source");
REQUIRE(WarpGraphModel::classifyNode(n) == WarpNodeType::kVirtualSource);
}
TEST_CASE("classifyNode identifies application stream output") {
auto n = MakeNode(5, "Firefox", "Stream/Output/Audio");
REQUIRE(WarpGraphModel::classifyNode(n) == WarpNodeType::kApplication);
}
TEST_CASE("classifyNode identifies application stream input") {
auto n = MakeNode(6, "Discord", "Stream/Input/Audio");
REQUIRE(WarpGraphModel::classifyNode(n) == WarpNodeType::kApplication);
}
TEST_CASE("classifyNode returns unknown for unrecognized media class") {
auto n = MakeNode(7, "midi-bridge", "Midi/Bridge");
REQUIRE(WarpGraphModel::classifyNode(n) == WarpNodeType::kUnknown);
}
TEST_CASE("classifyNode duplex treated as sink") {
auto n = MakeNode(8, "alsa_duplex", "Audio/Duplex");
REQUIRE(WarpGraphModel::classifyNode(n) == WarpNodeType::kHardwareSink);
}
TEST_CASE("refreshFromClient populates nodes from client") {
auto tc = TestClient::Create();
if (!tc.available()) { SUCCEED("PipeWire unavailable"); return; }
ensureApp();
REQUIRE(tc.client->Test_InsertNode(
MakeNode(100001, "test-sink", "Audio/Sink")).ok());
REQUIRE(tc.client->Test_InsertPort(
MakePort(100002, 100001, "FL", true)).ok());
REQUIRE(tc.client->Test_InsertPort(
MakePort(100003, 100001, "FR", true)).ok());
WarpGraphModel model(tc.client.get());
model.refreshFromClient();
auto qtId = model.qtNodeIdForPw(100001);
REQUIRE(qtId != 0);
REQUIRE(model.nodeExists(qtId));
auto caption = model.nodeData(qtId, QtNodes::NodeRole::Caption).toString();
REQUIRE(caption == "test-sink");
auto inCount = model.nodeData(qtId, QtNodes::NodeRole::InPortCount).toUInt();
REQUIRE(inCount == 2);
auto outCount = model.nodeData(qtId, QtNodes::NodeRole::OutPortCount).toUInt();
REQUIRE(outCount == 0);
}
TEST_CASE("caption prefers description over name") {
auto tc = TestClient::Create();
if (!tc.available()) { SUCCEED("PipeWire unavailable"); return; }
ensureApp();
REQUIRE(tc.client->Test_InsertNode(
MakeNode(100010, "alsa_output.pci-0000_00_1f.3",
"Audio/Sink", "", "Speakers")).ok());
WarpGraphModel model(tc.client.get());
model.refreshFromClient();
auto qtId = model.qtNodeIdForPw(100010);
auto caption = model.nodeData(qtId, QtNodes::NodeRole::Caption).toString();
REQUIRE(caption == "Speakers");
}
TEST_CASE("caption uses application_name for streams") {
auto tc = TestClient::Create();
if (!tc.available()) { SUCCEED("PipeWire unavailable"); return; }
ensureApp();
REQUIRE(tc.client->Test_InsertNode(
MakeNode(100020, "firefox-output", "Stream/Output/Audio", "Firefox")).ok());
WarpGraphModel model(tc.client.get());
model.refreshFromClient();
auto qtId = model.qtNodeIdForPw(100020);
auto caption = model.nodeData(qtId, QtNodes::NodeRole::Caption).toString();
REQUIRE(caption == "Firefox");
}
TEST_CASE("port data returns correct captions and types") {
auto tc = TestClient::Create();
if (!tc.available()) { SUCCEED("PipeWire unavailable"); return; }
ensureApp();
REQUIRE(tc.client->Test_InsertNode(
MakeNode(100030, "port-test", "Audio/Sink")).ok());
REQUIRE(tc.client->Test_InsertPort(
MakePort(100031, 100030, "playback_FL", true)).ok());
REQUIRE(tc.client->Test_InsertPort(
MakePort(100032, 100030, "playback_FR", true)).ok());
REQUIRE(tc.client->Test_InsertPort(
MakePort(100033, 100030, "monitor_FL", false)).ok());
WarpGraphModel model(tc.client.get());
model.refreshFromClient();
auto qtId = model.qtNodeIdForPw(100030);
auto inCaption = model.portData(qtId, QtNodes::PortType::In, 0,
QtNodes::PortRole::Caption).toString();
REQUIRE((inCaption == "playback_FL" || inCaption == "playback_FR"));
auto outCaption = model.portData(qtId, QtNodes::PortType::Out, 0,
QtNodes::PortRole::Caption).toString();
REQUIRE(outCaption == "monitor_FL");
auto dataType = model.portData(qtId, QtNodes::PortType::In, 0,
QtNodes::PortRole::DataType).toString();
REQUIRE(dataType == "audio");
}
TEST_CASE("node style varies by type") {
auto tc = TestClient::Create();
if (!tc.available()) { SUCCEED("PipeWire unavailable"); return; }
ensureApp();
REQUIRE(tc.client->Test_InsertNode(
MakeNode(100040, "hw-sink", "Audio/Sink")).ok());
REQUIRE(tc.client->Test_InsertNode(
MakeNode(100041, "warppipe-vsink", "Audio/Sink")).ok());
REQUIRE(tc.client->Test_InsertNode(
MakeNode(100042, "app-stream", "Stream/Output/Audio", "App")).ok());
WarpGraphModel model(tc.client.get());
model.refreshFromClient();
auto hwStyle = model.nodeData(model.qtNodeIdForPw(100040),
QtNodes::NodeRole::Style);
auto vStyle = model.nodeData(model.qtNodeIdForPw(100041),
QtNodes::NodeRole::Style);
auto appStyle = model.nodeData(model.qtNodeIdForPw(100042),
QtNodes::NodeRole::Style);
REQUIRE(hwStyle.isValid());
REQUIRE(vStyle.isValid());
REQUIRE(appStyle.isValid());
REQUIRE(hwStyle != vStyle);
REQUIRE(hwStyle != appStyle);
REQUIRE(vStyle != appStyle);
}
TEST_CASE("ghost nodes tracked after disappearance") {
auto tc = TestClient::Create();
if (!tc.available()) { SUCCEED("PipeWire unavailable"); return; }
ensureApp();
REQUIRE(tc.client->Test_InsertNode(
MakeNode(100050, "ephemeral-app", "Stream/Output/Audio", "App")).ok());
WarpGraphModel model(tc.client.get());
model.refreshFromClient();
auto qtId = model.qtNodeIdForPw(100050);
REQUIRE(qtId != 0);
REQUIRE_FALSE(model.isGhost(qtId));
REQUIRE(tc.client->Test_RemoveGlobal(100050).ok());
model.refreshFromClient();
REQUIRE(model.nodeExists(qtId));
REQUIRE(model.isGhost(qtId));
}
TEST_CASE("ghost node reappears and loses ghost status") {
auto tc = TestClient::Create();
if (!tc.available()) { SUCCEED("PipeWire unavailable"); return; }
ensureApp();
REQUIRE(tc.client->Test_InsertNode(
MakeNode(100060, "ephemeral-2", "Stream/Output/Audio", "App")).ok());
WarpGraphModel model(tc.client.get());
model.refreshFromClient();
auto qtId = model.qtNodeIdForPw(100060);
REQUIRE(tc.client->Test_RemoveGlobal(100060).ok());
model.refreshFromClient();
REQUIRE(model.isGhost(qtId));
REQUIRE(tc.client->Test_InsertNode(
MakeNode(100061, "ephemeral-2", "Stream/Output/Audio", "App")).ok());
model.refreshFromClient();
REQUIRE(model.nodeExists(qtId));
REQUIRE_FALSE(model.isGhost(qtId));
}
TEST_CASE("non-application nodes removed on disappearance") {
auto tc = TestClient::Create();
if (!tc.available()) { SUCCEED("PipeWire unavailable"); return; }
ensureApp();
REQUIRE(tc.client->Test_InsertNode(
MakeNode(100070, "hw-gone", "Audio/Sink")).ok());
WarpGraphModel model(tc.client.get());
model.refreshFromClient();
auto qtId = model.qtNodeIdForPw(100070);
REQUIRE(qtId != 0);
REQUIRE(tc.client->Test_RemoveGlobal(100070).ok());
model.refreshFromClient();
REQUIRE_FALSE(model.nodeExists(qtId));
}
TEST_CASE("links from client appear as connections") {
auto tc = TestClient::Create();
if (!tc.available()) { SUCCEED("PipeWire unavailable"); return; }
ensureApp();
REQUIRE(tc.client->Test_InsertNode(
MakeNode(100100, "src-node", "Audio/Source")).ok());
REQUIRE(tc.client->Test_InsertPort(
MakePort(100101, 100100, "out_FL", false)).ok());
REQUIRE(tc.client->Test_InsertNode(
MakeNode(100102, "sink-node", "Audio/Sink")).ok());
REQUIRE(tc.client->Test_InsertPort(
MakePort(100103, 100102, "in_FL", true)).ok());
REQUIRE(tc.client->Test_InsertLink(
MakeLink(100104, 100101, 100103)).ok());
WarpGraphModel model(tc.client.get());
model.refreshFromClient();
auto srcQt = model.qtNodeIdForPw(100100);
auto sinkQt = model.qtNodeIdForPw(100102);
REQUIRE(srcQt != 0);
REQUIRE(sinkQt != 0);
auto conns = model.allConnectionIds(srcQt);
REQUIRE(conns.size() == 1);
QtNodes::ConnectionId connId = *conns.begin();
REQUIRE(connId.outNodeId == srcQt);
REQUIRE(connId.inNodeId == sinkQt);
REQUIRE(model.connectionExists(connId));
}
TEST_CASE("connectionPossible rejects invalid port indices") {
auto tc = TestClient::Create();
if (!tc.available()) { SUCCEED("PipeWire unavailable"); return; }
ensureApp();
REQUIRE(tc.client->Test_InsertNode(
MakeNode(100110, "conn-test", "Audio/Sink")).ok());
REQUIRE(tc.client->Test_InsertPort(
MakePort(100111, 100110, "in_FL", true)).ok());
WarpGraphModel model(tc.client.get());
model.refreshFromClient();
auto qtId = model.qtNodeIdForPw(100110);
QtNodes::ConnectionId bad{qtId, 0, qtId, 99};
REQUIRE_FALSE(model.connectionPossible(bad));
}
TEST_CASE("connectionPossible rejects nonexistent nodes") {
ensureApp();
WarpGraphModel model(nullptr);
QtNodes::ConnectionId bad{999, 0, 998, 0};
REQUIRE_FALSE(model.connectionPossible(bad));
}
TEST_CASE("deleteConnection removes from model") {
auto tc = TestClient::Create();
if (!tc.available()) { SUCCEED("PipeWire unavailable"); return; }
ensureApp();
REQUIRE(tc.client->Test_InsertNode(
MakeNode(100120, "del-src", "Audio/Source")).ok());
REQUIRE(tc.client->Test_InsertPort(
MakePort(100121, 100120, "out_FL", false)).ok());
REQUIRE(tc.client->Test_InsertNode(
MakeNode(100122, "del-sink", "Audio/Sink")).ok());
REQUIRE(tc.client->Test_InsertPort(
MakePort(100123, 100122, "in_FL", true)).ok());
REQUIRE(tc.client->Test_InsertLink(
MakeLink(100124, 100121, 100123)).ok());
WarpGraphModel model(tc.client.get());
model.refreshFromClient();
auto srcQt = model.qtNodeIdForPw(100120);
auto conns = model.allConnectionIds(srcQt);
REQUIRE(conns.size() == 1);
QtNodes::ConnectionId connId = *conns.begin();
REQUIRE(model.deleteConnection(connId));
REQUIRE_FALSE(model.connectionExists(connId));
}
TEST_CASE("node deletion with connections does not crash") {
auto tc = TestClient::Create();
if (!tc.available()) { SUCCEED("PipeWire unavailable"); return; }
ensureApp();
REQUIRE(tc.client->Test_InsertNode(
MakeNode(100130, "crash-src", "Audio/Source")).ok());
REQUIRE(tc.client->Test_InsertPort(
MakePort(100131, 100130, "out_FL", false)).ok());
REQUIRE(tc.client->Test_InsertNode(
MakeNode(100132, "crash-sink", "Audio/Sink")).ok());
REQUIRE(tc.client->Test_InsertPort(
MakePort(100133, 100132, "in_FL", true)).ok());
REQUIRE(tc.client->Test_InsertLink(
MakeLink(100134, 100131, 100133)).ok());
WarpGraphModel model(tc.client.get());
model.refreshFromClient();
auto srcQt = model.qtNodeIdForPw(100130);
REQUIRE(model.deleteNode(srcQt));
REQUIRE_FALSE(model.nodeExists(srcQt));
}
TEST_CASE("saved positions restored for new nodes") {
auto tc = TestClient::Create();
if (!tc.available()) { SUCCEED("PipeWire unavailable"); return; }
ensureApp();
WarpGraphModel model(tc.client.get());
model.setPendingPosition("pending-node", QPointF(100, 200));
REQUIRE(tc.client->Test_InsertNode(
MakeNode(100140, "pending-node", "Audio/Sink")).ok());
model.refreshFromClient();
auto qtId = model.qtNodeIdForPw(100140);
auto pos = model.nodeData(qtId, QtNodes::NodeRole::Position).toPointF();
REQUIRE(pos.x() == 100.0);
REQUIRE(pos.y() == 200.0);
}
TEST_CASE("volume meter streams are filtered") {
auto tc = TestClient::Create();
if (!tc.available()) { SUCCEED("PipeWire unavailable"); return; }
ensureApp();
auto n = MakeNode(100150, "", "Stream/Output/Audio");
REQUIRE(tc.client->Test_InsertNode(n).ok());
WarpGraphModel model(tc.client.get());
model.refreshFromClient();
auto qtId = model.qtNodeIdForPw(100150);
REQUIRE(qtId == 0);
}