Milestone 5

This commit is contained in:
Joey Yakimowich-Payne 2026-01-29 21:13:18 -07:00
commit 4e21039222
9 changed files with 877 additions and 22 deletions

18
PLAN.md
View file

@ -8,7 +8,7 @@
- [x] Tests to add (non-happy path/edge cases): instructions: create unit tests that fail on missing PipeWire daemon, missing libpipewire-module-link-factory, missing libpipewire-module-metadata, and misconfigured runtime properties (e.g., invalid PW_KEY_MEDIA_CLASS).
- [x] Performance tests: instructions: add a microbenchmark harness that measures connect->create->destroy of N no-op objects (no links) and asserts subsecond N=200 on a warm PipeWire connection.
- [ ] Milestone 1 - Core runtime and registry model
- [x] Milestone 1 - Core runtime and registry model
- [x] Implement a WarpContext (pw_main_loop/pw_thread_loop + pw_context + pw_core) with lifecycle and reconnect handling.
- [x] Implement registry cache for nodes/ports/links with event listeners, and a stable "object identity" resolver (node name, application properties).
- [x] Expose a query API to list nodes, ports, and identify sinks/sources.
@ -17,14 +17,14 @@
- [x] Tests to add (non-happy path/edge cases): instructions: simulate registry events where nodes/ports disappear mid-iteration; ensure safe iteration and cleanup; verify reconnection logic after daemon restart.
- [x] Performance tests: instructions: measure time to ingest registry snapshot of 1000 objects and process 100 add/remove events; assert latency per event and total under subsecond.
- [ ] Milestone 2 - Virtual sinks and sources
- [x] Milestone 2 - Virtual sinks and sources
- [x] Implement virtual sink/source creation via pw_stream_new with PW_KEY_MEDIA_CLASS set to Audio/Sink or Audio/Source and autoconnect flags as needed (see src/pipewire/stream.h).
- [x] Support "null" behavior (discard) and "loopback" behavior (sink that forwards to target) using stream properties and explicit links.
- [x] Provide a naming scheme and metadata tags for virtual nodes to ensure stable identification.
- [x] Tests to add (non-happy path/edge cases): instructions: create sink/source with missing media class and expect validation error; create duplicate node name; attempt to connect when target node is absent.
- [x] Performance tests: instructions: create/destroy 100 virtual sinks and 100 virtual sources in a tight loop; measure wall time and ensure it stays within the target budget.
- [ ] Milestone 3 - Link management API
- [x] Milestone 3 - Link management API
- [x] Implement link creation via link-factory (load libpipewire-module-link-factory and call pw_core_create_object with link.input.* and link.output.* props; see src/modules/module-link-factory.c, src/examples/internal.c, src/tools/pw-link.c).
- [x] Support linking by node+port names and by object IDs; add object.linger and link.passive options.
- [x] Add link deletion and link reconciliation (auto-remove stale links when endpoints vanish).
@ -41,12 +41,12 @@
- [x] Tests to add (non-happy path/edge cases): instructions: rule for app that disappears and reappears under a different PID; verify re-routing; conflicting rules (two matches) resolved deterministically; persistence file corrupted; metadata module not available.
- [x] Performance tests: instructions: simulate 200 ephemeral sources (connect/disconnect) and measure time to apply routing rules and create links; ensure rule lookup is O(1) or O(log n).
- [ ] Milestone 5 - Stability, compatibility, and tooling
- [ ] Provide a simple CLI (optional) to inspect nodes, create virtual nodes, link/unlink, and export/import config (useful for manual testing).
- [ ] Add documentation: API usage patterns, threading model, and performance notes.
- [ ] Validate behavior with PipeWire session manager present (WirePlumber) to avoid fighting policy; allow “policy-only” mode (observes and sets metadata without forcing links).
- [ ] Tests to add (non-happy path/edge cases): instructions: run with session manager disabled/enabled and verify no infinite re-link loops; ensure policy mode doesnt override user defaults unexpectedly.
- [ ] Performance tests: instructions: end-to-end scenario (virtual sink + app connect + reroute + disconnect + reconnect) repeated 200 times; measure median/95th percentile latency.
- [x] Milestone 5 - Stability, compatibility, and tooling
- [x] Provide a simple CLI (optional) to inspect nodes, create virtual nodes, link/unlink, and export/import config (useful for manual testing).
- [x] Add documentation: API usage patterns, threading model, and performance notes.
- [x] Validate behavior with PipeWire session manager present (WirePlumber) to avoid fighting policy; allow "policy-only" mode (observes and sets metadata without forcing links).
- [x] Tests to add (non-happy path/edge cases): instructions: run with session manager disabled/enabled and verify no infinite re-link loops; ensure policy mode doesn't override user defaults unexpectedly.
- [x] Performance tests: instructions: end-to-end scenario (virtual sink + app connect + reroute + disconnect + reconnect) repeated 200 times; measure median/95th percentile latency.
## Design notes grounded in PipeWire code/docs

