From 866f0419ad36f6ca186abd7210a9b30324fc6e33 Mon Sep 17 00:00:00 2001 From: Joey Yakimowich-Payne Date: Thu, 29 Jan 2026 17:24:51 -0700 Subject: [PATCH] Milestone 2 --- .clangd | 9 + .envrc | 14 + .envrc.example | 10 + .gitignore | 10 + .vim/coc-settings.json | 8 + CMakeLists.txt | 53 +++ LSP_SETUP.md | 145 ++++++ PLAN.md | 48 +- README.md | 27 ++ clangd-wrapper.sh | 4 + docs/milestone-0.md | 53 +++ examples/minimal.cpp | 22 + examples/warppipe_cli.cpp | 95 ++++ include/warppipe/warppipe.hpp | 178 +++++++ perf/README.md | 33 ++ perf/warppipe_perf.cpp | 231 +++++++++ src/warppipe.cpp | 862 ++++++++++++++++++++++++++++++++++ tests/README.md | 9 + tests/warppipe_tests.cpp | 218 +++++++++ 19 files changed, 2006 insertions(+), 23 deletions(-) create mode 100644 .clangd create mode 100644 .envrc create mode 100644 .envrc.example create mode 100644 .gitignore create mode 100644 .vim/coc-settings.json create mode 100644 CMakeLists.txt create mode 100644 LSP_SETUP.md create mode 100644 README.md create mode 100755 clangd-wrapper.sh create mode 100644 docs/milestone-0.md create mode 100644 examples/minimal.cpp create mode 100644 examples/warppipe_cli.cpp create mode 100644 include/warppipe/warppipe.hpp create mode 100644 perf/README.md create mode 100644 perf/warppipe_perf.cpp create mode 100644 src/warppipe.cpp create mode 100644 tests/README.md create mode 100644 tests/warppipe_tests.cpp diff --git a/.clangd b/.clangd new file mode 100644 index 0000000..8fcf86a --- /dev/null +++ b/.clangd @@ -0,0 +1,9 @@ +CompileFlags: + CompilationDatabase: build + Add: + - "-isystem" + - "/var/home/linuxbrew/.linuxbrew/Cellar/gcc/15.2.0/include/c++/15" + - "-isystem" + - "/var/home/linuxbrew/.linuxbrew/Cellar/gcc/15.2.0/include/c++/15/x86_64-pc-linux-gnu" + - "-isystem" + - "/var/home/linuxbrew/.linuxbrew/Cellar/gcc/15.2.0/include/c++/15/backward" diff --git a/.envrc b/.envrc new file mode 100644 index 0000000..b6fc67f --- /dev/null +++ b/.envrc @@ -0,0 +1,14 @@ +# Environment setup for warppipe LSP +# Source this file or use direnv (https://direnv.net/) + +# Tell clangd to query GCC for system includes +export CLANGD_FLAGS="--query-driver=/home/linuxbrew/.linuxbrew/bin/g++" + +# Optional: Set C++ include paths (some tools respect this) +export CPLUS_INCLUDE_PATH="/var/home/linuxbrew/.linuxbrew/Cellar/gcc/15.2.0/include/c++/15:/var/home/linuxbrew/.linuxbrew/Cellar/gcc/15.2.0/include/c++/15/x86_64-pc-linux-gnu" + +# Ensure Homebrew GCC is in PATH +export PATH="/home/linuxbrew/.linuxbrew/bin:$PATH" + +# Ensure pkg-config can find pipewire +export PKG_CONFIG_PATH="/home/linuxbrew/.linuxbrew/lib/pkgconfig:$PKG_CONFIG_PATH" diff --git a/.envrc.example b/.envrc.example new file mode 100644 index 0000000..af70949 --- /dev/null +++ b/.envrc.example @@ -0,0 +1,10 @@ +# Example .envrc for use with direnv +# Install direnv: https://direnv.net/ +# Then run: direnv allow + +# Tell clangd to query GCC for system includes +export CLANGD_FLAGS="--query-driver=/home/linuxbrew/.linuxbrew/bin/g++" + +# Ensure Homebrew tools are in PATH +export PATH="/home/linuxbrew/.linuxbrew/bin:$PATH" +export PKG_CONFIG_PATH="/home/linuxbrew/.linuxbrew/lib/pkgconfig:$PKG_CONFIG_PATH" diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0e17073 --- /dev/null +++ b/.gitignore @@ -0,0 +1,10 @@ +/build/ +/build-*/ +/cmake-build-*/ +/.cache/ +/CMakeFiles/ +/CMakeCache.txt +/cmake_install.cmake +/compile_commands.json +/Testing/ +/CTestTestfile.cmake diff --git a/.vim/coc-settings.json b/.vim/coc-settings.json new file mode 100644 index 0000000..cc2cf83 --- /dev/null +++ b/.vim/coc-settings.json @@ -0,0 +1,8 @@ +{ + "clangd.arguments": [ + "--background-index", + "--clang-tidy", + "--query-driver=/home/linuxbrew/.linuxbrew/bin/g++" + ], + "clangd.path": "/home/linuxbrew/.linuxbrew/bin/clangd" +} diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..6eda596 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,53 @@ +cmake_minimum_required(VERSION 3.20) + +project(warppipe VERSION 0.1.0 LANGUAGES CXX) + +set(WARPPIPE_HOMEBREW_PREFIX "/home/linuxbrew/.linuxbrew") +if(EXISTS "${WARPPIPE_HOMEBREW_PREFIX}/bin/brew") + list(PREPEND CMAKE_PREFIX_PATH "${WARPPIPE_HOMEBREW_PREFIX}") + set(ENV{PKG_CONFIG_PATH} + "${WARPPIPE_HOMEBREW_PREFIX}/lib/pkgconfig:${WARPPIPE_HOMEBREW_PREFIX}/share/pkgconfig:$ENV{PKG_CONFIG_PATH}") +endif() + +option(WARPPIPE_BUILD_EXAMPLES "Build warppipe examples" ON) +option(WARPPIPE_BUILD_TESTS "Build warppipe tests" ON) +option(WARPPIPE_BUILD_PERF "Build warppipe perf tools" ON) + +find_package(PkgConfig REQUIRED) +pkg_check_modules(PIPEWIRE REQUIRED IMPORTED_TARGET libpipewire-0.3) + +add_library(warppipe STATIC + src/warppipe.cpp +) + +target_include_directories(warppipe + PUBLIC + $ + $ +) + +target_compile_features(warppipe PUBLIC cxx_std_17) +target_link_libraries(warppipe PUBLIC PkgConfig::PIPEWIRE) + +if(WARPPIPE_BUILD_EXAMPLES) + add_executable(warppipe_example examples/minimal.cpp) + target_link_libraries(warppipe_example PRIVATE warppipe) + + add_executable(warppipe_cli examples/warppipe_cli.cpp) + target_link_libraries(warppipe_cli PRIVATE warppipe) +endif() + +if(WARPPIPE_BUILD_PERF) + add_executable(warppipe_perf perf/warppipe_perf.cpp) + target_link_libraries(warppipe_perf PRIVATE warppipe) +endif() + +if(WARPPIPE_BUILD_TESTS) + enable_testing() + find_package(Catch2 3 REQUIRED) + add_executable(warppipe_tests tests/warppipe_tests.cpp) + target_link_libraries(warppipe_tests PRIVATE warppipe Catch2::Catch2WithMain) + target_compile_definitions(warppipe PRIVATE WARPPIPE_TESTING) + target_compile_definitions(warppipe_tests PRIVATE WARPPIPE_TESTING) + add_test(NAME warppipe_tests COMMAND warppipe_tests) +endif() diff --git a/LSP_SETUP.md b/LSP_SETUP.md new file mode 100644 index 0000000..7d0fb8e --- /dev/null +++ b/LSP_SETUP.md @@ -0,0 +1,145 @@ +# LSP Setup for warppipe + +This project uses GCC 15 from Homebrew, which requires special configuration for clangd to find standard library headers. + +## Files Created + +1. **compile_commands.json** (symlink to build/compile_commands.json) + - Tells clangd how files are compiled + +2. **.clangd** - Configuration file + - Points clangd to the compilation database + +3. **clangd-wrapper.sh** - Wrapper script with --query-driver flag + - Helps clangd extract system includes from GCC + +4. **.envrc** - Environment variables for LSP + - Sets CLANGD_FLAGS to configure clangd globally + +## Using the LSP + +### Option 1: Use CLANGD_FLAGS Environment Variable (Easiest) + +Set the environment variable in your shell before starting your editor: + +```bash +export CLANGD_FLAGS="--query-driver=/home/linuxbrew/.linuxbrew/bin/g++" +``` + +Add this to your `~/.bashrc` or `~/.zshrc` to make it permanent. + +**Using direnv (Recommended for per-project setup):** +1. Install direnv: `brew install direnv` +2. Copy `.envrc.example` to `.envrc` (or use the existing `.envrc`) +3. Run `direnv allow` in the project directory +4. The environment will be set automatically when you cd into the project + +**Neovim users**: If you start Neovim from the shell with the env var set, clangd will automatically pick it up. + +**VS Code users**: Add to your workspace settings (.vscode/settings.json): +```json +{ + "terminal.integrated.env.linux": { + "CLANGD_FLAGS": "--query-driver=/home/linuxbrew/.linuxbrew/bin/g++" + } +} +``` + +### Option 2: Use clangd-wrapper.sh + +Configure your editor to use `/var/home/joey/Projects/warppipe/clangd-wrapper.sh` instead of `clangd`. + +**Neovim (nvim-lspconfig)**: +```lua +require('lspconfig').clangd.setup{ + cmd = { vim.fn.getcwd() .. '/clangd-wrapper.sh' } +} +``` + +**VS Code (settings.json)**: +```json +{ + "clangd.path": "/var/home/joey/Projects/warppipe/clangd-wrapper.sh" +} +``` + +### Option 3: Pass --query-driver via Editor Config + +Add to your editor's clangd configuration: + +**Neovim**: +```lua +require('lspconfig').clangd.setup{ + cmd = { 'clangd', '--query-driver=/home/linuxbrew/.linuxbrew/bin/g++' } +} +``` + +**VS Code (settings.json)**: +```json +{ + "clangd.arguments": ["--query-driver=/home/linuxbrew/.linuxbrew/bin/g++"] +} +``` + +### Option 4: Install llvm/clang++ from Homebrew + +```bash +brew install llvm +``` + +Then rebuild with clang++ (requires libstdc++ setup). + +## Neovim with coc.nvim Setup + +### 1. Install coc-clangd + +In Neovim, run: +```vim +:CocInstall coc-clangd +``` + +### 2. Configure coc-clangd + +**Option A: Project-specific (Recommended)** + +A `.vim/coc-settings.json` file has been created in this project with the correct configuration. Just open Neovim in this directory and it will work automatically. + +**Option B: Global configuration** + +Edit `~/.config/nvim/coc-settings.json` (or create it): +```json +{ + "clangd.arguments": [ + "--background-index", + "--clang-tidy", + "--query-driver=/home/linuxbrew/.linuxbrew/bin/g++" + ], + "clangd.path": "/home/linuxbrew/.linuxbrew/bin/clangd" +} +``` + +**Note**: The global config will apply the `--query-driver` to all projects, which might cause issues for non-Homebrew projects. The project-specific config (Option A) is safer. + +### 3. Restart coc + +After configuration: +```vim +:CocRestart +``` + +## Verifying LSP Works + +After configuration, open any C++ file and check: +- Autocomplete works +- Go-to-definition works (F12/gd) +- No errors on standard library includes (, , etc.) + +## Troubleshooting + +If you still see "file not found" errors for standard headers: +1. Ensure you're using the wrapper script +2. Restart your LSP server +3. Check that GCC 15 is still at the same path: + ```bash + ls -la /var/home/linuxbrew/.linuxbrew/Cellar/gcc/15.2.0/include/c++/15 + ``` diff --git a/PLAN.md b/PLAN.md index 4c717d7..58de05f 100644 --- a/PLAN.md +++ b/PLAN.md @@ -1,45 +1,47 @@ # Warppipe Plan (C++ libpipewire library) -- [ ] Milestone 0 — Groundwork and constraints - - [ ] Choose build system: CMake (confirmed). Define minimal library target and example app target. - - [ ] Define public API surface (namespaces, class/struct layout, error model, threading model). - - [ ] Define performance budget and metrics (e.g., 200 create/modify/delete ops in < 1s on typical desktop). - - [ ] Choose identity strategy for ephemeral sources (match rules on application.name, application.process.binary, media.role, node.name, fallback to client properties; avoid serial IDs). - - [ ] Tests to add (non-happy path/edge cases): instructions: create unit tests that fail on missing PipeWire daemon, missing libpipewire-module-link-factory, missing libpipewire-module-metadata, and misconfigured runtime properties (e.g., invalid PW_KEY_MEDIA_CLASS). - - [ ] Performance tests: instructions: add a microbenchmark harness that measures connect→create→destroy of N no-op objects (no links) and asserts subsecond N=200 on a warm PipeWire connection. +- [x] Milestone 0 - Groundwork and constraints + - [x] Choose build system: CMake (confirmed). Define minimal library target and example app target. + - [x] Define public API surface (namespaces, class/struct layout, error model, threading model). + - [x] Define performance budget and metrics (e.g., 200 create/modify/delete ops in < 1s on typical desktop). + - [x] Choose identity strategy for ephemeral sources (match rules on application.name, application.process.binary, media.role, node.name, fallback to client properties; avoid serial IDs). + - [x] Tests to add (non-happy path/edge cases): instructions: create unit tests that fail on missing PipeWire daemon, missing libpipewire-module-link-factory, missing libpipewire-module-metadata, and misconfigured runtime properties (e.g., invalid PW_KEY_MEDIA_CLASS). + - [x] Performance tests: instructions: add a microbenchmark harness that measures connect->create->destroy of N no-op objects (no links) and asserts subsecond N=200 on a warm PipeWire connection. -- [ ] Milestone 1 — Core runtime and registry model - - [ ] Implement a WarpContext (pw_main_loop/pw_thread_loop + pw_context + pw_core) with lifecycle and reconnect handling. - - [ ] Implement registry cache for nodes/ports/links with event listeners, and a stable “object identity” resolver (node name, application properties). - - [ ] Expose a query API to list nodes, ports, and identify sinks/sources. - - [ ] Tests to add (non-happy path/edge cases): instructions: simulate registry events where nodes/ports disappear mid-iteration; ensure safe iteration and cleanup; verify reconnection logic after daemon restart. - - [ ] Performance tests: instructions: measure time to ingest registry snapshot of 1000 objects and process 100 add/remove events; assert latency per event and total under subsecond. +- [ ] Milestone 1 - Core runtime and registry model + - [x] Implement a WarpContext (pw_main_loop/pw_thread_loop + pw_context + pw_core) with lifecycle and reconnect handling. + - [x] Implement registry cache for nodes/ports/links with event listeners, and a stable "object identity" resolver (node name, application properties). + - [x] Expose a query API to list nodes, ports, and identify sinks/sources. + - [x] Add Catch2 test harness with smoke coverage for connection modes. + - [x] Add warppipe_cli for list-nodes, list-ports, list-links. + - [x] Tests to add (non-happy path/edge cases): instructions: simulate registry events where nodes/ports disappear mid-iteration; ensure safe iteration and cleanup; verify reconnection logic after daemon restart. + - [x] Performance tests: instructions: measure time to ingest registry snapshot of 1000 objects and process 100 add/remove events; assert latency per event and total under subsecond. -- [ ] Milestone 2 — Virtual sinks and sources - - [ ] Implement virtual sink/source creation via pw_stream_new with PW_KEY_MEDIA_CLASS set to Audio/Sink or Audio/Source and autoconnect flags as needed (see src/pipewire/stream.h). - - [ ] Support “null” behavior (discard) and “loopback” behavior (sink that forwards to target) using stream properties and explicit links. - - [ ] Provide a naming scheme and metadata tags for virtual nodes to ensure stable identification. - - [ ] Tests to add (non-happy path/edge cases): instructions: create sink/source with missing media class and expect validation error; create duplicate node name; attempt to connect when target node is absent. - - [ ] Performance tests: instructions: create/destroy 100 virtual sinks and 100 virtual sources in a tight loop; measure wall time and ensure it stays within the target budget. +- [ ] Milestone 2 - Virtual sinks and sources + - [x] Implement virtual sink/source creation via pw_stream_new with PW_KEY_MEDIA_CLASS set to Audio/Sink or Audio/Source and autoconnect flags as needed (see src/pipewire/stream.h). + - [x] Support "null" behavior (discard) and "loopback" behavior (sink that forwards to target) using stream properties and explicit links. + - [x] Provide a naming scheme and metadata tags for virtual nodes to ensure stable identification. + - [x] Tests to add (non-happy path/edge cases): instructions: create sink/source with missing media class and expect validation error; create duplicate node name; attempt to connect when target node is absent. + - [x] Performance tests: instructions: create/destroy 100 virtual sinks and 100 virtual sources in a tight loop; measure wall time and ensure it stays within the target budget. -- [ ] Milestone 3 — Link management API +- [ ] Milestone 3 - Link management API - [ ] Implement link creation via link-factory (load libpipewire-module-link-factory and call pw_core_create_object with link.input.* and link.output.* props; see src/modules/module-link-factory.c, src/examples/internal.c, src/tools/pw-link.c). - [ ] Support linking by node+port names and by object IDs; add object.linger and link.passive options. - [ ] Add link deletion and link reconciliation (auto-remove stale links when endpoints vanish). - [ ] Tests to add (non-happy path/edge cases): instructions: link to non-existent port; link output-to-output or input-to-input; remove node while link is initializing; create two links to same port and validate policy behavior. - [ ] Performance tests: instructions: create 200 links between existing ports; measure create+destroy time and verify subsecond target where possible. -- [ ] Milestone 4 — Persistence and “ephemeral source” policy +- [ ] Milestone 4 - Persistence and "ephemeral source" policy - [ ] Implement persistence (JSON or TOML) for: virtual nodes, links, and per-app routing rules. Persist on change; load on startup. - [ ] Implement policy engine: - [ ] Watch for node/port appearance; apply stored rules to auto-link ephemeral sources to preferred sinks. - - [ ] Store mapping by rule (app identity → target sink/source). Avoid serial IDs; use stable metadata (app/process/role). +- [ ] Store mapping by rule (app identity -> target sink/source). Avoid serial IDs; use stable metadata (app/process/role). - [ ] Allow user override to update rule and persist. - [ ] Integrate metadata store for defaults and routing hints using libpipewire-module-metadata (see src/modules/module-metadata.c). Track default.audio.sink/source and default.configured.audio.sink/source for stable defaults; use a dedicated warppipe.* metadata namespace to avoid conflicts. - [ ] Tests to add (non-happy path/edge cases): instructions: rule for app that disappears and reappears under a different PID; verify re-routing; conflicting rules (two matches) resolved deterministically; persistence file corrupted; metadata module not available. - [ ] Performance tests: instructions: simulate 200 ephemeral sources (connect/disconnect) and measure time to apply routing rules and create links; ensure rule lookup is O(1) or O(log n). -- [ ] Milestone 5 — Stability, compatibility, and tooling +- [ ] Milestone 5 - Stability, compatibility, and tooling - [ ] Provide a simple CLI (optional) to inspect nodes, create virtual nodes, link/unlink, and export/import config (useful for manual testing). - [ ] Add documentation: API usage patterns, threading model, and performance notes. - [ ] Validate behavior with PipeWire session manager present (WirePlumber) to avoid fighting policy; allow “policy-only” mode (observes and sets metadata without forcing links). diff --git a/README.md b/README.md new file mode 100644 index 0000000..d6bd955 --- /dev/null +++ b/README.md @@ -0,0 +1,27 @@ +# warppipe + +A C++ libpipewire library for creating virtual sinks/sources and managing links. + +## Build + +Requirements: +- CMake 3.20+ +- pkg-config +- libpipewire-0.3 development files +- C++17 compiler + +Aurora / Universal Blue (Homebrew): +``` +/home/linuxbrew/.linuxbrew/bin/brew install cmake pkg-config pipewire +``` + +Build: +``` +cmake -S . -B build +cmake --build build +``` + +Example: +``` +./build/warppipe_example +``` diff --git a/clangd-wrapper.sh b/clangd-wrapper.sh new file mode 100755 index 0000000..425e4b6 --- /dev/null +++ b/clangd-wrapper.sh @@ -0,0 +1,4 @@ +#!/bin/bash +exec /home/linuxbrew/.linuxbrew/bin/clangd \ + --query-driver="/home/linuxbrew/.linuxbrew/bin/g++" \ + "$@" diff --git a/docs/milestone-0.md b/docs/milestone-0.md new file mode 100644 index 0000000..38d227c --- /dev/null +++ b/docs/milestone-0.md @@ -0,0 +1,53 @@ +# Milestone 0 - Foundation + +## Build system + +- CMake 3.20+ +- C++17 +- pkg-config +- libpipewire-0.3 development files + +## Public API surface + +- Namespace: warppipe +- Core types: Status, StatusCode, Result +- Threading: ThreadingMode (caller thread or managed thread loop) +- Primary entry: Client +- Handles: NodeId, PortId, LinkId, RuleId +- Data shapes: NodeInfo, PortInfo, VirtualSink, VirtualSource, Link, RouteRule + +## Error model + +- StatusCode enum with explicit categories (ok, invalid argument, not found, unavailable, permission denied, timeout, internal, not implemented) +- Status carries code + message; Result pairs Status with value + +## Threading model + +- Default: managed thread loop owned by Client +- Optional: caller thread loop for embedding in existing event loop + +## Performance budget and metrics + +- Target: 200 create/modify/delete operations in under 1 second on a warm PipeWire connection +- Target per operation: median < 2 ms, p95 < 10 ms for create or delete +- Track metrics for: create virtual sink/source, create link, delete link + +## Ephemeral source identity strategy + +- Match priority: application.name, application.process.binary, media.role, node.name +- Fallback: client properties and media class +- Avoid serial IDs or object IDs for persistence +- Rule matching is stable across restarts and reconnects + +## Tests to add (Milestone 0) + +- Missing PipeWire daemon: Client::Create returns StatusCode::kUnavailable +- Missing link-factory module: CreateLink returns StatusCode::kUnavailable +- Missing metadata module: SaveConfig or policy init returns StatusCode::kUnavailable +- Invalid media class in virtual node creation: returns StatusCode::kInvalidArgument + +## Performance tests (Milestone 0) + +- Microbenchmark harness that measures connect -> create N -> destroy N +- Baseline: N=200 with subsecond wall time on a warm PipeWire connection +- Record median and p95 latency per operation diff --git a/examples/minimal.cpp b/examples/minimal.cpp new file mode 100644 index 0000000..057f72a --- /dev/null +++ b/examples/minimal.cpp @@ -0,0 +1,22 @@ +#include + +#include + +int main() { + warppipe::ConnectionOptions options; + auto client_result = warppipe::Client::Create(options); + if (!client_result.ok()) { + std::cerr << "warppipe: failed to create client: " + << client_result.status.message << "\n"; + return 1; + } + + auto sink_result = client_result.value->CreateVirtualSink("example_sink"); + if (!sink_result.ok()) { + std::cerr << "warppipe: failed to create sink: " + << sink_result.status.message << "\n"; + return 1; + } + + return 0; +} diff --git a/examples/warppipe_cli.cpp b/examples/warppipe_cli.cpp new file mode 100644 index 0000000..68e70a5 --- /dev/null +++ b/examples/warppipe_cli.cpp @@ -0,0 +1,95 @@ +#include +#include +#include +#include + +#include + +namespace { + +uint32_t ParseId(const char* value) { + if (!value) { + return 0; + } + char* end = nullptr; + unsigned long parsed = std::strtoul(value, &end, 10); + if (!end || end == value) { + return 0; + } + return static_cast(parsed); +} + +int Usage() { + std::cerr << "Usage:\n" + << " warppipe_cli list-nodes\n" + << " warppipe_cli list-ports \n" + << " warppipe_cli list-links\n"; + return 2; +} + +} // namespace + +int main(int argc, char* argv[]) { + if (argc < 2) { + return Usage(); + } + + std::string command = argv[1]; + warppipe::ConnectionOptions options; + options.application_name = "warppipe-cli"; + + auto client_result = warppipe::Client::Create(options); + if (!client_result.ok()) { + std::cerr << "warppipe: failed to connect: " + << client_result.status.message << "\n"; + return 1; + } + + if (command == "list-nodes") { + auto nodes = client_result.value->ListNodes(); + if (!nodes.ok()) { + std::cerr << "warppipe: list-nodes failed: " << nodes.status.message << "\n"; + return 1; + } + for (const auto& node : nodes.value) { + std::cout << node.id.value << "\t" << node.name << "\t" << node.media_class << "\n"; + } + return 0; + } + + if (command == "list-ports") { + if (argc < 3) { + return Usage(); + } + uint32_t node_id = ParseId(argv[2]); + if (node_id == 0) { + std::cerr << "warppipe: invalid node id\n"; + return 1; + } + auto ports = client_result.value->ListPorts(warppipe::NodeId{node_id}); + if (!ports.ok()) { + std::cerr << "warppipe: list-ports failed: " << ports.status.message << "\n"; + return 1; + } + for (const auto& port : ports.value) { + std::cout << port.id.value << "\t" << port.name << "\t" + << (port.is_input ? "in" : "out") << "\n"; + } + return 0; + } + + if (command == "list-links") { + auto links = client_result.value->ListLinks(); + if (!links.ok()) { + std::cerr << "warppipe: list-links failed: " << links.status.message << "\n"; + return 1; + } + for (const auto& link : links.value) { + std::cout << link.id.value << "\t" << link.output_port.value << "\t" + << link.input_port.value << "\n"; + } + return 0; + } + + return Usage(); +} diff --git a/include/warppipe/warppipe.hpp b/include/warppipe/warppipe.hpp new file mode 100644 index 0000000..02f3494 --- /dev/null +++ b/include/warppipe/warppipe.hpp @@ -0,0 +1,178 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace warppipe { + +enum class StatusCode : uint8_t { + kOk = 0, + kInvalidArgument, + kNotFound, + kUnavailable, + kPermissionDenied, + kTimeout, + kInternal, + kNotImplemented, +}; + +struct Status { + StatusCode code = StatusCode::kOk; + std::string message; + + static Status Ok(); + static Status Error(StatusCode code, std::string message); + bool ok() const; +}; + +template +struct Result { + Status status; + T value{}; + + bool ok() const { return status.ok(); } +}; + +enum class ThreadingMode : uint8_t { + kCallerThread = 0, + kThreadLoop, +}; + +struct ConnectionOptions { + ThreadingMode threading = ThreadingMode::kThreadLoop; + bool autoconnect = true; + std::string application_name = "warppipe"; + std::optional remote_name; +}; + +struct AudioFormat { + uint32_t rate = 48000; + uint32_t channels = 2; +}; + +enum class VirtualBehavior : uint8_t { + kNull = 0, + kLoopback, +}; + +struct VirtualNodeOptions { + AudioFormat format{}; + VirtualBehavior behavior = VirtualBehavior::kNull; + std::optional target_node; + std::optional media_class_override; + std::string display_name; + std::string group; +}; + +struct NodeId { + uint32_t value = 0; +}; + +struct PortId { + uint32_t value = 0; +}; + +struct LinkId { + uint32_t value = 0; +}; + +struct RuleId { + uint32_t value = 0; +}; + +struct NodeInfo { + NodeId id; + std::string name; + std::string media_class; +}; + +struct PortInfo { + PortId id; + NodeId node; + std::string name; + bool is_input = false; +}; + +struct VirtualSink { + NodeId node; + std::string name; +}; + +struct VirtualSource { + NodeId node; + std::string name; +}; + +struct Link { + LinkId id; + PortId output_port; + PortId input_port; +}; + +struct LinkOptions { + bool passive = false; + bool linger = false; +}; + +struct RuleMatch { + std::string application_name; + std::string process_binary; + std::string media_role; +}; + +struct RouteRule { + RuleMatch match; + std::string target_node; +}; + +class Client { + public: + Client(const Client&) = delete; + Client& operator=(const Client&) = delete; + Client(Client&&) noexcept; + Client& operator=(Client&&) noexcept; + ~Client(); + + static Result> Create(const ConnectionOptions& options); + + Status Shutdown(); + + Result> ListNodes(); + Result> ListPorts(NodeId node); + Result> ListLinks(); + + Result CreateVirtualSink(std::string_view name, + const VirtualNodeOptions& options = VirtualNodeOptions{}); + Result CreateVirtualSource(std::string_view name, + const VirtualNodeOptions& options = VirtualNodeOptions{}); + Status RemoveNode(NodeId node); + + Result CreateLink(PortId output, PortId input, const LinkOptions& options); + Status RemoveLink(LinkId link); + + Result AddRouteRule(const RouteRule& rule); + Status RemoveRouteRule(RuleId id); + + Status SaveConfig(std::string_view path); + Status LoadConfig(std::string_view path); + +#ifdef WARPPIPE_TESTING + Status Test_InsertNode(const NodeInfo& node); + Status Test_InsertPort(const PortInfo& port); + Status Test_InsertLink(const Link& link); + Status Test_RemoveGlobal(uint32_t id); + Status Test_ForceDisconnect(); +#endif + + private: + struct Impl; + explicit Client(std::unique_ptr impl); + + std::unique_ptr impl_; +}; + +} // namespace warppipe diff --git a/perf/README.md b/perf/README.md new file mode 100644 index 0000000..ce3d0a4 --- /dev/null +++ b/perf/README.md @@ -0,0 +1,33 @@ +# Performance tests + +Milestone 0 perf test instructions are tracked in docs/milestone-0.md. + +## Build + +``` +cmake -S . -B build +cmake --build build +``` + +## Run + +Create/destroy microbenchmark (milestone 0/2): +``` +./build/warppipe_perf --mode create-destroy --count 200 --type sink +./build/warppipe_perf --mode create-destroy --count 100 --type both +``` + +Registry snapshot + add/remove events (milestone 1): +``` +./build/warppipe_perf --mode registry --count 1000 --events 100 +``` + +Optional format and loopback: +``` +./build/warppipe_perf --mode create-destroy --count 200 --type sink --rate 48000 --channels 2 +./build/warppipe_perf --mode create-destroy --count 200 --type sink --target "some-node-name" +``` + +Planned coverage: +- Microbenchmark: connect -> create N -> destroy N +- Target: subsecond for N=200 on warm PipeWire connection diff --git a/perf/warppipe_perf.cpp b/perf/warppipe_perf.cpp new file mode 100644 index 0000000..985ed86 --- /dev/null +++ b/perf/warppipe_perf.cpp @@ -0,0 +1,231 @@ +#include +#include +#include +#include +#include +#include +#include + +#include + +#include + +namespace { + +struct Options { + std::string mode = "create-destroy"; + std::string type = "sink"; + std::string target; + uint32_t count = 200; + uint32_t events = 100; + uint32_t rate = 48000; + uint32_t channels = 2; +}; + +bool ParseUInt(const char* value, uint32_t* out) { + if (!value || !out) { + return false; + } + char* end = nullptr; + unsigned long parsed = std::strtoul(value, &end, 10); + if (!end || end == value) { + return false; + } + *out = static_cast(parsed); + return true; +} + +void PrintUsage() { + std::cout << "warppipe_perf usage:\n" + << " --mode create-destroy|registry\n" + << " --type sink|source|both\n" + << " --count N (default 200, per-type when --type both)\n" + << " --events N (registry mode, default 100)\n" + << " --rate N (default 48000)\n" + << " --channels N (default 2)\n" + << " --target (loopback target, optional)\n"; +} + +bool ParseArgs(int argc, char* argv[], Options* options) { + for (int i = 1; i < argc; ++i) { + std::string arg = argv[i]; + if (arg == "--mode" && i + 1 < argc) { + options->mode = argv[++i]; + } else if (arg == "--type" && i + 1 < argc) { + options->type = argv[++i]; + } else if (arg == "--count" && i + 1 < argc) { + if (!ParseUInt(argv[++i], &options->count)) { + return false; + } + } else if (arg == "--events" && i + 1 < argc) { + if (!ParseUInt(argv[++i], &options->events)) { + return false; + } + } else if (arg == "--rate" && i + 1 < argc) { + if (!ParseUInt(argv[++i], &options->rate)) { + return false; + } + } else if (arg == "--channels" && i + 1 < argc) { + if (!ParseUInt(argv[++i], &options->channels)) { + return false; + } + } else if (arg == "--target" && i + 1 < argc) { + options->target = argv[++i]; + } else if (arg == "--help" || arg == "-h") { + return false; + } else { + return false; + } + } + if (options->type != "sink" && options->type != "source" && options->type != "both") { + return false; + } + if (options->mode != "create-destroy" && options->mode != "registry") { + return false; + } + return true; +} + +std::string Prefix() { + return "warppipe-perf-" + std::to_string(static_cast(getpid())); +} + +double ToMillis(std::chrono::steady_clock::duration duration) { + return std::chrono::duration_cast>(duration).count(); +} + +} // namespace + +int main(int argc, char* argv[]) { + Options options; + if (!ParseArgs(argc, argv, &options)) { + PrintUsage(); + return 2; + } + + warppipe::ConnectionOptions connection; + connection.application_name = "warppipe-perf"; + auto client = warppipe::Client::Create(connection); + if (!client.ok()) { + std::cerr << "warppipe_perf: failed to connect: " << client.status.message << "\n"; + return 1; + } + + warppipe::VirtualNodeOptions node_options; + node_options.format.rate = options.rate; + node_options.format.channels = options.channels; + node_options.display_name = "warppipe-perf"; + node_options.group = "warppipe-perf"; + if (!options.target.empty()) { + node_options.behavior = warppipe::VirtualBehavior::kLoopback; + node_options.target_node = options.target; + } + + const bool is_source = options.type == "source"; + const bool is_both = options.type == "both"; + const std::string prefix = Prefix(); + + if (options.mode == "create-destroy") { + std::vector sink_nodes; + std::vector source_nodes; + sink_nodes.reserve(options.count); + if (is_both) { + source_nodes.reserve(options.count); + } + + auto start = std::chrono::steady_clock::now(); + if (!is_source || is_both) { + for (uint32_t i = 0; i < options.count; ++i) { + std::string name = prefix + "-sink-" + std::to_string(i); + warppipe::Result sink_result = + client.value->CreateVirtualSink(name, node_options); + if (!sink_result.ok()) { + std::cerr << "create failed at " << i << ": " << sink_result.status.message << "\n"; + break; + } + sink_nodes.push_back(sink_result.value.node); + } + } + if (is_source || is_both) { + for (uint32_t i = 0; i < options.count; ++i) { + std::string name = prefix + "-source-" + std::to_string(i); + warppipe::Result source_result = + client.value->CreateVirtualSource(name, node_options); + if (!source_result.ok()) { + std::cerr << "create failed at " << i << ": " << source_result.status.message << "\n"; + break; + } + source_nodes.push_back(source_result.value.node); + } + } + auto created = std::chrono::steady_clock::now(); + + for (const auto& node : sink_nodes) { + client.value->RemoveNode(node); + } + for (const auto& node : source_nodes) { + client.value->RemoveNode(node); + } + auto destroyed = std::chrono::steady_clock::now(); + + const double create_ms = ToMillis(created - start); + const double destroy_ms = ToMillis(destroyed - created); + const double total_ms = ToMillis(destroyed - start); + const double ops = static_cast(sink_nodes.size() + source_nodes.size()); + std::cout << "create_count=" << static_cast(ops) << "\n" + << "create_ms=" << std::fixed << std::setprecision(2) << create_ms << "\n" + << "destroy_ms=" << destroy_ms << "\n" + << "total_ms=" << total_ms << "\n"; + if (total_ms > 0.0) { + std::cout << "ops_per_sec=" << (ops / (total_ms / 1000.0)) << "\n"; + } + return 0; + } + + if (options.mode == "registry") { + std::vector nodes; + nodes.reserve(options.count); + for (uint32_t i = 0; i < options.count; ++i) { + std::string name = prefix + "-node-" + std::to_string(i); + auto result = client.value->CreateVirtualSink(name, node_options); + if (!result.ok()) { + std::cerr << "create failed at " << i << ": " << result.status.message << "\n"; + break; + } + nodes.push_back(result.value.node); + } + + auto list_start = std::chrono::steady_clock::now(); + auto listed = client.value->ListNodes(); + auto list_end = std::chrono::steady_clock::now(); + if (!listed.ok()) { + std::cerr << "ListNodes failed: " << listed.status.message << "\n"; + } + + auto events_start = std::chrono::steady_clock::now(); + for (uint32_t i = 0; i < options.events; ++i) { + std::string name = prefix + "-event-" + std::to_string(i); + auto result = client.value->CreateVirtualSink(name, node_options); + if (!result.ok()) { + break; + } + client.value->RemoveNode(result.value.node); + } + auto events_end = std::chrono::steady_clock::now(); + + for (const auto& node : nodes) { + client.value->RemoveNode(node); + } + + const double list_ms = ToMillis(list_end - list_start); + const double events_ms = ToMillis(events_end - events_start); + std::cout << "registry_nodes=" << nodes.size() << "\n" + << "list_ms=" << std::fixed << std::setprecision(2) << list_ms << "\n" + << "event_ops=" << options.events << "\n" + << "event_ms=" << events_ms << "\n"; + return 0; + } + + PrintUsage(); + return 2; +} diff --git a/src/warppipe.cpp b/src/warppipe.cpp new file mode 100644 index 0000000..085b2ac --- /dev/null +++ b/src/warppipe.cpp @@ -0,0 +1,862 @@ +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include + +#include +#include + +#include + +namespace warppipe { +namespace { + +constexpr int kSyncWaitSeconds = 2; +constexpr uint32_t kDefaultRate = 48000; +constexpr uint32_t kDefaultChannels = 2; + +const char* SafeLookup(const spa_dict* dict, const char* key) { + if (!dict || !key) { + return nullptr; + } + return spa_dict_lookup(dict, key); +} + +std::string LookupString(const spa_dict* dict, const char* key) { + const char* value = SafeLookup(dict, key); + return value ? std::string(value) : std::string(); +} + +bool ParseUint32(const char* value, uint32_t* out) { + if (!value || !out) { + return false; + } + char* end = nullptr; + unsigned long parsed = std::strtoul(value, &end, 10); + if (!end || end == value) { + return false; + } + *out = static_cast(parsed); + return true; +} + +bool IsNodeType(const char* type) { + return type && spa_streq(type, PW_TYPE_INTERFACE_Node); +} + +bool IsPortType(const char* type) { + return type && spa_streq(type, PW_TYPE_INTERFACE_Port); +} + +bool IsLinkType(const char* type) { + return type && spa_streq(type, PW_TYPE_INTERFACE_Link); +} + +struct StreamData { + pw_stream* stream = nullptr; + spa_hook listener{}; + pw_thread_loop* loop = nullptr; + bool is_source = false; + bool loopback = false; + std::string target_node; + std::string name; + bool ready = false; + bool failed = false; + std::string error; + uint32_t node_id = SPA_ID_INVALID; + uint32_t channels = kDefaultChannels; + uint32_t rate = kDefaultRate; +}; + +void StreamProcess(void* data) { + auto* stream_data = static_cast(data); + if (!stream_data || !stream_data->stream) { + return; + } + + if (!stream_data->is_source) { + struct pw_buffer* buffer = nullptr; + while ((buffer = pw_stream_dequeue_buffer(stream_data->stream)) != nullptr) { + pw_stream_queue_buffer(stream_data->stream, buffer); + } + return; + } + + struct pw_buffer* buffer = nullptr; + while ((buffer = pw_stream_dequeue_buffer(stream_data->stream)) != nullptr) { + struct spa_buffer* spa_buffer = buffer->buffer; + if (!spa_buffer) { + pw_stream_queue_buffer(stream_data->stream, buffer); + continue; + } + const uint32_t stride = sizeof(float) * stream_data->channels; + for (uint32_t i = 0; i < spa_buffer->n_datas; ++i) { + struct spa_data* data_entry = &spa_buffer->datas[i]; + if (!data_entry->data || !data_entry->chunk) { + continue; + } + std::memset(data_entry->data, 0, data_entry->maxsize); + uint32_t frames = stride > 0 ? data_entry->maxsize / stride : 0; + if (buffer->requested > 0 && buffer->requested < frames) { + frames = buffer->requested; + } + data_entry->chunk->offset = 0; + data_entry->chunk->stride = stride; + data_entry->chunk->size = frames * stride; + } + pw_stream_queue_buffer(stream_data->stream, buffer); + } +} + +void StreamStateChanged(void* data, + enum pw_stream_state, + enum pw_stream_state state, + const char* error) { + auto* stream_data = static_cast(data); + if (!stream_data) { + return; + } + + if (state == PW_STREAM_STATE_ERROR) { + stream_data->failed = true; + if (error) { + stream_data->error = error; + } + } + + if (stream_data->stream) { + uint32_t node_id = pw_stream_get_node_id(stream_data->stream); + if (node_id != SPA_ID_INVALID) { + stream_data->node_id = node_id; + stream_data->ready = true; + } + } + + if (stream_data->loop) { + pw_thread_loop_signal(stream_data->loop, false); + } +} + +static const pw_stream_events kStreamEvents = { + PW_VERSION_STREAM_EVENTS, + .state_changed = StreamStateChanged, + .process = StreamProcess, +}; + +} // namespace + +Status Status::Ok() { + return Status{}; +} + +Status Status::Error(StatusCode code, std::string message) { + Status status; + status.code = code; + status.message = std::move(message); + return status; +} + +bool Status::ok() const { + return code == StatusCode::kOk; +} + +struct Client::Impl { + ConnectionOptions options; + pw_thread_loop* thread_loop = nullptr; + pw_context* context = nullptr; + pw_core* core = nullptr; + pw_registry* registry = nullptr; + spa_hook core_listener{}; + spa_hook registry_listener{}; + bool core_listener_attached = false; + bool registry_listener_attached = false; + bool connected = false; + uint32_t pending_sync = 0; + uint32_t last_sync = 0; + Status last_error = Status::Ok(); + std::mutex cache_mutex; + std::unordered_map nodes; + std::unordered_map ports; + std::unordered_map links; + std::unordered_map> virtual_streams; + + Status ConnectLocked(); + void DisconnectLocked(); + Status SyncLocked(); + void ClearCache(); + Status EnsureConnected(); + Result CreateVirtualStreamLocked(std::string_view name, + bool is_source, + const VirtualNodeOptions& options); + + static void RegistryGlobal(void* data, + uint32_t id, + uint32_t permissions, + const char* type, + uint32_t version, + const spa_dict* props); + static void RegistryGlobalRemove(void* data, uint32_t id); + static void CoreDone(void* data, uint32_t id, int seq); + static void CoreError(void* data, uint32_t id, int seq, int res, const char* message); +}; + +void Client::Impl::RegistryGlobal(void* data, + uint32_t id, + uint32_t, + const char* type, + uint32_t, + const spa_dict* props) { + auto* impl = static_cast(data); + if (!impl) { + return; + } + + std::lock_guard lock(impl->cache_mutex); + + if (IsNodeType(type)) { + NodeInfo info; + info.id = NodeId{id}; + info.name = LookupString(props, PW_KEY_NODE_NAME); + info.media_class = LookupString(props, PW_KEY_MEDIA_CLASS); + impl->nodes[id] = std::move(info); + return; + } + + if (IsPortType(type)) { + PortInfo info; + info.id = PortId{id}; + info.name = LookupString(props, PW_KEY_PORT_NAME); + info.is_input = false; + uint32_t node_id = 0; + if (ParseUint32(SafeLookup(props, PW_KEY_NODE_ID), &node_id)) { + info.node = NodeId{node_id}; + } + const char* direction = SafeLookup(props, PW_KEY_PORT_DIRECTION); + if (direction && spa_streq(direction, "in")) { + info.is_input = true; + } + impl->ports[id] = std::move(info); + return; + } + + if (IsLinkType(type)) { + Link info; + info.id = LinkId{id}; + uint32_t out_port = 0; + uint32_t in_port = 0; + if (ParseUint32(SafeLookup(props, PW_KEY_LINK_OUTPUT_PORT), &out_port)) { + info.output_port = PortId{out_port}; + } + if (ParseUint32(SafeLookup(props, PW_KEY_LINK_INPUT_PORT), &in_port)) { + info.input_port = PortId{in_port}; + } + impl->links[id] = std::move(info); + } +} + +void Client::Impl::RegistryGlobalRemove(void* data, uint32_t id) { + auto* impl = static_cast(data); + if (!impl) { + return; + } + + std::lock_guard lock(impl->cache_mutex); + impl->virtual_streams.erase(id); + auto node_it = impl->nodes.find(id); + if (node_it != impl->nodes.end()) { + impl->nodes.erase(node_it); + std::vector removed_ports; + for (auto it = impl->ports.begin(); it != impl->ports.end();) { + if (it->second.node.value == id) { + removed_ports.push_back(it->first); + it = impl->ports.erase(it); + } else { + ++it; + } + } + for (auto it = impl->links.begin(); it != impl->links.end();) { + bool remove_link = false; + for (uint32_t port_id : removed_ports) { + if (it->second.input_port.value == port_id || it->second.output_port.value == port_id) { + remove_link = true; + break; + } + } + if (remove_link) { + it = impl->links.erase(it); + } else { + ++it; + } + } + return; + } + + if (impl->ports.erase(id) > 0) { + for (auto it = impl->links.begin(); it != impl->links.end();) { + if (it->second.input_port.value == id || it->second.output_port.value == id) { + it = impl->links.erase(it); + } else { + ++it; + } + } + return; + } + + impl->links.erase(id); +} + +void Client::Impl::CoreDone(void* data, uint32_t, int seq) { + auto* impl = static_cast(data); + if (!impl || !impl->thread_loop) { + return; + } + if (seq >= static_cast(impl->pending_sync)) { + impl->last_sync = static_cast(seq); + pw_thread_loop_signal(impl->thread_loop, false); + } +} + +void Client::Impl::CoreError(void* data, uint32_t, int, int res, const char* message) { + auto* impl = static_cast(data); + if (!impl) { + return; + } + impl->connected = false; + impl->last_error = Status::Error(StatusCode::kUnavailable, + message ? message : spa_strerror(res)); + if (impl->thread_loop) { + pw_thread_loop_signal(impl->thread_loop, false); + } +} + +Status Client::Impl::SyncLocked() { + if (!core || !thread_loop) { + return Status::Error(StatusCode::kUnavailable, "pipewire core not connected"); + } + pending_sync = pw_core_sync(core, PW_ID_CORE, 0); + if (pending_sync == SPA_ID_INVALID) { + return Status::Error(StatusCode::kInternal, "failed to sync with pipewire core"); + } + while (last_sync < pending_sync) { + int wait_res = pw_thread_loop_timed_wait(thread_loop, kSyncWaitSeconds); + if (wait_res == -ETIMEDOUT) { + return Status::Error(StatusCode::kTimeout, "timeout waiting for pipewire sync"); + } + } + return Status::Ok(); +} + +void Client::Impl::ClearCache() { + std::lock_guard lock(cache_mutex); + nodes.clear(); + ports.clear(); + links.clear(); +} + +Status Client::Impl::EnsureConnected() { + if (connected) { + return Status::Ok(); + } + if (!options.autoconnect) { + return Status::Error(StatusCode::kUnavailable, "pipewire core disconnected"); + } + if (!thread_loop) { + return Status::Error(StatusCode::kUnavailable, "pipewire thread loop not running"); + } + pw_thread_loop_lock(thread_loop); + Status status = ConnectLocked(); + pw_thread_loop_unlock(thread_loop); + return status; +} + +Result Client::Impl::CreateVirtualStreamLocked(std::string_view name, + bool is_source, + const VirtualNodeOptions& options) { + if (!core || !thread_loop) { + return {Status::Error(StatusCode::kUnavailable, "pipewire core not connected"), 0}; + } + + if (options.format.rate == 0 || options.format.channels == 0) { + return {Status::Error(StatusCode::kInvalidArgument, "invalid audio format"), 0}; + } + if (options.behavior == VirtualBehavior::kLoopback && !options.target_node) { + return {Status::Error(StatusCode::kInvalidArgument, "loopback requires target node"), 0}; + } + if (options.media_class_override && options.media_class_override->empty()) { + return {Status::Error(StatusCode::kInvalidArgument, "missing media class"), 0}; + } + + std::string stream_name = name.empty() ? (is_source ? "warppipe-source" : "warppipe-sink") + : std::string(name); + const char* media_class = is_source ? "Audio/Source" : "Audio/Sink"; + std::string media_class_value = options.media_class_override ? *options.media_class_override + : std::string(media_class); + if (media_class_value.empty()) { + return {Status::Error(StatusCode::kInvalidArgument, "missing media class"), 0}; + } + const char* media_category = is_source ? "Capture" : "Playback"; + + std::string display_name = options.display_name.empty() ? stream_name : options.display_name; + const char* node_group = options.group.empty() ? nullptr : options.group.c_str(); + + { + std::lock_guard lock(cache_mutex); + for (const auto& entry : nodes) { + if (entry.second.name == stream_name) { + return {Status::Error(StatusCode::kInvalidArgument, "duplicate node name"), 0}; + } + } + for (const auto& entry : virtual_streams) { + if (entry.second && entry.second->name == stream_name) { + return {Status::Error(StatusCode::kInvalidArgument, "duplicate node name"), 0}; + } + } + if (options.behavior == VirtualBehavior::kLoopback && options.target_node) { + bool found_target = false; + for (const auto& entry : nodes) { + if (entry.second.name == *options.target_node) { + found_target = true; + break; + } + } + if (!found_target) { + return {Status::Error(StatusCode::kNotFound, "target node not found"), 0}; + } + } + } + + pw_properties* props = pw_properties_new(PW_KEY_MEDIA_TYPE, "Audio", + PW_KEY_MEDIA_CATEGORY, media_category, + PW_KEY_MEDIA_ROLE, "Music", + PW_KEY_MEDIA_CLASS, media_class_value.c_str(), + PW_KEY_NODE_NAME, stream_name.c_str(), + PW_KEY_MEDIA_NAME, display_name.c_str(), + PW_KEY_NODE_DESCRIPTION, display_name.c_str(), + PW_KEY_NODE_VIRTUAL, "true", + nullptr); + if (!props) { + return {Status::Error(StatusCode::kInternal, "failed to allocate stream properties"), 0}; + } + if (node_group) { + pw_properties_set(props, PW_KEY_NODE_GROUP, node_group); + } + if (options.behavior == VirtualBehavior::kLoopback && options.target_node) { + pw_properties_set(props, PW_KEY_TARGET_OBJECT, options.target_node->c_str()); + } + + pw_stream* stream = pw_stream_new(core, stream_name.c_str(), props); + if (!stream) { + return {Status::Error(StatusCode::kUnavailable, "failed to create pipewire stream"), 0}; + } + + auto stream_data = std::make_unique(); + stream_data->stream = stream; + stream_data->loop = thread_loop; + stream_data->is_source = is_source; + stream_data->loopback = options.behavior == VirtualBehavior::kLoopback; + if (options.target_node) { + stream_data->target_node = *options.target_node; + } + stream_data->name = stream_name; + if (options.format.rate != 0) { + stream_data->rate = options.format.rate; + } + if (options.format.channels != 0) { + stream_data->channels = options.format.channels; + } + + pw_stream_add_listener(stream, &stream_data->listener, &kStreamEvents, stream_data.get()); + + const struct spa_pod* params[1]; + uint8_t buffer[1024]; + spa_pod_builder builder = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer)); + spa_audio_info_raw audio_info{}; + audio_info.format = SPA_AUDIO_FORMAT_F32; + audio_info.rate = stream_data->rate; + audio_info.channels = stream_data->channels; + params[0] = spa_format_audio_raw_build(&builder, SPA_PARAM_EnumFormat, &audio_info); + + enum pw_direction direction = is_source ? PW_DIRECTION_OUTPUT : PW_DIRECTION_INPUT; + enum pw_stream_flags flags = PW_STREAM_FLAG_MAP_BUFFERS; + if (options.behavior == VirtualBehavior::kLoopback && options.target_node) { + flags = static_cast(flags | PW_STREAM_FLAG_AUTOCONNECT); + } + int res = pw_stream_connect(stream, direction, PW_ID_ANY, flags, params, 1); + if (res < 0) { + pw_stream_destroy(stream); + return {Status::Error(StatusCode::kUnavailable, "failed to connect pipewire stream"), 0}; + } + + uint32_t node_id = pw_stream_get_node_id(stream); + int wait_attempts = 0; + while (node_id == SPA_ID_INVALID && !stream_data->failed && wait_attempts < 3) { + int wait_res = pw_thread_loop_timed_wait(thread_loop, kSyncWaitSeconds); + if (wait_res == -ETIMEDOUT) { + break; + } + node_id = stream_data->node_id; + ++wait_attempts; + } + + if (stream_data->failed) { + std::string error = stream_data->error.empty() ? "stream entered error state" : stream_data->error; + pw_stream_destroy(stream); + return {Status::Error(StatusCode::kUnavailable, std::move(error)), 0}; + } + + if (node_id == SPA_ID_INVALID) { + pw_stream_destroy(stream); + return {Status::Error(StatusCode::kTimeout, "timed out waiting for stream node id"), 0}; + } + + stream_data->node_id = node_id; + stream_data->ready = true; + virtual_streams.emplace(node_id, std::move(stream_data)); + return {Status::Ok(), node_id}; +} + +Status Client::Impl::ConnectLocked() { + if (connected) { + return Status::Ok(); + } + if (!thread_loop) { + return Status::Error(StatusCode::kInternal, "thread loop not initialized"); + } + pw_loop* loop = pw_thread_loop_get_loop(thread_loop); + if (!context) { + context = pw_context_new(loop, nullptr, 0); + if (!context) { + return Status::Error(StatusCode::kUnavailable, "failed to create pipewire context"); + } + } + + pw_properties* props = pw_properties_new(PW_KEY_APP_NAME, options.application_name.c_str(), nullptr); + if (options.remote_name && !options.remote_name->empty()) { + pw_properties_set(props, PW_KEY_REMOTE_NAME, options.remote_name->c_str()); + } + core = pw_context_connect(context, props, 0); + + if (!core) { + return Status::Error(StatusCode::kUnavailable, "failed to connect to pipewire core"); + } + + static const pw_core_events core_events = { + PW_VERSION_CORE_EVENTS, + .done = CoreDone, + .error = CoreError, + }; + pw_core_add_listener(core, &core_listener, &core_events, this); + core_listener_attached = true; + + registry = pw_core_get_registry(core, PW_VERSION_REGISTRY, 0); + if (!registry) { + return Status::Error(StatusCode::kUnavailable, "failed to get pipewire registry"); + } + + static const pw_registry_events registry_events = { + PW_VERSION_REGISTRY_EVENTS, + .global = RegistryGlobal, + .global_remove = RegistryGlobalRemove, + }; + pw_registry_add_listener(registry, ®istry_listener, ®istry_events, this); + registry_listener_attached = true; + connected = true; + last_error = Status::Ok(); + ClearCache(); + return SyncLocked(); +} + +void Client::Impl::DisconnectLocked() { + for (auto& entry : virtual_streams) { + StreamData* stream_data = entry.second.get(); + if (stream_data && stream_data->stream) { + pw_stream_disconnect(stream_data->stream); + pw_stream_destroy(stream_data->stream); + stream_data->stream = nullptr; + } + } + virtual_streams.clear(); + if (registry_listener_attached) { + spa_hook_remove(®istry_listener); + registry_listener_attached = false; + } + if (core_listener_attached) { + spa_hook_remove(&core_listener); + core_listener_attached = false; + } + if (registry) { + pw_proxy_destroy(reinterpret_cast(registry)); + registry = nullptr; + } + if (core) { + pw_core_disconnect(core); + core = nullptr; + } + connected = false; + ClearCache(); +} + +Client::Client(std::unique_ptr impl) : impl_(std::move(impl)) {} + +Client::Client(Client&&) noexcept = default; +Client& Client::operator=(Client&&) noexcept = default; + +Client::~Client() { + if (impl_) { + Shutdown(); + } +} + +Result> Client::Create(const ConnectionOptions& options) { + pw_init(nullptr, nullptr); + + if (options.threading == ThreadingMode::kCallerThread) { + return {Status::Error(StatusCode::kNotImplemented, "caller thread mode not implemented"), {}}; + } + + auto impl = std::make_unique(); + impl->options = options; + impl->thread_loop = pw_thread_loop_new("warppipe", nullptr); + if (!impl->thread_loop) { + return {Status::Error(StatusCode::kUnavailable, "failed to create pipewire thread loop"), {}}; + } + + if (pw_thread_loop_start(impl->thread_loop) != 0) { + pw_thread_loop_destroy(impl->thread_loop); + impl->thread_loop = nullptr; + return {Status::Error(StatusCode::kUnavailable, "failed to start pipewire thread loop"), {}}; + } + + pw_thread_loop_lock(impl->thread_loop); + Status status = impl->ConnectLocked(); + pw_thread_loop_unlock(impl->thread_loop); + if (!status.ok()) { + pw_thread_loop_lock(impl->thread_loop); + impl->DisconnectLocked(); + if (impl->context) { + pw_context_destroy(impl->context); + impl->context = nullptr; + } + pw_thread_loop_unlock(impl->thread_loop); + pw_thread_loop_stop(impl->thread_loop); + pw_thread_loop_destroy(impl->thread_loop); + impl->thread_loop = nullptr; + return {status, {}}; + } + + return {Status::Ok(), std::unique_ptr(new Client(std::move(impl)))}; +} + +Status Client::Shutdown() { + if (!impl_) { + return Status::Ok(); + } + if (impl_->thread_loop) { + pw_thread_loop_lock(impl_->thread_loop); + impl_->DisconnectLocked(); + if (impl_->context) { + pw_context_destroy(impl_->context); + impl_->context = nullptr; + } + pw_thread_loop_unlock(impl_->thread_loop); + pw_thread_loop_stop(impl_->thread_loop); + pw_thread_loop_destroy(impl_->thread_loop); + impl_->thread_loop = nullptr; + } + pw_deinit(); + return Status::Ok(); +} + +Result> Client::ListNodes() { + Status status = impl_->EnsureConnected(); + if (!status.ok()) { + return {status, {}}; + } + + std::lock_guard lock(impl_->cache_mutex); + std::vector items; + items.reserve(impl_->nodes.size()); + for (const auto& entry : impl_->nodes) { + items.push_back(entry.second); + } + return {Status::Ok(), std::move(items)}; +} + +Result> Client::ListPorts(NodeId node) { + Status status = impl_->EnsureConnected(); + if (!status.ok()) { + return {status, {}}; + } + + std::lock_guard lock(impl_->cache_mutex); + std::vector items; + for (const auto& entry : impl_->ports) { + if (entry.second.node.value == node.value) { + items.push_back(entry.second); + } + } + return {Status::Ok(), std::move(items)}; +} + +Result> Client::ListLinks() { + Status status = impl_->EnsureConnected(); + if (!status.ok()) { + return {status, {}}; + } + + std::lock_guard lock(impl_->cache_mutex); + std::vector items; + items.reserve(impl_->links.size()); + for (const auto& entry : impl_->links) { + items.push_back(entry.second); + } + return {Status::Ok(), std::move(items)}; +} + +Result Client::CreateVirtualSink(std::string_view name, + const VirtualNodeOptions& options) { + Status status = impl_->EnsureConnected(); + if (!status.ok()) { + return {status, {}}; + } + + std::string name_value = name.empty() ? std::string() : std::string(name); + + pw_thread_loop_lock(impl_->thread_loop); + auto result = impl_->CreateVirtualStreamLocked(name_value, false, options); + pw_thread_loop_unlock(impl_->thread_loop); + if (!result.ok()) { + return {result.status, {}}; + } + + VirtualSink sink; + sink.node = NodeId{result.value}; + sink.name = name_value.empty() ? "warppipe-sink" : name_value; + return {Status::Ok(), std::move(sink)}; +} + +Result Client::CreateVirtualSource(std::string_view name, + const VirtualNodeOptions& options) { + Status status = impl_->EnsureConnected(); + if (!status.ok()) { + return {status, {}}; + } + + std::string name_value = name.empty() ? std::string() : std::string(name); + + pw_thread_loop_lock(impl_->thread_loop); + auto result = impl_->CreateVirtualStreamLocked(name_value, true, options); + pw_thread_loop_unlock(impl_->thread_loop); + if (!result.ok()) { + return {result.status, {}}; + } + + VirtualSource source; + source.node = NodeId{result.value}; + source.name = name_value.empty() ? "warppipe-source" : name_value; + return {Status::Ok(), std::move(source)}; +} + +Status Client::RemoveNode(NodeId node) { + Status status = impl_->EnsureConnected(); + if (!status.ok()) { + return status; + } + + pw_thread_loop_lock(impl_->thread_loop); + auto it = impl_->virtual_streams.find(node.value); + if (it == impl_->virtual_streams.end()) { + pw_thread_loop_unlock(impl_->thread_loop); + return Status::Error(StatusCode::kNotFound, "node not managed by warppipe"); + } + StreamData* stream_data = it->second.get(); + if (stream_data && stream_data->stream) { + pw_stream_disconnect(stream_data->stream); + pw_stream_destroy(stream_data->stream); + stream_data->stream = nullptr; + } + impl_->virtual_streams.erase(it); + pw_thread_loop_unlock(impl_->thread_loop); + return Status::Ok(); +} + +Result Client::CreateLink(PortId, PortId, const LinkOptions&) { + return {Status::Error(StatusCode::kNotImplemented, "create link not implemented"), {}}; +} + +Status Client::RemoveLink(LinkId) { + return Status::Error(StatusCode::kNotImplemented, "remove link not implemented"); +} + +Result Client::AddRouteRule(const RouteRule&) { + return {Status::Error(StatusCode::kNotImplemented, "add route rule not implemented"), {}}; +} + +Status Client::RemoveRouteRule(RuleId) { + return Status::Error(StatusCode::kNotImplemented, "remove route rule not implemented"); +} + +Status Client::SaveConfig(std::string_view) { + return Status::Error(StatusCode::kNotImplemented, "save config not implemented"); +} + +Status Client::LoadConfig(std::string_view) { + return Status::Error(StatusCode::kNotImplemented, "load config not implemented"); +} + +#ifdef WARPPIPE_TESTING +Status Client::Test_InsertNode(const NodeInfo& node) { + if (!impl_) { + return Status::Error(StatusCode::kInternal, "client not initialized"); + } + std::lock_guard lock(impl_->cache_mutex); + impl_->nodes[node.id.value] = node; + return Status::Ok(); +} + +Status Client::Test_InsertPort(const PortInfo& port) { + if (!impl_) { + return Status::Error(StatusCode::kInternal, "client not initialized"); + } + std::lock_guard lock(impl_->cache_mutex); + impl_->ports[port.id.value] = port; + return Status::Ok(); +} + +Status Client::Test_InsertLink(const Link& link) { + if (!impl_) { + return Status::Error(StatusCode::kInternal, "client not initialized"); + } + std::lock_guard lock(impl_->cache_mutex); + impl_->links[link.id.value] = link; + return Status::Ok(); +} + +Status Client::Test_RemoveGlobal(uint32_t id) { + if (!impl_) { + return Status::Error(StatusCode::kInternal, "client not initialized"); + } + Client::Impl::RegistryGlobalRemove(impl_.get(), id); + return Status::Ok(); +} + +Status Client::Test_ForceDisconnect() { + if (!impl_ || !impl_->thread_loop) { + return Status::Error(StatusCode::kInternal, "thread loop not initialized"); + } + pw_thread_loop_lock(impl_->thread_loop); + impl_->DisconnectLocked(); + pw_thread_loop_unlock(impl_->thread_loop); + return Status::Ok(); +} +#endif + +} // namespace warppipe diff --git a/tests/README.md b/tests/README.md new file mode 100644 index 0000000..9e23053 --- /dev/null +++ b/tests/README.md @@ -0,0 +1,9 @@ +# Tests + +Milestone 0 test instructions are tracked in docs/milestone-0.md. + +Planned coverage: +- Missing PipeWire daemon +- Missing link-factory module +- Missing metadata module +- Invalid media class in virtual node creation diff --git a/tests/warppipe_tests.cpp b/tests/warppipe_tests.cpp new file mode 100644 index 0000000..e7cec81 --- /dev/null +++ b/tests/warppipe_tests.cpp @@ -0,0 +1,218 @@ +#include + +#include + +namespace { + +warppipe::ConnectionOptions DefaultOptions() { + warppipe::ConnectionOptions options; + options.threading = warppipe::ThreadingMode::kThreadLoop; + options.autoconnect = true; + options.application_name = "warppipe-tests"; + return options; +} + +bool ContainsNode(const std::vector& nodes, uint32_t id) { + for (const auto& node : nodes) { + if (node.id.value == id) { + return true; + } + } + return false; +} + +bool ContainsLink(const std::vector& links, uint32_t id) { + for (const auto& link : links) { + if (link.id.value == id) { + return true; + } + } + return false; +} + +} // namespace + +TEST_CASE("caller thread mode is not implemented") { + warppipe::ConnectionOptions options = DefaultOptions(); + options.threading = warppipe::ThreadingMode::kCallerThread; + auto result = warppipe::Client::Create(options); + REQUIRE_FALSE(result.ok()); + REQUIRE(result.status.code == warppipe::StatusCode::kNotImplemented); +} + +TEST_CASE("connects or reports unavailable") { + warppipe::ConnectionOptions connection_options = DefaultOptions(); + auto result = warppipe::Client::Create(connection_options); + if (!result.ok()) { + REQUIRE(result.status.code == warppipe::StatusCode::kUnavailable); + return; + } + + auto nodes = result.value->ListNodes(); + REQUIRE(nodes.ok()); +} + +TEST_CASE("invalid remote name fails") { + warppipe::ConnectionOptions options = DefaultOptions(); + options.remote_name = "warppipe-test-missing-remote"; + auto result = warppipe::Client::Create(options); + REQUIRE_FALSE(result.ok()); + REQUIRE(result.status.code == warppipe::StatusCode::kUnavailable); +} + +TEST_CASE("create and remove virtual sink/source when available") { + warppipe::ConnectionOptions options = DefaultOptions(); + auto result = warppipe::Client::Create(options); + if (!result.ok()) { + SUCCEED("PipeWire unavailable"); + return; + } + + warppipe::VirtualNodeOptions node_options; + node_options.display_name = "warppipe-test-virtual"; + node_options.group = "warppipe-test"; + node_options.format.rate = 44100; + node_options.format.channels = 2; + auto sink = result.value->CreateVirtualSink("warppipe-test-sink", node_options); + if (!sink.ok()) { + if (sink.status.code == warppipe::StatusCode::kUnavailable) { + SUCCEED("PipeWire unavailable"); + return; + } + REQUIRE(sink.ok()); + } + + auto source = result.value->CreateVirtualSource("warppipe-test-source", node_options); + if (!source.ok()) { + if (source.status.code == warppipe::StatusCode::kUnavailable) { + SUCCEED("PipeWire unavailable"); + return; + } + REQUIRE(source.ok()); + } + + REQUIRE(result.value->RemoveNode(sink.value.node).ok()); + REQUIRE(result.value->RemoveNode(source.value.node).ok()); +} + +TEST_CASE("missing media class returns invalid argument") { + auto result = warppipe::Client::Create(DefaultOptions()); + if (!result.ok()) { + SUCCEED("PipeWire unavailable"); + return; + } + + warppipe::VirtualNodeOptions options; + options.media_class_override = ""; + auto sink = result.value->CreateVirtualSink("warppipe-test-missing-class", options); + REQUIRE_FALSE(sink.ok()); + REQUIRE(sink.status.code == warppipe::StatusCode::kInvalidArgument); +} + +TEST_CASE("duplicate node name returns invalid argument") { + auto result = warppipe::Client::Create(DefaultOptions()); + if (!result.ok()) { + SUCCEED("PipeWire unavailable"); + return; + } + + auto first = result.value->CreateVirtualSink("warppipe-dup", warppipe::VirtualNodeOptions{}); + if (!first.ok()) { + if (first.status.code == warppipe::StatusCode::kUnavailable) { + SUCCEED("PipeWire unavailable"); + return; + } + REQUIRE(first.ok()); + } + + auto second = result.value->CreateVirtualSink("warppipe-dup", warppipe::VirtualNodeOptions{}); + REQUIRE_FALSE(second.ok()); + REQUIRE(second.status.code == warppipe::StatusCode::kInvalidArgument); + + REQUIRE(result.value->RemoveNode(first.value.node).ok()); +} + +TEST_CASE("loopback target missing returns not found") { + auto result = warppipe::Client::Create(DefaultOptions()); + if (!result.ok()) { + SUCCEED("PipeWire unavailable"); + return; + } + + warppipe::VirtualNodeOptions options; + options.behavior = warppipe::VirtualBehavior::kLoopback; + options.target_node = "warppipe-missing-target"; + + auto sink = result.value->CreateVirtualSink("warppipe-loopback", options); + REQUIRE_FALSE(sink.ok()); + REQUIRE(sink.status.code == warppipe::StatusCode::kNotFound); +} + +TEST_CASE("registry removal cleans up ports and links") { + auto result = warppipe::Client::Create(DefaultOptions()); + if (!result.ok()) { + SUCCEED("PipeWire unavailable"); + return; + } + + const uint32_t node_id = 500001; + const uint32_t out_port_id = 500002; + const uint32_t in_port_id = 500003; + const uint32_t link_id = 500004; + + warppipe::NodeInfo node; + node.id = warppipe::NodeId{node_id}; + node.name = "warppipe-test-node"; + node.media_class = "Audio/Sink"; + REQUIRE(result.value->Test_InsertNode(node).ok()); + + warppipe::PortInfo out_port; + out_port.id = warppipe::PortId{out_port_id}; + out_port.node = warppipe::NodeId{node_id}; + out_port.name = "output"; + out_port.is_input = false; + REQUIRE(result.value->Test_InsertPort(out_port).ok()); + + warppipe::PortInfo in_port; + in_port.id = warppipe::PortId{in_port_id}; + in_port.node = warppipe::NodeId{node_id}; + in_port.name = "input"; + in_port.is_input = true; + REQUIRE(result.value->Test_InsertPort(in_port).ok()); + + warppipe::Link link; + link.id = warppipe::LinkId{link_id}; + link.output_port = warppipe::PortId{out_port_id}; + link.input_port = warppipe::PortId{in_port_id}; + REQUIRE(result.value->Test_InsertLink(link).ok()); + + auto snapshot = result.value->ListNodes(); + REQUIRE(snapshot.ok()); + REQUIRE(ContainsNode(snapshot.value, node_id)); + + REQUIRE(result.value->Test_RemoveGlobal(node_id).ok()); + + auto ports = result.value->ListPorts(warppipe::NodeId{node_id}); + REQUIRE(ports.ok()); + REQUIRE(ports.value.empty()); + + auto links = result.value->ListLinks(); + REQUIRE(links.ok()); + REQUIRE_FALSE(ContainsLink(links.value, link_id)); +} + +TEST_CASE("autoconnect reconnects after forced disconnect") { + auto result = warppipe::Client::Create(DefaultOptions()); + if (!result.ok()) { + SUCCEED("PipeWire unavailable"); + return; + } + + auto nodes = result.value->ListNodes(); + REQUIRE(nodes.ok()); + + REQUIRE(result.value->Test_ForceDisconnect().ok()); + + auto nodes_after = result.value->ListNodes(); + REQUIRE(nodes_after.ok()); +}