From 08db414fa9ed4668d477b175b0624795771ad143 Mon Sep 17 00:00:00 2001 From: Joey Yakimowich-Payne Date: Fri, 30 Jan 2026 14:30:17 -0700 Subject: [PATCH] Rules work better --- src/warppipe.cpp | 98 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 98 insertions(+) diff --git a/src/warppipe.cpp b/src/warppipe.cpp index 2e74367..b669cfa 100644 --- a/src/warppipe.cpp +++ b/src/warppipe.cpp @@ -380,6 +380,7 @@ struct Client::Impl { const VirtualNodeOptions& options); void CheckRulesForNode(const NodeInfo& node); + void EnforceRulesForLink(uint32_t link_id, uint32_t out_port, uint32_t in_port); void SchedulePolicySync(); void ProcessPendingAutoLinks(); void CreateAutoLinkAsync(uint32_t output_port, uint32_t input_port); @@ -463,6 +464,9 @@ void Client::Impl::RegistryGlobal(void* data, info.input_port = PortId{in_port}; } impl->links[id] = std::move(info); + if (!impl->options.policy_only && out_port && in_port) { + impl->EnforceRulesForLink(id, out_port, in_port); + } notify = true; } } @@ -914,6 +918,52 @@ void Client::Impl::CheckRulesForNode(const NodeInfo& node) { } } +void Client::Impl::EnforceRulesForLink(uint32_t link_id, uint32_t out_port, + uint32_t in_port) { + auto port_it = ports.find(out_port); + if (port_it == ports.end()) return; + uint32_t src_node_id = port_it->second.node.value; + auto node_it = nodes.find(src_node_id); + if (node_it == nodes.end()) return; + + for (const auto& rule_entry : route_rules) { + if (!MatchesRule(node_it->second, rule_entry.second.match)) continue; + + uint32_t target_node_id = 0; + for (const auto& n : nodes) { + if (n.second.name == rule_entry.second.target_node) { + target_node_id = n.first; + break; + } + } + if (target_node_id == 0) return; + + auto in_port_it = ports.find(in_port); + if (in_port_it == ports.end()) return; + if (in_port_it->second.node.value == target_node_id) return; + + if (link_proxies.count(link_id)) return; + for (const auto& proxy : auto_link_proxies) { + if (proxy && proxy->output_port == out_port && + proxy->input_port == in_port) return; + } + for (const auto& proxy : saved_link_proxies) { + if (proxy && proxy->output_port == out_port && + proxy->input_port == in_port) return; + } + for (const auto& pair : auto_link_claimed_pairs) { + if (pair.first == out_port && pair.second == in_port) return; + } + + fprintf(stderr, "[WP] EnforceRule: destroying link %u (%u->%u), " + "rule says %s -> %s\n", link_id, out_port, in_port, + node_it->second.name.c_str(), + rule_entry.second.target_node.c_str()); + pw_registry_destroy(registry, link_id); + return; + } +} + void Client::Impl::SchedulePolicySync() { if (policy_sync_pending || !core) { return; @@ -937,6 +987,7 @@ void Client::Impl::ProcessPendingAutoLinks() { uint32_t input_port; }; std::vector links_to_create; + std::vector> batch_pairs; { std::lock_guard lock(cache_mutex); @@ -998,6 +1049,7 @@ void Client::Impl::ProcessPendingAutoLinks() { } } auto_link_claimed_pairs.emplace_back(src_ports[i].id, tgt_ports[i].id); + batch_pairs.emplace_back(src_ports[i].id, tgt_ports[i].id); if (!exists) { links_to_create.push_back({src_ports[i].id, tgt_ports[i].id}); fprintf(stderr, "[WP] AutoLink: will create %u->%u\n", @@ -1012,6 +1064,52 @@ void Client::Impl::ProcessPendingAutoLinks() { } } + if (batch_pairs.empty()) { + for (const auto& spec : links_to_create) { + CreateAutoLinkAsync(spec.output_port, spec.input_port); + } + return; + } + + std::unordered_map> auto_port_map; + for (const auto& pair : batch_pairs) { + auto_port_map[pair.first].push_back(pair.second); + } + + std::vector competing_ids; + { + std::lock_guard lock(cache_mutex); + for (const auto& link_entry : links) { + auto it = auto_port_map.find(link_entry.second.output_port.value); + if (it == auto_port_map.end()) continue; + uint32_t link_id = link_entry.first; + uint32_t in_port = link_entry.second.input_port.value; + bool is_ours = false; + for (uint32_t target_in : it->second) { + if (target_in == in_port) { is_ours = true; break; } + } + if (!is_ours) { + if (link_proxies.count(link_id)) is_ours = true; + } + if (!is_ours) { + uint32_t out_port = link_entry.second.output_port.value; + for (const auto& proxy : saved_link_proxies) { + if (proxy && proxy->output_port == out_port && + proxy->input_port == in_port) { is_ours = true; break; } + } + } + if (!is_ours) { + fprintf(stderr, "[WP] AutoLink competing: link %u (%u->%u) will destroy\n", + link_id, link_entry.second.output_port.value, in_port); + competing_ids.push_back(link_id); + } + } + } + + for (uint32_t id : competing_ids) { + pw_registry_destroy(registry, id); + } + for (const auto& spec : links_to_create) { CreateAutoLinkAsync(spec.output_port, spec.input_port); }