143
README.md
View file

@ -1,6 +1,16 @@
# warppipe
A C++ libpipewire library for creating virtual sinks/sources and managing links.
A C++17 static library wrapping libpipewire for virtual audio node management, link routing, and per-app routing policy.
## Features
- **Virtual sinks/sources** — create Audio/Sink and Audio/Source nodes with configurable sample rate and channels
- **Link management** — connect and disconnect ports by node+port name or ID, with passive and linger options
- **Per-app routing rules** — match applications by name, process binary, or media role and auto-route to target sinks
- **Persistence** — JSON config with atomic writes, auto-save on change, auto-load on startup
- **Metadata integration** — read and set default audio sink/source via PipeWire metadata
- **Policy-only mode** — observe and set metadata without creating links (avoids fighting WirePlumber)
- **Thread-safe** — dedicated PipeWire thread loop; all public methods callable from any thread
## Build
@ -10,18 +20,131 @@ Requirements:
- libpipewire-0.3 development files
- C++17 compiler
Aurora / Universal Blue (Homebrew):
```
/home/linuxbrew/.linuxbrew/bin/brew install cmake pkg-config pipewire
```
Build:
```
```sh
cmake -S . -B build
cmake --build build
```
Example:
Targets: `warppipe` (library), `warppipe_example`, `warppipe_cli`, `warppipe_tests`, `warppipe_perf`
### Dependencies
- [libpipewire-0.3](https://pipewire.org/) — system package
- [nlohmann/json](https://github.com/nlohmann/json) — fetched automatically via CMake FetchContent
- [Catch2 v3](https://github.com/catchorg/Catch2) — fetched automatically via CMake FetchContent
## Quick Start
```cpp
#include <warppipe/warppipe.hpp>
warppipe::ConnectionOptions opts;
opts.application_name = "my-app";
opts.config_path = "/home/user/.config/warppipe/config.json";
auto result = warppipe::Client::Create(opts);
auto& client = result.value;
// Create a virtual sink
auto sink = client->CreateVirtualSink("my-sink");
// Route Firefox audio to it
warppipe::RouteRule rule;
rule.match.application_name = "Firefox";
rule.target_node = "my-sink";
client->AddRouteRule(rule);
// The policy engine auto-links Firefox when it starts playing
```
./build/warppipe_example
## CLI
`warppipe_cli` provides full access to the library for manual testing and scripting:
```
warppipe_cli list-nodes
warppipe_cli list-ports <node-id>
warppipe_cli list-links
warppipe_cli list-rules
warppipe_cli create-sink <name> [--rate N] [--channels N]
warppipe_cli create-source <name> [--rate N] [--channels N]
warppipe_cli link <out-node> <out-port> <in-node> <in-port> [--passive] [--linger]
warppipe_cli unlink <link-id>
warppipe_cli add-rule --app <name> --target <node> [--process <bin>] [--role <role>]
warppipe_cli remove-rule <rule-id>
warppipe_cli save-config <path>
warppipe_cli load-config <path>
warppipe_cli defaults
```
`create-sink` and `create-source` block until interrupted with Ctrl-C.
## Configuration
Config is JSON. Set `ConnectionOptions::config_path` to enable auto-save/load, or use `SaveConfig`/`LoadConfig` manually.
```json
{
"version": 1,
"virtual_nodes": [
{
"name": "warppipe-gaming-sink",
"is_source": false,
"rate": 48000,
"channels": 2,
"loopback": false,
"target_node": ""
}
],
"route_rules": [
{
"match": {
"application_name": "Firefox",
"process_binary": "",
"media_role": ""
},
"target_node": "warppipe-gaming-sink"
}
]
}
```
See [docs/config-schema.md](docs/config-schema.md) for full schema documentation.
## Policy-Only Mode
When running alongside WirePlumber, enable `policy_only` to prevent warppipe from creating links that conflict with the session manager:
```cpp
warppipe::ConnectionOptions opts;
opts.policy_only = true;
```
In this mode the policy engine still evaluates rules but does not create links. Metadata defaults can still be read and set.
## Documentation
- [API Reference](docs/api.md) — full API, threading model, error model, performance notes
- [Config Schema](docs/config-schema.md) — JSON configuration format and persistence behavior
## Tests
```sh
./build/warppipe_tests
```
31 test cases covering connection, registry, virtual nodes, links, route rules, policy engine, persistence, and metadata.
## Performance
```sh
./build/warppipe_perf --mode <create-destroy|registry|links|policy|e2e>
```
| Mode | Workload | Typical Result |
|------|----------|----------------|
| create-destroy | 200 sink create/destroy | < 1s |
| registry | 1000 object ingest + 100 events | < 1s |
| links | 200 link create/destroy | < 1s |
| policy | 200 ephemeral sources routed | ~370ms |
| e2e | 200 full iterations (sink + rule + source + teardown) | median ~3ms, p95 ~4ms |

208
docs/api.md Normal file
View file

@ -0,0 +1,208 @@
# Warppipe API
## Overview
Warppipe is a C++17 library wrapping libpipewire for virtual audio node management, link routing, and per-app routing policy. The entire public API is in `<warppipe/warppipe.hpp>`.
## Quick Start
```cpp
#include <warppipe/warppipe.hpp>
warppipe::ConnectionOptions opts;
opts.application_name = "my-app";
opts.config_path = "/home/user/.config/warppipe/config.json";
auto result = warppipe::Client::Create(opts);
if (!result.ok()) {
// handle error: result.status.code, result.status.message
}
auto& client = result.value;
// Create a virtual sink
auto sink = client->CreateVirtualSink("my-sink");
// Route Firefox audio to it
warppipe::RouteRule rule;
rule.match.application_name = "Firefox";
rule.target_node = "my-sink";
client->AddRouteRule(rule);
// The policy engine auto-links Firefox when it starts playing
```
## Threading Model
`Client` creates a dedicated PipeWire thread loop (`pw_thread_loop`) that runs PipeWire event dispatch. All callbacks (registry events, stream state, link proxy events, metadata changes) execute on this thread.
Public methods are thread-safe. They lock the PipeWire thread loop for operations that call into libpipewire, and use a separate `cache_mutex` for reading/writing the in-memory registry cache.
**Rules:**
- All public `Client` methods can be called from any thread.
- Do not call `Client` methods from PipeWire callbacks (deadlock).
- The policy engine runs on the PipeWire thread. Auto-links are created asynchronously without blocking.
## Error Model
Every fallible operation returns `Status` or `Result<T>`.
```cpp
struct Status {
StatusCode code;
std::string message;
bool ok() const;
};
template<typename T>
struct Result {
Status status;
T value;
bool ok() const;
};
```
`StatusCode` values:
- `kOk` — success
- `kInvalidArgument` — bad input (empty name, missing match criteria, corrupted config)
- `kNotFound` — object doesn't exist (port, rule, config file)
- `kUnavailable` — PipeWire daemon down, stream failed, metadata not available
- `kPermissionDenied` — access denied
- `kTimeout` — PipeWire sync timed out
- `kInternal` — allocation or I/O failure
- `kNotImplemented` — caller thread mode
## Connection Options
```cpp
struct ConnectionOptions {
ThreadingMode threading; // kThreadLoop (default) or kCallerThread (not impl)
bool autoconnect; // Reconnect on disconnect (default true)
bool policy_only; // Observe-only mode, no auto-links (default false)
std::string application_name; // PW_KEY_APP_NAME (default "warppipe")
std::optional<std::string> remote_name; // Connect to non-default PipeWire
std::optional<std::string> config_path; // Auto-load on start, auto-save on change
};
```
### Policy-Only Mode
When `policy_only = true`, the policy engine observes the registry and matches rules, but does not create links. This avoids conflicts with WirePlumber or other session managers.
Use this when:
- Running alongside WirePlumber and you want to set metadata defaults instead of forcing links.
- Building a monitoring/observation tool.
You can still create links manually via `CreateLink` / `CreateLinkByName`.
## Registry Queries
```cpp
Result<std::vector<NodeInfo>> ListNodes();
Result<std::vector<PortInfo>> ListPorts(NodeId node);
Result<std::vector<Link>> ListLinks();
```
`NodeInfo` includes stable identity fields used for rule matching:
- `name` — PW_KEY_NODE_NAME
- `media_class` — PW_KEY_MEDIA_CLASS
- `application_name` — PW_KEY_APP_NAME
- `process_binary` — PW_KEY_APP_PROCESS_BINARY
- `media_role` — PW_KEY_MEDIA_ROLE
## Virtual Nodes
```cpp
Result<VirtualSink> CreateVirtualSink(std::string_view name, const VirtualNodeOptions& opts);
Result<VirtualSource> CreateVirtualSource(std::string_view name, const VirtualNodeOptions& opts);
Status RemoveNode(NodeId node);
```
Virtual nodes are PipeWire streams with `PW_KEY_NODE_VIRTUAL = true`. They live as long as the `Client` (or until explicitly removed). Null behavior discards audio; loopback behavior forwards to a target node.
## Link Management
```cpp
Result<Link> CreateLink(PortId output, PortId input, const LinkOptions& opts);
Result<Link> CreateLinkByName(std::string_view out_node, std::string_view out_port,
std::string_view in_node, std::string_view in_port,
const LinkOptions& opts);
Status RemoveLink(LinkId link);
```
Links are created via `link-factory` (`pw_core_create_object`). Use `linger = true` so links persist after the creating client disconnects. Use `passive = true` for monitoring/side-chain links that don't affect the session manager's routing decisions.
## Route Rules (Policy Engine)
```cpp
Result<RuleId> AddRouteRule(const RouteRule& rule);
Status RemoveRouteRule(RuleId id);
Result<std::vector<RouteRule>> ListRouteRules();
```
Rules match newly-appearing nodes by application metadata and auto-link them to target sinks.
Match criteria (all non-empty fields must match):
- `application_name` — PW_KEY_APP_NAME
- `process_binary` — PW_KEY_APP_PROCESS_BINARY
- `media_role` — PW_KEY_MEDIA_ROLE
When a matching node appears, the engine waits for ports to register (via PipeWire sync), then creates links pairwise by port name order.
Adding a rule also scans existing nodes for matches.
## Metadata
```cpp
Result<MetadataInfo> GetDefaults();
Status SetDefaultSink(std::string_view node_name);
Status SetDefaultSource(std::string_view node_name);
```
Metadata is read from PipeWire's "default" metadata object (created by the session manager). `SetDefaultSink`/`SetDefaultSource` set `default.configured.audio.sink`/`source`, which persists across sessions.
Returns `kUnavailable` if no metadata object is found (e.g., no session manager running).
## Persistence
```cpp
Status SaveConfig(std::string_view path);
Status LoadConfig(std::string_view path);
```
JSON format. See `docs/config-schema.md` for the schema.
When `config_path` is set in `ConnectionOptions`:
- Config is loaded automatically after connection.
- Config is saved automatically after mutations (add/remove rules, create/remove virtual nodes).
## Performance
Measured on PipeWire 1.4.10, Fedora, warm connection:
| Operation | Count | Time | Budget |
|-----------|-------|------|--------|
| Create/destroy virtual sinks | 200 | ~390ms | <1000ms |
| Registry snapshot + 100 events | 200 nodes | ~218ms | <1000ms |
| Policy: 200 ephemeral sources | 200 | ~370ms | <1000ms |
Rule lookup is O(n) over rules per node appearance, O(n) over ports for link matching. Typical rule sets (<100 rules) complete in microseconds.
## CLI
The `warppipe_cli` binary provides manual access to all operations:
```
warppipe_cli list-nodes
warppipe_cli list-ports <node-id>
warppipe_cli list-links
warppipe_cli list-rules
warppipe_cli create-sink <name> [--rate N] [--channels N]
warppipe_cli create-source <name> [--rate N] [--channels N]
warppipe_cli link <out-node> <out-port> <in-node> <in-port> [--passive] [--linger]
warppipe_cli unlink <link-id>
warppipe_cli add-rule --app <name> --target <node> [--process <bin>] [--role <role>]
warppipe_cli remove-rule <rule-id>
warppipe_cli save-config <path>
warppipe_cli load-config <path>
warppipe_cli defaults
```

97
docs/config-schema.md Normal file
View file

@ -0,0 +1,97 @@
# Warppipe Configuration Schema
Warppipe uses JSON for configuration persistence. The config file stores virtual nodes and routing rules using stable identifiers (names, not serial IDs).
## Schema Version 1
```json
{
"version": 1,
"virtual_nodes": [
{
"name": "warppipe-gaming-sink",
"is_source": false,
"rate": 48000,
"channels": 2,
"loopback": false,
"target_node": ""
},
{
"name": "warppipe-mic-source",
"is_source": true,
"rate": 48000,
"channels": 2,
"loopback": false,
"target_node": ""
},
{
"name": "warppipe-loopback",
"is_source": false,
"rate": 48000,
"channels": 2,
"loopback": true,
"target_node": "alsa_output.pci-0000_00_1f.3.analog-stereo"
}
],
"route_rules": [
{
"match": {
"application_name": "Firefox",
"process_binary": "firefox",
"media_role": ""
},
"target_node": "warppipe-gaming-sink"
},
{
"match": {
"application_name": "discord",
"process_binary": "",
"media_role": "Communication"
},
"target_node": "alsa_output.usb-headset"
}
]
}
```
## Field Descriptions
### virtual_nodes
- `name` (string, required): Unique node name (PW_KEY_NODE_NAME)
- `is_source` (boolean, required): true for Audio/Source, false for Audio/Sink
- `rate` (integer, default 48000): Sample rate in Hz
- `channels` (integer, default 2): Channel count
- `loopback` (boolean, default false): Whether node forwards to a target
- `target_node` (string, optional): Required when loopback is true
### route_rules
Rules match ephemeral audio sources to target sinks by stable application metadata.
- `match.application_name` (string): Match PW_KEY_APP_NAME
- `match.process_binary` (string): Match PW_KEY_APP_PROCESS_BINARY
- `match.media_role` (string): Match PW_KEY_MEDIA_ROLE
- `target_node` (string, required): Destination node name
All non-empty match fields must match (AND logic). At least one match field must be non-empty.
## Persistence Behavior
- **Auto-save**: When `ConnectionOptions::config_path` is set, config is saved after:
- Virtual node created/removed
- Routing rule added/removed
- **Load on startup**: When `config_path` is set and the file exists, it is loaded during `Client::Create()` after connection is established.
- **Manual**: `SaveConfig(path)` and `LoadConfig(path)` work independently of auto-save.
- **Atomic writes**: Saves write to a `.tmp` file first, then rename for crash safety.
## Error Handling
- Missing config file on auto-load: Silently ignored (fresh start)
- Invalid JSON syntax: `StatusCode::kInvalidArgument` with parse error details
- Missing version field: `StatusCode::kInvalidArgument`
- Empty path: `StatusCode::kInvalidArgument`
- File not found on manual load: `StatusCode::kNotFound`

View file

@ -1,3 +1,4 @@
#include <csignal>
#include <cstdint>
#include <cstdlib>
#include <iostream>
@ -7,6 +8,12 @@
namespace {
volatile sig_atomic_t g_running = 1;
void SignalHandler(int) {
g_running = 0;
}
uint32_t ParseId(const char* value) {
if (!value) {
return 0;
@ -19,11 +26,29 @@ uint32_t ParseId(const char* value) {
return static_cast<uint32_t>(parsed);
}
std::string ArgValue(int argc, char* argv[], int i, const char* flag) {
if (i + 1 < argc) {
return argv[i + 1];
}
std::cerr << "warppipe: missing value for " << flag << "\n";
return "";
}
int Usage() {
std::cerr << "Usage:\n"
<< " warppipe_cli list-nodes\n"
<< " warppipe_cli list-ports <node-id>\n"
<< " warppipe_cli list-links\n";
<< " warppipe_cli list-links\n"
<< " warppipe_cli list-rules\n"
<< " warppipe_cli create-sink <name> [--rate N] [--channels N]\n"
<< " warppipe_cli create-source <name> [--rate N] [--channels N]\n"
<< " warppipe_cli link <out-node> <out-port> <in-node> <in-port> [--passive] [--linger]\n"
<< " warppipe_cli unlink <link-id>\n"
<< " warppipe_cli add-rule --app <name> --target <node> [--process <bin>] [--role <role>]\n"
<< " warppipe_cli remove-rule <rule-id>\n"
<< " warppipe_cli save-config <path>\n"
<< " warppipe_cli load-config <path>\n"
<< " warppipe_cli defaults\n";
return 2;
}
@ -91,5 +116,189 @@ int main(int argc, char* argv[]) {
return 0;
}
if (command == "list-rules") {
auto rules = client_result.value->ListRouteRules();
if (!rules.ok()) {
std::cerr << "warppipe: list-rules failed: " << rules.status.message << "\n";
return 1;
}
for (const auto& rule : rules.value) {
std::cout << rule.id.value << "\t"
<< "app=" << rule.match.application_name << "\t"
<< "proc=" << rule.match.process_binary << "\t"
<< "role=" << rule.match.media_role << "\t"
<< "-> " << rule.target_node << "\n";
}
return 0;
}
if (command == "create-sink" || command == "create-source") {
if (argc < 3) {
return Usage();
}
std::string name = argv[2];
warppipe::VirtualNodeOptions node_options;
for (int i = 3; i < argc; ++i) {
std::string arg = argv[i];
if (arg == "--rate" && i + 1 < argc) {
node_options.format.rate = ParseId(argv[++i]);
} else if (arg == "--channels" && i + 1 < argc) {
node_options.format.channels = ParseId(argv[++i]);
}
}
if (command == "create-sink") {
auto result = client_result.value->CreateVirtualSink(name, node_options);
if (!result.ok()) {
std::cerr << "warppipe: create-sink failed: " << result.status.message << "\n";
return 1;
}
std::cout << "created sink " << result.value.name
<< " (node " << result.value.node.value << ")\n";
} else {
auto result = client_result.value->CreateVirtualSource(name, node_options);
if (!result.ok()) {
std::cerr << "warppipe: create-source failed: " << result.status.message << "\n";
return 1;
}
std::cout << "created source " << result.value.name
<< " (node " << result.value.node.value << ")\n";
}
std::cout << "press Ctrl+C to stop\n";
std::signal(SIGINT, SignalHandler);
std::signal(SIGTERM, SignalHandler);
while (g_running) {
usleep(100000);
}
std::cout << "\nshutting down\n";
return 0;
}
if (command == "link") {
if (argc < 6) {
std::cerr << "warppipe: link requires <out-node> <out-port> <in-node> <in-port>\n";
return Usage();
}
std::string out_node = argv[2];
std::string out_port = argv[3];
std::string in_node = argv[4];
std::string in_port = argv[5];
warppipe::LinkOptions link_opts;
for (int i = 6; i < argc; ++i) {
std::string arg = argv[i];
if (arg == "--passive") {
link_opts.passive = true;
} else if (arg == "--linger") {
link_opts.linger = true;
}
}
auto result = client_result.value->CreateLinkByName(out_node, out_port, in_node, in_port, link_opts);
if (!result.ok()) {
std::cerr << "warppipe: link failed: " << result.status.message << "\n";
return 1;
}
std::cout << "created link " << result.value.id.value << "\n";
return 0;
}
if (command == "unlink") {
if (argc < 3) {
return Usage();
}
uint32_t link_id = ParseId(argv[2]);
if (link_id == 0) {
std::cerr << "warppipe: invalid link id\n";
return 1;
}
auto status = client_result.value->RemoveLink(warppipe::LinkId{link_id});
if (!status.ok()) {
std::cerr << "warppipe: unlink failed: " << status.message << "\n";
return 1;
}
std::cout << "removed link " << link_id << "\n";
return 0;
}
if (command == "add-rule") {
warppipe::RouteRule rule;
for (int i = 2; i < argc; ++i) {
std::string arg = argv[i];
if (arg == "--app" && i + 1 < argc) {
rule.match.application_name = argv[++i];
} else if (arg == "--process" && i + 1 < argc) {
rule.match.process_binary = argv[++i];
} else if (arg == "--role" && i + 1 < argc) {
rule.match.media_role = argv[++i];
} else if (arg == "--target" && i + 1 < argc) {
rule.target_node = argv[++i];
}
}
auto result = client_result.value->AddRouteRule(rule);
if (!result.ok()) {
std::cerr << "warppipe: add-rule failed: " << result.status.message << "\n";
return 1;
}
std::cout << "added rule " << result.value.value << "\n";
return 0;
}
if (command == "remove-rule") {
if (argc < 3) {
return Usage();
}
uint32_t rule_id = ParseId(argv[2]);
if (rule_id == 0) {
std::cerr << "warppipe: invalid rule id\n";
return 1;
}
auto status = client_result.value->RemoveRouteRule(warppipe::RuleId{rule_id});
if (!status.ok()) {
std::cerr << "warppipe: remove-rule failed: " << status.message << "\n";
return 1;
}
std::cout << "removed rule " << rule_id << "\n";
return 0;
}
if (command == "save-config") {
if (argc < 3) {
return Usage();
}
auto status = client_result.value->SaveConfig(argv[2]);
if (!status.ok()) {
std::cerr << "warppipe: save-config failed: " << status.message << "\n";
return 1;
}
std::cout << "saved config to " << argv[2] << "\n";
return 0;
}
if (command == "load-config") {
if (argc < 3) {
return Usage();
}
auto status = client_result.value->LoadConfig(argv[2]);
if (!status.ok()) {
std::cerr << "warppipe: load-config failed: " << status.message << "\n";
return 1;
}
std::cout << "loaded config from " << argv[2] << "\n";
return 0;
}
if (command == "defaults") {
auto defaults = client_result.value->GetDefaults();
if (!defaults.ok()) {
std::cerr << "warppipe: defaults failed: " << defaults.status.message << "\n";
return 1;
}
std::cout << "default_sink\t" << defaults.value.default_sink_name << "\n"
<< "default_source\t" << defaults.value.default_source_name << "\n"
<< "configured_sink\t" << defaults.value.configured_sink_name << "\n"
<< "configured_source\t" << defaults.value.configured_source_name << "\n";
return 0;
}
return Usage();
}

View file

@ -45,6 +45,7 @@ enum class ThreadingMode : uint8_t {
struct ConnectionOptions {
ThreadingMode threading = ThreadingMode::kThreadLoop;
bool autoconnect = true;
bool policy_only = false;
std::string application_name = "warppipe";
std::optional<std::string> remote_name;
std::optional<std::string> config_path;

View file

@ -41,7 +41,7 @@ bool ParseUInt(const char* value, uint32_t* out) {
void PrintUsage() {
std::cout << "warppipe_perf usage:\n"
<< " --mode create-destroy|registry|links|policy\n"
<< " --mode create-destroy|registry|links|policy|e2e\n"
<< " --type sink|source|both\n"
<< " --count N (default 200, per-type when --type both)\n"
<< " --events N (registry mode, default 100)\n"
@ -95,7 +95,8 @@ bool ParseArgs(int argc, char* argv[], Options* options) {
return false;
}
if (options->mode != "create-destroy" && options->mode != "registry" &&
options->mode != "links" && options->mode != "policy") {
options->mode != "links" && options->mode != "policy" &&
options->mode != "e2e") {
return false;
}
return true;
@ -402,6 +403,77 @@ int main(int argc, char* argv[]) {
return 0;
}
if (options.mode == "e2e") {
std::vector<double> latencies;
latencies.reserve(options.count);
for (uint32_t i = 0; i < options.count; ++i) {
auto iter_start = std::chrono::steady_clock::now();
std::string sink_name = prefix + "-e2e-sink-" + std::to_string(i);
auto sink = client.value->CreateVirtualSink(sink_name, node_options);
if (!sink.ok()) {
std::cerr << "e2e: create sink failed at " << i << ": "
<< sink.status.message << "\n";
break;
}
warppipe::RouteRule rule;
rule.match.application_name = "warppipe-perf";
rule.target_node = sink_name;
auto rule_result = client.value->AddRouteRule(rule);
if (!rule_result.ok()) {
client.value->RemoveNode(sink.value.node);
std::cerr << "e2e: add rule failed at " << i << ": "
<< rule_result.status.message << "\n";
break;
}
std::string source_name = prefix + "-e2e-src-" + std::to_string(i);
auto source = client.value->CreateVirtualSource(source_name, node_options);
if (!source.ok()) {
client.value->RemoveRouteRule(rule_result.value);
client.value->RemoveNode(sink.value.node);
std::cerr << "e2e: create source failed at " << i << ": "
<< source.status.message << "\n";
break;
}
client.value->RemoveNode(source.value.node);
client.value->RemoveRouteRule(rule_result.value);
client.value->RemoveNode(sink.value.node);
auto iter_end = std::chrono::steady_clock::now();
latencies.push_back(ToMillis(iter_end - iter_start));
}
if (latencies.empty()) {
std::cerr << "e2e: no iterations completed\n";
return 1;
}
std::sort(latencies.begin(), latencies.end());
size_t n = latencies.size();
size_t p50 = n / 2;
size_t p95 = static_cast<size_t>(static_cast<double>(n) * 0.95);
if (p95 >= n) {
p95 = n - 1;
}
double total = 0.0;
for (double v : latencies) {
total += v;
}
std::cout << "e2e_iterations=" << n << "\n"
<< "e2e_total_ms=" << std::fixed << std::setprecision(2) << total << "\n"
<< "e2e_median_ms=" << latencies[p50] << "\n"
<< "e2e_p95_ms=" << latencies[p95] << "\n"
<< "e2e_min_ms=" << latencies[0] << "\n"
<< "e2e_max_ms=" << latencies[n - 1] << "\n";
return 0;
}
PrintUsage();
return 2;
}

View file

@ -809,6 +809,12 @@ void Client::Impl::SchedulePolicySync() {
}
void Client::Impl::ProcessPendingAutoLinks() {
if (options.policy_only) {
std::lock_guard<std::mutex> lock(cache_mutex);
pending_auto_links.clear();
return;
}
struct LinkSpec {
uint32_t output_port;
uint32_t input_port;

View file

@ -653,3 +653,142 @@ TEST_CASE("NodeInfo captures application properties") {
}
FAIL("inserted node not found");
}
TEST_CASE("policy-only mode does not create auto-links") {
warppipe::ConnectionOptions opts = DefaultOptions();
opts.policy_only = true;
auto result = warppipe::Client::Create(opts);
if (!result.ok()) {
SUCCEED("PipeWire unavailable");
return;
}
warppipe::RouteRule rule;
rule.match.application_name = "policy-only-app";
rule.target_node = "policy-sink";
REQUIRE(result.value->AddRouteRule(rule).ok());
warppipe::NodeInfo sink;
sink.id = warppipe::NodeId{900001};
sink.name = "policy-sink";
sink.media_class = "Audio/Sink";
REQUIRE(result.value->Test_InsertNode(sink).ok());
warppipe::PortInfo sink_port;
sink_port.id = warppipe::PortId{900002};
sink_port.node = sink.id;
sink_port.name = "playback_FL";
sink_port.is_input = true;
REQUIRE(result.value->Test_InsertPort(sink_port).ok());
warppipe::NodeInfo source;
source.id = warppipe::NodeId{900003};
source.name = "policy-only-source";
source.media_class = "Stream/Output/Audio";
source.application_name = "policy-only-app";
REQUIRE(result.value->Test_InsertNode(source).ok());
warppipe::PortInfo src_port;
src_port.id = warppipe::PortId{900004};
src_port.node = source.id;
src_port.name = "capture_FL";
src_port.is_input = false;
REQUIRE(result.value->Test_InsertPort(src_port).ok());
REQUIRE(result.value->Test_TriggerPolicyCheck().ok());
REQUIRE(result.value->Test_GetPendingAutoLinkCount() == 0);
auto links = result.value->ListLinks();
REQUIRE(links.ok());
bool found_auto_link = false;
for (const auto& link : links.value) {
if (link.output_port.value == 900004 && link.input_port.value == 900002) {
found_auto_link = true;
}
}
REQUIRE_FALSE(found_auto_link);
}
TEST_CASE("policy engine does not re-create existing links") {
auto result = warppipe::Client::Create(DefaultOptions());
if (!result.ok()) {
SUCCEED("PipeWire unavailable");
return;
}
warppipe::NodeInfo sink;
sink.id = warppipe::NodeId{910001};
sink.name = "relink-sink";
sink.media_class = "Audio/Sink";
REQUIRE(result.value->Test_InsertNode(sink).ok());
warppipe::PortInfo sink_port;
sink_port.id = warppipe::PortId{910002};
sink_port.node = sink.id;
sink_port.name = "playback_FL";
sink_port.is_input = true;
REQUIRE(result.value->Test_InsertPort(sink_port).ok());
warppipe::NodeInfo source;
source.id = warppipe::NodeId{910003};
source.name = "relink-source";
source.media_class = "Stream/Output/Audio";
source.application_name = "relink-app";
REQUIRE(result.value->Test_InsertNode(source).ok());
warppipe::PortInfo src_port;
src_port.id = warppipe::PortId{910004};
src_port.node = source.id;
src_port.name = "capture_FL";
src_port.is_input = false;
REQUIRE(result.value->Test_InsertPort(src_port).ok());
warppipe::Link existing_link;
existing_link.id = warppipe::LinkId{910005};
existing_link.output_port = warppipe::PortId{910004};
existing_link.input_port = warppipe::PortId{910002};
REQUIRE(result.value->Test_InsertLink(existing_link).ok());
warppipe::RouteRule rule;
rule.match.application_name = "relink-app";
rule.target_node = "relink-sink";
REQUIRE(result.value->AddRouteRule(rule).ok());
REQUIRE(result.value->Test_TriggerPolicyCheck().ok());
auto links = result.value->ListLinks();
REQUIRE(links.ok());
int matching_links = 0;
for (const auto& link : links.value) {
if (link.output_port.value == 910004 && link.input_port.value == 910002) {
++matching_links;
}
}
REQUIRE(matching_links == 1);
}
TEST_CASE("policy mode does not override user defaults") {
warppipe::ConnectionOptions opts = DefaultOptions();
opts.policy_only = true;
auto result = warppipe::Client::Create(opts);
if (!result.ok()) {
SUCCEED("PipeWire unavailable");
return;
}
auto defaults = result.value->GetDefaults();
REQUIRE(defaults.ok());
warppipe::RouteRule rule;
rule.match.application_name = "some-app";
rule.target_node = "custom-sink";
REQUIRE(result.value->AddRouteRule(rule).ok());
auto defaults2 = result.value->GetDefaults();
REQUIRE(defaults2.ok());
REQUIRE(defaults2.value.default_sink_name == defaults.value.default_sink_name);
REQUIRE(defaults2.value.default_source_name == defaults.value.default_source_name);
REQUIRE(defaults2.value.configured_sink_name == defaults.value.configured_sink_name);
REQUIRE(defaults2.value.configured_source_name == defaults.value.configured_source_name);
}