#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace warppipe { namespace { constexpr int kSyncWaitSeconds = 2; constexpr uint32_t kDefaultRate = 48000; constexpr uint32_t kDefaultChannels = 2; const char* SafeLookup(const spa_dict* dict, const char* key) { if (!dict || !key) { return nullptr; } return spa_dict_lookup(dict, key); } std::string LookupString(const spa_dict* dict, const char* key) { const char* value = SafeLookup(dict, key); return value ? std::string(value) : std::string(); } bool ParseUint32(const char* value, uint32_t* out) { if (!value || !out) { return false; } char* end = nullptr; unsigned long parsed = std::strtoul(value, &end, 10); if (!end || end == value) { return false; } *out = static_cast(parsed); return true; } bool IsNodeType(const char* type) { return type && spa_streq(type, PW_TYPE_INTERFACE_Node); } bool IsPortType(const char* type) { return type && spa_streq(type, PW_TYPE_INTERFACE_Port); } bool IsLinkType(const char* type) { return type && spa_streq(type, PW_TYPE_INTERFACE_Link); } struct PendingAutoLink { uint32_t source_node_id = 0; std::string target_node_name; uint32_t rule_id = 0; RuleDirection direction = RuleDirection::kPlayback; }; bool MatchesRule(const NodeInfo& node, const RuleMatch& match) { bool any_field = false; if (!match.application_name.empty()) { any_field = true; if (node.application_name != match.application_name) { return false; } } if (!match.process_binary.empty()) { any_field = true; if (node.process_binary != match.process_binary) { return false; } } if (!match.media_role.empty()) { any_field = true; if (node.media_role != match.media_role) { return false; } } return any_field; } struct StreamData { pw_stream* stream = nullptr; pw_impl_module* module = nullptr; pw_proxy* proxy = nullptr; spa_hook listener{}; pw_thread_loop* loop = nullptr; bool is_source = false; bool loopback = false; bool linger = false; std::string target_node; std::string name; bool ready = false; bool failed = false; std::string error; uint32_t node_id = SPA_ID_INVALID; uint32_t channels = kDefaultChannels; uint32_t rate = kDefaultRate; }; struct LinkProxy { pw_proxy* proxy = nullptr; spa_hook listener{}; pw_thread_loop* loop = nullptr; bool done = false; bool failed = false; std::string error; uint32_t id = SPA_ID_INVALID; uint32_t output_port = 0; uint32_t input_port = 0; }; void LinkProxyBound(void* data, uint32_t global_id) { auto* link = static_cast(data); if (!link) { return; } link->id = global_id; link->done = true; if (link->loop) { pw_thread_loop_signal(link->loop, false); } } void LinkProxyRemoved(void* data) { auto* link = static_cast(data); if (!link) { return; } link->done = true; if (link->loop) { pw_thread_loop_signal(link->loop, false); } } void LinkProxyError(void* data, int, int res, const char* message) { auto* link = static_cast(data); if (!link) { return; } link->failed = true; link->error = message ? message : spa_strerror(res); if (link->loop) { pw_thread_loop_signal(link->loop, false); } } static const pw_proxy_events kLinkProxyEvents = { .version = PW_VERSION_PROXY_EVENTS, .bound = LinkProxyBound, .removed = LinkProxyRemoved, .error = LinkProxyError, }; void NodeProxyBound(void* data, uint32_t global_id) { auto* sd = static_cast(data); if (!sd) return; sd->node_id = global_id; sd->ready = true; if (sd->loop) { pw_thread_loop_signal(sd->loop, false); } } void NodeProxyRemoved(void* data) { auto* sd = static_cast(data); if (!sd) return; sd->ready = true; if (sd->loop) { pw_thread_loop_signal(sd->loop, false); } } void NodeProxyError(void* data, int, int res, const char* message) { auto* sd = static_cast(data); if (!sd) return; sd->failed = true; sd->error = message ? message : spa_strerror(res); if (sd->loop) { pw_thread_loop_signal(sd->loop, false); } } static const pw_proxy_events kNodeProxyEvents = { .version = PW_VERSION_PROXY_EVENTS, .bound = NodeProxyBound, .removed = NodeProxyRemoved, .error = NodeProxyError, }; void StreamProcess(void* data) { auto* stream_data = static_cast(data); if (!stream_data || !stream_data->stream) { return; } if (!stream_data->is_source) { struct pw_buffer* buffer = nullptr; while ((buffer = pw_stream_dequeue_buffer(stream_data->stream)) != nullptr) { pw_stream_queue_buffer(stream_data->stream, buffer); } return; } struct pw_buffer* buffer = nullptr; while ((buffer = pw_stream_dequeue_buffer(stream_data->stream)) != nullptr) { struct spa_buffer* spa_buffer = buffer->buffer; if (!spa_buffer) { pw_stream_queue_buffer(stream_data->stream, buffer); continue; } const uint32_t stride = sizeof(float) * stream_data->channels; for (uint32_t i = 0; i < spa_buffer->n_datas; ++i) { struct spa_data* data_entry = &spa_buffer->datas[i]; if (!data_entry->data || !data_entry->chunk) { continue; } std::memset(data_entry->data, 0, data_entry->maxsize); uint32_t frames = stride > 0 ? data_entry->maxsize / stride : 0; if (buffer->requested > 0 && buffer->requested < frames) { frames = buffer->requested; } data_entry->chunk->offset = 0; data_entry->chunk->stride = stride; data_entry->chunk->size = frames * stride; } pw_stream_queue_buffer(stream_data->stream, buffer); } } void StreamStateChanged(void* data, enum pw_stream_state, enum pw_stream_state state, const char* error) { auto* stream_data = static_cast(data); if (!stream_data) { return; } if (state == PW_STREAM_STATE_ERROR) { stream_data->failed = true; if (error) { stream_data->error = error; } } if (stream_data->stream) { uint32_t node_id = pw_stream_get_node_id(stream_data->stream); if (node_id != SPA_ID_INVALID) { stream_data->node_id = node_id; stream_data->ready = true; } } if (stream_data->loop) { pw_thread_loop_signal(stream_data->loop, false); } } static const pw_stream_events kStreamEvents = { .version = PW_VERSION_STREAM_EVENTS, .state_changed = StreamStateChanged, .process = StreamProcess, }; struct MeterStreamData { uint32_t node_id = 0; std::string target_name; pw_stream* stream = nullptr; spa_hook listener{}; std::atomic peak_left{0.0f}; std::atomic peak_right{0.0f}; }; void NodeMeterProcess(void* data) { auto* meter = static_cast(data); if (!meter || !meter->stream) { return; } float left = 0.0f; float right = 0.0f; bool had_data = false; pw_buffer* buf = nullptr; while ((buf = pw_stream_dequeue_buffer(meter->stream)) != nullptr) { if (!buf->buffer || buf->buffer->n_datas == 0) { pw_stream_queue_buffer(meter->stream, buf); continue; } spa_data* d = &buf->buffer->datas[0]; if (!d->data || !d->chunk) { pw_stream_queue_buffer(meter->stream, buf); continue; } const float* samples = static_cast(d->data); uint32_t count = d->chunk->size / sizeof(float); left = 0.0f; right = 0.0f; for (uint32_t i = 0; i + 1 < count; i += 2) { float l = std::fabs(samples[i]); float r = std::fabs(samples[i + 1]); if (l > left) left = l; if (r > right) right = r; } had_data = true; pw_stream_queue_buffer(meter->stream, buf); } if (had_data) { meter->peak_left.store(left, std::memory_order_relaxed); meter->peak_right.store(right, std::memory_order_relaxed); } } static const pw_stream_events kNodeMeterEvents = { .version = PW_VERSION_STREAM_EVENTS, .process = NodeMeterProcess, }; struct NodeProxyData { pw_proxy* proxy = nullptr; spa_hook object_listener{}; uint32_t node_id = 0; void* impl_ptr = nullptr; bool params_subscribed = false; }; } // namespace Status Status::Ok() { return Status{}; } Status Status::Error(StatusCode code, std::string message) { Status status; status.code = code; status.message = std::move(message); return status; } bool Status::ok() const { return code == StatusCode::kOk; } struct Client::Impl { ConnectionOptions options; pw_thread_loop* thread_loop = nullptr; pw_context* context = nullptr; pw_core* core = nullptr; pw_registry* registry = nullptr; spa_hook core_listener{}; spa_hook registry_listener{}; bool core_listener_attached = false; bool registry_listener_attached = false; bool connected = false; uint32_t pending_sync = 0; uint32_t last_sync = 0; Status last_error = Status::Ok(); std::mutex cache_mutex; std::unordered_map nodes; std::unordered_map ports; std::unordered_map links; std::unordered_map> virtual_streams; std::unordered_map> link_proxies; std::unordered_map volume_states; std::unordered_map> node_proxies; std::unordered_map node_channel_counts; std::unordered_set loopback_internal_nodes; std::unordered_map meter_states; std::unordered_set metered_nodes; MeterState master_meter; std::unique_ptr master_meter_data; std::unordered_map> live_meters; uint32_t next_rule_id = 1; std::unordered_map route_rules; std::vector pending_auto_links; uint32_t policy_sync_seq = 0; bool policy_sync_pending = false; std::vector> auto_link_proxies; std::vector> auto_link_claimed_pairs; std::vector> saved_link_proxies; std::vector> pending_link_pairs; pw_proxy* metadata_proxy = nullptr; spa_hook metadata_listener{}; bool metadata_listener_attached = false; MetadataInfo defaults; bool loading_config = false; struct SavedLink { std::string out_node; std::string out_port; std::string in_node; std::string in_port; bool operator==(const SavedLink& o) const { return out_node == o.out_node && out_port == o.out_port && in_node == o.in_node && in_port == o.in_port; } }; std::vector saved_links; std::mutex change_cb_mutex; Client::ChangeCallback change_callback; void NotifyChange(); Status ConnectLocked(); void DisconnectLocked(); Status SyncLocked(); void ClearCache(); Status EnsureConnected(); Result CreateVirtualStreamLocked(std::string_view name, bool is_source, const VirtualNodeOptions& options); void CheckRulesForNode(const NodeInfo& node); void EnforceRulesForLink(uint32_t link_id, uint32_t out_port, uint32_t in_port); void SchedulePolicySync(); void ProcessPendingAutoLinks(); void CreateAutoLinkAsync(uint32_t output_port, uint32_t input_port); void ProcessSavedLinks(); void CreateSavedLinkAsync(uint32_t output_port, uint32_t input_port); void AutoSave(); void SetupMasterMeter(); void TeardownMasterMeter(); void TeardownAllLiveMeters(); void BindNodeForParams(uint32_t id); void UnbindNodeForParams(uint32_t id); static void RegistryGlobal(void* data, uint32_t id, uint32_t permissions, const char* type, uint32_t version, const spa_dict* props); static void RegistryGlobalRemove(void* data, uint32_t id); static void CoreDone(void* data, uint32_t id, int seq); static void CoreError(void* data, uint32_t id, int seq, int res, const char* message); static int MetadataProperty(void* data, uint32_t subject, const char* key, const char* type, const char* value); static void NodeInfoChanged(void* data, const struct pw_node_info* info); static void NodeParamChanged(void* data, int seq, uint32_t id, uint32_t index, uint32_t next, const struct spa_pod* param); }; void Client::Impl::NodeInfoChanged(void* data, const struct pw_node_info* info) { auto* np = static_cast(data); if (!np || !info) return; auto* impl = static_cast(np->impl_ptr); bool notify = false; if (impl && info->props) { const char* virt_str = spa_dict_lookup(info->props, PW_KEY_NODE_VIRTUAL); if (virt_str) { std::lock_guard lock(impl->cache_mutex); auto it = impl->nodes.find(np->node_id); if (it != impl->nodes.end()) { bool new_virtual = spa_streq(virt_str, "true"); if (it->second.is_virtual != new_virtual) { it->second.is_virtual = new_virtual; notify = true; } } } } if (notify) { impl->NotifyChange(); } if (np->params_subscribed) return; for (uint32_t i = 0; i < info->n_params; ++i) { if (info->params[i].id == SPA_PARAM_Props && (info->params[i].flags & SPA_PARAM_INFO_READ)) { np->params_subscribed = true; uint32_t params[] = {SPA_PARAM_Props}; pw_node_subscribe_params( reinterpret_cast(np->proxy), params, 1); break; } } } void Client::Impl::NodeParamChanged(void* data, int, uint32_t id, uint32_t, uint32_t, const struct spa_pod* param) { auto* np = static_cast(data); if (!np || !np->impl_ptr || !param) return; if (id != SPA_PARAM_Props) return; if (!spa_pod_is_object(param)) return; auto* impl = static_cast(np->impl_ptr); float volume = -1.0f; float mon_volume = -1.0f; bool mute = false; bool mon_mute = false; bool found_mute = false; bool found_mon_mute = false; uint32_t n_channels = 0; const auto* obj = reinterpret_cast(param); const spa_pod_prop* prop; SPA_POD_OBJECT_FOREACH(obj, prop) { switch (prop->key) { case SPA_PROP_channelVolumes: { float vols[64]; uint32_t n = spa_pod_copy_array(&prop->value, SPA_TYPE_Float, vols, 64); if (n > 0) { volume = vols[0]; n_channels = n; } break; } case SPA_PROP_monitorVolumes: { float vols[64]; uint32_t n = spa_pod_copy_array(&prop->value, SPA_TYPE_Float, vols, 64); if (n > 0) { mon_volume = vols[0]; if (n_channels == 0) n_channels = n; } break; } case SPA_PROP_mute: { bool m = false; if (spa_pod_get_bool(&prop->value, &m) >= 0) { mute = m; found_mute = true; } break; } case SPA_PROP_monitorMute: { bool m = false; if (spa_pod_get_bool(&prop->value, &m) >= 0) { mon_mute = m; found_mon_mute = true; } break; } } } bool uses_monitor_volume = false; { std::lock_guard lock(impl->cache_mutex); auto node_it = impl->nodes.find(np->node_id); if (node_it != impl->nodes.end()) { uses_monitor_volume = node_it->second.is_virtual; } } float effective_vol = uses_monitor_volume && mon_volume >= 0.0f ? mon_volume : volume; bool effective_mute = uses_monitor_volume && found_mon_mute ? mon_mute : mute; bool effective_found_mute = uses_monitor_volume ? found_mon_mute : found_mute; bool changed = false; { std::lock_guard lock(impl->cache_mutex); auto& vs = impl->volume_states[np->node_id]; if (effective_vol >= 0.0f && std::abs(vs.volume - effective_vol) > 0.0001f) { vs.volume = effective_vol; changed = true; } if (effective_found_mute && vs.mute != effective_mute) { vs.mute = effective_mute; changed = true; } if (n_channels > 0) { impl->node_channel_counts[np->node_id] = n_channels; } } if (changed) { impl->NotifyChange(); } } void Client::Impl::BindNodeForParams(uint32_t id) { if (!registry) return; { std::lock_guard lock(cache_mutex); if (node_proxies.count(id)) return; } static const pw_node_events node_param_events = { .version = PW_VERSION_NODE_EVENTS, .info = NodeInfoChanged, .param = NodeParamChanged, }; auto np = std::make_unique(); np->proxy = static_cast( pw_registry_bind(registry, id, PW_TYPE_INTERFACE_Node, PW_VERSION_NODE, 0)); if (!np->proxy) return; np->node_id = id; np->impl_ptr = this; pw_proxy_add_object_listener(np->proxy, &np->object_listener, &node_param_events, np.get()); std::lock_guard lock(cache_mutex); node_proxies[id] = std::move(np); } void Client::Impl::UnbindNodeForParams(uint32_t id) { std::unique_ptr np; { std::lock_guard lock(cache_mutex); auto it = node_proxies.find(id); if (it != node_proxies.end()) { np = std::move(it->second); node_proxies.erase(it); } node_channel_counts.erase(id); } if (np) { spa_hook_remove(&np->object_listener); pw_proxy_destroy(np->proxy); } } void Client::Impl::RegistryGlobal(void* data, uint32_t id, uint32_t, const char* type, uint32_t, const spa_dict* props) { auto* impl = static_cast(data); if (!impl) { return; } bool notify = false; bool bind_node = false; { std::lock_guard lock(impl->cache_mutex); if (IsNodeType(type)) { NodeInfo info; info.id = NodeId{id}; info.name = LookupString(props, PW_KEY_NODE_NAME); info.description = LookupString(props, PW_KEY_NODE_DESCRIPTION); info.media_class = LookupString(props, PW_KEY_MEDIA_CLASS); info.application_name = LookupString(props, PW_KEY_APP_NAME); info.process_binary = LookupString(props, PW_KEY_APP_PROCESS_BINARY); info.media_role = LookupString(props, PW_KEY_MEDIA_ROLE); std::string virt_str = LookupString(props, PW_KEY_NODE_VIRTUAL); info.is_virtual = (virt_str == "true"); impl->nodes[id] = info; impl->CheckRulesForNode(info); notify = true; bind_node = true; } else if (IsPortType(type)) { PortInfo info; info.id = PortId{id}; info.name = LookupString(props, PW_KEY_PORT_NAME); info.is_input = false; uint32_t node_id = 0; if (ParseUint32(SafeLookup(props, PW_KEY_NODE_ID), &node_id)) { info.node = NodeId{node_id}; } const char* direction = SafeLookup(props, PW_KEY_PORT_DIRECTION); if (direction && spa_streq(direction, "in")) { info.is_input = true; } impl->ports[id] = info; if (!impl->pending_auto_links.empty() || !impl->saved_links.empty()) { impl->SchedulePolicySync(); } notify = true; } else if (IsLinkType(type)) { Link info; info.id = LinkId{id}; uint32_t out_port = 0; uint32_t in_port = 0; if (ParseUint32(SafeLookup(props, PW_KEY_LINK_OUTPUT_PORT), &out_port)) { info.output_port = PortId{out_port}; } if (ParseUint32(SafeLookup(props, PW_KEY_LINK_INPUT_PORT), &in_port)) { info.input_port = PortId{in_port}; } impl->links[id] = std::move(info); if (!impl->options.policy_only && out_port && in_port) { impl->EnforceRulesForLink(id, out_port, in_port); } notify = true; } } if (bind_node) { impl->BindNodeForParams(id); } if (notify) { impl->NotifyChange(); return; } std::lock_guard lock(impl->cache_mutex); if (type && spa_streq(type, PW_TYPE_INTERFACE_Metadata)) { const char* meta_name = SafeLookup(props, "metadata.name"); if (meta_name && spa_streq(meta_name, "default") && !impl->metadata_proxy) { impl->metadata_proxy = reinterpret_cast( pw_registry_bind(impl->registry, id, PW_TYPE_INTERFACE_Metadata, PW_VERSION_METADATA, 0)); if (impl->metadata_proxy) { static const pw_metadata_events metadata_events = { .version = PW_VERSION_METADATA_EVENTS, .property = MetadataProperty, }; pw_metadata_add_listener( reinterpret_cast(impl->metadata_proxy), &impl->metadata_listener, &metadata_events, impl); impl->metadata_listener_attached = true; } } } } void Client::Impl::RegistryGlobalRemove(void* data, uint32_t id) { auto* impl = static_cast(data); if (!impl) { return; } { std::lock_guard lock(impl->cache_mutex); impl->virtual_streams.erase(id); impl->link_proxies.erase(id); auto node_it = impl->nodes.find(id); if (node_it != impl->nodes.end()) { impl->nodes.erase(node_it); std::vector removed_ports; for (auto it = impl->ports.begin(); it != impl->ports.end();) { if (it->second.node.value == id) { removed_ports.push_back(it->first); it = impl->ports.erase(it); } else { ++it; } } for (auto it = impl->links.begin(); it != impl->links.end();) { bool remove_link = false; for (uint32_t port_id : removed_ports) { if (it->second.input_port.value == port_id || it->second.output_port.value == port_id) { remove_link = true; break; } } if (remove_link) { it = impl->links.erase(it); } else { ++it; } } } else if (impl->ports.erase(id) > 0) { for (auto it = impl->links.begin(); it != impl->links.end();) { if (it->second.input_port.value == id || it->second.output_port.value == id) { it = impl->links.erase(it); } else { ++it; } } } else { impl->links.erase(id); } } impl->UnbindNodeForParams(id); impl->NotifyChange(); } void Client::Impl::CoreDone(void* data, uint32_t, int seq) { auto* impl = static_cast(data); if (!impl || !impl->thread_loop) { return; } if (seq >= static_cast(impl->pending_sync)) { impl->last_sync = static_cast(seq); pw_thread_loop_signal(impl->thread_loop, false); } if (impl->policy_sync_pending && seq >= static_cast(impl->policy_sync_seq)) { impl->policy_sync_pending = false; impl->ProcessPendingAutoLinks(); impl->ProcessSavedLinks(); } } void Client::Impl::CoreError(void* data, uint32_t id, int seq, int res, const char* message) { auto* impl = static_cast(data); if (!impl) { return; } if (id == PW_ID_CORE) { impl->connected = false; impl->last_error = Status::Error(StatusCode::kUnavailable, message ? message : spa_strerror(res)); if (impl->thread_loop) { pw_thread_loop_signal(impl->thread_loop, false); } } } Status Client::Impl::SyncLocked() { if (!core || !thread_loop) { return Status::Error(StatusCode::kUnavailable, "pipewire core not connected"); } pending_sync = pw_core_sync(core, PW_ID_CORE, 0); if (pending_sync == SPA_ID_INVALID) { return Status::Error(StatusCode::kInternal, "failed to sync with pipewire core"); } while (last_sync < pending_sync) { int wait_res = pw_thread_loop_timed_wait(thread_loop, kSyncWaitSeconds); if (wait_res == -ETIMEDOUT) { return Status::Error(StatusCode::kTimeout, "timeout waiting for pipewire sync"); } } return Status::Ok(); } void Client::Impl::ClearCache() { std::lock_guard lock(cache_mutex); nodes.clear(); ports.clear(); links.clear(); loopback_internal_nodes.clear(); pending_auto_links.clear(); auto_link_claimed_pairs.clear(); policy_sync_pending = false; } void Client::Impl::NotifyChange() { std::lock_guard lock(change_cb_mutex); if (change_callback) { change_callback(); } } Status Client::Impl::EnsureConnected() { if (connected) { return Status::Ok(); } if (!options.autoconnect) { return Status::Error(StatusCode::kUnavailable, "pipewire core disconnected"); } if (!thread_loop) { return Status::Error(StatusCode::kUnavailable, "pipewire thread loop not running"); } pw_thread_loop_lock(thread_loop); Status status = ConnectLocked(); pw_thread_loop_unlock(thread_loop); return status; } Result Client::Impl::CreateVirtualStreamLocked(std::string_view name, bool is_source, const VirtualNodeOptions& options) { if (!core || !thread_loop) { return {Status::Error(StatusCode::kUnavailable, "pipewire core not connected"), 0}; } if (options.format.rate == 0 || options.format.channels == 0) { return {Status::Error(StatusCode::kInvalidArgument, "invalid audio format"), 0}; } if (options.behavior == VirtualBehavior::kLoopback && !options.target_node) { return {Status::Error(StatusCode::kInvalidArgument, "loopback requires target node"), 0}; } if (options.media_class_override && options.media_class_override->empty()) { return {Status::Error(StatusCode::kInvalidArgument, "missing media class"), 0}; } std::string stream_name = name.empty() ? (is_source ? "warppipe-source" : "warppipe-sink") : std::string(name); const char* media_class = is_source ? "Audio/Source" : "Audio/Sink"; std::string media_class_value = options.media_class_override ? *options.media_class_override : std::string(media_class); if (media_class_value.empty()) { return {Status::Error(StatusCode::kInvalidArgument, "missing media class"), 0}; } const char* media_category = is_source ? "Capture" : "Playback"; std::string display_name = options.display_name.empty() ? stream_name : options.display_name; const char* node_group = options.group.empty() ? nullptr : options.group.c_str(); { std::lock_guard lock(cache_mutex); for (const auto& entry : virtual_streams) { if (entry.second && entry.second->name == stream_name) { return {Status::Error(StatusCode::kInvalidArgument, "duplicate node name"), 0}; } } for (const auto& entry : nodes) { if (entry.second.name == stream_name) { uint32_t existing_id = entry.first; auto adopted = std::make_unique(); adopted->linger = true; adopted->loop = thread_loop; adopted->is_source = is_source; adopted->name = stream_name; adopted->node_id = existing_id; adopted->ready = true; if (options.format.rate != 0) adopted->rate = options.format.rate; if (options.format.channels != 0) adopted->channels = options.format.channels; virtual_streams.emplace(existing_id, std::move(adopted)); return {Status::Ok(), existing_id}; } } if (options.behavior == VirtualBehavior::kLoopback && options.target_node) { bool found_target = false; for (const auto& entry : nodes) { if (entry.second.name == *options.target_node) { found_target = true; break; } } if (!found_target) { return {Status::Error(StatusCode::kNotFound, "target node not found"), 0}; } } } if (options.behavior == VirtualBehavior::kLoopback && options.target_node) { std::string args; args += "{ node.description = \"" + display_name + "\""; args += " node.name = \"" + stream_name + "\""; args += " audio.channels = " + std::to_string(options.format.channels); args += " audio.rate = " + std::to_string(options.format.rate); args += " capture.props = {"; args += " target.object = \"" + *options.target_node + "\""; args += " stream.capture.sink = true"; args += " node.passive = true"; args += " node.dont-reconnect = true"; args += " }"; args += " playback.props = {"; args += " node.name = \"" + stream_name + "\""; args += " node.description = \"" + display_name + "\""; args += " media.class = \"" + media_class_value + "\""; args += " node.virtual = true"; args += " }"; args += " }"; pw_impl_module* mod = pw_context_load_module(context, "libpipewire-module-loopback", args.c_str(), nullptr); if (!mod) { return {Status::Error(StatusCode::kUnavailable, "failed to load loopback module"), 0}; } auto sync_status = SyncLocked(); if (!sync_status.ok()) { pw_impl_module_destroy(mod); return {sync_status, 0}; } uint32_t node_id = SPA_ID_INVALID; { std::lock_guard lock(cache_mutex); for (const auto& entry : nodes) { if (entry.second.name == stream_name) { node_id = entry.first; break; } } } if (node_id == SPA_ID_INVALID) { int wait_attempts = 0; while (node_id == SPA_ID_INVALID && wait_attempts < 3) { int wait_res = pw_thread_loop_timed_wait(thread_loop, kSyncWaitSeconds); if (wait_res == -ETIMEDOUT) { break; } auto sync2 = SyncLocked(); (void)sync2; std::lock_guard lock(cache_mutex); for (const auto& entry : nodes) { if (entry.second.name == stream_name) { node_id = entry.first; break; } } ++wait_attempts; } } if (node_id == SPA_ID_INVALID) { pw_impl_module_destroy(mod); return {Status::Error(StatusCode::kTimeout, "loopback node did not appear in registry"), 0}; } std::string capture_name = "input." + stream_name; { std::lock_guard lock(cache_mutex); for (const auto& entry : nodes) { if (entry.second.name == capture_name) { loopback_internal_nodes.insert(entry.first); break; } } } auto stream_data = std::make_unique(); stream_data->module = mod; stream_data->loop = thread_loop; stream_data->is_source = is_source; stream_data->loopback = true; stream_data->target_node = *options.target_node; stream_data->name = stream_name; stream_data->rate = options.format.rate; stream_data->channels = options.format.channels; stream_data->node_id = node_id; stream_data->ready = true; { std::lock_guard lock(cache_mutex); virtual_streams.emplace(node_id, std::move(stream_data)); } return {Status::Ok(), node_id}; } // Build audio position string for channels (e.g. "FL,FR" for stereo). std::string audio_position; { static const char* const kChannelNames[] = { "FL", "FR", "FC", "LFE", "RL", "RR", "SL", "SR" }; uint32_t ch = options.format.channels; for (uint32_t i = 0; i < ch && i < 8; ++i) { if (i > 0) audio_position += ','; audio_position += kChannelNames[i]; } if (audio_position.empty()) audio_position = "FL,FR"; } pw_properties* props = pw_properties_new( "factory.name", "support.null-audio-sink", PW_KEY_NODE_NAME, stream_name.c_str(), PW_KEY_NODE_DESCRIPTION, display_name.c_str(), PW_KEY_MEDIA_CLASS, media_class_value.c_str(), PW_KEY_NODE_VIRTUAL, "true", "audio.position", audio_position.c_str(), nullptr); #ifndef WARPPIPE_TESTING pw_properties_set(props, PW_KEY_OBJECT_LINGER, "true"); #endif if (!props) { return {Status::Error(StatusCode::kInternal, "failed to allocate node properties"), 0}; } if (node_group) { pw_properties_set(props, PW_KEY_NODE_GROUP, node_group); } pw_proxy* proxy = reinterpret_cast( pw_core_create_object(core, "adapter", PW_TYPE_INTERFACE_Node, PW_VERSION_NODE, &props->dict, 0)); pw_properties_free(props); if (!proxy) { return {Status::Error(StatusCode::kUnavailable, "failed to create virtual node"), 0}; } auto stream_data = std::make_unique(); stream_data->proxy = proxy; #ifndef WARPPIPE_TESTING stream_data->linger = true; #endif stream_data->loop = thread_loop; stream_data->is_source = is_source; stream_data->loopback = false; if (options.target_node) { stream_data->target_node = *options.target_node; } stream_data->name = stream_name; if (options.format.rate != 0) { stream_data->rate = options.format.rate; } if (options.format.channels != 0) { stream_data->channels = options.format.channels; } pw_proxy_add_listener(proxy, &stream_data->listener, &kNodeProxyEvents, stream_data.get()); uint32_t node_id = SPA_ID_INVALID; int wait_attempts = 0; while (node_id == SPA_ID_INVALID && !stream_data->failed && !stream_data->ready && wait_attempts < 3) { int wait_res = pw_thread_loop_timed_wait(thread_loop, kSyncWaitSeconds); if (wait_res == -ETIMEDOUT) { break; } node_id = stream_data->node_id; ++wait_attempts; } if (stream_data->failed) { std::string error = stream_data->error.empty() ? "node creation failed" : stream_data->error; pw_proxy_destroy(proxy); return {Status::Error(StatusCode::kUnavailable, std::move(error)), 0}; } node_id = stream_data->node_id; if (node_id == SPA_ID_INVALID) { pw_proxy_destroy(proxy); return {Status::Error(StatusCode::kTimeout, "timed out waiting for virtual node id"), 0}; } stream_data->ready = true; { std::lock_guard lock(cache_mutex); virtual_streams.emplace(node_id, std::move(stream_data)); } return {Status::Ok(), node_id}; } Status Client::Impl::ConnectLocked() { if (connected) { return Status::Ok(); } if (!thread_loop) { return Status::Error(StatusCode::kInternal, "thread loop not initialized"); } pw_loop* loop = pw_thread_loop_get_loop(thread_loop); if (!context) { context = pw_context_new(loop, nullptr, 0); if (!context) { return Status::Error(StatusCode::kUnavailable, "failed to create pipewire context"); } } pw_properties* props = pw_properties_new(PW_KEY_APP_NAME, options.application_name.c_str(), nullptr); if (options.remote_name && !options.remote_name->empty()) { pw_properties_set(props, PW_KEY_REMOTE_NAME, options.remote_name->c_str()); } core = pw_context_connect(context, props, 0); if (!core) { return Status::Error(StatusCode::kUnavailable, "failed to connect to pipewire core"); } static const pw_core_events core_events = { .version = PW_VERSION_CORE_EVENTS, .done = CoreDone, .error = CoreError, }; pw_core_add_listener(core, &core_listener, &core_events, this); core_listener_attached = true; registry = pw_core_get_registry(core, PW_VERSION_REGISTRY, 0); if (!registry) { return Status::Error(StatusCode::kUnavailable, "failed to get pipewire registry"); } static const pw_registry_events registry_events = { .version = PW_VERSION_REGISTRY_EVENTS, .global = RegistryGlobal, .global_remove = RegistryGlobalRemove, }; pw_registry_add_listener(registry, ®istry_listener, ®istry_events, this); registry_listener_attached = true; connected = true; last_error = Status::Ok(); ClearCache(); Status sync_status = SyncLocked(); if (!sync_status.ok()) { return sync_status; } SetupMasterMeter(); return Status::Ok(); } void Client::Impl::DisconnectLocked() { TeardownMasterMeter(); TeardownAllLiveMeters(); std::unordered_map> links; std::unordered_map> streams; { std::lock_guard lock(cache_mutex); links.swap(link_proxies); streams.swap(virtual_streams); } for (auto& entry : links) { LinkProxy* link = entry.second.get(); if (link) { spa_hook_remove(&link->listener); link->proxy = nullptr; } } for (auto& entry : streams) { StreamData* stream_data = entry.second.get(); if (!stream_data) continue; if (stream_data->linger && stream_data->proxy) { spa_hook_remove(&stream_data->listener); stream_data->proxy = nullptr; } else if (stream_data->module) { pw_impl_module_destroy(stream_data->module); stream_data->module = nullptr; } else if (stream_data->proxy) { pw_proxy_destroy(stream_data->proxy); stream_data->proxy = nullptr; } else if (stream_data->stream) { pw_stream_disconnect(stream_data->stream); pw_stream_destroy(stream_data->stream); stream_data->stream = nullptr; } } for (auto& entry : auto_link_proxies) { if (entry) { spa_hook_remove(&entry->listener); entry->proxy = nullptr; } } auto_link_proxies.clear(); for (auto& entry : saved_link_proxies) { if (entry) { spa_hook_remove(&entry->listener); entry->proxy = nullptr; } } saved_link_proxies.clear(); for (auto& entry : node_proxies) { if (entry.second) { spa_hook_remove(&entry.second->object_listener); entry.second->proxy = nullptr; } } node_proxies.clear(); node_channel_counts.clear(); if (metadata_listener_attached) { spa_hook_remove(&metadata_listener); metadata_listener_attached = false; } if (metadata_proxy) { pw_proxy_destroy(metadata_proxy); metadata_proxy = nullptr; } if (registry_listener_attached) { spa_hook_remove(®istry_listener); registry_listener_attached = false; } if (core_listener_attached) { spa_hook_remove(&core_listener); core_listener_attached = false; } if (registry) { pw_proxy_destroy(reinterpret_cast(registry)); registry = nullptr; } if (core) { pw_core_disconnect(core); core = nullptr; } connected = false; ClearCache(); } void Client::Impl::CheckRulesForNode(const NodeInfo& node) { for (const auto& entry : route_rules) { if (!MatchesRule(node, entry.second.match)) continue; const RouteRule& rule = entry.second; PendingAutoLink pending; pending.rule_id = entry.first; pending.direction = rule.direction; if (rule.direction == RuleDirection::kPlayback) { pending.source_node_id = node.id.value; pending.target_node_name = rule.target_node; } else if (!rule.source_node.empty()) { // source_node_id = app (input side); target_node_name = hw source (output side) pending.source_node_id = node.id.value; pending.target_node_name = rule.source_node; } else { continue; } pending_auto_links.push_back(std::move(pending)); SchedulePolicySync(); } } void Client::Impl::EnforceRulesForLink(uint32_t link_id, uint32_t out_port, uint32_t in_port) { auto out_port_it = ports.find(out_port); auto in_port_it = ports.find(in_port); if (out_port_it == ports.end() || in_port_it == ports.end()) return; uint32_t src_node_id = out_port_it->second.node.value; uint32_t dest_node_id = in_port_it->second.node.value; auto src_node_it = nodes.find(src_node_id); auto dest_node_it = nodes.find(dest_node_id); bool should_destroy = false; // Playback enforcement: if source app matches a playback rule, link must go to rule target if (src_node_it != nodes.end()) { std::vector rule_target_ids; for (const auto& rule_entry : route_rules) { if (rule_entry.second.direction != RuleDirection::kPlayback) continue; if (!MatchesRule(src_node_it->second, rule_entry.second.match)) continue; for (const auto& n : nodes) { if (n.second.name == rule_entry.second.target_node) { rule_target_ids.push_back(n.first); break; } } } if (!rule_target_ids.empty()) { bool links_to_target = false; for (uint32_t tid : rule_target_ids) { if (dest_node_id == tid) { links_to_target = true; break; } } if (!links_to_target) should_destroy = true; } } // Capture enforcement: if dest app matches a capture rule, link must come from rule source if (!should_destroy && dest_node_it != nodes.end()) { std::vector rule_source_ids; for (const auto& rule_entry : route_rules) { if (rule_entry.second.direction != RuleDirection::kCapture) continue; if (!MatchesRule(dest_node_it->second, rule_entry.second.match)) continue; for (const auto& n : nodes) { if (n.second.name == rule_entry.second.source_node) { rule_source_ids.push_back(n.first); break; } } } if (!rule_source_ids.empty()) { bool links_from_source = false; for (uint32_t sid : rule_source_ids) { if (src_node_id == sid) { links_from_source = true; break; } } if (!links_from_source) should_destroy = true; } } if (!should_destroy) return; if (link_proxies.count(link_id)) return; for (const auto& proxy : auto_link_proxies) { if (proxy && proxy->output_port == out_port && proxy->input_port == in_port) return; } for (const auto& proxy : saved_link_proxies) { if (proxy && proxy->output_port == out_port && proxy->input_port == in_port) return; } for (const auto& pair : auto_link_claimed_pairs) { if (pair.first == out_port && pair.second == in_port) return; } for (const auto& pair : pending_link_pairs) { if (pair.first == out_port && pair.second == in_port) return; } pw_registry_destroy(registry, link_id); } void Client::Impl::SchedulePolicySync() { if (policy_sync_pending || !core) { return; } uint32_t seq = pw_core_sync(core, PW_ID_CORE, 0); if (seq != SPA_ID_INVALID) { policy_sync_seq = seq; policy_sync_pending = true; } } void Client::Impl::ProcessPendingAutoLinks() { if (options.policy_only) { std::lock_guard lock(cache_mutex); pending_auto_links.clear(); return; } struct LinkSpec { uint32_t output_port; uint32_t input_port; }; std::vector links_to_create; std::vector> batch_pairs; { std::lock_guard lock(cache_mutex); for (auto it = pending_auto_links.begin(); it != pending_auto_links.end();) { uint32_t target_node_id = 0; for (const auto& node_entry : nodes) { if (node_entry.second.name == it->target_node_name) { target_node_id = node_entry.first; break; } } if (target_node_id == 0) { ++it; continue; } struct PortEntry { uint32_t id; std::string name; }; std::vector src_ports; std::vector tgt_ports; if (it->direction == RuleDirection::kCapture) { // Capture: target_node_name is the hw source (outputs), source_node_id is the app (inputs) for (const auto& port_entry : ports) { const PortInfo& port = port_entry.second; if (port.node.value == target_node_id && !port.is_input) { src_ports.push_back({port_entry.first, port.name}); } if (port.node.value == it->source_node_id && port.is_input) { tgt_ports.push_back({port_entry.first, port.name}); } } } else { for (const auto& port_entry : ports) { const PortInfo& port = port_entry.second; if (port.node.value == it->source_node_id && !port.is_input) { src_ports.push_back({port_entry.first, port.name}); } if (port.node.value == target_node_id && port.is_input) { tgt_ports.push_back({port_entry.first, port.name}); } } } if (src_ports.empty() || tgt_ports.empty()) { ++it; continue; } auto cmp = [](const PortEntry& a, const PortEntry& b) { return a.name < b.name; }; std::sort(src_ports.begin(), src_ports.end(), cmp); std::sort(tgt_ports.begin(), tgt_ports.end(), cmp); size_t count = std::min(src_ports.size(), tgt_ports.size()); for (size_t i = 0; i < count; ++i) { bool exists = false; for (const auto& link_entry : links) { if (link_entry.second.output_port.value == src_ports[i].id && link_entry.second.input_port.value == tgt_ports[i].id) { exists = true; break; } } auto_link_claimed_pairs.emplace_back(src_ports[i].id, tgt_ports[i].id); batch_pairs.emplace_back(src_ports[i].id, tgt_ports[i].id); if (!exists) { links_to_create.push_back({src_ports[i].id, tgt_ports[i].id}); } } it = pending_auto_links.erase(it); } } if (batch_pairs.empty()) { for (const auto& spec : links_to_create) { CreateAutoLinkAsync(spec.output_port, spec.input_port); } return; } std::unordered_map> auto_port_map; for (const auto& pair : batch_pairs) { auto_port_map[pair.first].push_back(pair.second); } std::vector competing_ids; { std::lock_guard lock(cache_mutex); for (const auto& link_entry : links) { auto it = auto_port_map.find(link_entry.second.output_port.value); if (it == auto_port_map.end()) continue; uint32_t link_id = link_entry.first; uint32_t in_port = link_entry.second.input_port.value; bool is_ours = false; for (uint32_t target_in : it->second) { if (target_in == in_port) { is_ours = true; break; } } if (!is_ours) { if (link_proxies.count(link_id)) is_ours = true; } if (!is_ours) { uint32_t out_port = link_entry.second.output_port.value; for (const auto& proxy : auto_link_proxies) { if (proxy && proxy->output_port == out_port && proxy->input_port == in_port) { is_ours = true; break; } } } if (!is_ours) { uint32_t out_port = link_entry.second.output_port.value; for (const auto& pair : auto_link_claimed_pairs) { if (pair.first == out_port && pair.second == in_port) { is_ours = true; break; } } } if (!is_ours) { uint32_t out_port = link_entry.second.output_port.value; for (const auto& proxy : saved_link_proxies) { if (proxy && proxy->output_port == out_port && proxy->input_port == in_port) { is_ours = true; break; } } } if (!is_ours) { competing_ids.push_back(link_id); } } } for (uint32_t id : competing_ids) { pw_registry_destroy(registry, id); } for (const auto& spec : links_to_create) { CreateAutoLinkAsync(spec.output_port, spec.input_port); } } void Client::Impl::CreateAutoLinkAsync(uint32_t output_port, uint32_t input_port) { if (!core) { return; } pw_properties* props = pw_properties_new(nullptr, nullptr); if (!props) { return; } pw_properties_setf(props, PW_KEY_LINK_OUTPUT_PORT, "%u", output_port); pw_properties_setf(props, PW_KEY_LINK_INPUT_PORT, "%u", input_port); pw_properties_set(props, PW_KEY_OBJECT_LINGER, "true"); pw_proxy* proxy = reinterpret_cast( pw_core_create_object(core, "link-factory", PW_TYPE_INTERFACE_Link, PW_VERSION_LINK, &props->dict, 0)); pw_properties_free(props); if (!proxy) { return; } auto link_data = std::make_unique(); link_data->proxy = proxy; link_data->loop = thread_loop; link_data->output_port = output_port; link_data->input_port = input_port; pw_proxy_add_listener(proxy, &link_data->listener, &kLinkProxyEvents, link_data.get()); std::lock_guard lock(cache_mutex); auto_link_proxies.push_back(std::move(link_data)); } void Client::Impl::ProcessSavedLinks() { struct LinkSpec { uint32_t output_port; uint32_t input_port; std::string label; }; std::vector to_create; { std::lock_guard lock(cache_mutex); for (auto it = saved_links.begin(); it != saved_links.end();) { bool covered_by_rule = false; for (const auto& node_entry : nodes) { if (node_entry.second.name != it->out_node) continue; for (const auto& rule_entry : route_rules) { if (MatchesRule(node_entry.second, rule_entry.second.match) && rule_entry.second.target_node == it->in_node) { covered_by_rule = true; break; } } if (covered_by_rule) break; } if (covered_by_rule) { it = saved_links.erase(it); continue; } uint32_t out_id = 0, in_id = 0; for (const auto& port_entry : ports) { const PortInfo& port = port_entry.second; auto node_it = nodes.find(port.node.value); if (node_it == nodes.end()) continue; if (!port.is_input && node_it->second.name == it->out_node && port.name == it->out_port) { out_id = port_entry.first; } if (port.is_input && node_it->second.name == it->in_node && port.name == it->in_port) { in_id = port_entry.first; } if (out_id && in_id) break; } if (!out_id || !in_id) { ++it; continue; } bool exists = false; for (const auto& link_entry : links) { if (link_entry.second.output_port.value == out_id && link_entry.second.input_port.value == in_id) { exists = true; break; } } if (exists) { it = saved_links.erase(it); continue; } std::string label = it->out_node + ":" + it->out_port + " -> " + it->in_node + ":" + it->in_port; to_create.push_back({out_id, in_id, std::move(label)}); it = saved_links.erase(it); } } if (to_create.empty()) return; std::unordered_map> saved_port_map; for (const auto& spec : to_create) { saved_port_map[spec.output_port].push_back(spec.input_port); } std::vector competing_link_ids; { std::lock_guard lock(cache_mutex); for (const auto& link_entry : links) { auto it = saved_port_map.find(link_entry.second.output_port.value); if (it == saved_port_map.end()) continue; uint32_t link_id = link_entry.first; uint32_t in_port = link_entry.second.input_port.value; bool is_ours = false; for (uint32_t saved_in : it->second) { if (saved_in == in_port) { is_ours = true; break; } } if (!is_ours) { if (link_proxies.count(link_id)) { is_ours = true; } } if (!is_ours) { uint32_t out_port = link_entry.second.output_port.value; for (const auto& proxy : auto_link_proxies) { if (proxy && proxy->output_port == out_port && proxy->input_port == in_port) { is_ours = true; break; } } } if (!is_ours) { uint32_t out_port = link_entry.second.output_port.value; for (const auto& pair : auto_link_claimed_pairs) { if (pair.first == out_port && pair.second == in_port) { is_ours = true; break; } } } if (!is_ours) { uint32_t out_port = link_entry.second.output_port.value; for (const auto& proxy : saved_link_proxies) { if (proxy && proxy->output_port == out_port && proxy->input_port == in_port) { is_ours = true; break; } } } if (!is_ours) { competing_link_ids.push_back(link_id); } } } for (uint32_t id : competing_link_ids) { pw_registry_destroy(registry, id); } for (const auto& spec : to_create) { CreateSavedLinkAsync(spec.output_port, spec.input_port); } } void Client::Impl::CreateSavedLinkAsync(uint32_t output_port, uint32_t input_port) { if (!core) return; pw_properties* props = pw_properties_new(nullptr, nullptr); if (!props) return; pw_properties_setf(props, PW_KEY_LINK_OUTPUT_PORT, "%u", output_port); pw_properties_setf(props, PW_KEY_LINK_INPUT_PORT, "%u", input_port); pw_properties_set(props, PW_KEY_OBJECT_LINGER, "true"); pw_proxy* proxy = reinterpret_cast( pw_core_create_object(core, "link-factory", PW_TYPE_INTERFACE_Link, PW_VERSION_LINK, &props->dict, 0)); pw_properties_free(props); if (!proxy) return; auto link_data = std::make_unique(); link_data->proxy = proxy; link_data->loop = thread_loop; link_data->output_port = output_port; link_data->input_port = input_port; pw_proxy_add_listener(proxy, &link_data->listener, &kLinkProxyEvents, link_data.get()); std::lock_guard lock(cache_mutex); saved_link_proxies.push_back(std::move(link_data)); } void Client::Impl::AutoSave() { if (!options.config_path || options.config_path->empty() || loading_config) { return; } nlohmann::json j; j["version"] = 1; nlohmann::json nodes_array = nlohmann::json::array(); { std::lock_guard lock(cache_mutex); for (const auto& entry : virtual_streams) { if (!entry.second) { continue; } const StreamData& sd = *entry.second; nlohmann::json node_obj; node_obj["name"] = sd.name; node_obj["is_source"] = sd.is_source; node_obj["rate"] = sd.rate; node_obj["channels"] = sd.channels; node_obj["loopback"] = sd.loopback; node_obj["target_node"] = sd.target_node; auto volIt = volume_states.find(entry.first); if (volIt != volume_states.end()) { node_obj["volume"] = volIt->second.volume; node_obj["mute"] = volIt->second.mute; } nodes_array.push_back(std::move(node_obj)); } } j["virtual_nodes"] = std::move(nodes_array); nlohmann::json rules_array = nlohmann::json::array(); { std::lock_guard lock(cache_mutex); for (const auto& entry : route_rules) { nlohmann::json rule_obj; rule_obj["id"] = entry.first; rule_obj["match"]["application_name"] = entry.second.match.application_name; rule_obj["match"]["process_binary"] = entry.second.match.process_binary; rule_obj["match"]["media_role"] = entry.second.match.media_role; rule_obj["target_node"] = entry.second.target_node; rule_obj["direction"] = entry.second.direction == RuleDirection::kCapture ? "capture" : "playback"; rule_obj["source_node"] = entry.second.source_node; rules_array.push_back(std::move(rule_obj)); } } j["route_rules"] = std::move(rules_array); nlohmann::json links_array = nlohmann::json::array(); { std::lock_guard lock(cache_mutex); std::vector live; for (const auto& entry : link_proxies) { if (!entry.second) { continue; } auto link_it = links.find(entry.first); if (link_it == links.end()) { continue; } const Link& link = link_it->second; auto out_port_it = ports.find(link.output_port.value); auto in_port_it = ports.find(link.input_port.value); if (out_port_it == ports.end() || in_port_it == ports.end()) { continue; } auto out_node_it = nodes.find(out_port_it->second.node.value); auto in_node_it = nodes.find(in_port_it->second.node.value); if (out_node_it == nodes.end() || in_node_it == nodes.end()) { continue; } SavedLink sl{out_node_it->second.name, out_port_it->second.name, in_node_it->second.name, in_port_it->second.name}; live.push_back(sl); nlohmann::json link_obj; link_obj["out_node"] = sl.out_node; link_obj["out_port"] = sl.out_port; link_obj["in_node"] = sl.in_node; link_obj["in_port"] = sl.in_port; links_array.push_back(std::move(link_obj)); } for (const auto& lp : saved_link_proxies) { if (!lp || lp->id == SPA_ID_INVALID) continue; auto link_it = links.find(lp->id); if (link_it == links.end()) continue; const Link& link = link_it->second; auto out_port_it = ports.find(link.output_port.value); auto in_port_it = ports.find(link.input_port.value); if (out_port_it == ports.end() || in_port_it == ports.end()) continue; auto out_node_it = nodes.find(out_port_it->second.node.value); auto in_node_it = nodes.find(in_port_it->second.node.value); if (out_node_it == nodes.end() || in_node_it == nodes.end()) continue; SavedLink sl{out_node_it->second.name, out_port_it->second.name, in_node_it->second.name, in_port_it->second.name}; bool dup = false; for (const auto& l : live) { if (l == sl) { dup = true; break; } } if (!dup) { live.push_back(sl); nlohmann::json link_obj; link_obj["out_node"] = sl.out_node; link_obj["out_port"] = sl.out_port; link_obj["in_node"] = sl.in_node; link_obj["in_port"] = sl.in_port; links_array.push_back(std::move(link_obj)); } } for (const auto& sl : saved_links) { bool already = false; for (const auto& l : live) { if (l == sl) { already = true; break; } } if (!already) { nlohmann::json link_obj; link_obj["out_node"] = sl.out_node; link_obj["out_port"] = sl.out_port; link_obj["in_node"] = sl.in_node; link_obj["in_port"] = sl.in_port; links_array.push_back(std::move(link_obj)); } } } j["links"] = std::move(links_array); std::string tmp_path = *options.config_path + ".tmp"; std::ofstream file(tmp_path); if (!file.is_open()) { return; } file << j.dump(2); file.close(); if (!file.fail()) { std::rename(tmp_path.c_str(), options.config_path->c_str()); } } void Client::Impl::SetupMasterMeter() { if (!thread_loop || !core || master_meter_data) { return; } auto meter = std::make_unique(); pw_properties* props = pw_properties_new( PW_KEY_MEDIA_TYPE, "Audio", PW_KEY_MEDIA_CATEGORY, "Capture", PW_KEY_MEDIA_CLASS, "Stream/Input/Audio", PW_KEY_STREAM_CAPTURE_SINK, "true", PW_KEY_STREAM_MONITOR, "true", PW_KEY_NODE_NAME, "", nullptr); meter->stream = pw_stream_new_simple( pw_thread_loop_get_loop(thread_loop), "warppipe-meter", props, &kNodeMeterEvents, meter.get()); if (!meter->stream) { return; } uint8_t buffer[512]; spa_pod_builder builder = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer)); spa_audio_info_raw info{}; info.format = SPA_AUDIO_FORMAT_F32; info.rate = 48000; info.channels = 2; info.position[0] = SPA_AUDIO_CHANNEL_FL; info.position[1] = SPA_AUDIO_CHANNEL_FR; const spa_pod* params[1]; params[0] = spa_format_audio_raw_build(&builder, SPA_PARAM_EnumFormat, &info); int res = pw_stream_connect( meter->stream, PW_DIRECTION_INPUT, PW_ID_ANY, static_cast( PW_STREAM_FLAG_AUTOCONNECT | PW_STREAM_FLAG_MAP_BUFFERS | PW_STREAM_FLAG_RT_PROCESS), params, 1); if (res != 0) { pw_stream_destroy(meter->stream); return; } master_meter_data = std::move(meter); } void Client::Impl::TeardownMasterMeter() { if (!master_meter_data) { return; } if (master_meter_data->stream) { pw_stream_destroy(master_meter_data->stream); } master_meter_data.reset(); } void Client::Impl::TeardownAllLiveMeters() { std::unordered_map> meters; { std::lock_guard lock(cache_mutex); meters.swap(live_meters); } for (auto& entry : meters) { if (entry.second && entry.second->stream) { pw_stream_destroy(entry.second->stream); entry.second->stream = nullptr; } } } int Client::Impl::MetadataProperty(void* data, uint32_t subject, const char* key, const char* type, const char* value) { auto* impl = static_cast(data); if (!impl || subject != 0 || !key) { return 0; } std::string name; if (value && value[0] != '\0') { try { auto j = nlohmann::json::parse(value); if (j.contains("name") && j["name"].is_string()) { name = j["name"].get(); } } catch (...) { name = value; } } std::lock_guard lock(impl->cache_mutex); if (spa_streq(key, "default.audio.sink")) { impl->defaults.default_sink_name = name; } else if (spa_streq(key, "default.audio.source")) { impl->defaults.default_source_name = name; } else if (spa_streq(key, "default.configured.audio.sink")) { impl->defaults.configured_sink_name = name; } else if (spa_streq(key, "default.configured.audio.source")) { impl->defaults.configured_source_name = name; } return 0; } Client::Client(std::unique_ptr impl) : impl_(std::move(impl)) {} Client::Client(Client&&) noexcept = default; Client& Client::operator=(Client&&) noexcept = default; Client::~Client() { if (impl_) { Shutdown(); } } Result> Client::Create(const ConnectionOptions& options) { pw_init(nullptr, nullptr); if (options.threading == ThreadingMode::kCallerThread) { return {Status::Error(StatusCode::kNotImplemented, "caller thread mode not implemented"), {}}; } auto impl = std::make_unique(); impl->options = options; impl->thread_loop = pw_thread_loop_new("warppipe", nullptr); if (!impl->thread_loop) { return {Status::Error(StatusCode::kUnavailable, "failed to create pipewire thread loop"), {}}; } if (pw_thread_loop_start(impl->thread_loop) != 0) { pw_thread_loop_destroy(impl->thread_loop); impl->thread_loop = nullptr; return {Status::Error(StatusCode::kUnavailable, "failed to start pipewire thread loop"), {}}; } pw_thread_loop_lock(impl->thread_loop); Status status = impl->ConnectLocked(); pw_thread_loop_unlock(impl->thread_loop); if (!status.ok()) { pw_thread_loop_lock(impl->thread_loop); impl->DisconnectLocked(); if (impl->context) { pw_context_destroy(impl->context); impl->context = nullptr; } pw_thread_loop_unlock(impl->thread_loop); pw_thread_loop_stop(impl->thread_loop); pw_thread_loop_destroy(impl->thread_loop); impl->thread_loop = nullptr; return {status, {}}; } auto client = std::unique_ptr(new Client(std::move(impl))); if (options.config_path && !options.config_path->empty()) { std::ifstream test_file(*options.config_path); if (test_file.good()) { client->LoadConfig(*options.config_path); } } return {Status::Ok(), std::move(client)}; } Status Client::Shutdown() { if (!impl_) { return Status::Ok(); } if (impl_->thread_loop) { pw_thread_loop_lock(impl_->thread_loop); impl_->DisconnectLocked(); if (impl_->context) { pw_context_destroy(impl_->context); impl_->context = nullptr; } pw_thread_loop_unlock(impl_->thread_loop); pw_thread_loop_stop(impl_->thread_loop); pw_thread_loop_destroy(impl_->thread_loop); impl_->thread_loop = nullptr; } pw_deinit(); return Status::Ok(); } Result> Client::ListNodes() { Status status = impl_->EnsureConnected(); if (!status.ok()) { return {status, {}}; } std::lock_guard lock(impl_->cache_mutex); std::vector items; items.reserve(impl_->nodes.size()); for (const auto& entry : impl_->nodes) { if (impl_->loopback_internal_nodes.count(entry.first)) { continue; } NodeInfo info = entry.second; if (!info.is_virtual && impl_->virtual_streams.find(entry.first) != impl_->virtual_streams.end()) { info.is_virtual = true; } items.push_back(std::move(info)); } return {Status::Ok(), std::move(items)}; } Result> Client::ListPorts(NodeId node) { Status status = impl_->EnsureConnected(); if (!status.ok()) { return {status, {}}; } std::lock_guard lock(impl_->cache_mutex); std::vector items; for (const auto& entry : impl_->ports) { if (entry.second.node.value == node.value) { items.push_back(entry.second); } } return {Status::Ok(), std::move(items)}; } Result> Client::ListLinks() { Status status = impl_->EnsureConnected(); if (!status.ok()) { return {status, {}}; } std::lock_guard lock(impl_->cache_mutex); std::vector items; items.reserve(impl_->links.size()); for (const auto& entry : impl_->links) { items.push_back(entry.second); } return {Status::Ok(), std::move(items)}; } Result Client::CreateVirtualSink(std::string_view name, const VirtualNodeOptions& options) { Status status = impl_->EnsureConnected(); if (!status.ok()) { return {status, {}}; } std::string name_value = name.empty() ? std::string() : std::string(name); pw_thread_loop_lock(impl_->thread_loop); auto result = impl_->CreateVirtualStreamLocked(name_value, false, options); pw_thread_loop_unlock(impl_->thread_loop); if (!result.ok()) { return {result.status, {}}; } VirtualSink sink; sink.node = NodeId{result.value}; sink.name = name_value.empty() ? "warppipe-sink" : name_value; impl_->AutoSave(); return {Status::Ok(), std::move(sink)}; } Result Client::CreateVirtualSource(std::string_view name, const VirtualNodeOptions& options) { Status status = impl_->EnsureConnected(); if (!status.ok()) { return {status, {}}; } std::string name_value = name.empty() ? std::string() : std::string(name); pw_thread_loop_lock(impl_->thread_loop); auto result = impl_->CreateVirtualStreamLocked(name_value, true, options); pw_thread_loop_unlock(impl_->thread_loop); if (!result.ok()) { return {result.status, {}}; } VirtualSource source; source.node = NodeId{result.value}; source.name = name_value.empty() ? "warppipe-source" : name_value; impl_->AutoSave(); return {Status::Ok(), std::move(source)}; } Status Client::RemoveNode(NodeId node) { Status status = impl_->EnsureConnected(); if (!status.ok()) { return status; } pw_thread_loop_lock(impl_->thread_loop); std::unique_ptr owned_stream; { std::lock_guard lock(impl_->cache_mutex); auto it = impl_->virtual_streams.find(node.value); if (it == impl_->virtual_streams.end()) { pw_thread_loop_unlock(impl_->thread_loop); return Status::Error(StatusCode::kNotFound, "node not managed by warppipe"); } owned_stream = std::move(it->second); impl_->virtual_streams.erase(it); } if (owned_stream) { if (owned_stream->module) { std::string capture_name = "input." + owned_stream->name; { std::lock_guard lock(impl_->cache_mutex); for (auto it = impl_->loopback_internal_nodes.begin(); it != impl_->loopback_internal_nodes.end(); ++it) { auto node_it = impl_->nodes.find(*it); if (node_it != impl_->nodes.end() && node_it->second.name == capture_name) { impl_->loopback_internal_nodes.erase(it); break; } } } pw_impl_module_destroy(owned_stream->module); owned_stream->module = nullptr; } else if (owned_stream->proxy) { pw_proxy_destroy(owned_stream->proxy); owned_stream->proxy = nullptr; } else if (owned_stream->stream) { pw_stream_disconnect(owned_stream->stream); pw_stream_destroy(owned_stream->stream); owned_stream->stream = nullptr; } } pw_thread_loop_unlock(impl_->thread_loop); impl_->AutoSave(); return Status::Ok(); } Status Client::SetNodeVolume(NodeId node, float volume, bool mute) { Status status = impl_->EnsureConnected(); if (!status.ok()) { return status; } if (node.value == 0) { return Status::Error(StatusCode::kInvalidArgument, "invalid node id"); } volume = std::clamp(volume, 0.0f, 1.5f); pw_thread_loop_lock(impl_->thread_loop); bool is_virtual = false; { std::lock_guard lock(impl_->cache_mutex); auto node_it = impl_->nodes.find(node.value); if (node_it == impl_->nodes.end()) { pw_thread_loop_unlock(impl_->thread_loop); return Status::Error(StatusCode::kNotFound, "node not found"); } is_virtual = node_it->second.is_virtual; } uint32_t n_channels; pw_stream* own_stream = nullptr; pw_proxy* proxy = nullptr; { std::lock_guard lock(impl_->cache_mutex); auto streamIt = impl_->virtual_streams.find(node.value); if (streamIt != impl_->virtual_streams.end() && streamIt->second->stream) { own_stream = streamIt->second->stream; n_channels = streamIt->second->channels; } if (!own_stream) { auto proxyIt = impl_->node_proxies.find(node.value); if (proxyIt == impl_->node_proxies.end() || !proxyIt->second->proxy) { pw_thread_loop_unlock(impl_->thread_loop); return Status::Error(StatusCode::kNotFound, "no proxy bound for node"); } proxy = proxyIt->second->proxy; auto chIt = impl_->node_channel_counts.find(node.value); n_channels = (chIt != impl_->node_channel_counts.end()) ? chIt->second : 2; } } if (n_channels == 0) n_channels = 2; uint8_t buffer[512]; spa_pod_builder builder = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer)); uint32_t vol_prop = is_virtual ? SPA_PROP_monitorVolumes : SPA_PROP_channelVolumes; uint32_t mute_prop = is_virtual ? SPA_PROP_monitorMute : SPA_PROP_mute; spa_pod_frame obj_frame; spa_pod_builder_push_object(&builder, &obj_frame, SPA_TYPE_OBJECT_Props, SPA_PARAM_Props); spa_pod_builder_prop(&builder, vol_prop, 0); spa_pod_frame arr_frame; spa_pod_builder_push_array(&builder, &arr_frame); for (uint32_t ch = 0; ch < n_channels; ++ch) { spa_pod_builder_float(&builder, volume); } spa_pod_builder_pop(&builder, &arr_frame); spa_pod_builder_prop(&builder, mute_prop, 0); spa_pod_builder_bool(&builder, mute); auto* param = static_cast( spa_pod_builder_pop(&builder, &obj_frame)); if (own_stream) { pw_stream_set_param(own_stream, SPA_PARAM_Props, param); } else { pw_node_set_param(reinterpret_cast(proxy), SPA_PARAM_Props, 0, param); } { std::lock_guard lock(impl_->cache_mutex); impl_->volume_states[node.value] = VolumeState{volume, mute}; } pw_thread_loop_unlock(impl_->thread_loop); impl_->AutoSave(); return Status::Ok(); } Result Client::GetNodeVolume(NodeId node) const { std::lock_guard lock(impl_->cache_mutex); auto it = impl_->volume_states.find(node.value); if (it == impl_->volume_states.end()) { return {Status::Ok(), VolumeState{}}; } return {Status::Ok(), it->second}; } Status Client::EnsureNodeMeter(NodeId node) { if (node.value == 0) { return Status::Error(StatusCode::kInvalidArgument, "invalid node id"); } std::string target_name; bool capture_sink = false; { std::lock_guard lock(impl_->cache_mutex); auto node_it = impl_->nodes.find(node.value); if (node_it == impl_->nodes.end()) { return Status::Error(StatusCode::kNotFound, "node not found"); } impl_->metered_nodes.insert(node.value); if (impl_->meter_states.find(node.value) == impl_->meter_states.end()) { impl_->meter_states[node.value] = MeterState{}; } if (impl_->live_meters.find(node.value) != impl_->live_meters.end()) { return Status::Ok(); } target_name = node_it->second.name; const auto& mc = node_it->second.media_class; capture_sink = (mc.find("Sink") != std::string::npos || mc.find("Duplex") != std::string::npos); } if (!impl_->thread_loop || !impl_->core) { return Status::Ok(); } pw_thread_loop_lock(impl_->thread_loop); auto meter = std::make_unique(); meter->node_id = node.value; meter->target_name = target_name; pw_properties* props = pw_properties_new( PW_KEY_MEDIA_TYPE, "Audio", PW_KEY_MEDIA_CATEGORY, "Capture", PW_KEY_MEDIA_CLASS, "Stream/Input/Audio", PW_KEY_TARGET_OBJECT, target_name.c_str(), PW_KEY_STREAM_MONITOR, "true", PW_KEY_NODE_NAME, "", nullptr); if (capture_sink) { pw_properties_set(props, PW_KEY_STREAM_CAPTURE_SINK, "true"); } meter->stream = pw_stream_new_simple( pw_thread_loop_get_loop(impl_->thread_loop), "warppipe-node-meter", props, &kNodeMeterEvents, meter.get()); if (!meter->stream) { pw_thread_loop_unlock(impl_->thread_loop); return Status::Ok(); } uint8_t buffer[512]; spa_pod_builder builder = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer)); spa_audio_info_raw info{}; info.format = SPA_AUDIO_FORMAT_F32; info.rate = 48000; info.channels = 2; info.position[0] = SPA_AUDIO_CHANNEL_FL; info.position[1] = SPA_AUDIO_CHANNEL_FR; const spa_pod* params[1]; params[0] = spa_format_audio_raw_build(&builder, SPA_PARAM_EnumFormat, &info); int res = pw_stream_connect( meter->stream, PW_DIRECTION_INPUT, PW_ID_ANY, static_cast( PW_STREAM_FLAG_AUTOCONNECT | PW_STREAM_FLAG_MAP_BUFFERS | PW_STREAM_FLAG_RT_PROCESS), params, 1); if (res != 0) { pw_stream_destroy(meter->stream); pw_thread_loop_unlock(impl_->thread_loop); return Status::Ok(); } { std::lock_guard lock(impl_->cache_mutex); impl_->live_meters[node.value] = std::move(meter); } pw_thread_loop_unlock(impl_->thread_loop); return Status::Ok(); } Status Client::DisableNodeMeter(NodeId node) { std::unique_ptr meter; { std::lock_guard lock(impl_->cache_mutex); impl_->metered_nodes.erase(node.value); impl_->meter_states.erase(node.value); auto it = impl_->live_meters.find(node.value); if (it != impl_->live_meters.end()) { meter = std::move(it->second); impl_->live_meters.erase(it); } } if (meter && meter->stream && impl_->thread_loop) { pw_thread_loop_lock(impl_->thread_loop); pw_stream_destroy(meter->stream); meter->stream = nullptr; pw_thread_loop_unlock(impl_->thread_loop); } impl_->AutoSave(); return Status::Ok(); } Result Client::GetVirtualNodeInfo(NodeId node) const { std::lock_guard lock(impl_->cache_mutex); auto it = impl_->virtual_streams.find(node.value); if (it == impl_->virtual_streams.end()) return {Status::Error(StatusCode::kNotFound, "not a virtual node"), {}}; const auto &sd = *it->second; VirtualNodeInfo info; info.node = node; info.name = sd.name; info.is_source = sd.is_source; info.loopback = sd.loopback; info.target_node = sd.target_node; info.rate = sd.rate; info.channels = sd.channels; return {Status::Ok(), std::move(info)}; } Result Client::NodeMeterPeak(NodeId node) const { std::lock_guard lock(impl_->cache_mutex); auto live_it = impl_->live_meters.find(node.value); if (live_it != impl_->live_meters.end() && live_it->second) { MeterState state; state.peak_left = live_it->second->peak_left.load(std::memory_order_relaxed); state.peak_right = live_it->second->peak_right.load(std::memory_order_relaxed); return {Status::Ok(), state}; } auto it = impl_->meter_states.find(node.value); if (it == impl_->meter_states.end()) { return {Status::Error(StatusCode::kNotFound, "node not metered"), {}}; } return {Status::Ok(), it->second}; } Result Client::MeterPeak() const { std::lock_guard lock(impl_->cache_mutex); if (impl_->master_meter_data) { MeterState state; state.peak_left = impl_->master_meter_data->peak_left.load(std::memory_order_relaxed); state.peak_right = impl_->master_meter_data->peak_right.load(std::memory_order_relaxed); return {Status::Ok(), state}; } return {Status::Ok(), impl_->master_meter}; } Result Client::CreateLink(PortId output, PortId input, const LinkOptions& options) { Status status = impl_->EnsureConnected(); if (!status.ok()) { return {status, {}}; } if (output.value == 0 || input.value == 0) { return {Status::Error(StatusCode::kInvalidArgument, "invalid port id"), {}}; } pw_thread_loop_lock(impl_->thread_loop); PortInfo out_port; PortInfo in_port; { std::lock_guard lock(impl_->cache_mutex); auto out_it = impl_->ports.find(output.value); if (out_it == impl_->ports.end()) { pw_thread_loop_unlock(impl_->thread_loop); return {Status::Error(StatusCode::kNotFound, "output port not found"), {}}; } auto in_it = impl_->ports.find(input.value); if (in_it == impl_->ports.end()) { pw_thread_loop_unlock(impl_->thread_loop); return {Status::Error(StatusCode::kNotFound, "input port not found"), {}}; } out_port = out_it->second; in_port = in_it->second; for (const auto& entry : impl_->links) { const Link& link = entry.second; if (link.output_port.value == output.value && link.input_port.value == input.value) { pw_thread_loop_unlock(impl_->thread_loop); return {Status::Error(StatusCode::kInvalidArgument, "link already exists"), {}}; } } } if (out_port.is_input || !in_port.is_input) { pw_thread_loop_unlock(impl_->thread_loop); return {Status::Error(StatusCode::kInvalidArgument, "port directions do not match"), {}}; } pw_properties* props = pw_properties_new(nullptr, nullptr); if (!props) { pw_thread_loop_unlock(impl_->thread_loop); return {Status::Error(StatusCode::kInternal, "failed to allocate link properties"), {}}; } pw_properties_setf(props, PW_KEY_LINK_OUTPUT_PORT, "%u", output.value); pw_properties_setf(props, PW_KEY_LINK_INPUT_PORT, "%u", input.value); if (options.passive) { pw_properties_set(props, PW_KEY_LINK_PASSIVE, "true"); } if (options.linger) { pw_properties_set(props, PW_KEY_OBJECT_LINGER, "true"); } pw_proxy* proxy = reinterpret_cast( pw_core_create_object(impl_->core, "link-factory", PW_TYPE_INTERFACE_Link, PW_VERSION_LINK, &props->dict, 0)); pw_properties_free(props); if (!proxy) { pw_thread_loop_unlock(impl_->thread_loop); return {Status::Error(StatusCode::kUnavailable, "failed to create link"), {}}; } { std::lock_guard lock(impl_->cache_mutex); impl_->pending_link_pairs.emplace_back(output.value, input.value); } auto link_proxy = std::make_unique(); link_proxy->proxy = proxy; link_proxy->loop = impl_->thread_loop; link_proxy->output_port = output.value; link_proxy->input_port = input.value; pw_proxy_add_listener(proxy, &link_proxy->listener, &kLinkProxyEvents, link_proxy.get()); int wait_attempts = 0; while (link_proxy->id == SPA_ID_INVALID && !link_proxy->failed && wait_attempts < 3) { int wait_res = pw_thread_loop_timed_wait(impl_->thread_loop, kSyncWaitSeconds); if (wait_res == -ETIMEDOUT) { break; } ++wait_attempts; } auto remove_pending = [&] { std::lock_guard lock(impl_->cache_mutex); std::erase_if(impl_->pending_link_pairs, [&](const auto& p) { return p.first == output.value && p.second == input.value; }); }; if (link_proxy->failed) { std::string error = link_proxy->error.empty() ? "link creation failed" : link_proxy->error; remove_pending(); pw_proxy_destroy(proxy); pw_thread_loop_unlock(impl_->thread_loop); return {Status::Error(StatusCode::kUnavailable, std::move(error)), {}}; } if (link_proxy->id == SPA_ID_INVALID) { remove_pending(); pw_proxy_destroy(proxy); pw_thread_loop_unlock(impl_->thread_loop); return {Status::Error(StatusCode::kTimeout, "timed out waiting for link id"), {}}; } Link link; link.id = LinkId{link_proxy->id}; link.output_port = output; link.input_port = input; { std::lock_guard lock(impl_->cache_mutex); impl_->link_proxies.emplace(link_proxy->id, std::move(link_proxy)); impl_->links[link.id.value] = link; std::erase_if(impl_->pending_link_pairs, [&](const auto& p) { return p.first == output.value && p.second == input.value; }); } pw_thread_loop_unlock(impl_->thread_loop); impl_->AutoSave(); return {Status::Ok(), link}; } Result Client::CreateLinkByName(std::string_view output_node, std::string_view output_port, std::string_view input_node, std::string_view input_port, const LinkOptions& options) { Status status = impl_->EnsureConnected(); if (!status.ok()) { return {status, {}}; } if (output_node.empty() || output_port.empty() || input_node.empty() || input_port.empty()) { return {Status::Error(StatusCode::kInvalidArgument, "node or port name missing"), {}}; } std::optional out_id; std::optional in_id; { std::lock_guard lock(impl_->cache_mutex); for (const auto& entry : impl_->ports) { const PortInfo& port = entry.second; auto node_it = impl_->nodes.find(port.node.value); if (node_it == impl_->nodes.end()) { continue; } const NodeInfo& node = node_it->second; if (port.is_input) { if (node.name == input_node && port.name == input_port) { in_id = port.id; } } else { if (node.name == output_node && port.name == output_port) { out_id = port.id; } } if (out_id && in_id) { break; } } } if (!out_id || !in_id) { return {Status::Error(StatusCode::kNotFound, "matching ports not found"), {}}; } return CreateLink(*out_id, *in_id, options); } Status Client::RemoveLink(LinkId link) { Status status = impl_->EnsureConnected(); if (!status.ok()) { return status; } pw_thread_loop_lock(impl_->thread_loop); bool removed = false; Impl::SavedLink removed_link; { std::lock_guard lock(impl_->cache_mutex); auto link_it = impl_->links.find(link.value); if (link_it != impl_->links.end()) { auto op = impl_->ports.find(link_it->second.output_port.value); auto ip = impl_->ports.find(link_it->second.input_port.value); if (op != impl_->ports.end() && ip != impl_->ports.end()) { auto on = impl_->nodes.find(op->second.node.value); auto in_ = impl_->nodes.find(ip->second.node.value); if (on != impl_->nodes.end() && in_ != impl_->nodes.end()) { removed_link = {on->second.name, op->second.name, in_->second.name, ip->second.name}; } } } auto it = impl_->link_proxies.find(link.value); if (it != impl_->link_proxies.end()) { if (it->second && it->second->proxy) { spa_hook_remove(&it->second->listener); pw_proxy_destroy(it->second->proxy); } if (impl_->registry) { pw_registry_destroy(impl_->registry, link.value); } impl_->link_proxies.erase(it); auto link_it2 = impl_->links.find(link.value); if (link_it2 != impl_->links.end()) { uint32_t op = link_it2->second.output_port.value; uint32_t ip = link_it2->second.input_port.value; impl_->links.erase(link_it2); std::erase_if(impl_->auto_link_claimed_pairs, [&](const auto& pair) { return pair.first == op && pair.second == ip; }); } removed = true; } } if (!removed && impl_->registry) { int res = pw_registry_destroy(impl_->registry, link.value); if (res < 0) { pw_thread_loop_unlock(impl_->thread_loop); return Status::Error(StatusCode::kNotFound, "link not found"); } { std::lock_guard lock(impl_->cache_mutex); auto link_it = impl_->links.find(link.value); uint32_t out_port = 0, in_port = 0; if (link_it != impl_->links.end()) { out_port = link_it->second.output_port.value; in_port = link_it->second.input_port.value; impl_->links.erase(link_it); } if (out_port && in_port) { for (auto& p : impl_->saved_link_proxies) { if (p && p->output_port == out_port && p->input_port == in_port) { spa_hook_remove(&p->listener); if (p->proxy) pw_proxy_destroy(p->proxy); p->proxy = nullptr; } } std::erase_if(impl_->saved_link_proxies, [&](const auto& p) { return p && p->output_port == out_port && p->input_port == in_port; }); for (auto& p : impl_->auto_link_proxies) { if (p && p->output_port == out_port && p->input_port == in_port) { spa_hook_remove(&p->listener); if (p->proxy) pw_proxy_destroy(p->proxy); p->proxy = nullptr; } } std::erase_if(impl_->auto_link_proxies, [&](const auto& p) { return p && p->output_port == out_port && p->input_port == in_port; }); std::erase_if(impl_->auto_link_claimed_pairs, [&](const auto& pair) { return pair.first == out_port && pair.second == in_port; }); } } removed = true; } pw_thread_loop_unlock(impl_->thread_loop); if (removed) { if (!removed_link.out_node.empty()) { std::lock_guard lock(impl_->cache_mutex); auto& sl = impl_->saved_links; sl.erase(std::remove(sl.begin(), sl.end(), removed_link), sl.end()); } impl_->AutoSave(); } return removed ? Status::Ok() : Status::Error(StatusCode::kNotFound, "link not found"); } Result Client::AddRouteRule(const RouteRule& rule) { if (rule.match.application_name.empty() && rule.match.process_binary.empty() && rule.match.media_role.empty()) { return {Status::Error(StatusCode::kInvalidArgument, "rule match has no criteria"), {}}; } if (rule.direction == RuleDirection::kPlayback && rule.target_node.empty()) { return {Status::Error(StatusCode::kInvalidArgument, "rule target node is empty"), {}}; } if (rule.direction == RuleDirection::kCapture && rule.source_node.empty()) { return {Status::Error(StatusCode::kInvalidArgument, "rule source node is empty"), {}}; } uint32_t id = 0; { std::lock_guard lock(impl_->cache_mutex); id = impl_->next_rule_id++; RouteRule stored = rule; stored.id = RuleId{id}; impl_->route_rules[id] = std::move(stored); for (const auto& node_entry : impl_->nodes) { if (MatchesRule(node_entry.second, rule.match)) { PendingAutoLink pending; pending.rule_id = id; pending.direction = rule.direction; if (rule.direction == RuleDirection::kPlayback) { pending.source_node_id = node_entry.first; pending.target_node_name = rule.target_node; } else { pending.source_node_id = node_entry.first; pending.target_node_name = rule.source_node; } impl_->pending_auto_links.push_back(std::move(pending)); } } } if (!impl_->pending_auto_links.empty() && impl_->thread_loop) { pw_thread_loop_lock(impl_->thread_loop); impl_->SchedulePolicySync(); pw_thread_loop_unlock(impl_->thread_loop); } impl_->AutoSave(); return {Status::Ok(), RuleId{id}}; } Status Client::RemoveRouteRule(RuleId id) { RuleMatch match; std::string target_node; std::string source_node; RuleDirection direction = RuleDirection::kPlayback; { std::lock_guard lock(impl_->cache_mutex); auto it = impl_->route_rules.find(id.value); if (it == impl_->route_rules.end()) { return Status::Error(StatusCode::kNotFound, "route rule not found"); } match = it->second.match; target_node = it->second.target_node; source_node = it->second.source_node; direction = it->second.direction; impl_->route_rules.erase(it); auto pending_it = impl_->pending_auto_links.begin(); while (pending_it != impl_->pending_auto_links.end()) { if (pending_it->rule_id == id.value) { pending_it = impl_->pending_auto_links.erase(pending_it); } else { ++pending_it; } } impl_->auto_link_claimed_pairs.clear(); } std::vector links_to_destroy; std::vector> pairs_to_remove; { std::lock_guard lock(impl_->cache_mutex); if (direction == RuleDirection::kPlayback) { uint32_t target_id = 0; for (const auto& n : impl_->nodes) { if (n.second.name == target_node) { target_id = n.first; break; } } if (target_id) { for (const auto& link_entry : impl_->links) { uint32_t out_port = link_entry.second.output_port.value; uint32_t in_port = link_entry.second.input_port.value; auto in_port_it = impl_->ports.find(in_port); if (in_port_it == impl_->ports.end() || in_port_it->second.node.value != target_id) continue; auto out_port_it = impl_->ports.find(out_port); if (out_port_it == impl_->ports.end()) continue; auto src_node_it = impl_->nodes.find(out_port_it->second.node.value); if (src_node_it == impl_->nodes.end()) continue; if (MatchesRule(src_node_it->second, match)) { links_to_destroy.push_back(link_entry.first); pairs_to_remove.emplace_back(out_port, in_port); } } } } else { uint32_t source_id = 0; for (const auto& n : impl_->nodes) { if (n.second.name == source_node) { source_id = n.first; break; } } if (source_id) { for (const auto& link_entry : impl_->links) { uint32_t out_port = link_entry.second.output_port.value; uint32_t in_port = link_entry.second.input_port.value; auto out_port_it = impl_->ports.find(out_port); if (out_port_it == impl_->ports.end() || out_port_it->second.node.value != source_id) continue; auto in_port_it = impl_->ports.find(in_port); if (in_port_it == impl_->ports.end()) continue; auto dest_node_it = impl_->nodes.find(in_port_it->second.node.value); if (dest_node_it == impl_->nodes.end()) continue; if (MatchesRule(dest_node_it->second, match)) { links_to_destroy.push_back(link_entry.first); pairs_to_remove.emplace_back(out_port, in_port); } } } } } if (!links_to_destroy.empty() && impl_->thread_loop) { pw_thread_loop_lock(impl_->thread_loop); for (uint32_t link_id : links_to_destroy) { pw_registry_destroy(impl_->registry, link_id); } { std::lock_guard lock(impl_->cache_mutex); for (const auto& pair : pairs_to_remove) { for (auto& p : impl_->auto_link_proxies) { if (p && p->output_port == pair.first && p->input_port == pair.second) { spa_hook_remove(&p->listener); } } std::erase_if(impl_->auto_link_proxies, [&](const auto& p) { return p && p->output_port == pair.first && p->input_port == pair.second; }); } } pw_thread_loop_unlock(impl_->thread_loop); } impl_->AutoSave(); return Status::Ok(); } Result> Client::ListRouteRules() { std::lock_guard lock(impl_->cache_mutex); std::vector items; items.reserve(impl_->route_rules.size()); for (const auto& entry : impl_->route_rules) { items.push_back(entry.second); } return {Status::Ok(), std::move(items)}; } Result Client::GetDefaults() { std::lock_guard lock(impl_->cache_mutex); return {Status::Ok(), impl_->defaults}; } Status Client::SetDefaultSink(std::string_view node_name) { if (!impl_->metadata_proxy) { return Status::Error(StatusCode::kUnavailable, "metadata not available"); } if (node_name.empty()) { return Status::Error(StatusCode::kInvalidArgument, "node name is empty"); } std::string json_value = "{\"name\":\"" + std::string(node_name) + "\"}"; pw_thread_loop_lock(impl_->thread_loop); pw_metadata_set_property( reinterpret_cast(impl_->metadata_proxy), 0, "default.configured.audio.sink", "Spa:String:JSON", json_value.c_str()); pw_thread_loop_unlock(impl_->thread_loop); return Status::Ok(); } Status Client::SetDefaultSource(std::string_view node_name) { if (!impl_->metadata_proxy) { return Status::Error(StatusCode::kUnavailable, "metadata not available"); } if (node_name.empty()) { return Status::Error(StatusCode::kInvalidArgument, "node name is empty"); } std::string json_value = "{\"name\":\"" + std::string(node_name) + "\"}"; pw_thread_loop_lock(impl_->thread_loop); pw_metadata_set_property( reinterpret_cast(impl_->metadata_proxy), 0, "default.configured.audio.source", "Spa:String:JSON", json_value.c_str()); pw_thread_loop_unlock(impl_->thread_loop); return Status::Ok(); } Status Client::SaveConfig(std::string_view path) { if (path.empty()) { return Status::Error(StatusCode::kInvalidArgument, "path is empty"); } nlohmann::json j; j["version"] = 1; nlohmann::json nodes_array = nlohmann::json::array(); nlohmann::json rules_array = nlohmann::json::array(); { std::lock_guard lock(impl_->cache_mutex); for (const auto& entry : impl_->virtual_streams) { if (!entry.second) { continue; } const StreamData& sd = *entry.second; nlohmann::json node_obj; node_obj["name"] = sd.name; node_obj["is_source"] = sd.is_source; node_obj["rate"] = sd.rate; node_obj["channels"] = sd.channels; node_obj["loopback"] = sd.loopback; node_obj["target_node"] = sd.target_node; auto volIt = impl_->volume_states.find(entry.first); if (volIt != impl_->volume_states.end()) { node_obj["volume"] = volIt->second.volume; node_obj["mute"] = volIt->second.mute; } nodes_array.push_back(std::move(node_obj)); } for (const auto& entry : impl_->route_rules) { nlohmann::json rule_obj; rule_obj["match"]["application_name"] = entry.second.match.application_name; rule_obj["match"]["process_binary"] = entry.second.match.process_binary; rule_obj["match"]["media_role"] = entry.second.match.media_role; rule_obj["target_node"] = entry.second.target_node; rule_obj["direction"] = entry.second.direction == RuleDirection::kCapture ? "capture" : "playback"; rule_obj["source_node"] = entry.second.source_node; rules_array.push_back(std::move(rule_obj)); } } nlohmann::json links_array = nlohmann::json::array(); { std::lock_guard lock(impl_->cache_mutex); std::vector live; for (const auto& entry : impl_->link_proxies) { if (!entry.second) { continue; } auto link_it = impl_->links.find(entry.first); if (link_it == impl_->links.end()) { continue; } const Link& link = link_it->second; auto out_port_it = impl_->ports.find(link.output_port.value); auto in_port_it = impl_->ports.find(link.input_port.value); if (out_port_it == impl_->ports.end() || in_port_it == impl_->ports.end()) { continue; } auto out_node_it = impl_->nodes.find(out_port_it->second.node.value); auto in_node_it = impl_->nodes.find(in_port_it->second.node.value); if (out_node_it == impl_->nodes.end() || in_node_it == impl_->nodes.end()) { continue; } Impl::SavedLink sl{out_node_it->second.name, out_port_it->second.name, in_node_it->second.name, in_port_it->second.name}; live.push_back(sl); nlohmann::json link_obj; link_obj["out_node"] = sl.out_node; link_obj["out_port"] = sl.out_port; link_obj["in_node"] = sl.in_node; link_obj["in_port"] = sl.in_port; links_array.push_back(std::move(link_obj)); } for (const auto& lp : impl_->saved_link_proxies) { if (!lp || lp->id == SPA_ID_INVALID) continue; auto link_it = impl_->links.find(lp->id); if (link_it == impl_->links.end()) continue; const Link& link = link_it->second; auto out_port_it = impl_->ports.find(link.output_port.value); auto in_port_it = impl_->ports.find(link.input_port.value); if (out_port_it == impl_->ports.end() || in_port_it == impl_->ports.end()) continue; auto out_node_it = impl_->nodes.find(out_port_it->second.node.value); auto in_node_it = impl_->nodes.find(in_port_it->second.node.value); if (out_node_it == impl_->nodes.end() || in_node_it == impl_->nodes.end()) continue; Impl::SavedLink sl{out_node_it->second.name, out_port_it->second.name, in_node_it->second.name, in_port_it->second.name}; bool dup = false; for (const auto& l : live) { if (l == sl) { dup = true; break; } } if (!dup) { live.push_back(sl); nlohmann::json link_obj; link_obj["out_node"] = sl.out_node; link_obj["out_port"] = sl.out_port; link_obj["in_node"] = sl.in_node; link_obj["in_port"] = sl.in_port; links_array.push_back(std::move(link_obj)); } } for (const auto& sl : impl_->saved_links) { bool already = false; for (const auto& l : live) { if (l == sl) { already = true; break; } } if (!already) { nlohmann::json link_obj; link_obj["out_node"] = sl.out_node; link_obj["out_port"] = sl.out_port; link_obj["in_node"] = sl.in_node; link_obj["in_port"] = sl.in_port; links_array.push_back(std::move(link_obj)); } } } j["virtual_nodes"] = std::move(nodes_array); j["route_rules"] = std::move(rules_array); j["links"] = std::move(links_array); std::string tmp_path = std::string(path) + ".tmp"; std::ofstream file(tmp_path); if (!file.is_open()) { return Status::Error(StatusCode::kInternal, "failed to open config file for writing"); } file << j.dump(2); file.close(); if (file.fail()) { return Status::Error(StatusCode::kInternal, "failed to write config file"); } if (std::rename(tmp_path.c_str(), std::string(path).c_str()) != 0) { return Status::Error(StatusCode::kInternal, "failed to rename config file"); } return Status::Ok(); } void Client::SetChangeCallback(ChangeCallback callback) { std::lock_guard lock(impl_->change_cb_mutex); impl_->change_callback = std::move(callback); } Status Client::LoadConfig(std::string_view path) { if (path.empty()) { return Status::Error(StatusCode::kInvalidArgument, "path is empty"); } std::string path_str(path); std::ifstream file(path_str); if (!file.is_open()) { return Status::Error(StatusCode::kNotFound, "config file not found"); } nlohmann::json j; try { j = nlohmann::json::parse(file); } catch (const nlohmann::json::parse_error& e) { return Status::Error(StatusCode::kInvalidArgument, std::string("config parse error: ") + e.what()); } if (!j.contains("version") || !j["version"].is_number_integer()) { return Status::Error(StatusCode::kInvalidArgument, "config missing version"); } impl_->loading_config = true; if (j.contains("route_rules") && j["route_rules"].is_array()) { for (const auto& rule_obj : j["route_rules"]) { try { RouteRule rule; if (rule_obj.contains("match") && rule_obj["match"].is_object()) { const auto& m = rule_obj["match"]; rule.match.application_name = m.value("application_name", ""); rule.match.process_binary = m.value("process_binary", ""); rule.match.media_role = m.value("media_role", ""); } rule.target_node = rule_obj.value("target_node", ""); rule.source_node = rule_obj.value("source_node", ""); std::string dir_str = rule_obj.value("direction", "playback"); rule.direction = (dir_str == "capture") ? RuleDirection::kCapture : RuleDirection::kPlayback; bool has_match = !rule.match.application_name.empty() || !rule.match.process_binary.empty() || !rule.match.media_role.empty(); bool has_target = (rule.direction == RuleDirection::kPlayback && !rule.target_node.empty()) || (rule.direction == RuleDirection::kCapture && !rule.source_node.empty()); if (has_match && has_target) { AddRouteRule(rule); } } catch (...) { continue; } } } Status conn_status = impl_->EnsureConnected(); if (conn_status.ok() && j.contains("virtual_nodes") && j["virtual_nodes"].is_array()) { for (const auto& node_obj : j["virtual_nodes"]) { try { std::string name = node_obj.value("name", ""); if (name.empty()) { continue; } bool is_source = node_obj.value("is_source", false); VirtualNodeOptions opts; opts.format.rate = node_obj.value("rate", 48000u); opts.format.channels = node_obj.value("channels", 2u); bool loopback = node_obj.value("loopback", false); std::string target = node_obj.value("target_node", ""); if (loopback && !target.empty()) { opts.behavior = VirtualBehavior::kLoopback; opts.target_node = target; } Result result; if (is_source) { auto r = CreateVirtualSource(name, opts); result = {r.status, r.ok() ? r.value.node.value : 0}; } else { auto r = CreateVirtualSink(name, opts); result = {r.status, r.ok() ? r.value.node.value : 0}; } if (result.ok() && result.value != 0) { float vol = node_obj.value("volume", 1.0f); bool muted = node_obj.value("mute", false); if (vol != 1.0f || muted) { SetNodeVolume(NodeId{result.value}, vol, muted); } } } catch (...) { continue; } } } if (j.contains("links") && j["links"].is_array()) { { std::lock_guard lock(impl_->cache_mutex); for (const auto& link_obj : j["links"]) { try { std::string out_node = link_obj.value("out_node", ""); std::string out_port = link_obj.value("out_port", ""); std::string in_node = link_obj.value("in_node", ""); std::string in_port = link_obj.value("in_port", ""); if (out_node.empty() || out_port.empty() || in_node.empty() || in_port.empty()) { continue; } impl_->saved_links.push_back({out_node, out_port, in_node, in_port}); } catch (...) { continue; } } } if (conn_status.ok()) { pw_thread_loop_lock(impl_->thread_loop); impl_->SchedulePolicySync(); pw_thread_loop_unlock(impl_->thread_loop); } } impl_->loading_config = false; impl_->AutoSave(); return Status::Ok(); } #ifdef WARPPIPE_TESTING Status Client::Test_InsertNode(const NodeInfo& node) { if (!impl_) { return Status::Error(StatusCode::kInternal, "client not initialized"); } { std::lock_guard lock(impl_->cache_mutex); impl_->nodes[node.id.value] = node; impl_->CheckRulesForNode(node); } impl_->NotifyChange(); return Status::Ok(); } Status Client::Test_InsertPort(const PortInfo& port) { if (!impl_) { return Status::Error(StatusCode::kInternal, "client not initialized"); } { std::lock_guard lock(impl_->cache_mutex); impl_->ports[port.id.value] = port; } impl_->NotifyChange(); return Status::Ok(); } Status Client::Test_InsertLink(const Link& link) { if (!impl_) { return Status::Error(StatusCode::kInternal, "client not initialized"); } { std::lock_guard lock(impl_->cache_mutex); impl_->links[link.id.value] = link; } impl_->NotifyChange(); return Status::Ok(); } Status Client::Test_RemoveGlobal(uint32_t id) { if (!impl_) { return Status::Error(StatusCode::kInternal, "client not initialized"); } Client::Impl::RegistryGlobalRemove(impl_.get(), id); return Status::Ok(); } Status Client::Test_ForceDisconnect() { if (!impl_ || !impl_->thread_loop) { return Status::Error(StatusCode::kInternal, "thread loop not initialized"); } pw_thread_loop_lock(impl_->thread_loop); impl_->DisconnectLocked(); pw_thread_loop_unlock(impl_->thread_loop); return Status::Ok(); } Status Client::Test_TriggerPolicyCheck() { if (!impl_) { return Status::Error(StatusCode::kInternal, "client not initialized"); } impl_->ProcessPendingAutoLinks(); return Status::Ok(); } size_t Client::Test_GetPendingAutoLinkCount() const { if (!impl_) { return 0; } std::lock_guard lock(impl_->cache_mutex); return impl_->pending_auto_links.size(); } Status Client::Test_SetNodeVolume(NodeId node, float volume, bool mute) { if (!impl_) { return Status::Error(StatusCode::kUnavailable, "no impl"); } std::lock_guard lock(impl_->cache_mutex); if (impl_->nodes.find(node.value) == impl_->nodes.end()) { return Status::Error(StatusCode::kNotFound, "node not found"); } impl_->volume_states[node.value] = VolumeState{std::clamp(volume, 0.0f, 1.5f), mute}; return Status::Ok(); } Result Client::Test_GetNodeVolume(NodeId node) const { if (!impl_) { return {Status::Error(StatusCode::kUnavailable, "no impl"), {}}; } std::lock_guard lock(impl_->cache_mutex); auto it = impl_->volume_states.find(node.value); if (it == impl_->volume_states.end()) { return {Status::Ok(), VolumeState{}}; } return {Status::Ok(), it->second}; } Status Client::Test_SetNodeMeterPeak(NodeId node, float left, float right) { if (!impl_) { return Status::Error(StatusCode::kUnavailable, "no impl"); } std::lock_guard lock(impl_->cache_mutex); float cl = std::clamp(left, 0.0f, 1.0f); float cr = std::clamp(right, 0.0f, 1.0f); impl_->meter_states[node.value] = MeterState{cl, cr}; impl_->metered_nodes.insert(node.value); auto it = impl_->live_meters.find(node.value); if (it != impl_->live_meters.end() && it->second) { it->second->peak_left.store(cl, std::memory_order_relaxed); it->second->peak_right.store(cr, std::memory_order_relaxed); } return Status::Ok(); } Status Client::Test_SetMasterMeterPeak(float left, float right) { if (!impl_) { return Status::Error(StatusCode::kUnavailable, "no impl"); } std::lock_guard lock(impl_->cache_mutex); float cl = std::clamp(left, 0.0f, 1.0f); float cr = std::clamp(right, 0.0f, 1.0f); impl_->master_meter = MeterState{cl, cr}; if (impl_->master_meter_data) { impl_->master_meter_data->peak_left.store(cl, std::memory_order_relaxed); impl_->master_meter_data->peak_right.store(cr, std::memory_order_relaxed); } return Status::Ok(); } #endif } // namespace warppipe