Add persistent links

This commit is contained in:
Joey Yakimowich-Payne 2026-01-30 11:28:43 -07:00
commit e8d3f63f4d
4 changed files with 362 additions and 9 deletions

View file

@ -1035,7 +1035,7 @@ void GraphEditorWidget::tryResolvePendingLinks() {
} }
if (foundOut && foundIn) { if (foundOut && foundIn) {
m_client->CreateLink(outPortId, inPortId, warppipe::LinkOptions{}); m_client->CreateLink(outPortId, inPortId, warppipe::LinkOptions{.linger = true});
} else { } else {
remaining.push_back(pending); remaining.push_back(pending);
} }

View file

@ -186,7 +186,7 @@ bool PresetManager::loadPreset(const QString &path, warppipe::Client *client,
std::string inPort = route["in_port"].toString().toStdString(); std::string inPort = route["in_port"].toString().toStdString();
client->CreateLinkByName(outNode, outPort, inNode, inPort, client->CreateLinkByName(outNode, outPort, inNode, inPort,
warppipe::LinkOptions{}); warppipe::LinkOptions{.linger = true});
} }
model->refreshFromClient(); model->refreshFromClient();

View file

@ -124,7 +124,7 @@ void WarpGraphModel::addConnection(
warppipe::PortId outPortId = outIt->second.outputPorts[outIdx].id; warppipe::PortId outPortId = outIt->second.outputPorts[outIdx].id;
warppipe::PortId inPortId = inIt->second.inputPorts[inIdx].id; warppipe::PortId inPortId = inIt->second.inputPorts[inIdx].id;
auto result = m_client->CreateLink(outPortId, inPortId, warppipe::LinkOptions{}); auto result = m_client->CreateLink(outPortId, inPortId, warppipe::LinkOptions{.linger = true});
if (!result.ok()) { if (!result.ok()) {
return; return;
} }

View file

@ -343,11 +343,25 @@ struct Client::Impl {
uint32_t policy_sync_seq = 0; uint32_t policy_sync_seq = 0;
bool policy_sync_pending = false; bool policy_sync_pending = false;
std::vector<std::unique_ptr<LinkProxy>> auto_link_proxies; std::vector<std::unique_ptr<LinkProxy>> auto_link_proxies;
std::vector<std::unique_ptr<LinkProxy>> saved_link_proxies;
pw_proxy* metadata_proxy = nullptr; pw_proxy* metadata_proxy = nullptr;
spa_hook metadata_listener{}; spa_hook metadata_listener{};
bool metadata_listener_attached = false; bool metadata_listener_attached = false;
MetadataInfo defaults; MetadataInfo defaults;
bool loading_config = false;
struct SavedLink {
std::string out_node;
std::string out_port;
std::string in_node;
std::string in_port;
bool operator==(const SavedLink& o) const {
return out_node == o.out_node && out_port == o.out_port &&
in_node == o.in_node && in_port == o.in_port;
}
};
std::vector<SavedLink> saved_links;
Status ConnectLocked(); Status ConnectLocked();
void DisconnectLocked(); void DisconnectLocked();
@ -362,6 +376,8 @@ struct Client::Impl {
void SchedulePolicySync(); void SchedulePolicySync();
void ProcessPendingAutoLinks(); void ProcessPendingAutoLinks();
void CreateAutoLinkAsync(uint32_t output_port, uint32_t input_port); void CreateAutoLinkAsync(uint32_t output_port, uint32_t input_port);
void ProcessSavedLinks();
void CreateSavedLinkAsync(uint32_t output_port, uint32_t input_port);
void AutoSave(); void AutoSave();
void SetupMasterMeter(); void SetupMasterMeter();
void TeardownMasterMeter(); void TeardownMasterMeter();
@ -423,7 +439,7 @@ void Client::Impl::RegistryGlobal(void* data,
info.is_input = true; info.is_input = true;
} }
impl->ports[id] = info; impl->ports[id] = info;
if (!impl->pending_auto_links.empty()) { if (!impl->pending_auto_links.empty() || !impl->saved_links.empty()) {
impl->SchedulePolicySync(); impl->SchedulePolicySync();
} }
return; return;
@ -530,6 +546,7 @@ void Client::Impl::CoreDone(void* data, uint32_t, int seq) {
seq >= static_cast<int>(impl->policy_sync_seq)) { seq >= static_cast<int>(impl->policy_sync_seq)) {
impl->policy_sync_pending = false; impl->policy_sync_pending = false;
impl->ProcessPendingAutoLinks(); impl->ProcessPendingAutoLinks();
impl->ProcessSavedLinks();
} }
} }
@ -806,8 +823,8 @@ void Client::Impl::DisconnectLocked() {
} }
for (auto& entry : links) { for (auto& entry : links) {
LinkProxy* link = entry.second.get(); LinkProxy* link = entry.second.get();
if (link && link->proxy) { if (link) {
pw_proxy_destroy(link->proxy); spa_hook_remove(&link->listener);
link->proxy = nullptr; link->proxy = nullptr;
} }
} }
@ -820,12 +837,19 @@ void Client::Impl::DisconnectLocked() {
} }
} }
for (auto& entry : auto_link_proxies) { for (auto& entry : auto_link_proxies) {
if (entry && entry->proxy) { if (entry) {
pw_proxy_destroy(entry->proxy); spa_hook_remove(&entry->listener);
entry->proxy = nullptr; entry->proxy = nullptr;
} }
} }
auto_link_proxies.clear(); auto_link_proxies.clear();
for (auto& entry : saved_link_proxies) {
if (entry) {
spa_hook_remove(&entry->listener);
entry->proxy = nullptr;
}
}
saved_link_proxies.clear();
if (metadata_listener_attached) { if (metadata_listener_attached) {
spa_hook_remove(&metadata_listener); spa_hook_remove(&metadata_listener);
metadata_listener_attached = false; metadata_listener_attached = false;
@ -991,8 +1015,129 @@ void Client::Impl::CreateAutoLinkAsync(uint32_t output_port, uint32_t input_port
auto_link_proxies.push_back(std::move(link_data)); auto_link_proxies.push_back(std::move(link_data));
} }
void Client::Impl::ProcessSavedLinks() {
struct LinkSpec {
uint32_t output_port;
uint32_t input_port;
std::string label;
};
std::vector<LinkSpec> to_create;
{
std::lock_guard<std::mutex> lock(cache_mutex);
fprintf(stderr, "[warppipe] ProcessSavedLinks: %zu pending, %zu nodes, %zu ports\n",
saved_links.size(), nodes.size(), ports.size());
for (auto it = saved_links.begin(); it != saved_links.end();) {
uint32_t out_id = 0, in_id = 0;
for (const auto& port_entry : ports) {
const PortInfo& port = port_entry.second;
auto node_it = nodes.find(port.node.value);
if (node_it == nodes.end()) continue;
if (!port.is_input && node_it->second.name == it->out_node &&
port.name == it->out_port) {
out_id = port_entry.first;
}
if (port.is_input && node_it->second.name == it->in_node &&
port.name == it->in_port) {
in_id = port_entry.first;
}
if (out_id && in_id) break;
}
if (!out_id || !in_id) {
fprintf(stderr, " deferred: %s:%s -> %s:%s (ports not found)\n",
it->out_node.c_str(), it->out_port.c_str(),
it->in_node.c_str(), it->in_port.c_str());
++it;
continue;
}
bool exists = false;
for (const auto& link_entry : links) {
if (link_entry.second.output_port.value == out_id &&
link_entry.second.input_port.value == in_id) {
exists = true;
break;
}
}
if (exists) {
fprintf(stderr, " already exists: %s:%s -> %s:%s\n",
it->out_node.c_str(), it->out_port.c_str(),
it->in_node.c_str(), it->in_port.c_str());
it = saved_links.erase(it);
continue;
}
std::string label = it->out_node + ":" + it->out_port + " -> " +
it->in_node + ":" + it->in_port;
to_create.push_back({out_id, in_id, std::move(label)});
it = saved_links.erase(it);
}
}
if (to_create.empty()) return;
std::unordered_map<uint32_t, std::vector<uint32_t>> saved_port_map;
for (const auto& spec : to_create) {
saved_port_map[spec.output_port].push_back(spec.input_port);
}
std::vector<uint32_t> competing_link_ids;
{
std::lock_guard<std::mutex> lock(cache_mutex);
for (const auto& link_entry : links) {
auto it = saved_port_map.find(link_entry.second.output_port.value);
if (it == saved_port_map.end()) continue;
uint32_t in_port = link_entry.second.input_port.value;
bool is_saved = false;
for (uint32_t saved_in : it->second) {
if (saved_in == in_port) { is_saved = true; break; }
}
if (!is_saved) {
competing_link_ids.push_back(link_entry.first);
}
}
}
for (uint32_t id : competing_link_ids) {
fprintf(stderr, " removing competing link %u\n", id);
pw_registry_destroy(registry, id);
}
for (const auto& spec : to_create) {
fprintf(stderr, " creating: %s (ports %u -> %u)\n",
spec.label.c_str(), spec.output_port, spec.input_port);
CreateSavedLinkAsync(spec.output_port, spec.input_port);
}
}
void Client::Impl::CreateSavedLinkAsync(uint32_t output_port,
uint32_t input_port) {
if (!core) return;
pw_properties* props = pw_properties_new(nullptr, nullptr);
if (!props) return;
pw_properties_setf(props, PW_KEY_LINK_OUTPUT_PORT, "%u", output_port);
pw_properties_setf(props, PW_KEY_LINK_INPUT_PORT, "%u", input_port);
pw_properties_set(props, PW_KEY_OBJECT_LINGER, "true");
pw_proxy* proxy = reinterpret_cast<pw_proxy*>(
pw_core_create_object(core, "link-factory",
PW_TYPE_INTERFACE_Link,
PW_VERSION_LINK,
&props->dict, 0));
pw_properties_free(props);
if (!proxy) return;
auto link_data = std::make_unique<LinkProxy>();
link_data->proxy = proxy;
link_data->loop = thread_loop;
pw_proxy_add_listener(proxy, &link_data->listener, &kLinkProxyEvents,
link_data.get());
std::lock_guard<std::mutex> lock(cache_mutex);
saved_link_proxies.push_back(std::move(link_data));
}
void Client::Impl::AutoSave() { void Client::Impl::AutoSave() {
if (!options.config_path || options.config_path->empty()) { if (!options.config_path || options.config_path->empty() || loading_config) {
return; return;
} }
nlohmann::json j; nlohmann::json j;
@ -1033,6 +1178,83 @@ void Client::Impl::AutoSave() {
} }
j["route_rules"] = std::move(rules_array); j["route_rules"] = std::move(rules_array);
nlohmann::json links_array = nlohmann::json::array();
{
std::lock_guard<std::mutex> lock(cache_mutex);
std::vector<SavedLink> live;
for (const auto& entry : link_proxies) {
if (!entry.second) {
continue;
}
auto link_it = links.find(entry.first);
if (link_it == links.end()) {
continue;
}
const Link& link = link_it->second;
auto out_port_it = ports.find(link.output_port.value);
auto in_port_it = ports.find(link.input_port.value);
if (out_port_it == ports.end() || in_port_it == ports.end()) {
continue;
}
auto out_node_it = nodes.find(out_port_it->second.node.value);
auto in_node_it = nodes.find(in_port_it->second.node.value);
if (out_node_it == nodes.end() || in_node_it == nodes.end()) {
continue;
}
SavedLink sl{out_node_it->second.name, out_port_it->second.name,
in_node_it->second.name, in_port_it->second.name};
live.push_back(sl);
nlohmann::json link_obj;
link_obj["out_node"] = sl.out_node;
link_obj["out_port"] = sl.out_port;
link_obj["in_node"] = sl.in_node;
link_obj["in_port"] = sl.in_port;
links_array.push_back(std::move(link_obj));
}
for (const auto& lp : saved_link_proxies) {
if (!lp || lp->id == SPA_ID_INVALID) continue;
auto link_it = links.find(lp->id);
if (link_it == links.end()) continue;
const Link& link = link_it->second;
auto out_port_it = ports.find(link.output_port.value);
auto in_port_it = ports.find(link.input_port.value);
if (out_port_it == ports.end() || in_port_it == ports.end()) continue;
auto out_node_it = nodes.find(out_port_it->second.node.value);
auto in_node_it = nodes.find(in_port_it->second.node.value);
if (out_node_it == nodes.end() || in_node_it == nodes.end()) continue;
SavedLink sl{out_node_it->second.name, out_port_it->second.name,
in_node_it->second.name, in_port_it->second.name};
bool dup = false;
for (const auto& l : live) {
if (l == sl) { dup = true; break; }
}
if (!dup) {
live.push_back(sl);
nlohmann::json link_obj;
link_obj["out_node"] = sl.out_node;
link_obj["out_port"] = sl.out_port;
link_obj["in_node"] = sl.in_node;
link_obj["in_port"] = sl.in_port;
links_array.push_back(std::move(link_obj));
}
}
for (const auto& sl : saved_links) {
bool already = false;
for (const auto& l : live) {
if (l == sl) { already = true; break; }
}
if (!already) {
nlohmann::json link_obj;
link_obj["out_node"] = sl.out_node;
link_obj["out_port"] = sl.out_port;
link_obj["in_node"] = sl.in_node;
link_obj["in_port"] = sl.in_port;
links_array.push_back(std::move(link_obj));
}
}
}
j["links"] = std::move(links_array);
std::string tmp_path = *options.config_path + ".tmp"; std::string tmp_path = *options.config_path + ".tmp";
std::ofstream file(tmp_path); std::ofstream file(tmp_path);
if (!file.is_open()) { if (!file.is_open()) {
@ -1648,6 +1870,7 @@ Result<Link> Client::CreateLink(PortId output, PortId input, const LinkOptions&
} }
pw_thread_loop_unlock(impl_->thread_loop); pw_thread_loop_unlock(impl_->thread_loop);
impl_->AutoSave();
return {Status::Ok(), link}; return {Status::Ok(), link};
} }
@ -1707,8 +1930,22 @@ Status Client::RemoveLink(LinkId link) {
pw_thread_loop_lock(impl_->thread_loop); pw_thread_loop_lock(impl_->thread_loop);
bool removed = false; bool removed = false;
Impl::SavedLink removed_link;
{ {
std::lock_guard<std::mutex> lock(impl_->cache_mutex); std::lock_guard<std::mutex> lock(impl_->cache_mutex);
auto link_it = impl_->links.find(link.value);
if (link_it != impl_->links.end()) {
auto op = impl_->ports.find(link_it->second.output_port.value);
auto ip = impl_->ports.find(link_it->second.input_port.value);
if (op != impl_->ports.end() && ip != impl_->ports.end()) {
auto on = impl_->nodes.find(op->second.node.value);
auto in_ = impl_->nodes.find(ip->second.node.value);
if (on != impl_->nodes.end() && in_ != impl_->nodes.end()) {
removed_link = {on->second.name, op->second.name,
in_->second.name, ip->second.name};
}
}
}
auto it = impl_->link_proxies.find(link.value); auto it = impl_->link_proxies.find(link.value);
if (it != impl_->link_proxies.end()) { if (it != impl_->link_proxies.end()) {
if (it->second && it->second->proxy) { if (it->second && it->second->proxy) {
@ -1730,6 +1967,14 @@ Status Client::RemoveLink(LinkId link) {
} }
pw_thread_loop_unlock(impl_->thread_loop); pw_thread_loop_unlock(impl_->thread_loop);
if (removed) {
if (!removed_link.out_node.empty()) {
std::lock_guard<std::mutex> lock(impl_->cache_mutex);
auto& sl = impl_->saved_links;
sl.erase(std::remove(sl.begin(), sl.end(), removed_link), sl.end());
}
impl_->AutoSave();
}
return removed ? Status::Ok() : Status::Error(StatusCode::kNotFound, "link not found"); return removed ? Status::Ok() : Status::Error(StatusCode::kNotFound, "link not found");
} }
@ -1886,8 +2131,85 @@ Status Client::SaveConfig(std::string_view path) {
} }
} }
nlohmann::json links_array = nlohmann::json::array();
{
std::lock_guard<std::mutex> lock(impl_->cache_mutex);
std::vector<Impl::SavedLink> live;
for (const auto& entry : impl_->link_proxies) {
if (!entry.second) {
continue;
}
auto link_it = impl_->links.find(entry.first);
if (link_it == impl_->links.end()) {
continue;
}
const Link& link = link_it->second;
auto out_port_it = impl_->ports.find(link.output_port.value);
auto in_port_it = impl_->ports.find(link.input_port.value);
if (out_port_it == impl_->ports.end() || in_port_it == impl_->ports.end()) {
continue;
}
auto out_node_it = impl_->nodes.find(out_port_it->second.node.value);
auto in_node_it = impl_->nodes.find(in_port_it->second.node.value);
if (out_node_it == impl_->nodes.end() || in_node_it == impl_->nodes.end()) {
continue;
}
Impl::SavedLink sl{out_node_it->second.name, out_port_it->second.name,
in_node_it->second.name, in_port_it->second.name};
live.push_back(sl);
nlohmann::json link_obj;
link_obj["out_node"] = sl.out_node;
link_obj["out_port"] = sl.out_port;
link_obj["in_node"] = sl.in_node;
link_obj["in_port"] = sl.in_port;
links_array.push_back(std::move(link_obj));
}
for (const auto& lp : impl_->saved_link_proxies) {
if (!lp || lp->id == SPA_ID_INVALID) continue;
auto link_it = impl_->links.find(lp->id);
if (link_it == impl_->links.end()) continue;
const Link& link = link_it->second;
auto out_port_it = impl_->ports.find(link.output_port.value);
auto in_port_it = impl_->ports.find(link.input_port.value);
if (out_port_it == impl_->ports.end() || in_port_it == impl_->ports.end()) continue;
auto out_node_it = impl_->nodes.find(out_port_it->second.node.value);
auto in_node_it = impl_->nodes.find(in_port_it->second.node.value);
if (out_node_it == impl_->nodes.end() || in_node_it == impl_->nodes.end()) continue;
Impl::SavedLink sl{out_node_it->second.name, out_port_it->second.name,
in_node_it->second.name, in_port_it->second.name};
bool dup = false;
for (const auto& l : live) {
if (l == sl) { dup = true; break; }
}
if (!dup) {
live.push_back(sl);
nlohmann::json link_obj;
link_obj["out_node"] = sl.out_node;
link_obj["out_port"] = sl.out_port;
link_obj["in_node"] = sl.in_node;
link_obj["in_port"] = sl.in_port;
links_array.push_back(std::move(link_obj));
}
}
for (const auto& sl : impl_->saved_links) {
bool already = false;
for (const auto& l : live) {
if (l == sl) { already = true; break; }
}
if (!already) {
nlohmann::json link_obj;
link_obj["out_node"] = sl.out_node;
link_obj["out_port"] = sl.out_port;
link_obj["in_node"] = sl.in_node;
link_obj["in_port"] = sl.in_port;
links_array.push_back(std::move(link_obj));
}
}
}
j["virtual_nodes"] = std::move(nodes_array); j["virtual_nodes"] = std::move(nodes_array);
j["route_rules"] = std::move(rules_array); j["route_rules"] = std::move(rules_array);
j["links"] = std::move(links_array);
std::string tmp_path = std::string(path) + ".tmp"; std::string tmp_path = std::string(path) + ".tmp";
std::ofstream file(tmp_path); std::ofstream file(tmp_path);
@ -1928,6 +2250,8 @@ Status Client::LoadConfig(std::string_view path) {
return Status::Error(StatusCode::kInvalidArgument, "config missing version"); return Status::Error(StatusCode::kInvalidArgument, "config missing version");
} }
impl_->loading_config = true;
if (j.contains("route_rules") && j["route_rules"].is_array()) { if (j.contains("route_rules") && j["route_rules"].is_array()) {
for (const auto& rule_obj : j["route_rules"]) { for (const auto& rule_obj : j["route_rules"]) {
try { try {
@ -1981,6 +2305,35 @@ Status Client::LoadConfig(std::string_view path) {
} }
} }
if (j.contains("links") && j["links"].is_array()) {
{
std::lock_guard<std::mutex> lock(impl_->cache_mutex);
for (const auto& link_obj : j["links"]) {
try {
std::string out_node = link_obj.value("out_node", "");
std::string out_port = link_obj.value("out_port", "");
std::string in_node = link_obj.value("in_node", "");
std::string in_port = link_obj.value("in_port", "");
if (out_node.empty() || out_port.empty() ||
in_node.empty() || in_port.empty()) {
continue;
}
impl_->saved_links.push_back({out_node, out_port, in_node, in_port});
} catch (...) {
continue;
}
}
}
if (conn_status.ok()) {
pw_thread_loop_lock(impl_->thread_loop);
impl_->SchedulePolicySync();
pw_thread_loop_unlock(impl_->thread_loop);
}
}
impl_->loading_config = false;
impl_->AutoSave();
return Status::Ok(); return Status::Ok();
} }