Fix segfault

This commit is contained in:
Joey Yakimowich-Payne 2026-01-30 15:49:05 -07:00
commit f3e40a7d91

View file

@ -347,6 +347,7 @@ struct Client::Impl {
std::vector<std::unique_ptr<LinkProxy>> auto_link_proxies; std::vector<std::unique_ptr<LinkProxy>> auto_link_proxies;
std::vector<std::pair<uint32_t, uint32_t>> auto_link_claimed_pairs; std::vector<std::pair<uint32_t, uint32_t>> auto_link_claimed_pairs;
std::vector<std::unique_ptr<LinkProxy>> saved_link_proxies; std::vector<std::unique_ptr<LinkProxy>> saved_link_proxies;
std::vector<std::pair<uint32_t, uint32_t>> pending_link_pairs;
pw_proxy* metadata_proxy = nullptr; pw_proxy* metadata_proxy = nullptr;
spa_hook metadata_listener{}; spa_hook metadata_listener{};
@ -562,8 +563,6 @@ void Client::Impl::CoreDone(void* data, uint32_t, int seq) {
if (impl->policy_sync_pending && if (impl->policy_sync_pending &&
seq >= static_cast<int>(impl->policy_sync_seq)) { seq >= static_cast<int>(impl->policy_sync_seq)) {
impl->policy_sync_pending = false; impl->policy_sync_pending = false;
fprintf(stderr, "[WP] CoreDone policy sync seq=%d, pending_auto=%zu saved=%zu\n",
seq, impl->pending_auto_links.size(), impl->saved_links.size());
impl->ProcessPendingAutoLinks(); impl->ProcessPendingAutoLinks();
impl->ProcessSavedLinks(); impl->ProcessSavedLinks();
} }
@ -954,11 +953,10 @@ void Client::Impl::EnforceRulesForLink(uint32_t link_id, uint32_t out_port,
for (const auto& pair : auto_link_claimed_pairs) { for (const auto& pair : auto_link_claimed_pairs) {
if (pair.first == out_port && pair.second == in_port) return; if (pair.first == out_port && pair.second == in_port) return;
} }
for (const auto& pair : pending_link_pairs) {
if (pair.first == out_port && pair.second == in_port) return;
}
fprintf(stderr, "[WP] EnforceRule: destroying link %u (%u->%u), "
"rule says %s -> %s\n", link_id, out_port, in_port,
node_it->second.name.c_str(),
rule_entry.second.target_node.c_str());
pw_registry_destroy(registry, link_id); pw_registry_destroy(registry, link_id);
return; return;
} }
@ -1001,8 +999,6 @@ void Client::Impl::ProcessPendingAutoLinks() {
} }
} }
if (target_node_id == 0) { if (target_node_id == 0) {
fprintf(stderr, "[WP] AutoLink: target '%s' not found for src node %u\n",
it->target_node_name.c_str(), it->source_node_id);
++it; ++it;
continue; continue;
} }
@ -1025,9 +1021,6 @@ void Client::Impl::ProcessPendingAutoLinks() {
} }
if (src_ports.empty() || tgt_ports.empty()) { if (src_ports.empty() || tgt_ports.empty()) {
fprintf(stderr, "[WP] AutoLink: src_ports=%zu tgt_ports=%zu for src=%u target=%u('%s')\n",
src_ports.size(), tgt_ports.size(), it->source_node_id,
target_node_id, it->target_node_name.c_str());
++it; ++it;
continue; continue;
} }
@ -1052,11 +1045,6 @@ void Client::Impl::ProcessPendingAutoLinks() {
batch_pairs.emplace_back(src_ports[i].id, tgt_ports[i].id); batch_pairs.emplace_back(src_ports[i].id, tgt_ports[i].id);
if (!exists) { if (!exists) {
links_to_create.push_back({src_ports[i].id, tgt_ports[i].id}); links_to_create.push_back({src_ports[i].id, tgt_ports[i].id});
fprintf(stderr, "[WP] AutoLink: will create %u->%u\n",
src_ports[i].id, tgt_ports[i].id);
} else {
fprintf(stderr, "[WP] AutoLink: already exists %u->%u (claimed)\n",
src_ports[i].id, tgt_ports[i].id);
} }
} }
@ -1099,8 +1087,6 @@ void Client::Impl::ProcessPendingAutoLinks() {
} }
} }
if (!is_ours) { if (!is_ours) {
fprintf(stderr, "[WP] AutoLink competing: link %u (%u->%u) will destroy\n",
link_id, link_entry.second.output_port.value, in_port);
competing_ids.push_back(link_id); competing_ids.push_back(link_id);
} }
} }
@ -1173,9 +1159,6 @@ void Client::Impl::ProcessSavedLinks() {
if (covered_by_rule) break; if (covered_by_rule) break;
} }
if (covered_by_rule) { if (covered_by_rule) {
fprintf(stderr, "[WP] SavedLink: covered_by_rule, skipping %s:%s -> %s:%s\n",
it->out_node.c_str(), it->out_port.c_str(),
it->in_node.c_str(), it->in_port.c_str());
it = saved_links.erase(it); it = saved_links.erase(it);
continue; continue;
} }
@ -1207,9 +1190,6 @@ void Client::Impl::ProcessSavedLinks() {
} }
} }
if (exists) { if (exists) {
fprintf(stderr, "[WP] SavedLink: already exists %s:%s -> %s:%s\n",
it->out_node.c_str(), it->out_port.c_str(),
it->in_node.c_str(), it->in_port.c_str());
it = saved_links.erase(it); it = saved_links.erase(it);
continue; continue;
} }
@ -1220,7 +1200,6 @@ void Client::Impl::ProcessSavedLinks() {
} }
} }
fprintf(stderr, "[WP] SavedLink: %zu links to create\n", to_create.size());
if (to_create.empty()) return; if (to_create.empty()) return;
std::unordered_map<uint32_t, std::vector<uint32_t>> saved_port_map; std::unordered_map<uint32_t, std::vector<uint32_t>> saved_port_map;
@ -1268,12 +1247,7 @@ void Client::Impl::ProcessSavedLinks() {
} }
} }
if (!is_ours) { if (!is_ours) {
fprintf(stderr, "[WP] Competing: link %u (%u->%u) has no owner, will destroy\n",
link_id, link_entry.second.output_port.value, in_port);
competing_link_ids.push_back(link_id); competing_link_ids.push_back(link_id);
} else {
fprintf(stderr, "[WP] Competing: link %u (%u->%u) is ours, keeping\n",
link_id, link_entry.second.output_port.value, in_port);
} }
} }
} }
@ -2014,6 +1988,11 @@ Result<Link> Client::CreateLink(PortId output, PortId input, const LinkOptions&
return {Status::Error(StatusCode::kUnavailable, "failed to create link"), {}}; return {Status::Error(StatusCode::kUnavailable, "failed to create link"), {}};
} }
{
std::lock_guard<std::mutex> lock(impl_->cache_mutex);
impl_->pending_link_pairs.emplace_back(output.value, input.value);
}
auto link_proxy = std::make_unique<LinkProxy>(); auto link_proxy = std::make_unique<LinkProxy>();
link_proxy->proxy = proxy; link_proxy->proxy = proxy;
link_proxy->loop = impl_->thread_loop; link_proxy->loop = impl_->thread_loop;
@ -2030,13 +2009,22 @@ Result<Link> Client::CreateLink(PortId output, PortId input, const LinkOptions&
++wait_attempts; ++wait_attempts;
} }
auto remove_pending = [&] {
std::lock_guard<std::mutex> lock(impl_->cache_mutex);
std::erase_if(impl_->pending_link_pairs, [&](const auto& p) {
return p.first == output.value && p.second == input.value;
});
};
if (link_proxy->failed) { if (link_proxy->failed) {
std::string error = link_proxy->error.empty() ? "link creation failed" : link_proxy->error; std::string error = link_proxy->error.empty() ? "link creation failed" : link_proxy->error;
remove_pending();
pw_proxy_destroy(proxy); pw_proxy_destroy(proxy);
pw_thread_loop_unlock(impl_->thread_loop); pw_thread_loop_unlock(impl_->thread_loop);
return {Status::Error(StatusCode::kUnavailable, std::move(error)), {}}; return {Status::Error(StatusCode::kUnavailable, std::move(error)), {}};
} }
if (link_proxy->id == SPA_ID_INVALID) { if (link_proxy->id == SPA_ID_INVALID) {
remove_pending();
pw_proxy_destroy(proxy); pw_proxy_destroy(proxy);
pw_thread_loop_unlock(impl_->thread_loop); pw_thread_loop_unlock(impl_->thread_loop);
return {Status::Error(StatusCode::kTimeout, "timed out waiting for link id"), {}}; return {Status::Error(StatusCode::kTimeout, "timed out waiting for link id"), {}};
@ -2050,6 +2038,9 @@ Result<Link> Client::CreateLink(PortId output, PortId input, const LinkOptions&
std::lock_guard<std::mutex> lock(impl_->cache_mutex); std::lock_guard<std::mutex> lock(impl_->cache_mutex);
impl_->link_proxies.emplace(link_proxy->id, std::move(link_proxy)); impl_->link_proxies.emplace(link_proxy->id, std::move(link_proxy));
impl_->links[link.id.value] = link; impl_->links[link.id.value] = link;
std::erase_if(impl_->pending_link_pairs, [&](const auto& p) {
return p.first == output.value && p.second == input.value;
});
} }
pw_thread_loop_unlock(impl_->thread_loop); pw_thread_loop_unlock(impl_->thread_loop);
@ -2135,7 +2126,15 @@ Status Client::RemoveLink(LinkId link) {
pw_proxy_destroy(it->second->proxy); pw_proxy_destroy(it->second->proxy);
} }
impl_->link_proxies.erase(it); impl_->link_proxies.erase(it);
impl_->links.erase(link.value); auto link_it2 = impl_->links.find(link.value);
if (link_it2 != impl_->links.end()) {
uint32_t op = link_it2->second.output_port.value;
uint32_t ip = link_it2->second.input_port.value;
impl_->links.erase(link_it2);
std::erase_if(impl_->auto_link_claimed_pairs, [&](const auto& pair) {
return pair.first == op && pair.second == ip;
});
}
removed = true; removed = true;
} }
} }
@ -2146,6 +2145,37 @@ Status Client::RemoveLink(LinkId link) {
pw_thread_loop_unlock(impl_->thread_loop); pw_thread_loop_unlock(impl_->thread_loop);
return Status::Error(StatusCode::kNotFound, "link not found"); return Status::Error(StatusCode::kNotFound, "link not found");
} }
{
std::lock_guard<std::mutex> lock(impl_->cache_mutex);
auto link_it = impl_->links.find(link.value);
uint32_t out_port = 0, in_port = 0;
if (link_it != impl_->links.end()) {
out_port = link_it->second.output_port.value;
in_port = link_it->second.input_port.value;
impl_->links.erase(link_it);
}
if (out_port && in_port) {
for (auto& p : impl_->saved_link_proxies) {
if (p && p->output_port == out_port && p->input_port == in_port) {
spa_hook_remove(&p->listener);
}
}
std::erase_if(impl_->saved_link_proxies, [&](const auto& p) {
return p && p->output_port == out_port && p->input_port == in_port;
});
for (auto& p : impl_->auto_link_proxies) {
if (p && p->output_port == out_port && p->input_port == in_port) {
spa_hook_remove(&p->listener);
}
}
std::erase_if(impl_->auto_link_proxies, [&](const auto& p) {
return p && p->output_port == out_port && p->input_port == in_port;
});
std::erase_if(impl_->auto_link_claimed_pairs, [&](const auto& pair) {
return pair.first == out_port && pair.second == in_port;
});
}
}
removed = true; removed = true;
} }
@ -2201,12 +2231,17 @@ Result<RuleId> Client::AddRouteRule(const RouteRule& rule) {
} }
Status Client::RemoveRouteRule(RuleId id) { Status Client::RemoveRouteRule(RuleId id) {
RuleMatch match;
std::string target_node;
{ {
std::lock_guard<std::mutex> lock(impl_->cache_mutex); std::lock_guard<std::mutex> lock(impl_->cache_mutex);
auto it = impl_->route_rules.find(id.value); auto it = impl_->route_rules.find(id.value);
if (it == impl_->route_rules.end()) { if (it == impl_->route_rules.end()) {
return Status::Error(StatusCode::kNotFound, "route rule not found"); return Status::Error(StatusCode::kNotFound, "route rule not found");
} }
match = it->second.match;
target_node = it->second.target_node;
impl_->route_rules.erase(it); impl_->route_rules.erase(it);
auto pending_it = impl_->pending_auto_links.begin(); auto pending_it = impl_->pending_auto_links.begin();
@ -2220,6 +2255,59 @@ Status Client::RemoveRouteRule(RuleId id) {
impl_->auto_link_claimed_pairs.clear(); impl_->auto_link_claimed_pairs.clear();
} }
std::vector<uint32_t> links_to_destroy;
std::vector<std::pair<uint32_t, uint32_t>> pairs_to_remove;
{
std::lock_guard<std::mutex> lock(impl_->cache_mutex);
uint32_t target_id = 0;
for (const auto& n : impl_->nodes) {
if (n.second.name == target_node) {
target_id = n.first;
break;
}
}
if (target_id) {
for (const auto& link_entry : impl_->links) {
uint32_t out_port = link_entry.second.output_port.value;
uint32_t in_port = link_entry.second.input_port.value;
auto in_port_it = impl_->ports.find(in_port);
if (in_port_it == impl_->ports.end() ||
in_port_it->second.node.value != target_id) continue;
auto out_port_it = impl_->ports.find(out_port);
if (out_port_it == impl_->ports.end()) continue;
auto src_node_it = impl_->nodes.find(out_port_it->second.node.value);
if (src_node_it == impl_->nodes.end()) continue;
if (MatchesRule(src_node_it->second, match)) {
links_to_destroy.push_back(link_entry.first);
pairs_to_remove.emplace_back(out_port, in_port);
}
}
}
}
if (!links_to_destroy.empty() && impl_->thread_loop) {
pw_thread_loop_lock(impl_->thread_loop);
for (uint32_t link_id : links_to_destroy) {
pw_registry_destroy(impl_->registry, link_id);
}
{
std::lock_guard<std::mutex> lock(impl_->cache_mutex);
for (const auto& pair : pairs_to_remove) {
for (auto& p : impl_->auto_link_proxies) {
if (p && p->output_port == pair.first &&
p->input_port == pair.second) {
spa_hook_remove(&p->listener);
}
}
std::erase_if(impl_->auto_link_proxies, [&](const auto& p) {
return p && p->output_port == pair.first &&
p->input_port == pair.second;
});
}
}
pw_thread_loop_unlock(impl_->thread_loop);
}
impl_->AutoSave(); impl_->AutoSave();
return Status::Ok(); return Status::Ok();
} }
@ -2507,9 +2595,6 @@ Status Client::LoadConfig(std::string_view path) {
in_node.empty() || in_port.empty()) { in_node.empty() || in_port.empty()) {
continue; continue;
} }
fprintf(stderr, "[WP] Config: loaded saved link %s:%s -> %s:%s\n",
out_node.c_str(), out_port.c_str(),
in_node.c_str(), in_port.c_str());
impl_->saved_links.push_back({out_node, out_port, in_node, in_port}); impl_->saved_links.push_back({out_node, out_port, in_node, in_port});
} catch (...) { } catch (...) {
continue; continue;