diff --git a/gui/GraphEditorWidget.cpp b/gui/GraphEditorWidget.cpp index bf4cc30..878fe95 100644 --- a/gui/GraphEditorWidget.cpp +++ b/gui/GraphEditorWidget.cpp @@ -1035,7 +1035,7 @@ void GraphEditorWidget::tryResolvePendingLinks() { } if (foundOut && foundIn) { - m_client->CreateLink(outPortId, inPortId, warppipe::LinkOptions{}); + m_client->CreateLink(outPortId, inPortId, warppipe::LinkOptions{.linger = true}); } else { remaining.push_back(pending); } diff --git a/gui/PresetManager.cpp b/gui/PresetManager.cpp index 4322839..ba51577 100644 --- a/gui/PresetManager.cpp +++ b/gui/PresetManager.cpp @@ -186,7 +186,7 @@ bool PresetManager::loadPreset(const QString &path, warppipe::Client *client, std::string inPort = route["in_port"].toString().toStdString(); client->CreateLinkByName(outNode, outPort, inNode, inPort, - warppipe::LinkOptions{}); + warppipe::LinkOptions{.linger = true}); } model->refreshFromClient(); diff --git a/gui/WarpGraphModel.cpp b/gui/WarpGraphModel.cpp index c32f039..a62a63a 100644 --- a/gui/WarpGraphModel.cpp +++ b/gui/WarpGraphModel.cpp @@ -124,7 +124,7 @@ void WarpGraphModel::addConnection( warppipe::PortId outPortId = outIt->second.outputPorts[outIdx].id; warppipe::PortId inPortId = inIt->second.inputPorts[inIdx].id; - auto result = m_client->CreateLink(outPortId, inPortId, warppipe::LinkOptions{}); + auto result = m_client->CreateLink(outPortId, inPortId, warppipe::LinkOptions{.linger = true}); if (!result.ok()) { return; } diff --git a/src/warppipe.cpp b/src/warppipe.cpp index a361b4a..3ddbb7c 100644 --- a/src/warppipe.cpp +++ b/src/warppipe.cpp @@ -343,11 +343,25 @@ struct Client::Impl { uint32_t policy_sync_seq = 0; bool policy_sync_pending = false; std::vector> auto_link_proxies; + std::vector> saved_link_proxies; pw_proxy* metadata_proxy = nullptr; spa_hook metadata_listener{}; bool metadata_listener_attached = false; MetadataInfo defaults; + bool loading_config = false; + + struct SavedLink { + std::string out_node; + std::string out_port; + std::string in_node; + std::string in_port; + bool operator==(const SavedLink& o) const { + return out_node == o.out_node && out_port == o.out_port && + in_node == o.in_node && in_port == o.in_port; + } + }; + std::vector saved_links; Status ConnectLocked(); void DisconnectLocked(); @@ -362,6 +376,8 @@ struct Client::Impl { void SchedulePolicySync(); void ProcessPendingAutoLinks(); void CreateAutoLinkAsync(uint32_t output_port, uint32_t input_port); + void ProcessSavedLinks(); + void CreateSavedLinkAsync(uint32_t output_port, uint32_t input_port); void AutoSave(); void SetupMasterMeter(); void TeardownMasterMeter(); @@ -423,7 +439,7 @@ void Client::Impl::RegistryGlobal(void* data, info.is_input = true; } impl->ports[id] = info; - if (!impl->pending_auto_links.empty()) { + if (!impl->pending_auto_links.empty() || !impl->saved_links.empty()) { impl->SchedulePolicySync(); } return; @@ -530,6 +546,7 @@ void Client::Impl::CoreDone(void* data, uint32_t, int seq) { seq >= static_cast(impl->policy_sync_seq)) { impl->policy_sync_pending = false; impl->ProcessPendingAutoLinks(); + impl->ProcessSavedLinks(); } } @@ -806,8 +823,8 @@ void Client::Impl::DisconnectLocked() { } for (auto& entry : links) { LinkProxy* link = entry.second.get(); - if (link && link->proxy) { - pw_proxy_destroy(link->proxy); + if (link) { + spa_hook_remove(&link->listener); link->proxy = nullptr; } } @@ -820,12 +837,19 @@ void Client::Impl::DisconnectLocked() { } } for (auto& entry : auto_link_proxies) { - if (entry && entry->proxy) { - pw_proxy_destroy(entry->proxy); + if (entry) { + spa_hook_remove(&entry->listener); entry->proxy = nullptr; } } auto_link_proxies.clear(); + for (auto& entry : saved_link_proxies) { + if (entry) { + spa_hook_remove(&entry->listener); + entry->proxy = nullptr; + } + } + saved_link_proxies.clear(); if (metadata_listener_attached) { spa_hook_remove(&metadata_listener); metadata_listener_attached = false; @@ -991,8 +1015,129 @@ void Client::Impl::CreateAutoLinkAsync(uint32_t output_port, uint32_t input_port auto_link_proxies.push_back(std::move(link_data)); } +void Client::Impl::ProcessSavedLinks() { + struct LinkSpec { + uint32_t output_port; + uint32_t input_port; + std::string label; + }; + std::vector to_create; + + { + std::lock_guard lock(cache_mutex); + fprintf(stderr, "[warppipe] ProcessSavedLinks: %zu pending, %zu nodes, %zu ports\n", + saved_links.size(), nodes.size(), ports.size()); + for (auto it = saved_links.begin(); it != saved_links.end();) { + uint32_t out_id = 0, in_id = 0; + for (const auto& port_entry : ports) { + const PortInfo& port = port_entry.second; + auto node_it = nodes.find(port.node.value); + if (node_it == nodes.end()) continue; + if (!port.is_input && node_it->second.name == it->out_node && + port.name == it->out_port) { + out_id = port_entry.first; + } + if (port.is_input && node_it->second.name == it->in_node && + port.name == it->in_port) { + in_id = port_entry.first; + } + if (out_id && in_id) break; + } + if (!out_id || !in_id) { + fprintf(stderr, " deferred: %s:%s -> %s:%s (ports not found)\n", + it->out_node.c_str(), it->out_port.c_str(), + it->in_node.c_str(), it->in_port.c_str()); + ++it; + continue; + } + bool exists = false; + for (const auto& link_entry : links) { + if (link_entry.second.output_port.value == out_id && + link_entry.second.input_port.value == in_id) { + exists = true; + break; + } + } + if (exists) { + fprintf(stderr, " already exists: %s:%s -> %s:%s\n", + it->out_node.c_str(), it->out_port.c_str(), + it->in_node.c_str(), it->in_port.c_str()); + it = saved_links.erase(it); + continue; + } + std::string label = it->out_node + ":" + it->out_port + " -> " + + it->in_node + ":" + it->in_port; + to_create.push_back({out_id, in_id, std::move(label)}); + it = saved_links.erase(it); + } + } + + if (to_create.empty()) return; + + std::unordered_map> saved_port_map; + for (const auto& spec : to_create) { + saved_port_map[spec.output_port].push_back(spec.input_port); + } + + std::vector competing_link_ids; + { + std::lock_guard lock(cache_mutex); + for (const auto& link_entry : links) { + auto it = saved_port_map.find(link_entry.second.output_port.value); + if (it == saved_port_map.end()) continue; + uint32_t in_port = link_entry.second.input_port.value; + bool is_saved = false; + for (uint32_t saved_in : it->second) { + if (saved_in == in_port) { is_saved = true; break; } + } + if (!is_saved) { + competing_link_ids.push_back(link_entry.first); + } + } + } + + for (uint32_t id : competing_link_ids) { + fprintf(stderr, " removing competing link %u\n", id); + pw_registry_destroy(registry, id); + } + + for (const auto& spec : to_create) { + fprintf(stderr, " creating: %s (ports %u -> %u)\n", + spec.label.c_str(), spec.output_port, spec.input_port); + CreateSavedLinkAsync(spec.output_port, spec.input_port); + } +} + +void Client::Impl::CreateSavedLinkAsync(uint32_t output_port, + uint32_t input_port) { + if (!core) return; + + pw_properties* props = pw_properties_new(nullptr, nullptr); + if (!props) return; + pw_properties_setf(props, PW_KEY_LINK_OUTPUT_PORT, "%u", output_port); + pw_properties_setf(props, PW_KEY_LINK_INPUT_PORT, "%u", input_port); + pw_properties_set(props, PW_KEY_OBJECT_LINGER, "true"); + + pw_proxy* proxy = reinterpret_cast( + pw_core_create_object(core, "link-factory", + PW_TYPE_INTERFACE_Link, + PW_VERSION_LINK, + &props->dict, 0)); + pw_properties_free(props); + if (!proxy) return; + + auto link_data = std::make_unique(); + link_data->proxy = proxy; + link_data->loop = thread_loop; + pw_proxy_add_listener(proxy, &link_data->listener, &kLinkProxyEvents, + link_data.get()); + + std::lock_guard lock(cache_mutex); + saved_link_proxies.push_back(std::move(link_data)); +} + void Client::Impl::AutoSave() { - if (!options.config_path || options.config_path->empty()) { + if (!options.config_path || options.config_path->empty() || loading_config) { return; } nlohmann::json j; @@ -1033,6 +1178,83 @@ void Client::Impl::AutoSave() { } j["route_rules"] = std::move(rules_array); + nlohmann::json links_array = nlohmann::json::array(); + { + std::lock_guard lock(cache_mutex); + std::vector live; + for (const auto& entry : link_proxies) { + if (!entry.second) { + continue; + } + auto link_it = links.find(entry.first); + if (link_it == links.end()) { + continue; + } + const Link& link = link_it->second; + auto out_port_it = ports.find(link.output_port.value); + auto in_port_it = ports.find(link.input_port.value); + if (out_port_it == ports.end() || in_port_it == ports.end()) { + continue; + } + auto out_node_it = nodes.find(out_port_it->second.node.value); + auto in_node_it = nodes.find(in_port_it->second.node.value); + if (out_node_it == nodes.end() || in_node_it == nodes.end()) { + continue; + } + SavedLink sl{out_node_it->second.name, out_port_it->second.name, + in_node_it->second.name, in_port_it->second.name}; + live.push_back(sl); + nlohmann::json link_obj; + link_obj["out_node"] = sl.out_node; + link_obj["out_port"] = sl.out_port; + link_obj["in_node"] = sl.in_node; + link_obj["in_port"] = sl.in_port; + links_array.push_back(std::move(link_obj)); + } + for (const auto& lp : saved_link_proxies) { + if (!lp || lp->id == SPA_ID_INVALID) continue; + auto link_it = links.find(lp->id); + if (link_it == links.end()) continue; + const Link& link = link_it->second; + auto out_port_it = ports.find(link.output_port.value); + auto in_port_it = ports.find(link.input_port.value); + if (out_port_it == ports.end() || in_port_it == ports.end()) continue; + auto out_node_it = nodes.find(out_port_it->second.node.value); + auto in_node_it = nodes.find(in_port_it->second.node.value); + if (out_node_it == nodes.end() || in_node_it == nodes.end()) continue; + SavedLink sl{out_node_it->second.name, out_port_it->second.name, + in_node_it->second.name, in_port_it->second.name}; + bool dup = false; + for (const auto& l : live) { + if (l == sl) { dup = true; break; } + } + if (!dup) { + live.push_back(sl); + nlohmann::json link_obj; + link_obj["out_node"] = sl.out_node; + link_obj["out_port"] = sl.out_port; + link_obj["in_node"] = sl.in_node; + link_obj["in_port"] = sl.in_port; + links_array.push_back(std::move(link_obj)); + } + } + for (const auto& sl : saved_links) { + bool already = false; + for (const auto& l : live) { + if (l == sl) { already = true; break; } + } + if (!already) { + nlohmann::json link_obj; + link_obj["out_node"] = sl.out_node; + link_obj["out_port"] = sl.out_port; + link_obj["in_node"] = sl.in_node; + link_obj["in_port"] = sl.in_port; + links_array.push_back(std::move(link_obj)); + } + } + } + j["links"] = std::move(links_array); + std::string tmp_path = *options.config_path + ".tmp"; std::ofstream file(tmp_path); if (!file.is_open()) { @@ -1648,6 +1870,7 @@ Result Client::CreateLink(PortId output, PortId input, const LinkOptions& } pw_thread_loop_unlock(impl_->thread_loop); + impl_->AutoSave(); return {Status::Ok(), link}; } @@ -1707,8 +1930,22 @@ Status Client::RemoveLink(LinkId link) { pw_thread_loop_lock(impl_->thread_loop); bool removed = false; + Impl::SavedLink removed_link; { std::lock_guard lock(impl_->cache_mutex); + auto link_it = impl_->links.find(link.value); + if (link_it != impl_->links.end()) { + auto op = impl_->ports.find(link_it->second.output_port.value); + auto ip = impl_->ports.find(link_it->second.input_port.value); + if (op != impl_->ports.end() && ip != impl_->ports.end()) { + auto on = impl_->nodes.find(op->second.node.value); + auto in_ = impl_->nodes.find(ip->second.node.value); + if (on != impl_->nodes.end() && in_ != impl_->nodes.end()) { + removed_link = {on->second.name, op->second.name, + in_->second.name, ip->second.name}; + } + } + } auto it = impl_->link_proxies.find(link.value); if (it != impl_->link_proxies.end()) { if (it->second && it->second->proxy) { @@ -1730,6 +1967,14 @@ Status Client::RemoveLink(LinkId link) { } pw_thread_loop_unlock(impl_->thread_loop); + if (removed) { + if (!removed_link.out_node.empty()) { + std::lock_guard lock(impl_->cache_mutex); + auto& sl = impl_->saved_links; + sl.erase(std::remove(sl.begin(), sl.end(), removed_link), sl.end()); + } + impl_->AutoSave(); + } return removed ? Status::Ok() : Status::Error(StatusCode::kNotFound, "link not found"); } @@ -1886,8 +2131,85 @@ Status Client::SaveConfig(std::string_view path) { } } + nlohmann::json links_array = nlohmann::json::array(); + { + std::lock_guard lock(impl_->cache_mutex); + std::vector live; + for (const auto& entry : impl_->link_proxies) { + if (!entry.second) { + continue; + } + auto link_it = impl_->links.find(entry.first); + if (link_it == impl_->links.end()) { + continue; + } + const Link& link = link_it->second; + auto out_port_it = impl_->ports.find(link.output_port.value); + auto in_port_it = impl_->ports.find(link.input_port.value); + if (out_port_it == impl_->ports.end() || in_port_it == impl_->ports.end()) { + continue; + } + auto out_node_it = impl_->nodes.find(out_port_it->second.node.value); + auto in_node_it = impl_->nodes.find(in_port_it->second.node.value); + if (out_node_it == impl_->nodes.end() || in_node_it == impl_->nodes.end()) { + continue; + } + Impl::SavedLink sl{out_node_it->second.name, out_port_it->second.name, + in_node_it->second.name, in_port_it->second.name}; + live.push_back(sl); + nlohmann::json link_obj; + link_obj["out_node"] = sl.out_node; + link_obj["out_port"] = sl.out_port; + link_obj["in_node"] = sl.in_node; + link_obj["in_port"] = sl.in_port; + links_array.push_back(std::move(link_obj)); + } + for (const auto& lp : impl_->saved_link_proxies) { + if (!lp || lp->id == SPA_ID_INVALID) continue; + auto link_it = impl_->links.find(lp->id); + if (link_it == impl_->links.end()) continue; + const Link& link = link_it->second; + auto out_port_it = impl_->ports.find(link.output_port.value); + auto in_port_it = impl_->ports.find(link.input_port.value); + if (out_port_it == impl_->ports.end() || in_port_it == impl_->ports.end()) continue; + auto out_node_it = impl_->nodes.find(out_port_it->second.node.value); + auto in_node_it = impl_->nodes.find(in_port_it->second.node.value); + if (out_node_it == impl_->nodes.end() || in_node_it == impl_->nodes.end()) continue; + Impl::SavedLink sl{out_node_it->second.name, out_port_it->second.name, + in_node_it->second.name, in_port_it->second.name}; + bool dup = false; + for (const auto& l : live) { + if (l == sl) { dup = true; break; } + } + if (!dup) { + live.push_back(sl); + nlohmann::json link_obj; + link_obj["out_node"] = sl.out_node; + link_obj["out_port"] = sl.out_port; + link_obj["in_node"] = sl.in_node; + link_obj["in_port"] = sl.in_port; + links_array.push_back(std::move(link_obj)); + } + } + for (const auto& sl : impl_->saved_links) { + bool already = false; + for (const auto& l : live) { + if (l == sl) { already = true; break; } + } + if (!already) { + nlohmann::json link_obj; + link_obj["out_node"] = sl.out_node; + link_obj["out_port"] = sl.out_port; + link_obj["in_node"] = sl.in_node; + link_obj["in_port"] = sl.in_port; + links_array.push_back(std::move(link_obj)); + } + } + } + j["virtual_nodes"] = std::move(nodes_array); j["route_rules"] = std::move(rules_array); + j["links"] = std::move(links_array); std::string tmp_path = std::string(path) + ".tmp"; std::ofstream file(tmp_path); @@ -1928,6 +2250,8 @@ Status Client::LoadConfig(std::string_view path) { return Status::Error(StatusCode::kInvalidArgument, "config missing version"); } + impl_->loading_config = true; + if (j.contains("route_rules") && j["route_rules"].is_array()) { for (const auto& rule_obj : j["route_rules"]) { try { @@ -1981,6 +2305,35 @@ Status Client::LoadConfig(std::string_view path) { } } + if (j.contains("links") && j["links"].is_array()) { + { + std::lock_guard lock(impl_->cache_mutex); + for (const auto& link_obj : j["links"]) { + try { + std::string out_node = link_obj.value("out_node", ""); + std::string out_port = link_obj.value("out_port", ""); + std::string in_node = link_obj.value("in_node", ""); + std::string in_port = link_obj.value("in_port", ""); + if (out_node.empty() || out_port.empty() || + in_node.empty() || in_port.empty()) { + continue; + } + impl_->saved_links.push_back({out_node, out_port, in_node, in_port}); + } catch (...) { + continue; + } + } + } + + if (conn_status.ok()) { + pw_thread_loop_lock(impl_->thread_loop); + impl_->SchedulePolicySync(); + pw_thread_loop_unlock(impl_->thread_loop); + } + } + + impl_->loading_config = false; + impl_->AutoSave(); return Status::Ok(); }