diff --git a/src/warppipe.cpp b/src/warppipe.cpp index b669cfa..d07554a 100644 --- a/src/warppipe.cpp +++ b/src/warppipe.cpp @@ -347,6 +347,7 @@ struct Client::Impl { std::vector> auto_link_proxies; std::vector> auto_link_claimed_pairs; std::vector> saved_link_proxies; + std::vector> pending_link_pairs; pw_proxy* metadata_proxy = nullptr; spa_hook metadata_listener{}; @@ -562,8 +563,6 @@ void Client::Impl::CoreDone(void* data, uint32_t, int seq) { if (impl->policy_sync_pending && seq >= static_cast(impl->policy_sync_seq)) { impl->policy_sync_pending = false; - fprintf(stderr, "[WP] CoreDone policy sync seq=%d, pending_auto=%zu saved=%zu\n", - seq, impl->pending_auto_links.size(), impl->saved_links.size()); impl->ProcessPendingAutoLinks(); impl->ProcessSavedLinks(); } @@ -954,11 +953,10 @@ void Client::Impl::EnforceRulesForLink(uint32_t link_id, uint32_t out_port, for (const auto& pair : auto_link_claimed_pairs) { if (pair.first == out_port && pair.second == in_port) return; } + for (const auto& pair : pending_link_pairs) { + if (pair.first == out_port && pair.second == in_port) return; + } - fprintf(stderr, "[WP] EnforceRule: destroying link %u (%u->%u), " - "rule says %s -> %s\n", link_id, out_port, in_port, - node_it->second.name.c_str(), - rule_entry.second.target_node.c_str()); pw_registry_destroy(registry, link_id); return; } @@ -1001,8 +999,6 @@ void Client::Impl::ProcessPendingAutoLinks() { } } if (target_node_id == 0) { - fprintf(stderr, "[WP] AutoLink: target '%s' not found for src node %u\n", - it->target_node_name.c_str(), it->source_node_id); ++it; continue; } @@ -1025,9 +1021,6 @@ void Client::Impl::ProcessPendingAutoLinks() { } if (src_ports.empty() || tgt_ports.empty()) { - fprintf(stderr, "[WP] AutoLink: src_ports=%zu tgt_ports=%zu for src=%u target=%u('%s')\n", - src_ports.size(), tgt_ports.size(), it->source_node_id, - target_node_id, it->target_node_name.c_str()); ++it; continue; } @@ -1052,11 +1045,6 @@ void Client::Impl::ProcessPendingAutoLinks() { batch_pairs.emplace_back(src_ports[i].id, tgt_ports[i].id); if (!exists) { links_to_create.push_back({src_ports[i].id, tgt_ports[i].id}); - fprintf(stderr, "[WP] AutoLink: will create %u->%u\n", - src_ports[i].id, tgt_ports[i].id); - } else { - fprintf(stderr, "[WP] AutoLink: already exists %u->%u (claimed)\n", - src_ports[i].id, tgt_ports[i].id); } } @@ -1099,8 +1087,6 @@ void Client::Impl::ProcessPendingAutoLinks() { } } if (!is_ours) { - fprintf(stderr, "[WP] AutoLink competing: link %u (%u->%u) will destroy\n", - link_id, link_entry.second.output_port.value, in_port); competing_ids.push_back(link_id); } } @@ -1173,9 +1159,6 @@ void Client::Impl::ProcessSavedLinks() { if (covered_by_rule) break; } if (covered_by_rule) { - fprintf(stderr, "[WP] SavedLink: covered_by_rule, skipping %s:%s -> %s:%s\n", - it->out_node.c_str(), it->out_port.c_str(), - it->in_node.c_str(), it->in_port.c_str()); it = saved_links.erase(it); continue; } @@ -1207,9 +1190,6 @@ void Client::Impl::ProcessSavedLinks() { } } if (exists) { - fprintf(stderr, "[WP] SavedLink: already exists %s:%s -> %s:%s\n", - it->out_node.c_str(), it->out_port.c_str(), - it->in_node.c_str(), it->in_port.c_str()); it = saved_links.erase(it); continue; } @@ -1220,7 +1200,6 @@ void Client::Impl::ProcessSavedLinks() { } } - fprintf(stderr, "[WP] SavedLink: %zu links to create\n", to_create.size()); if (to_create.empty()) return; std::unordered_map> saved_port_map; @@ -1268,12 +1247,7 @@ void Client::Impl::ProcessSavedLinks() { } } if (!is_ours) { - fprintf(stderr, "[WP] Competing: link %u (%u->%u) has no owner, will destroy\n", - link_id, link_entry.second.output_port.value, in_port); competing_link_ids.push_back(link_id); - } else { - fprintf(stderr, "[WP] Competing: link %u (%u->%u) is ours, keeping\n", - link_id, link_entry.second.output_port.value, in_port); } } } @@ -2014,6 +1988,11 @@ Result Client::CreateLink(PortId output, PortId input, const LinkOptions& return {Status::Error(StatusCode::kUnavailable, "failed to create link"), {}}; } + { + std::lock_guard lock(impl_->cache_mutex); + impl_->pending_link_pairs.emplace_back(output.value, input.value); + } + auto link_proxy = std::make_unique(); link_proxy->proxy = proxy; link_proxy->loop = impl_->thread_loop; @@ -2030,13 +2009,22 @@ Result Client::CreateLink(PortId output, PortId input, const LinkOptions& ++wait_attempts; } + auto remove_pending = [&] { + std::lock_guard lock(impl_->cache_mutex); + std::erase_if(impl_->pending_link_pairs, [&](const auto& p) { + return p.first == output.value && p.second == input.value; + }); + }; + if (link_proxy->failed) { std::string error = link_proxy->error.empty() ? "link creation failed" : link_proxy->error; + remove_pending(); pw_proxy_destroy(proxy); pw_thread_loop_unlock(impl_->thread_loop); return {Status::Error(StatusCode::kUnavailable, std::move(error)), {}}; } if (link_proxy->id == SPA_ID_INVALID) { + remove_pending(); pw_proxy_destroy(proxy); pw_thread_loop_unlock(impl_->thread_loop); return {Status::Error(StatusCode::kTimeout, "timed out waiting for link id"), {}}; @@ -2050,6 +2038,9 @@ Result Client::CreateLink(PortId output, PortId input, const LinkOptions& std::lock_guard lock(impl_->cache_mutex); impl_->link_proxies.emplace(link_proxy->id, std::move(link_proxy)); impl_->links[link.id.value] = link; + std::erase_if(impl_->pending_link_pairs, [&](const auto& p) { + return p.first == output.value && p.second == input.value; + }); } pw_thread_loop_unlock(impl_->thread_loop); @@ -2135,7 +2126,15 @@ Status Client::RemoveLink(LinkId link) { pw_proxy_destroy(it->second->proxy); } impl_->link_proxies.erase(it); - impl_->links.erase(link.value); + auto link_it2 = impl_->links.find(link.value); + if (link_it2 != impl_->links.end()) { + uint32_t op = link_it2->second.output_port.value; + uint32_t ip = link_it2->second.input_port.value; + impl_->links.erase(link_it2); + std::erase_if(impl_->auto_link_claimed_pairs, [&](const auto& pair) { + return pair.first == op && pair.second == ip; + }); + } removed = true; } } @@ -2146,6 +2145,37 @@ Status Client::RemoveLink(LinkId link) { pw_thread_loop_unlock(impl_->thread_loop); return Status::Error(StatusCode::kNotFound, "link not found"); } + { + std::lock_guard lock(impl_->cache_mutex); + auto link_it = impl_->links.find(link.value); + uint32_t out_port = 0, in_port = 0; + if (link_it != impl_->links.end()) { + out_port = link_it->second.output_port.value; + in_port = link_it->second.input_port.value; + impl_->links.erase(link_it); + } + if (out_port && in_port) { + for (auto& p : impl_->saved_link_proxies) { + if (p && p->output_port == out_port && p->input_port == in_port) { + spa_hook_remove(&p->listener); + } + } + std::erase_if(impl_->saved_link_proxies, [&](const auto& p) { + return p && p->output_port == out_port && p->input_port == in_port; + }); + for (auto& p : impl_->auto_link_proxies) { + if (p && p->output_port == out_port && p->input_port == in_port) { + spa_hook_remove(&p->listener); + } + } + std::erase_if(impl_->auto_link_proxies, [&](const auto& p) { + return p && p->output_port == out_port && p->input_port == in_port; + }); + std::erase_if(impl_->auto_link_claimed_pairs, [&](const auto& pair) { + return pair.first == out_port && pair.second == in_port; + }); + } + } removed = true; } @@ -2201,12 +2231,17 @@ Result Client::AddRouteRule(const RouteRule& rule) { } Status Client::RemoveRouteRule(RuleId id) { + RuleMatch match; + std::string target_node; + { std::lock_guard lock(impl_->cache_mutex); auto it = impl_->route_rules.find(id.value); if (it == impl_->route_rules.end()) { return Status::Error(StatusCode::kNotFound, "route rule not found"); } + match = it->second.match; + target_node = it->second.target_node; impl_->route_rules.erase(it); auto pending_it = impl_->pending_auto_links.begin(); @@ -2220,6 +2255,59 @@ Status Client::RemoveRouteRule(RuleId id) { impl_->auto_link_claimed_pairs.clear(); } + std::vector links_to_destroy; + std::vector> pairs_to_remove; + { + std::lock_guard lock(impl_->cache_mutex); + uint32_t target_id = 0; + for (const auto& n : impl_->nodes) { + if (n.second.name == target_node) { + target_id = n.first; + break; + } + } + if (target_id) { + for (const auto& link_entry : impl_->links) { + uint32_t out_port = link_entry.second.output_port.value; + uint32_t in_port = link_entry.second.input_port.value; + auto in_port_it = impl_->ports.find(in_port); + if (in_port_it == impl_->ports.end() || + in_port_it->second.node.value != target_id) continue; + auto out_port_it = impl_->ports.find(out_port); + if (out_port_it == impl_->ports.end()) continue; + auto src_node_it = impl_->nodes.find(out_port_it->second.node.value); + if (src_node_it == impl_->nodes.end()) continue; + if (MatchesRule(src_node_it->second, match)) { + links_to_destroy.push_back(link_entry.first); + pairs_to_remove.emplace_back(out_port, in_port); + } + } + } + } + + if (!links_to_destroy.empty() && impl_->thread_loop) { + pw_thread_loop_lock(impl_->thread_loop); + for (uint32_t link_id : links_to_destroy) { + pw_registry_destroy(impl_->registry, link_id); + } + { + std::lock_guard lock(impl_->cache_mutex); + for (const auto& pair : pairs_to_remove) { + for (auto& p : impl_->auto_link_proxies) { + if (p && p->output_port == pair.first && + p->input_port == pair.second) { + spa_hook_remove(&p->listener); + } + } + std::erase_if(impl_->auto_link_proxies, [&](const auto& p) { + return p && p->output_port == pair.first && + p->input_port == pair.second; + }); + } + } + pw_thread_loop_unlock(impl_->thread_loop); + } + impl_->AutoSave(); return Status::Ok(); } @@ -2507,9 +2595,6 @@ Status Client::LoadConfig(std::string_view path) { in_node.empty() || in_port.empty()) { continue; } - fprintf(stderr, "[WP] Config: loaded saved link %s:%s -> %s:%s\n", - out_node.c_str(), out_port.c_str(), - in_node.c_str(), in_port.c_str()); impl_->saved_links.push_back({out_node, out_port, in_node, in_port}); } catch (...) { continue;