From 5962c03b965e7945046068c672e34ef984b3230a Mon Sep 17 00:00:00 2001 From: Joey Yakimowich-Payne Date: Thu, 12 Feb 2026 14:19:48 -0700 Subject: [PATCH] Fix crashes --- src/warppipe.cpp | 119 +++++++++++++++++++---------------------------- 1 file changed, 47 insertions(+), 72 deletions(-) diff --git a/src/warppipe.cpp b/src/warppipe.cpp index 15930a7..6ee2aab 100644 --- a/src/warppipe.cpp +++ b/src/warppipe.cpp @@ -124,6 +124,7 @@ struct StreamData { struct LinkProxy { pw_proxy* proxy = nullptr; spa_hook listener{}; + bool listener_attached = false; pw_thread_loop* loop = nullptr; bool done = false; bool failed = false; @@ -133,6 +134,20 @@ struct LinkProxy { uint32_t input_port = 0; }; +void DetachLinkProxy(LinkProxy* link, bool destroy_proxy) { + if (!link) { + return; + } + if (link->listener_attached) { + spa_hook_remove(&link->listener); + link->listener_attached = false; + } + if (destroy_proxy && link->proxy) { + pw_proxy_destroy(link->proxy); + } + link->proxy = nullptr; +} + void LinkProxyBound(void* data, uint32_t global_id) { auto* link = static_cast(data); if (!link) { @@ -150,6 +165,8 @@ void LinkProxyRemoved(void* data) { if (!link) { return; } + link->listener_attached = false; + link->proxy = nullptr; link->done = true; if (link->loop) { pw_thread_loop_signal(link->loop, false); @@ -161,6 +178,8 @@ void LinkProxyError(void* data, int, int res, const char* message) { if (!link) { return; } + link->listener_attached = false; + link->proxy = nullptr; link->failed = true; link->error = message ? message : spa_strerror(res); if (link->loop) { @@ -697,14 +716,6 @@ void Client::Impl::RegistryGlobalRemove(void* data, uint32_t id) { { std::lock_guard lock(impl->cache_mutex); impl->virtual_streams.erase(id); - auto link_it = impl->link_proxies.find(id); - if (link_it != impl->link_proxies.end()) { - if (link_it->second && link_it->second->proxy) { - spa_hook_remove(&link_it->second->listener); - link_it->second->proxy = nullptr; - } - impl->link_proxies.erase(link_it); - } auto node_it = impl->nodes.find(id); if (node_it != impl->nodes.end()) { impl->nodes.erase(node_it); @@ -1138,11 +1149,7 @@ void Client::Impl::DisconnectLocked() { streams.swap(virtual_streams); } for (auto& entry : links) { - LinkProxy* link = entry.second.get(); - if (link) { - spa_hook_remove(&link->listener); - link->proxy = nullptr; - } + DetachLinkProxy(entry.second.get(), false); } for (auto& entry : streams) { StreamData* stream_data = entry.second.get(); @@ -1157,17 +1164,11 @@ void Client::Impl::DisconnectLocked() { } } for (auto& entry : auto_link_proxies) { - if (entry) { - spa_hook_remove(&entry->listener); - entry->proxy = nullptr; - } + DetachLinkProxy(entry.get(), false); } auto_link_proxies.clear(); for (auto& entry : saved_link_proxies) { - if (entry) { - spa_hook_remove(&entry->listener); - entry->proxy = nullptr; - } + DetachLinkProxy(entry.get(), false); } saved_link_proxies.clear(); for (auto& entry : node_proxies) { @@ -1290,13 +1291,17 @@ void Client::Impl::EnforceRulesForLink(uint32_t link_id, uint32_t out_port, if (!should_destroy) return; - if (link_proxies.count(link_id)) return; + auto active_link_proxy = link_proxies.find(link_id); + if (active_link_proxy != link_proxies.end() && + active_link_proxy->second && active_link_proxy->second->proxy) { + return; + } for (const auto& proxy : auto_link_proxies) { - if (proxy && proxy->output_port == out_port && + if (proxy && proxy->proxy && proxy->output_port == out_port && proxy->input_port == in_port) return; } for (const auto& proxy : saved_link_proxies) { - if (proxy && proxy->output_port == out_port && + if (proxy && proxy->proxy && proxy->output_port == out_port && proxy->input_port == in_port) return; } for (const auto& pair : auto_link_claimed_pairs) { @@ -1437,12 +1442,14 @@ void Client::Impl::ProcessPendingAutoLinks() { if (target_in == in_port) { is_ours = true; break; } } if (!is_ours) { - if (link_proxies.count(link_id)) is_ours = true; + auto owned = link_proxies.find(link_id); + if (owned != link_proxies.end() && owned->second && owned->second->proxy) + is_ours = true; } if (!is_ours) { uint32_t out_port = link_entry.second.output_port.value; for (const auto& proxy : auto_link_proxies) { - if (proxy && proxy->output_port == out_port && + if (proxy && proxy->proxy && proxy->output_port == out_port && proxy->input_port == in_port) { is_ours = true; break; } } } @@ -1457,7 +1464,7 @@ void Client::Impl::ProcessPendingAutoLinks() { if (!is_ours) { uint32_t out_port = link_entry.second.output_port.value; for (const auto& proxy : saved_link_proxies) { - if (proxy && proxy->output_port == out_port && + if (proxy && proxy->proxy && proxy->output_port == out_port && proxy->input_port == in_port) { is_ours = true; break; } } } @@ -1505,6 +1512,7 @@ void Client::Impl::CreateAutoLinkAsync(uint32_t output_port, uint32_t input_port link_data->output_port = output_port; link_data->input_port = input_port; pw_proxy_add_listener(proxy, &link_data->listener, &kLinkProxyEvents, link_data.get()); + link_data->listener_attached = true; std::lock_guard lock(cache_mutex); auto_link_proxies.push_back(std::move(link_data)); @@ -1595,14 +1603,15 @@ void Client::Impl::ProcessSavedLinks() { if (saved_in == in_port) { is_ours = true; break; } } if (!is_ours) { - if (link_proxies.count(link_id)) { + auto owned = link_proxies.find(link_id); + if (owned != link_proxies.end() && owned->second && owned->second->proxy) { is_ours = true; } } if (!is_ours) { uint32_t out_port = link_entry.second.output_port.value; for (const auto& proxy : auto_link_proxies) { - if (proxy && proxy->output_port == out_port && + if (proxy && proxy->proxy && proxy->output_port == out_port && proxy->input_port == in_port) { is_ours = true; break; } } } @@ -1617,7 +1626,7 @@ void Client::Impl::ProcessSavedLinks() { if (!is_ours) { uint32_t out_port = link_entry.second.output_port.value; for (const auto& proxy : saved_link_proxies) { - if (proxy && proxy->output_port == out_port && + if (proxy && proxy->proxy && proxy->output_port == out_port && proxy->input_port == in_port) { is_ours = true; break; } } } @@ -1661,6 +1670,7 @@ void Client::Impl::CreateSavedLinkAsync(uint32_t output_port, link_data->input_port = input_port; pw_proxy_add_listener(proxy, &link_data->listener, &kLinkProxyEvents, link_data.get()); + link_data->listener_attached = true; std::lock_guard lock(cache_mutex); saved_link_proxies.push_back(std::move(link_data)); @@ -1720,7 +1730,7 @@ void Client::Impl::AutoSave() { std::lock_guard lock(cache_mutex); std::vector live; for (const auto& entry : link_proxies) { - if (!entry.second) { + if (!entry.second || !entry.second->proxy) { continue; } auto link_it = links.find(entry.first); @@ -1749,7 +1759,7 @@ void Client::Impl::AutoSave() { links_array.push_back(std::move(link_obj)); } for (const auto& lp : saved_link_proxies) { - if (!lp || lp->id == SPA_ID_INVALID) continue; + if (!lp || !lp->proxy || lp->id == SPA_ID_INVALID) continue; auto link_it = links.find(lp->id); if (link_it == links.end()) continue; const Link& link = link_it->second; @@ -2456,6 +2466,7 @@ Result Client::CreateLink(PortId output, PortId input, const LinkOptions& link_proxy->output_port = output.value; link_proxy->input_port = input.value; pw_proxy_add_listener(proxy, &link_proxy->listener, &kLinkProxyEvents, link_proxy.get()); + link_proxy->listener_attached = true; int wait_attempts = 0; while (link_proxy->id == SPA_ID_INVALID && !link_proxy->failed && wait_attempts < 3) { @@ -2476,13 +2487,13 @@ Result Client::CreateLink(PortId output, PortId input, const LinkOptions& if (link_proxy->failed) { std::string error = link_proxy->error.empty() ? "link creation failed" : link_proxy->error; remove_pending(); - pw_proxy_destroy(proxy); + DetachLinkProxy(link_proxy.get(), true); pw_thread_loop_unlock(impl_->thread_loop); return {Status::Error(StatusCode::kUnavailable, std::move(error)), {}}; } if (link_proxy->id == SPA_ID_INVALID) { remove_pending(); - pw_proxy_destroy(proxy); + DetachLinkProxy(link_proxy.get(), true); pw_thread_loop_unlock(impl_->thread_loop); return {Status::Error(StatusCode::kTimeout, "timed out waiting for link id"), {}}; } @@ -2493,7 +2504,7 @@ Result Client::CreateLink(PortId output, PortId input, const LinkOptions& link.input_port = input; { std::lock_guard lock(impl_->cache_mutex); - impl_->link_proxies.emplace(link_proxy->id, std::move(link_proxy)); + impl_->link_proxies[link_proxy->id] = std::move(link_proxy); impl_->links[link.id.value] = link; std::erase_if(impl_->pending_link_pairs, [&](const auto& p) { return p.first == output.value && p.second == input.value; @@ -2579,14 +2590,9 @@ Status Client::RemoveLink(LinkId link) { } auto it = impl_->link_proxies.find(link.value); if (it != impl_->link_proxies.end()) { - if (it->second && it->second->proxy) { - spa_hook_remove(&it->second->listener); - pw_proxy_destroy(it->second->proxy); - } if (impl_->registry) { pw_registry_destroy(impl_->registry, link.value); } - impl_->link_proxies.erase(it); auto link_it2 = impl_->links.find(link.value); if (link_it2 != impl_->links.end()) { uint32_t op = link_it2->second.output_port.value; @@ -2616,26 +2622,6 @@ Status Client::RemoveLink(LinkId link) { impl_->links.erase(link_it); } if (out_port && in_port) { - for (auto& p : impl_->saved_link_proxies) { - if (p && p->output_port == out_port && p->input_port == in_port) { - spa_hook_remove(&p->listener); - if (p->proxy) pw_proxy_destroy(p->proxy); - p->proxy = nullptr; - } - } - std::erase_if(impl_->saved_link_proxies, [&](const auto& p) { - return p && p->output_port == out_port && p->input_port == in_port; - }); - for (auto& p : impl_->auto_link_proxies) { - if (p && p->output_port == out_port && p->input_port == in_port) { - spa_hook_remove(&p->listener); - if (p->proxy) pw_proxy_destroy(p->proxy); - p->proxy = nullptr; - } - } - std::erase_if(impl_->auto_link_proxies, [&](const auto& p) { - return p && p->output_port == out_port && p->input_port == in_port; - }); std::erase_if(impl_->auto_link_claimed_pairs, [&](const auto& pair) { return pair.first == out_port && pair.second == in_port; }); @@ -2799,18 +2785,7 @@ Status Client::RemoveRouteRule(RuleId id) { } { std::lock_guard lock(impl_->cache_mutex); - for (const auto& pair : pairs_to_remove) { - for (auto& p : impl_->auto_link_proxies) { - if (p && p->output_port == pair.first && - p->input_port == pair.second) { - spa_hook_remove(&p->listener); - } - } - std::erase_if(impl_->auto_link_proxies, [&](const auto& p) { - return p && p->output_port == pair.first && - p->input_port == pair.second; - }); - } + (void)pairs_to_remove; } pw_thread_loop_unlock(impl_->thread_loop); }