#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace warppipe { namespace { constexpr int kSyncWaitSeconds = 2; constexpr uint32_t kDefaultRate = 48000; constexpr uint32_t kDefaultChannels = 2; const char* SafeLookup(const spa_dict* dict, const char* key) { if (!dict || !key) { return nullptr; } return spa_dict_lookup(dict, key); } std::string LookupString(const spa_dict* dict, const char* key) { const char* value = SafeLookup(dict, key); return value ? std::string(value) : std::string(); } bool ParseUint32(const char* value, uint32_t* out) { if (!value || !out) { return false; } char* end = nullptr; unsigned long parsed = std::strtoul(value, &end, 10); if (!end || end == value) { return false; } *out = static_cast(parsed); return true; } bool IsNodeType(const char* type) { return type && spa_streq(type, PW_TYPE_INTERFACE_Node); } bool IsPortType(const char* type) { return type && spa_streq(type, PW_TYPE_INTERFACE_Port); } bool IsLinkType(const char* type) { return type && spa_streq(type, PW_TYPE_INTERFACE_Link); } struct PendingAutoLink { uint32_t source_node_id = 0; std::string target_node_name; uint32_t rule_id = 0; }; bool MatchesRule(const NodeInfo& node, const RuleMatch& match) { bool any_field = false; if (!match.application_name.empty()) { any_field = true; if (node.application_name != match.application_name) { return false; } } if (!match.process_binary.empty()) { any_field = true; if (node.process_binary != match.process_binary) { return false; } } if (!match.media_role.empty()) { any_field = true; if (node.media_role != match.media_role) { return false; } } return any_field; } struct StreamData { pw_stream* stream = nullptr; spa_hook listener{}; pw_thread_loop* loop = nullptr; bool is_source = false; bool loopback = false; std::string target_node; std::string name; bool ready = false; bool failed = false; std::string error; uint32_t node_id = SPA_ID_INVALID; uint32_t channels = kDefaultChannels; uint32_t rate = kDefaultRate; }; struct LinkProxy { pw_proxy* proxy = nullptr; spa_hook listener{}; pw_thread_loop* loop = nullptr; bool done = false; bool failed = false; std::string error; uint32_t id = SPA_ID_INVALID; }; void LinkProxyBound(void* data, uint32_t global_id) { auto* link = static_cast(data); if (!link) { return; } link->id = global_id; link->done = true; if (link->loop) { pw_thread_loop_signal(link->loop, false); } } void LinkProxyRemoved(void* data) { auto* link = static_cast(data); if (!link) { return; } link->done = true; if (link->loop) { pw_thread_loop_signal(link->loop, false); } } void LinkProxyError(void* data, int, int res, const char* message) { auto* link = static_cast(data); if (!link) { return; } link->failed = true; link->error = message ? message : spa_strerror(res); if (link->loop) { pw_thread_loop_signal(link->loop, false); } } static const pw_proxy_events kLinkProxyEvents = { .version = PW_VERSION_PROXY_EVENTS, .bound = LinkProxyBound, .removed = LinkProxyRemoved, .error = LinkProxyError, }; void StreamProcess(void* data) { auto* stream_data = static_cast(data); if (!stream_data || !stream_data->stream) { return; } if (!stream_data->is_source) { struct pw_buffer* buffer = nullptr; while ((buffer = pw_stream_dequeue_buffer(stream_data->stream)) != nullptr) { pw_stream_queue_buffer(stream_data->stream, buffer); } return; } struct pw_buffer* buffer = nullptr; while ((buffer = pw_stream_dequeue_buffer(stream_data->stream)) != nullptr) { struct spa_buffer* spa_buffer = buffer->buffer; if (!spa_buffer) { pw_stream_queue_buffer(stream_data->stream, buffer); continue; } const uint32_t stride = sizeof(float) * stream_data->channels; for (uint32_t i = 0; i < spa_buffer->n_datas; ++i) { struct spa_data* data_entry = &spa_buffer->datas[i]; if (!data_entry->data || !data_entry->chunk) { continue; } std::memset(data_entry->data, 0, data_entry->maxsize); uint32_t frames = stride > 0 ? data_entry->maxsize / stride : 0; if (buffer->requested > 0 && buffer->requested < frames) { frames = buffer->requested; } data_entry->chunk->offset = 0; data_entry->chunk->stride = stride; data_entry->chunk->size = frames * stride; } pw_stream_queue_buffer(stream_data->stream, buffer); } } void StreamStateChanged(void* data, enum pw_stream_state, enum pw_stream_state state, const char* error) { auto* stream_data = static_cast(data); if (!stream_data) { return; } if (state == PW_STREAM_STATE_ERROR) { stream_data->failed = true; if (error) { stream_data->error = error; } } if (stream_data->stream) { uint32_t node_id = pw_stream_get_node_id(stream_data->stream); if (node_id != SPA_ID_INVALID) { stream_data->node_id = node_id; stream_data->ready = true; } } if (stream_data->loop) { pw_thread_loop_signal(stream_data->loop, false); } } static const pw_stream_events kStreamEvents = { .version = PW_VERSION_STREAM_EVENTS, .state_changed = StreamStateChanged, .process = StreamProcess, }; struct MeterStreamData { uint32_t node_id = 0; std::string target_name; pw_stream* stream = nullptr; spa_hook listener{}; std::atomic peak_left{0.0f}; std::atomic peak_right{0.0f}; }; void NodeMeterProcess(void* data) { auto* meter = static_cast(data); if (!meter || !meter->stream) { return; } pw_buffer* buf = pw_stream_dequeue_buffer(meter->stream); if (!buf || !buf->buffer || buf->buffer->n_datas == 0) { if (buf) { pw_stream_queue_buffer(meter->stream, buf); } return; } spa_data* d = &buf->buffer->datas[0]; if (!d->data || !d->chunk) { pw_stream_queue_buffer(meter->stream, buf); return; } const float* samples = static_cast(d->data); uint32_t count = d->chunk->size / sizeof(float); float left = 0.0f; float right = 0.0f; for (uint32_t i = 0; i + 1 < count; i += 2) { float l = std::fabs(samples[i]); float r = std::fabs(samples[i + 1]); if (l > left) left = l; if (r > right) right = r; } meter->peak_left.store(left, std::memory_order_relaxed); meter->peak_right.store(right, std::memory_order_relaxed); pw_stream_queue_buffer(meter->stream, buf); } static const pw_stream_events kNodeMeterEvents = { .version = PW_VERSION_STREAM_EVENTS, .process = NodeMeterProcess, }; } // namespace Status Status::Ok() { return Status{}; } Status Status::Error(StatusCode code, std::string message) { Status status; status.code = code; status.message = std::move(message); return status; } bool Status::ok() const { return code == StatusCode::kOk; } struct Client::Impl { ConnectionOptions options; pw_thread_loop* thread_loop = nullptr; pw_context* context = nullptr; pw_core* core = nullptr; pw_registry* registry = nullptr; spa_hook core_listener{}; spa_hook registry_listener{}; bool core_listener_attached = false; bool registry_listener_attached = false; bool connected = false; uint32_t pending_sync = 0; uint32_t last_sync = 0; Status last_error = Status::Ok(); std::mutex cache_mutex; std::unordered_map nodes; std::unordered_map ports; std::unordered_map links; std::unordered_map> virtual_streams; std::unordered_map> link_proxies; std::unordered_map volume_states; std::unordered_map meter_states; std::unordered_set metered_nodes; MeterState master_meter; std::unique_ptr master_meter_data; std::unordered_map> live_meters; uint32_t next_rule_id = 1; std::unordered_map route_rules; std::vector pending_auto_links; uint32_t policy_sync_seq = 0; bool policy_sync_pending = false; std::vector> auto_link_proxies; std::vector> saved_link_proxies; pw_proxy* metadata_proxy = nullptr; spa_hook metadata_listener{}; bool metadata_listener_attached = false; MetadataInfo defaults; bool loading_config = false; struct SavedLink { std::string out_node; std::string out_port; std::string in_node; std::string in_port; bool operator==(const SavedLink& o) const { return out_node == o.out_node && out_port == o.out_port && in_node == o.in_node && in_port == o.in_port; } }; std::vector saved_links; std::mutex change_cb_mutex; Client::ChangeCallback change_callback; void NotifyChange(); Status ConnectLocked(); void DisconnectLocked(); Status SyncLocked(); void ClearCache(); Status EnsureConnected(); Result CreateVirtualStreamLocked(std::string_view name, bool is_source, const VirtualNodeOptions& options); void CheckRulesForNode(const NodeInfo& node); void SchedulePolicySync(); void ProcessPendingAutoLinks(); void CreateAutoLinkAsync(uint32_t output_port, uint32_t input_port); void ProcessSavedLinks(); void CreateSavedLinkAsync(uint32_t output_port, uint32_t input_port); void AutoSave(); void SetupMasterMeter(); void TeardownMasterMeter(); void TeardownAllLiveMeters(); static void RegistryGlobal(void* data, uint32_t id, uint32_t permissions, const char* type, uint32_t version, const spa_dict* props); static void RegistryGlobalRemove(void* data, uint32_t id); static void CoreDone(void* data, uint32_t id, int seq); static void CoreError(void* data, uint32_t id, int seq, int res, const char* message); static int MetadataProperty(void* data, uint32_t subject, const char* key, const char* type, const char* value); }; void Client::Impl::RegistryGlobal(void* data, uint32_t id, uint32_t, const char* type, uint32_t, const spa_dict* props) { auto* impl = static_cast(data); if (!impl) { return; } bool notify = false; { std::lock_guard lock(impl->cache_mutex); if (IsNodeType(type)) { NodeInfo info; info.id = NodeId{id}; info.name = LookupString(props, PW_KEY_NODE_NAME); info.description = LookupString(props, PW_KEY_NODE_DESCRIPTION); info.media_class = LookupString(props, PW_KEY_MEDIA_CLASS); info.application_name = LookupString(props, PW_KEY_APP_NAME); info.process_binary = LookupString(props, PW_KEY_APP_PROCESS_BINARY); info.media_role = LookupString(props, PW_KEY_MEDIA_ROLE); std::string virt_str = LookupString(props, PW_KEY_NODE_VIRTUAL); info.is_virtual = (virt_str == "true"); impl->nodes[id] = info; impl->CheckRulesForNode(info); notify = true; } else if (IsPortType(type)) { PortInfo info; info.id = PortId{id}; info.name = LookupString(props, PW_KEY_PORT_NAME); info.is_input = false; uint32_t node_id = 0; if (ParseUint32(SafeLookup(props, PW_KEY_NODE_ID), &node_id)) { info.node = NodeId{node_id}; } const char* direction = SafeLookup(props, PW_KEY_PORT_DIRECTION); if (direction && spa_streq(direction, "in")) { info.is_input = true; } impl->ports[id] = info; if (!impl->pending_auto_links.empty() || !impl->saved_links.empty()) { impl->SchedulePolicySync(); } notify = true; } else if (IsLinkType(type)) { Link info; info.id = LinkId{id}; uint32_t out_port = 0; uint32_t in_port = 0; if (ParseUint32(SafeLookup(props, PW_KEY_LINK_OUTPUT_PORT), &out_port)) { info.output_port = PortId{out_port}; } if (ParseUint32(SafeLookup(props, PW_KEY_LINK_INPUT_PORT), &in_port)) { info.input_port = PortId{in_port}; } impl->links[id] = std::move(info); notify = true; } } if (notify) { impl->NotifyChange(); return; } std::lock_guard lock(impl->cache_mutex); if (type && spa_streq(type, PW_TYPE_INTERFACE_Metadata)) { const char* meta_name = SafeLookup(props, "metadata.name"); if (meta_name && spa_streq(meta_name, "default") && !impl->metadata_proxy) { impl->metadata_proxy = reinterpret_cast( pw_registry_bind(impl->registry, id, PW_TYPE_INTERFACE_Metadata, PW_VERSION_METADATA, 0)); if (impl->metadata_proxy) { static const pw_metadata_events metadata_events = { .version = PW_VERSION_METADATA_EVENTS, .property = MetadataProperty, }; pw_metadata_add_listener( reinterpret_cast(impl->metadata_proxy), &impl->metadata_listener, &metadata_events, impl); impl->metadata_listener_attached = true; } } } } void Client::Impl::RegistryGlobalRemove(void* data, uint32_t id) { auto* impl = static_cast(data); if (!impl) { return; } { std::lock_guard lock(impl->cache_mutex); impl->virtual_streams.erase(id); impl->link_proxies.erase(id); auto node_it = impl->nodes.find(id); if (node_it != impl->nodes.end()) { impl->nodes.erase(node_it); std::vector removed_ports; for (auto it = impl->ports.begin(); it != impl->ports.end();) { if (it->second.node.value == id) { removed_ports.push_back(it->first); it = impl->ports.erase(it); } else { ++it; } } for (auto it = impl->links.begin(); it != impl->links.end();) { bool remove_link = false; for (uint32_t port_id : removed_ports) { if (it->second.input_port.value == port_id || it->second.output_port.value == port_id) { remove_link = true; break; } } if (remove_link) { it = impl->links.erase(it); } else { ++it; } } } else if (impl->ports.erase(id) > 0) { for (auto it = impl->links.begin(); it != impl->links.end();) { if (it->second.input_port.value == id || it->second.output_port.value == id) { it = impl->links.erase(it); } else { ++it; } } } else { impl->links.erase(id); } } impl->NotifyChange(); } void Client::Impl::CoreDone(void* data, uint32_t, int seq) { auto* impl = static_cast(data); if (!impl || !impl->thread_loop) { return; } if (seq >= static_cast(impl->pending_sync)) { impl->last_sync = static_cast(seq); pw_thread_loop_signal(impl->thread_loop, false); } if (impl->policy_sync_pending && seq >= static_cast(impl->policy_sync_seq)) { impl->policy_sync_pending = false; impl->ProcessPendingAutoLinks(); impl->ProcessSavedLinks(); } } void Client::Impl::CoreError(void* data, uint32_t, int, int res, const char* message) { auto* impl = static_cast(data); if (!impl) { return; } impl->connected = false; impl->last_error = Status::Error(StatusCode::kUnavailable, message ? message : spa_strerror(res)); if (impl->thread_loop) { pw_thread_loop_signal(impl->thread_loop, false); } } Status Client::Impl::SyncLocked() { if (!core || !thread_loop) { return Status::Error(StatusCode::kUnavailable, "pipewire core not connected"); } pending_sync = pw_core_sync(core, PW_ID_CORE, 0); if (pending_sync == SPA_ID_INVALID) { return Status::Error(StatusCode::kInternal, "failed to sync with pipewire core"); } while (last_sync < pending_sync) { int wait_res = pw_thread_loop_timed_wait(thread_loop, kSyncWaitSeconds); if (wait_res == -ETIMEDOUT) { return Status::Error(StatusCode::kTimeout, "timeout waiting for pipewire sync"); } } return Status::Ok(); } void Client::Impl::ClearCache() { std::lock_guard lock(cache_mutex); nodes.clear(); ports.clear(); links.clear(); pending_auto_links.clear(); policy_sync_pending = false; } void Client::Impl::NotifyChange() { std::lock_guard lock(change_cb_mutex); if (change_callback) { change_callback(); } } Status Client::Impl::EnsureConnected() { if (connected) { return Status::Ok(); } if (!options.autoconnect) { return Status::Error(StatusCode::kUnavailable, "pipewire core disconnected"); } if (!thread_loop) { return Status::Error(StatusCode::kUnavailable, "pipewire thread loop not running"); } pw_thread_loop_lock(thread_loop); Status status = ConnectLocked(); pw_thread_loop_unlock(thread_loop); return status; } Result Client::Impl::CreateVirtualStreamLocked(std::string_view name, bool is_source, const VirtualNodeOptions& options) { if (!core || !thread_loop) { return {Status::Error(StatusCode::kUnavailable, "pipewire core not connected"), 0}; } if (options.format.rate == 0 || options.format.channels == 0) { return {Status::Error(StatusCode::kInvalidArgument, "invalid audio format"), 0}; } if (options.behavior == VirtualBehavior::kLoopback && !options.target_node) { return {Status::Error(StatusCode::kInvalidArgument, "loopback requires target node"), 0}; } if (options.media_class_override && options.media_class_override->empty()) { return {Status::Error(StatusCode::kInvalidArgument, "missing media class"), 0}; } std::string stream_name = name.empty() ? (is_source ? "warppipe-source" : "warppipe-sink") : std::string(name); const char* media_class = is_source ? "Audio/Source" : "Audio/Sink"; std::string media_class_value = options.media_class_override ? *options.media_class_override : std::string(media_class); if (media_class_value.empty()) { return {Status::Error(StatusCode::kInvalidArgument, "missing media class"), 0}; } const char* media_category = is_source ? "Capture" : "Playback"; std::string display_name = options.display_name.empty() ? stream_name : options.display_name; const char* node_group = options.group.empty() ? nullptr : options.group.c_str(); { std::lock_guard lock(cache_mutex); for (const auto& entry : nodes) { if (entry.second.name == stream_name) { return {Status::Error(StatusCode::kInvalidArgument, "duplicate node name"), 0}; } } for (const auto& entry : virtual_streams) { if (entry.second && entry.second->name == stream_name) { return {Status::Error(StatusCode::kInvalidArgument, "duplicate node name"), 0}; } } if (options.behavior == VirtualBehavior::kLoopback && options.target_node) { bool found_target = false; for (const auto& entry : nodes) { if (entry.second.name == *options.target_node) { found_target = true; break; } } if (!found_target) { return {Status::Error(StatusCode::kNotFound, "target node not found"), 0}; } } } pw_properties* props = pw_properties_new(PW_KEY_MEDIA_TYPE, "Audio", PW_KEY_MEDIA_CATEGORY, media_category, PW_KEY_MEDIA_ROLE, "Music", PW_KEY_MEDIA_CLASS, media_class_value.c_str(), PW_KEY_NODE_NAME, stream_name.c_str(), PW_KEY_MEDIA_NAME, display_name.c_str(), PW_KEY_NODE_DESCRIPTION, display_name.c_str(), PW_KEY_NODE_VIRTUAL, "true", nullptr); if (!props) { return {Status::Error(StatusCode::kInternal, "failed to allocate stream properties"), 0}; } if (node_group) { pw_properties_set(props, PW_KEY_NODE_GROUP, node_group); } if (options.behavior == VirtualBehavior::kLoopback && options.target_node) { pw_properties_set(props, PW_KEY_TARGET_OBJECT, options.target_node->c_str()); } pw_stream* stream = pw_stream_new(core, stream_name.c_str(), props); if (!stream) { return {Status::Error(StatusCode::kUnavailable, "failed to create pipewire stream"), 0}; } auto stream_data = std::make_unique(); stream_data->stream = stream; stream_data->loop = thread_loop; stream_data->is_source = is_source; stream_data->loopback = options.behavior == VirtualBehavior::kLoopback; if (options.target_node) { stream_data->target_node = *options.target_node; } stream_data->name = stream_name; if (options.format.rate != 0) { stream_data->rate = options.format.rate; } if (options.format.channels != 0) { stream_data->channels = options.format.channels; } pw_stream_add_listener(stream, &stream_data->listener, &kStreamEvents, stream_data.get()); const struct spa_pod* params[1]; uint8_t buffer[1024]; spa_pod_builder builder = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer)); spa_audio_info_raw audio_info{}; audio_info.format = SPA_AUDIO_FORMAT_F32; audio_info.rate = stream_data->rate; audio_info.channels = stream_data->channels; params[0] = spa_format_audio_raw_build(&builder, SPA_PARAM_EnumFormat, &audio_info); enum pw_direction direction = is_source ? PW_DIRECTION_OUTPUT : PW_DIRECTION_INPUT; enum pw_stream_flags flags = PW_STREAM_FLAG_MAP_BUFFERS; if (options.behavior == VirtualBehavior::kLoopback && options.target_node) { flags = static_cast(flags | PW_STREAM_FLAG_AUTOCONNECT); } int res = pw_stream_connect(stream, direction, PW_ID_ANY, flags, params, 1); if (res < 0) { pw_stream_destroy(stream); return {Status::Error(StatusCode::kUnavailable, "failed to connect pipewire stream"), 0}; } uint32_t node_id = pw_stream_get_node_id(stream); int wait_attempts = 0; while (node_id == SPA_ID_INVALID && !stream_data->failed && wait_attempts < 3) { int wait_res = pw_thread_loop_timed_wait(thread_loop, kSyncWaitSeconds); if (wait_res == -ETIMEDOUT) { break; } node_id = stream_data->node_id; ++wait_attempts; } if (stream_data->failed) { std::string error = stream_data->error.empty() ? "stream entered error state" : stream_data->error; pw_stream_destroy(stream); return {Status::Error(StatusCode::kUnavailable, std::move(error)), 0}; } if (node_id == SPA_ID_INVALID) { pw_stream_destroy(stream); return {Status::Error(StatusCode::kTimeout, "timed out waiting for stream node id"), 0}; } stream_data->node_id = node_id; stream_data->ready = true; { std::lock_guard lock(cache_mutex); virtual_streams.emplace(node_id, std::move(stream_data)); } return {Status::Ok(), node_id}; } Status Client::Impl::ConnectLocked() { if (connected) { return Status::Ok(); } if (!thread_loop) { return Status::Error(StatusCode::kInternal, "thread loop not initialized"); } pw_loop* loop = pw_thread_loop_get_loop(thread_loop); if (!context) { context = pw_context_new(loop, nullptr, 0); if (!context) { return Status::Error(StatusCode::kUnavailable, "failed to create pipewire context"); } } pw_properties* props = pw_properties_new(PW_KEY_APP_NAME, options.application_name.c_str(), nullptr); if (options.remote_name && !options.remote_name->empty()) { pw_properties_set(props, PW_KEY_REMOTE_NAME, options.remote_name->c_str()); } core = pw_context_connect(context, props, 0); if (!core) { return Status::Error(StatusCode::kUnavailable, "failed to connect to pipewire core"); } static const pw_core_events core_events = { .version = PW_VERSION_CORE_EVENTS, .done = CoreDone, .error = CoreError, }; pw_core_add_listener(core, &core_listener, &core_events, this); core_listener_attached = true; registry = pw_core_get_registry(core, PW_VERSION_REGISTRY, 0); if (!registry) { return Status::Error(StatusCode::kUnavailable, "failed to get pipewire registry"); } static const pw_registry_events registry_events = { .version = PW_VERSION_REGISTRY_EVENTS, .global = RegistryGlobal, .global_remove = RegistryGlobalRemove, }; pw_registry_add_listener(registry, ®istry_listener, ®istry_events, this); registry_listener_attached = true; connected = true; last_error = Status::Ok(); ClearCache(); Status sync_status = SyncLocked(); if (!sync_status.ok()) { return sync_status; } SetupMasterMeter(); return Status::Ok(); } void Client::Impl::DisconnectLocked() { TeardownMasterMeter(); TeardownAllLiveMeters(); std::unordered_map> links; std::unordered_map> streams; { std::lock_guard lock(cache_mutex); links.swap(link_proxies); streams.swap(virtual_streams); } for (auto& entry : links) { LinkProxy* link = entry.second.get(); if (link) { spa_hook_remove(&link->listener); link->proxy = nullptr; } } for (auto& entry : streams) { StreamData* stream_data = entry.second.get(); if (stream_data && stream_data->stream) { pw_stream_disconnect(stream_data->stream); pw_stream_destroy(stream_data->stream); stream_data->stream = nullptr; } } for (auto& entry : auto_link_proxies) { if (entry) { spa_hook_remove(&entry->listener); entry->proxy = nullptr; } } auto_link_proxies.clear(); for (auto& entry : saved_link_proxies) { if (entry) { spa_hook_remove(&entry->listener); entry->proxy = nullptr; } } saved_link_proxies.clear(); if (metadata_listener_attached) { spa_hook_remove(&metadata_listener); metadata_listener_attached = false; } if (metadata_proxy) { pw_proxy_destroy(metadata_proxy); metadata_proxy = nullptr; } if (registry_listener_attached) { spa_hook_remove(®istry_listener); registry_listener_attached = false; } if (core_listener_attached) { spa_hook_remove(&core_listener); core_listener_attached = false; } if (registry) { pw_proxy_destroy(reinterpret_cast(registry)); registry = nullptr; } if (core) { pw_core_disconnect(core); core = nullptr; } connected = false; ClearCache(); } void Client::Impl::CheckRulesForNode(const NodeInfo& node) { for (const auto& entry : route_rules) { if (MatchesRule(node, entry.second.match)) { PendingAutoLink pending; pending.source_node_id = node.id.value; pending.target_node_name = entry.second.target_node; pending.rule_id = entry.first; pending_auto_links.push_back(std::move(pending)); SchedulePolicySync(); } } } void Client::Impl::SchedulePolicySync() { if (policy_sync_pending || !core) { return; } uint32_t seq = pw_core_sync(core, PW_ID_CORE, 0); if (seq != SPA_ID_INVALID) { policy_sync_seq = seq; policy_sync_pending = true; } } void Client::Impl::ProcessPendingAutoLinks() { if (options.policy_only) { std::lock_guard lock(cache_mutex); pending_auto_links.clear(); return; } struct LinkSpec { uint32_t output_port; uint32_t input_port; }; std::vector links_to_create; { std::lock_guard lock(cache_mutex); for (auto it = pending_auto_links.begin(); it != pending_auto_links.end();) { uint32_t target_node_id = 0; for (const auto& node_entry : nodes) { if (node_entry.second.name == it->target_node_name) { target_node_id = node_entry.first; break; } } if (target_node_id == 0) { ++it; continue; } struct PortEntry { uint32_t id; std::string name; }; std::vector src_ports; std::vector tgt_ports; for (const auto& port_entry : ports) { const PortInfo& port = port_entry.second; if (port.node.value == it->source_node_id && !port.is_input) { src_ports.push_back({port_entry.first, port.name}); } if (port.node.value == target_node_id && port.is_input) { tgt_ports.push_back({port_entry.first, port.name}); } } if (src_ports.empty() || tgt_ports.empty()) { ++it; continue; } auto cmp = [](const PortEntry& a, const PortEntry& b) { return a.name < b.name; }; std::sort(src_ports.begin(), src_ports.end(), cmp); std::sort(tgt_ports.begin(), tgt_ports.end(), cmp); size_t count = std::min(src_ports.size(), tgt_ports.size()); for (size_t i = 0; i < count; ++i) { bool exists = false; for (const auto& link_entry : links) { if (link_entry.second.output_port.value == src_ports[i].id && link_entry.second.input_port.value == tgt_ports[i].id) { exists = true; break; } } if (!exists) { links_to_create.push_back({src_ports[i].id, tgt_ports[i].id}); } } it = pending_auto_links.erase(it); } } for (const auto& spec : links_to_create) { CreateAutoLinkAsync(spec.output_port, spec.input_port); } } void Client::Impl::CreateAutoLinkAsync(uint32_t output_port, uint32_t input_port) { if (!core) { return; } pw_properties* props = pw_properties_new(nullptr, nullptr); if (!props) { return; } pw_properties_setf(props, PW_KEY_LINK_OUTPUT_PORT, "%u", output_port); pw_properties_setf(props, PW_KEY_LINK_INPUT_PORT, "%u", input_port); pw_properties_set(props, PW_KEY_OBJECT_LINGER, "true"); pw_proxy* proxy = reinterpret_cast( pw_core_create_object(core, "link-factory", PW_TYPE_INTERFACE_Link, PW_VERSION_LINK, &props->dict, 0)); pw_properties_free(props); if (!proxy) { return; } auto link_data = std::make_unique(); link_data->proxy = proxy; link_data->loop = thread_loop; pw_proxy_add_listener(proxy, &link_data->listener, &kLinkProxyEvents, link_data.get()); std::lock_guard lock(cache_mutex); auto_link_proxies.push_back(std::move(link_data)); } void Client::Impl::ProcessSavedLinks() { struct LinkSpec { uint32_t output_port; uint32_t input_port; std::string label; }; std::vector to_create; { std::lock_guard lock(cache_mutex); for (auto it = saved_links.begin(); it != saved_links.end();) { bool covered_by_rule = false; for (const auto& node_entry : nodes) { if (node_entry.second.name != it->out_node) continue; for (const auto& rule_entry : route_rules) { if (MatchesRule(node_entry.second, rule_entry.second.match) && rule_entry.second.target_node == it->in_node) { covered_by_rule = true; break; } } if (covered_by_rule) break; } if (covered_by_rule) { it = saved_links.erase(it); continue; } uint32_t out_id = 0, in_id = 0; for (const auto& port_entry : ports) { const PortInfo& port = port_entry.second; auto node_it = nodes.find(port.node.value); if (node_it == nodes.end()) continue; if (!port.is_input && node_it->second.name == it->out_node && port.name == it->out_port) { out_id = port_entry.first; } if (port.is_input && node_it->second.name == it->in_node && port.name == it->in_port) { in_id = port_entry.first; } if (out_id && in_id) break; } if (!out_id || !in_id) { ++it; continue; } bool exists = false; for (const auto& link_entry : links) { if (link_entry.second.output_port.value == out_id && link_entry.second.input_port.value == in_id) { exists = true; break; } } if (exists) { it = saved_links.erase(it); continue; } std::string label = it->out_node + ":" + it->out_port + " -> " + it->in_node + ":" + it->in_port; to_create.push_back({out_id, in_id, std::move(label)}); it = saved_links.erase(it); } } if (to_create.empty()) return; std::unordered_map> saved_port_map; for (const auto& spec : to_create) { saved_port_map[spec.output_port].push_back(spec.input_port); } std::vector competing_link_ids; { std::lock_guard lock(cache_mutex); for (const auto& link_entry : links) { auto it = saved_port_map.find(link_entry.second.output_port.value); if (it == saved_port_map.end()) continue; uint32_t link_id = link_entry.first; uint32_t in_port = link_entry.second.input_port.value; bool is_ours = false; for (uint32_t saved_in : it->second) { if (saved_in == in_port) { is_ours = true; break; } } if (!is_ours) { if (link_proxies.count(link_id)) { is_ours = true; } } if (!is_ours) { for (const auto& proxy : auto_link_proxies) { if (proxy && proxy->id == link_id) { is_ours = true; break; } } } if (!is_ours) { for (const auto& proxy : saved_link_proxies) { if (proxy && proxy->id == link_id) { is_ours = true; break; } } } if (!is_ours) { competing_link_ids.push_back(link_id); } } } for (uint32_t id : competing_link_ids) { pw_registry_destroy(registry, id); } for (const auto& spec : to_create) { CreateSavedLinkAsync(spec.output_port, spec.input_port); } } void Client::Impl::CreateSavedLinkAsync(uint32_t output_port, uint32_t input_port) { if (!core) return; pw_properties* props = pw_properties_new(nullptr, nullptr); if (!props) return; pw_properties_setf(props, PW_KEY_LINK_OUTPUT_PORT, "%u", output_port); pw_properties_setf(props, PW_KEY_LINK_INPUT_PORT, "%u", input_port); pw_properties_set(props, PW_KEY_OBJECT_LINGER, "true"); pw_proxy* proxy = reinterpret_cast( pw_core_create_object(core, "link-factory", PW_TYPE_INTERFACE_Link, PW_VERSION_LINK, &props->dict, 0)); pw_properties_free(props); if (!proxy) return; auto link_data = std::make_unique(); link_data->proxy = proxy; link_data->loop = thread_loop; pw_proxy_add_listener(proxy, &link_data->listener, &kLinkProxyEvents, link_data.get()); std::lock_guard lock(cache_mutex); saved_link_proxies.push_back(std::move(link_data)); } void Client::Impl::AutoSave() { if (!options.config_path || options.config_path->empty() || loading_config) { return; } nlohmann::json j; j["version"] = 1; nlohmann::json nodes_array = nlohmann::json::array(); { std::lock_guard lock(cache_mutex); for (const auto& entry : virtual_streams) { if (!entry.second) { continue; } const StreamData& sd = *entry.second; nlohmann::json node_obj; node_obj["name"] = sd.name; node_obj["is_source"] = sd.is_source; node_obj["rate"] = sd.rate; node_obj["channels"] = sd.channels; node_obj["loopback"] = sd.loopback; node_obj["target_node"] = sd.target_node; nodes_array.push_back(std::move(node_obj)); } } j["virtual_nodes"] = std::move(nodes_array); nlohmann::json rules_array = nlohmann::json::array(); { std::lock_guard lock(cache_mutex); for (const auto& entry : route_rules) { nlohmann::json rule_obj; rule_obj["id"] = entry.first; rule_obj["match"]["application_name"] = entry.second.match.application_name; rule_obj["match"]["process_binary"] = entry.second.match.process_binary; rule_obj["match"]["media_role"] = entry.second.match.media_role; rule_obj["target_node"] = entry.second.target_node; rules_array.push_back(std::move(rule_obj)); } } j["route_rules"] = std::move(rules_array); nlohmann::json links_array = nlohmann::json::array(); { std::lock_guard lock(cache_mutex); std::vector live; for (const auto& entry : link_proxies) { if (!entry.second) { continue; } auto link_it = links.find(entry.first); if (link_it == links.end()) { continue; } const Link& link = link_it->second; auto out_port_it = ports.find(link.output_port.value); auto in_port_it = ports.find(link.input_port.value); if (out_port_it == ports.end() || in_port_it == ports.end()) { continue; } auto out_node_it = nodes.find(out_port_it->second.node.value); auto in_node_it = nodes.find(in_port_it->second.node.value); if (out_node_it == nodes.end() || in_node_it == nodes.end()) { continue; } SavedLink sl{out_node_it->second.name, out_port_it->second.name, in_node_it->second.name, in_port_it->second.name}; live.push_back(sl); nlohmann::json link_obj; link_obj["out_node"] = sl.out_node; link_obj["out_port"] = sl.out_port; link_obj["in_node"] = sl.in_node; link_obj["in_port"] = sl.in_port; links_array.push_back(std::move(link_obj)); } for (const auto& lp : saved_link_proxies) { if (!lp || lp->id == SPA_ID_INVALID) continue; auto link_it = links.find(lp->id); if (link_it == links.end()) continue; const Link& link = link_it->second; auto out_port_it = ports.find(link.output_port.value); auto in_port_it = ports.find(link.input_port.value); if (out_port_it == ports.end() || in_port_it == ports.end()) continue; auto out_node_it = nodes.find(out_port_it->second.node.value); auto in_node_it = nodes.find(in_port_it->second.node.value); if (out_node_it == nodes.end() || in_node_it == nodes.end()) continue; SavedLink sl{out_node_it->second.name, out_port_it->second.name, in_node_it->second.name, in_port_it->second.name}; bool dup = false; for (const auto& l : live) { if (l == sl) { dup = true; break; } } if (!dup) { live.push_back(sl); nlohmann::json link_obj; link_obj["out_node"] = sl.out_node; link_obj["out_port"] = sl.out_port; link_obj["in_node"] = sl.in_node; link_obj["in_port"] = sl.in_port; links_array.push_back(std::move(link_obj)); } } for (const auto& sl : saved_links) { bool already = false; for (const auto& l : live) { if (l == sl) { already = true; break; } } if (!already) { nlohmann::json link_obj; link_obj["out_node"] = sl.out_node; link_obj["out_port"] = sl.out_port; link_obj["in_node"] = sl.in_node; link_obj["in_port"] = sl.in_port; links_array.push_back(std::move(link_obj)); } } } j["links"] = std::move(links_array); std::string tmp_path = *options.config_path + ".tmp"; std::ofstream file(tmp_path); if (!file.is_open()) { return; } file << j.dump(2); file.close(); if (!file.fail()) { std::rename(tmp_path.c_str(), options.config_path->c_str()); } } void Client::Impl::SetupMasterMeter() { if (!thread_loop || !core || master_meter_data) { return; } auto meter = std::make_unique(); pw_properties* props = pw_properties_new( PW_KEY_MEDIA_TYPE, "Audio", PW_KEY_MEDIA_CATEGORY, "Capture", PW_KEY_MEDIA_CLASS, "Stream/Input/Audio", PW_KEY_STREAM_CAPTURE_SINK, "true", PW_KEY_STREAM_MONITOR, "true", PW_KEY_NODE_NAME, "", nullptr); meter->stream = pw_stream_new_simple( pw_thread_loop_get_loop(thread_loop), "warppipe-meter", props, &kNodeMeterEvents, meter.get()); if (!meter->stream) { return; } uint8_t buffer[512]; spa_pod_builder builder = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer)); spa_audio_info_raw info{}; info.format = SPA_AUDIO_FORMAT_F32; info.rate = 48000; info.channels = 2; info.position[0] = SPA_AUDIO_CHANNEL_FL; info.position[1] = SPA_AUDIO_CHANNEL_FR; const spa_pod* params[1]; params[0] = spa_format_audio_raw_build(&builder, SPA_PARAM_EnumFormat, &info); int res = pw_stream_connect( meter->stream, PW_DIRECTION_INPUT, PW_ID_ANY, static_cast( PW_STREAM_FLAG_AUTOCONNECT | PW_STREAM_FLAG_MAP_BUFFERS | PW_STREAM_FLAG_RT_PROCESS), params, 1); if (res != 0) { pw_stream_destroy(meter->stream); return; } master_meter_data = std::move(meter); } void Client::Impl::TeardownMasterMeter() { if (!master_meter_data) { return; } if (master_meter_data->stream) { pw_stream_destroy(master_meter_data->stream); } master_meter_data.reset(); } void Client::Impl::TeardownAllLiveMeters() { std::unordered_map> meters; { std::lock_guard lock(cache_mutex); meters.swap(live_meters); } for (auto& entry : meters) { if (entry.second && entry.second->stream) { pw_stream_destroy(entry.second->stream); entry.second->stream = nullptr; } } } int Client::Impl::MetadataProperty(void* data, uint32_t subject, const char* key, const char* type, const char* value) { auto* impl = static_cast(data); if (!impl || subject != 0 || !key) { return 0; } std::string name; if (value && value[0] != '\0') { try { auto j = nlohmann::json::parse(value); if (j.contains("name") && j["name"].is_string()) { name = j["name"].get(); } } catch (...) { name = value; } } std::lock_guard lock(impl->cache_mutex); if (spa_streq(key, "default.audio.sink")) { impl->defaults.default_sink_name = name; } else if (spa_streq(key, "default.audio.source")) { impl->defaults.default_source_name = name; } else if (spa_streq(key, "default.configured.audio.sink")) { impl->defaults.configured_sink_name = name; } else if (spa_streq(key, "default.configured.audio.source")) { impl->defaults.configured_source_name = name; } return 0; } Client::Client(std::unique_ptr impl) : impl_(std::move(impl)) {} Client::Client(Client&&) noexcept = default; Client& Client::operator=(Client&&) noexcept = default; Client::~Client() { if (impl_) { Shutdown(); } } Result> Client::Create(const ConnectionOptions& options) { pw_init(nullptr, nullptr); if (options.threading == ThreadingMode::kCallerThread) { return {Status::Error(StatusCode::kNotImplemented, "caller thread mode not implemented"), {}}; } auto impl = std::make_unique(); impl->options = options; impl->thread_loop = pw_thread_loop_new("warppipe", nullptr); if (!impl->thread_loop) { return {Status::Error(StatusCode::kUnavailable, "failed to create pipewire thread loop"), {}}; } if (pw_thread_loop_start(impl->thread_loop) != 0) { pw_thread_loop_destroy(impl->thread_loop); impl->thread_loop = nullptr; return {Status::Error(StatusCode::kUnavailable, "failed to start pipewire thread loop"), {}}; } pw_thread_loop_lock(impl->thread_loop); Status status = impl->ConnectLocked(); pw_thread_loop_unlock(impl->thread_loop); if (!status.ok()) { pw_thread_loop_lock(impl->thread_loop); impl->DisconnectLocked(); if (impl->context) { pw_context_destroy(impl->context); impl->context = nullptr; } pw_thread_loop_unlock(impl->thread_loop); pw_thread_loop_stop(impl->thread_loop); pw_thread_loop_destroy(impl->thread_loop); impl->thread_loop = nullptr; return {status, {}}; } auto client = std::unique_ptr(new Client(std::move(impl))); if (options.config_path && !options.config_path->empty()) { std::ifstream test_file(*options.config_path); if (test_file.good()) { client->LoadConfig(*options.config_path); } } return {Status::Ok(), std::move(client)}; } Status Client::Shutdown() { if (!impl_) { return Status::Ok(); } if (impl_->thread_loop) { pw_thread_loop_lock(impl_->thread_loop); impl_->DisconnectLocked(); if (impl_->context) { pw_context_destroy(impl_->context); impl_->context = nullptr; } pw_thread_loop_unlock(impl_->thread_loop); pw_thread_loop_stop(impl_->thread_loop); pw_thread_loop_destroy(impl_->thread_loop); impl_->thread_loop = nullptr; } pw_deinit(); return Status::Ok(); } Result> Client::ListNodes() { Status status = impl_->EnsureConnected(); if (!status.ok()) { return {status, {}}; } std::lock_guard lock(impl_->cache_mutex); std::vector items; items.reserve(impl_->nodes.size()); for (const auto& entry : impl_->nodes) { NodeInfo info = entry.second; if (!info.is_virtual && impl_->virtual_streams.find(entry.first) != impl_->virtual_streams.end()) { info.is_virtual = true; } items.push_back(std::move(info)); } return {Status::Ok(), std::move(items)}; } Result> Client::ListPorts(NodeId node) { Status status = impl_->EnsureConnected(); if (!status.ok()) { return {status, {}}; } std::lock_guard lock(impl_->cache_mutex); std::vector items; for (const auto& entry : impl_->ports) { if (entry.second.node.value == node.value) { items.push_back(entry.second); } } return {Status::Ok(), std::move(items)}; } Result> Client::ListLinks() { Status status = impl_->EnsureConnected(); if (!status.ok()) { return {status, {}}; } std::lock_guard lock(impl_->cache_mutex); std::vector items; items.reserve(impl_->links.size()); for (const auto& entry : impl_->links) { items.push_back(entry.second); } return {Status::Ok(), std::move(items)}; } Result Client::CreateVirtualSink(std::string_view name, const VirtualNodeOptions& options) { Status status = impl_->EnsureConnected(); if (!status.ok()) { return {status, {}}; } std::string name_value = name.empty() ? std::string() : std::string(name); pw_thread_loop_lock(impl_->thread_loop); auto result = impl_->CreateVirtualStreamLocked(name_value, false, options); pw_thread_loop_unlock(impl_->thread_loop); if (!result.ok()) { return {result.status, {}}; } VirtualSink sink; sink.node = NodeId{result.value}; sink.name = name_value.empty() ? "warppipe-sink" : name_value; impl_->AutoSave(); return {Status::Ok(), std::move(sink)}; } Result Client::CreateVirtualSource(std::string_view name, const VirtualNodeOptions& options) { Status status = impl_->EnsureConnected(); if (!status.ok()) { return {status, {}}; } std::string name_value = name.empty() ? std::string() : std::string(name); pw_thread_loop_lock(impl_->thread_loop); auto result = impl_->CreateVirtualStreamLocked(name_value, true, options); pw_thread_loop_unlock(impl_->thread_loop); if (!result.ok()) { return {result.status, {}}; } VirtualSource source; source.node = NodeId{result.value}; source.name = name_value.empty() ? "warppipe-source" : name_value; impl_->AutoSave(); return {Status::Ok(), std::move(source)}; } Status Client::RemoveNode(NodeId node) { Status status = impl_->EnsureConnected(); if (!status.ok()) { return status; } pw_thread_loop_lock(impl_->thread_loop); std::unique_ptr owned_stream; { std::lock_guard lock(impl_->cache_mutex); auto it = impl_->virtual_streams.find(node.value); if (it == impl_->virtual_streams.end()) { pw_thread_loop_unlock(impl_->thread_loop); return Status::Error(StatusCode::kNotFound, "node not managed by warppipe"); } owned_stream = std::move(it->second); impl_->virtual_streams.erase(it); } if (owned_stream && owned_stream->stream) { pw_stream_disconnect(owned_stream->stream); pw_stream_destroy(owned_stream->stream); owned_stream->stream = nullptr; } pw_thread_loop_unlock(impl_->thread_loop); impl_->AutoSave(); return Status::Ok(); } Status Client::SetNodeVolume(NodeId node, float volume, bool mute) { Status status = impl_->EnsureConnected(); if (!status.ok()) { return status; } if (node.value == 0) { return Status::Error(StatusCode::kInvalidArgument, "invalid node id"); } volume = std::clamp(volume, 0.0f, 1.5f); pw_thread_loop_lock(impl_->thread_loop); { std::lock_guard lock(impl_->cache_mutex); if (impl_->nodes.find(node.value) == impl_->nodes.end()) { pw_thread_loop_unlock(impl_->thread_loop); return Status::Error(StatusCode::kNotFound, "node not found"); } } auto* proxy = static_cast( pw_registry_bind(impl_->registry, node.value, PW_TYPE_INTERFACE_Node, PW_VERSION_NODE, 0)); if (!proxy) { pw_thread_loop_unlock(impl_->thread_loop); return Status::Error(StatusCode::kInternal, "failed to bind node proxy"); } uint8_t buffer[128]; spa_pod_builder builder = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer)); auto* param = reinterpret_cast(spa_pod_builder_add_object( &builder, SPA_TYPE_OBJECT_Props, SPA_PARAM_Props, SPA_PROP_volume, SPA_POD_Float(volume), SPA_PROP_mute, SPA_POD_Bool(mute))); pw_node_set_param(proxy, SPA_PARAM_Props, 0, param); pw_proxy_destroy(reinterpret_cast(proxy)); { std::lock_guard lock(impl_->cache_mutex); impl_->volume_states[node.value] = VolumeState{volume, mute}; } pw_thread_loop_unlock(impl_->thread_loop); return Status::Ok(); } Result Client::GetNodeVolume(NodeId node) const { std::lock_guard lock(impl_->cache_mutex); auto it = impl_->volume_states.find(node.value); if (it == impl_->volume_states.end()) { return {Status::Ok(), VolumeState{}}; } return {Status::Ok(), it->second}; } Status Client::EnsureNodeMeter(NodeId node) { if (node.value == 0) { return Status::Error(StatusCode::kInvalidArgument, "invalid node id"); } std::string target_name; bool capture_sink = false; { std::lock_guard lock(impl_->cache_mutex); auto node_it = impl_->nodes.find(node.value); if (node_it == impl_->nodes.end()) { return Status::Error(StatusCode::kNotFound, "node not found"); } impl_->metered_nodes.insert(node.value); if (impl_->meter_states.find(node.value) == impl_->meter_states.end()) { impl_->meter_states[node.value] = MeterState{}; } if (impl_->live_meters.find(node.value) != impl_->live_meters.end()) { return Status::Ok(); } target_name = node_it->second.name; const auto& mc = node_it->second.media_class; capture_sink = (mc.find("Sink") != std::string::npos || mc.find("Duplex") != std::string::npos); } if (!impl_->thread_loop || !impl_->core) { return Status::Ok(); } pw_thread_loop_lock(impl_->thread_loop); auto meter = std::make_unique(); meter->node_id = node.value; meter->target_name = target_name; pw_properties* props = pw_properties_new( PW_KEY_MEDIA_TYPE, "Audio", PW_KEY_MEDIA_CATEGORY, "Capture", PW_KEY_MEDIA_CLASS, "Stream/Input/Audio", PW_KEY_TARGET_OBJECT, target_name.c_str(), PW_KEY_STREAM_MONITOR, "true", PW_KEY_NODE_NAME, "", nullptr); if (capture_sink) { pw_properties_set(props, PW_KEY_STREAM_CAPTURE_SINK, "true"); } meter->stream = pw_stream_new_simple( pw_thread_loop_get_loop(impl_->thread_loop), "warppipe-node-meter", props, &kNodeMeterEvents, meter.get()); if (!meter->stream) { pw_thread_loop_unlock(impl_->thread_loop); return Status::Ok(); } uint8_t buffer[512]; spa_pod_builder builder = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer)); spa_audio_info_raw info{}; info.format = SPA_AUDIO_FORMAT_F32; info.rate = 48000; info.channels = 2; info.position[0] = SPA_AUDIO_CHANNEL_FL; info.position[1] = SPA_AUDIO_CHANNEL_FR; const spa_pod* params[1]; params[0] = spa_format_audio_raw_build(&builder, SPA_PARAM_EnumFormat, &info); int res = pw_stream_connect( meter->stream, PW_DIRECTION_INPUT, PW_ID_ANY, static_cast( PW_STREAM_FLAG_AUTOCONNECT | PW_STREAM_FLAG_MAP_BUFFERS | PW_STREAM_FLAG_RT_PROCESS), params, 1); if (res != 0) { pw_stream_destroy(meter->stream); pw_thread_loop_unlock(impl_->thread_loop); return Status::Ok(); } { std::lock_guard lock(impl_->cache_mutex); impl_->live_meters[node.value] = std::move(meter); } pw_thread_loop_unlock(impl_->thread_loop); return Status::Ok(); } Status Client::DisableNodeMeter(NodeId node) { std::unique_ptr meter; { std::lock_guard lock(impl_->cache_mutex); impl_->metered_nodes.erase(node.value); impl_->meter_states.erase(node.value); auto it = impl_->live_meters.find(node.value); if (it != impl_->live_meters.end()) { meter = std::move(it->second); impl_->live_meters.erase(it); } } if (meter && meter->stream && impl_->thread_loop) { pw_thread_loop_lock(impl_->thread_loop); pw_stream_destroy(meter->stream); meter->stream = nullptr; pw_thread_loop_unlock(impl_->thread_loop); } return Status::Ok(); } Result Client::NodeMeterPeak(NodeId node) const { std::lock_guard lock(impl_->cache_mutex); auto live_it = impl_->live_meters.find(node.value); if (live_it != impl_->live_meters.end() && live_it->second) { MeterState state; state.peak_left = live_it->second->peak_left.load(std::memory_order_relaxed); state.peak_right = live_it->second->peak_right.load(std::memory_order_relaxed); return {Status::Ok(), state}; } auto it = impl_->meter_states.find(node.value); if (it == impl_->meter_states.end()) { return {Status::Error(StatusCode::kNotFound, "node not metered"), {}}; } return {Status::Ok(), it->second}; } Result Client::MeterPeak() const { std::lock_guard lock(impl_->cache_mutex); if (impl_->master_meter_data) { MeterState state; state.peak_left = impl_->master_meter_data->peak_left.load(std::memory_order_relaxed); state.peak_right = impl_->master_meter_data->peak_right.load(std::memory_order_relaxed); return {Status::Ok(), state}; } return {Status::Ok(), impl_->master_meter}; } Result Client::CreateLink(PortId output, PortId input, const LinkOptions& options) { Status status = impl_->EnsureConnected(); if (!status.ok()) { return {status, {}}; } if (output.value == 0 || input.value == 0) { return {Status::Error(StatusCode::kInvalidArgument, "invalid port id"), {}}; } pw_thread_loop_lock(impl_->thread_loop); PortInfo out_port; PortInfo in_port; { std::lock_guard lock(impl_->cache_mutex); auto out_it = impl_->ports.find(output.value); if (out_it == impl_->ports.end()) { pw_thread_loop_unlock(impl_->thread_loop); return {Status::Error(StatusCode::kNotFound, "output port not found"), {}}; } auto in_it = impl_->ports.find(input.value); if (in_it == impl_->ports.end()) { pw_thread_loop_unlock(impl_->thread_loop); return {Status::Error(StatusCode::kNotFound, "input port not found"), {}}; } out_port = out_it->second; in_port = in_it->second; for (const auto& entry : impl_->links) { const Link& link = entry.second; if (link.output_port.value == output.value && link.input_port.value == input.value) { pw_thread_loop_unlock(impl_->thread_loop); return {Status::Error(StatusCode::kInvalidArgument, "link already exists"), {}}; } } } if (out_port.is_input || !in_port.is_input) { pw_thread_loop_unlock(impl_->thread_loop); return {Status::Error(StatusCode::kInvalidArgument, "port directions do not match"), {}}; } pw_properties* props = pw_properties_new(nullptr, nullptr); if (!props) { pw_thread_loop_unlock(impl_->thread_loop); return {Status::Error(StatusCode::kInternal, "failed to allocate link properties"), {}}; } pw_properties_setf(props, PW_KEY_LINK_OUTPUT_PORT, "%u", output.value); pw_properties_setf(props, PW_KEY_LINK_INPUT_PORT, "%u", input.value); if (options.passive) { pw_properties_set(props, PW_KEY_LINK_PASSIVE, "true"); } if (options.linger) { pw_properties_set(props, PW_KEY_OBJECT_LINGER, "true"); } pw_proxy* proxy = reinterpret_cast( pw_core_create_object(impl_->core, "link-factory", PW_TYPE_INTERFACE_Link, PW_VERSION_LINK, &props->dict, 0)); pw_properties_free(props); if (!proxy) { pw_thread_loop_unlock(impl_->thread_loop); return {Status::Error(StatusCode::kUnavailable, "failed to create link"), {}}; } auto link_proxy = std::make_unique(); link_proxy->proxy = proxy; link_proxy->loop = impl_->thread_loop; pw_proxy_add_listener(proxy, &link_proxy->listener, &kLinkProxyEvents, link_proxy.get()); int wait_attempts = 0; while (link_proxy->id == SPA_ID_INVALID && !link_proxy->failed && wait_attempts < 3) { int wait_res = pw_thread_loop_timed_wait(impl_->thread_loop, kSyncWaitSeconds); if (wait_res == -ETIMEDOUT) { break; } ++wait_attempts; } if (link_proxy->failed) { std::string error = link_proxy->error.empty() ? "link creation failed" : link_proxy->error; pw_proxy_destroy(proxy); pw_thread_loop_unlock(impl_->thread_loop); return {Status::Error(StatusCode::kUnavailable, std::move(error)), {}}; } if (link_proxy->id == SPA_ID_INVALID) { pw_proxy_destroy(proxy); pw_thread_loop_unlock(impl_->thread_loop); return {Status::Error(StatusCode::kTimeout, "timed out waiting for link id"), {}}; } Link link; link.id = LinkId{link_proxy->id}; link.output_port = output; link.input_port = input; { std::lock_guard lock(impl_->cache_mutex); impl_->link_proxies.emplace(link_proxy->id, std::move(link_proxy)); impl_->links[link.id.value] = link; } pw_thread_loop_unlock(impl_->thread_loop); impl_->AutoSave(); return {Status::Ok(), link}; } Result Client::CreateLinkByName(std::string_view output_node, std::string_view output_port, std::string_view input_node, std::string_view input_port, const LinkOptions& options) { Status status = impl_->EnsureConnected(); if (!status.ok()) { return {status, {}}; } if (output_node.empty() || output_port.empty() || input_node.empty() || input_port.empty()) { return {Status::Error(StatusCode::kInvalidArgument, "node or port name missing"), {}}; } std::optional out_id; std::optional in_id; { std::lock_guard lock(impl_->cache_mutex); for (const auto& entry : impl_->ports) { const PortInfo& port = entry.second; auto node_it = impl_->nodes.find(port.node.value); if (node_it == impl_->nodes.end()) { continue; } const NodeInfo& node = node_it->second; if (port.is_input) { if (node.name == input_node && port.name == input_port) { in_id = port.id; } } else { if (node.name == output_node && port.name == output_port) { out_id = port.id; } } if (out_id && in_id) { break; } } } if (!out_id || !in_id) { return {Status::Error(StatusCode::kNotFound, "matching ports not found"), {}}; } return CreateLink(*out_id, *in_id, options); } Status Client::RemoveLink(LinkId link) { Status status = impl_->EnsureConnected(); if (!status.ok()) { return status; } pw_thread_loop_lock(impl_->thread_loop); bool removed = false; Impl::SavedLink removed_link; { std::lock_guard lock(impl_->cache_mutex); auto link_it = impl_->links.find(link.value); if (link_it != impl_->links.end()) { auto op = impl_->ports.find(link_it->second.output_port.value); auto ip = impl_->ports.find(link_it->second.input_port.value); if (op != impl_->ports.end() && ip != impl_->ports.end()) { auto on = impl_->nodes.find(op->second.node.value); auto in_ = impl_->nodes.find(ip->second.node.value); if (on != impl_->nodes.end() && in_ != impl_->nodes.end()) { removed_link = {on->second.name, op->second.name, in_->second.name, ip->second.name}; } } } auto it = impl_->link_proxies.find(link.value); if (it != impl_->link_proxies.end()) { if (it->second && it->second->proxy) { pw_proxy_destroy(it->second->proxy); } impl_->link_proxies.erase(it); impl_->links.erase(link.value); removed = true; } } if (!removed && impl_->registry) { int res = pw_registry_destroy(impl_->registry, link.value); if (res < 0) { pw_thread_loop_unlock(impl_->thread_loop); return Status::Error(StatusCode::kNotFound, "link not found"); } removed = true; } pw_thread_loop_unlock(impl_->thread_loop); if (removed) { if (!removed_link.out_node.empty()) { std::lock_guard lock(impl_->cache_mutex); auto& sl = impl_->saved_links; sl.erase(std::remove(sl.begin(), sl.end(), removed_link), sl.end()); } impl_->AutoSave(); } return removed ? Status::Ok() : Status::Error(StatusCode::kNotFound, "link not found"); } Result Client::AddRouteRule(const RouteRule& rule) { if (rule.match.application_name.empty() && rule.match.process_binary.empty() && rule.match.media_role.empty()) { return {Status::Error(StatusCode::kInvalidArgument, "rule match has no criteria"), {}}; } if (rule.target_node.empty()) { return {Status::Error(StatusCode::kInvalidArgument, "rule target node is empty"), {}}; } uint32_t id = 0; { std::lock_guard lock(impl_->cache_mutex); id = impl_->next_rule_id++; RouteRule stored = rule; stored.id = RuleId{id}; impl_->route_rules[id] = std::move(stored); for (const auto& node_entry : impl_->nodes) { if (MatchesRule(node_entry.second, rule.match)) { PendingAutoLink pending; pending.source_node_id = node_entry.first; pending.target_node_name = rule.target_node; pending.rule_id = id; impl_->pending_auto_links.push_back(std::move(pending)); } } } if (!impl_->pending_auto_links.empty() && impl_->thread_loop) { pw_thread_loop_lock(impl_->thread_loop); impl_->SchedulePolicySync(); pw_thread_loop_unlock(impl_->thread_loop); } impl_->AutoSave(); return {Status::Ok(), RuleId{id}}; } Status Client::RemoveRouteRule(RuleId id) { { std::lock_guard lock(impl_->cache_mutex); auto it = impl_->route_rules.find(id.value); if (it == impl_->route_rules.end()) { return Status::Error(StatusCode::kNotFound, "route rule not found"); } impl_->route_rules.erase(it); auto pending_it = impl_->pending_auto_links.begin(); while (pending_it != impl_->pending_auto_links.end()) { if (pending_it->rule_id == id.value) { pending_it = impl_->pending_auto_links.erase(pending_it); } else { ++pending_it; } } } impl_->AutoSave(); return Status::Ok(); } Result> Client::ListRouteRules() { std::lock_guard lock(impl_->cache_mutex); std::vector items; items.reserve(impl_->route_rules.size()); for (const auto& entry : impl_->route_rules) { items.push_back(entry.second); } return {Status::Ok(), std::move(items)}; } Result Client::GetDefaults() { std::lock_guard lock(impl_->cache_mutex); return {Status::Ok(), impl_->defaults}; } Status Client::SetDefaultSink(std::string_view node_name) { if (!impl_->metadata_proxy) { return Status::Error(StatusCode::kUnavailable, "metadata not available"); } if (node_name.empty()) { return Status::Error(StatusCode::kInvalidArgument, "node name is empty"); } std::string json_value = "{\"name\":\"" + std::string(node_name) + "\"}"; pw_thread_loop_lock(impl_->thread_loop); pw_metadata_set_property( reinterpret_cast(impl_->metadata_proxy), 0, "default.configured.audio.sink", "Spa:String:JSON", json_value.c_str()); pw_thread_loop_unlock(impl_->thread_loop); return Status::Ok(); } Status Client::SetDefaultSource(std::string_view node_name) { if (!impl_->metadata_proxy) { return Status::Error(StatusCode::kUnavailable, "metadata not available"); } if (node_name.empty()) { return Status::Error(StatusCode::kInvalidArgument, "node name is empty"); } std::string json_value = "{\"name\":\"" + std::string(node_name) + "\"}"; pw_thread_loop_lock(impl_->thread_loop); pw_metadata_set_property( reinterpret_cast(impl_->metadata_proxy), 0, "default.configured.audio.source", "Spa:String:JSON", json_value.c_str()); pw_thread_loop_unlock(impl_->thread_loop); return Status::Ok(); } Status Client::SaveConfig(std::string_view path) { if (path.empty()) { return Status::Error(StatusCode::kInvalidArgument, "path is empty"); } nlohmann::json j; j["version"] = 1; nlohmann::json nodes_array = nlohmann::json::array(); nlohmann::json rules_array = nlohmann::json::array(); { std::lock_guard lock(impl_->cache_mutex); for (const auto& entry : impl_->virtual_streams) { if (!entry.second) { continue; } const StreamData& sd = *entry.second; nlohmann::json node_obj; node_obj["name"] = sd.name; node_obj["is_source"] = sd.is_source; node_obj["rate"] = sd.rate; node_obj["channels"] = sd.channels; node_obj["loopback"] = sd.loopback; node_obj["target_node"] = sd.target_node; nodes_array.push_back(std::move(node_obj)); } for (const auto& entry : impl_->route_rules) { nlohmann::json rule_obj; rule_obj["match"]["application_name"] = entry.second.match.application_name; rule_obj["match"]["process_binary"] = entry.second.match.process_binary; rule_obj["match"]["media_role"] = entry.second.match.media_role; rule_obj["target_node"] = entry.second.target_node; rules_array.push_back(std::move(rule_obj)); } } nlohmann::json links_array = nlohmann::json::array(); { std::lock_guard lock(impl_->cache_mutex); std::vector live; for (const auto& entry : impl_->link_proxies) { if (!entry.second) { continue; } auto link_it = impl_->links.find(entry.first); if (link_it == impl_->links.end()) { continue; } const Link& link = link_it->second; auto out_port_it = impl_->ports.find(link.output_port.value); auto in_port_it = impl_->ports.find(link.input_port.value); if (out_port_it == impl_->ports.end() || in_port_it == impl_->ports.end()) { continue; } auto out_node_it = impl_->nodes.find(out_port_it->second.node.value); auto in_node_it = impl_->nodes.find(in_port_it->second.node.value); if (out_node_it == impl_->nodes.end() || in_node_it == impl_->nodes.end()) { continue; } Impl::SavedLink sl{out_node_it->second.name, out_port_it->second.name, in_node_it->second.name, in_port_it->second.name}; live.push_back(sl); nlohmann::json link_obj; link_obj["out_node"] = sl.out_node; link_obj["out_port"] = sl.out_port; link_obj["in_node"] = sl.in_node; link_obj["in_port"] = sl.in_port; links_array.push_back(std::move(link_obj)); } for (const auto& lp : impl_->saved_link_proxies) { if (!lp || lp->id == SPA_ID_INVALID) continue; auto link_it = impl_->links.find(lp->id); if (link_it == impl_->links.end()) continue; const Link& link = link_it->second; auto out_port_it = impl_->ports.find(link.output_port.value); auto in_port_it = impl_->ports.find(link.input_port.value); if (out_port_it == impl_->ports.end() || in_port_it == impl_->ports.end()) continue; auto out_node_it = impl_->nodes.find(out_port_it->second.node.value); auto in_node_it = impl_->nodes.find(in_port_it->second.node.value); if (out_node_it == impl_->nodes.end() || in_node_it == impl_->nodes.end()) continue; Impl::SavedLink sl{out_node_it->second.name, out_port_it->second.name, in_node_it->second.name, in_port_it->second.name}; bool dup = false; for (const auto& l : live) { if (l == sl) { dup = true; break; } } if (!dup) { live.push_back(sl); nlohmann::json link_obj; link_obj["out_node"] = sl.out_node; link_obj["out_port"] = sl.out_port; link_obj["in_node"] = sl.in_node; link_obj["in_port"] = sl.in_port; links_array.push_back(std::move(link_obj)); } } for (const auto& sl : impl_->saved_links) { bool already = false; for (const auto& l : live) { if (l == sl) { already = true; break; } } if (!already) { nlohmann::json link_obj; link_obj["out_node"] = sl.out_node; link_obj["out_port"] = sl.out_port; link_obj["in_node"] = sl.in_node; link_obj["in_port"] = sl.in_port; links_array.push_back(std::move(link_obj)); } } } j["virtual_nodes"] = std::move(nodes_array); j["route_rules"] = std::move(rules_array); j["links"] = std::move(links_array); std::string tmp_path = std::string(path) + ".tmp"; std::ofstream file(tmp_path); if (!file.is_open()) { return Status::Error(StatusCode::kInternal, "failed to open config file for writing"); } file << j.dump(2); file.close(); if (file.fail()) { return Status::Error(StatusCode::kInternal, "failed to write config file"); } if (std::rename(tmp_path.c_str(), std::string(path).c_str()) != 0) { return Status::Error(StatusCode::kInternal, "failed to rename config file"); } return Status::Ok(); } void Client::SetChangeCallback(ChangeCallback callback) { std::lock_guard lock(impl_->change_cb_mutex); impl_->change_callback = std::move(callback); } Status Client::LoadConfig(std::string_view path) { if (path.empty()) { return Status::Error(StatusCode::kInvalidArgument, "path is empty"); } std::string path_str(path); std::ifstream file(path_str); if (!file.is_open()) { return Status::Error(StatusCode::kNotFound, "config file not found"); } nlohmann::json j; try { j = nlohmann::json::parse(file); } catch (const nlohmann::json::parse_error& e) { return Status::Error(StatusCode::kInvalidArgument, std::string("config parse error: ") + e.what()); } if (!j.contains("version") || !j["version"].is_number_integer()) { return Status::Error(StatusCode::kInvalidArgument, "config missing version"); } impl_->loading_config = true; if (j.contains("route_rules") && j["route_rules"].is_array()) { for (const auto& rule_obj : j["route_rules"]) { try { RouteRule rule; if (rule_obj.contains("match") && rule_obj["match"].is_object()) { const auto& m = rule_obj["match"]; rule.match.application_name = m.value("application_name", ""); rule.match.process_binary = m.value("process_binary", ""); rule.match.media_role = m.value("media_role", ""); } rule.target_node = rule_obj.value("target_node", ""); if (!rule.target_node.empty() && (!rule.match.application_name.empty() || !rule.match.process_binary.empty() || !rule.match.media_role.empty())) { AddRouteRule(rule); } } catch (...) { continue; } } } Status conn_status = impl_->EnsureConnected(); if (conn_status.ok() && j.contains("virtual_nodes") && j["virtual_nodes"].is_array()) { for (const auto& node_obj : j["virtual_nodes"]) { try { std::string name = node_obj.value("name", ""); if (name.empty()) { continue; } bool is_source = node_obj.value("is_source", false); VirtualNodeOptions opts; opts.format.rate = node_obj.value("rate", 48000u); opts.format.channels = node_obj.value("channels", 2u); bool loopback = node_obj.value("loopback", false); std::string target = node_obj.value("target_node", ""); if (loopback && !target.empty()) { opts.behavior = VirtualBehavior::kLoopback; opts.target_node = target; } if (is_source) { CreateVirtualSource(name, opts); } else { CreateVirtualSink(name, opts); } } catch (...) { continue; } } } if (j.contains("links") && j["links"].is_array()) { { std::lock_guard lock(impl_->cache_mutex); for (const auto& link_obj : j["links"]) { try { std::string out_node = link_obj.value("out_node", ""); std::string out_port = link_obj.value("out_port", ""); std::string in_node = link_obj.value("in_node", ""); std::string in_port = link_obj.value("in_port", ""); if (out_node.empty() || out_port.empty() || in_node.empty() || in_port.empty()) { continue; } impl_->saved_links.push_back({out_node, out_port, in_node, in_port}); } catch (...) { continue; } } } if (conn_status.ok()) { pw_thread_loop_lock(impl_->thread_loop); impl_->SchedulePolicySync(); pw_thread_loop_unlock(impl_->thread_loop); } } impl_->loading_config = false; impl_->AutoSave(); return Status::Ok(); } #ifdef WARPPIPE_TESTING Status Client::Test_InsertNode(const NodeInfo& node) { if (!impl_) { return Status::Error(StatusCode::kInternal, "client not initialized"); } { std::lock_guard lock(impl_->cache_mutex); impl_->nodes[node.id.value] = node; impl_->CheckRulesForNode(node); } impl_->NotifyChange(); return Status::Ok(); } Status Client::Test_InsertPort(const PortInfo& port) { if (!impl_) { return Status::Error(StatusCode::kInternal, "client not initialized"); } { std::lock_guard lock(impl_->cache_mutex); impl_->ports[port.id.value] = port; } impl_->NotifyChange(); return Status::Ok(); } Status Client::Test_InsertLink(const Link& link) { if (!impl_) { return Status::Error(StatusCode::kInternal, "client not initialized"); } { std::lock_guard lock(impl_->cache_mutex); impl_->links[link.id.value] = link; } impl_->NotifyChange(); return Status::Ok(); } Status Client::Test_RemoveGlobal(uint32_t id) { if (!impl_) { return Status::Error(StatusCode::kInternal, "client not initialized"); } Client::Impl::RegistryGlobalRemove(impl_.get(), id); return Status::Ok(); } Status Client::Test_ForceDisconnect() { if (!impl_ || !impl_->thread_loop) { return Status::Error(StatusCode::kInternal, "thread loop not initialized"); } pw_thread_loop_lock(impl_->thread_loop); impl_->DisconnectLocked(); pw_thread_loop_unlock(impl_->thread_loop); return Status::Ok(); } Status Client::Test_TriggerPolicyCheck() { if (!impl_) { return Status::Error(StatusCode::kInternal, "client not initialized"); } impl_->ProcessPendingAutoLinks(); return Status::Ok(); } size_t Client::Test_GetPendingAutoLinkCount() const { if (!impl_) { return 0; } std::lock_guard lock(impl_->cache_mutex); return impl_->pending_auto_links.size(); } Status Client::Test_SetNodeVolume(NodeId node, float volume, bool mute) { if (!impl_) { return Status::Error(StatusCode::kUnavailable, "no impl"); } std::lock_guard lock(impl_->cache_mutex); if (impl_->nodes.find(node.value) == impl_->nodes.end()) { return Status::Error(StatusCode::kNotFound, "node not found"); } impl_->volume_states[node.value] = VolumeState{std::clamp(volume, 0.0f, 1.5f), mute}; return Status::Ok(); } Result Client::Test_GetNodeVolume(NodeId node) const { if (!impl_) { return {Status::Error(StatusCode::kUnavailable, "no impl"), {}}; } std::lock_guard lock(impl_->cache_mutex); auto it = impl_->volume_states.find(node.value); if (it == impl_->volume_states.end()) { return {Status::Ok(), VolumeState{}}; } return {Status::Ok(), it->second}; } Status Client::Test_SetNodeMeterPeak(NodeId node, float left, float right) { if (!impl_) { return Status::Error(StatusCode::kUnavailable, "no impl"); } std::lock_guard lock(impl_->cache_mutex); float cl = std::clamp(left, 0.0f, 1.0f); float cr = std::clamp(right, 0.0f, 1.0f); impl_->meter_states[node.value] = MeterState{cl, cr}; impl_->metered_nodes.insert(node.value); auto it = impl_->live_meters.find(node.value); if (it != impl_->live_meters.end() && it->second) { it->second->peak_left.store(cl, std::memory_order_relaxed); it->second->peak_right.store(cr, std::memory_order_relaxed); } return Status::Ok(); } Status Client::Test_SetMasterMeterPeak(float left, float right) { if (!impl_) { return Status::Error(StatusCode::kUnavailable, "no impl"); } std::lock_guard lock(impl_->cache_mutex); float cl = std::clamp(left, 0.0f, 1.0f); float cr = std::clamp(right, 0.0f, 1.0f); impl_->master_meter = MeterState{cl, cr}; if (impl_->master_meter_data) { impl_->master_meter_data->peak_left.store(cl, std::memory_order_relaxed); impl_->master_meter_data->peak_right.store(cr, std::memory_order_relaxed); } return Status::Ok(); } #endif } // namespace warppipe