Fix crashes
This commit is contained in:
parent
f4f5a69531
commit
5962c03b96
1 changed files with 47 additions and 72 deletions
119
src/warppipe.cpp
119
src/warppipe.cpp
|
|
@ -124,6 +124,7 @@ struct StreamData {
|
||||||
struct LinkProxy {
|
struct LinkProxy {
|
||||||
pw_proxy* proxy = nullptr;
|
pw_proxy* proxy = nullptr;
|
||||||
spa_hook listener{};
|
spa_hook listener{};
|
||||||
|
bool listener_attached = false;
|
||||||
pw_thread_loop* loop = nullptr;
|
pw_thread_loop* loop = nullptr;
|
||||||
bool done = false;
|
bool done = false;
|
||||||
bool failed = false;
|
bool failed = false;
|
||||||
|
|
@ -133,6 +134,20 @@ struct LinkProxy {
|
||||||
uint32_t input_port = 0;
|
uint32_t input_port = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
void DetachLinkProxy(LinkProxy* link, bool destroy_proxy) {
|
||||||
|
if (!link) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (link->listener_attached) {
|
||||||
|
spa_hook_remove(&link->listener);
|
||||||
|
link->listener_attached = false;
|
||||||
|
}
|
||||||
|
if (destroy_proxy && link->proxy) {
|
||||||
|
pw_proxy_destroy(link->proxy);
|
||||||
|
}
|
||||||
|
link->proxy = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
void LinkProxyBound(void* data, uint32_t global_id) {
|
void LinkProxyBound(void* data, uint32_t global_id) {
|
||||||
auto* link = static_cast<LinkProxy*>(data);
|
auto* link = static_cast<LinkProxy*>(data);
|
||||||
if (!link) {
|
if (!link) {
|
||||||
|
|
@ -150,6 +165,8 @@ void LinkProxyRemoved(void* data) {
|
||||||
if (!link) {
|
if (!link) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
link->listener_attached = false;
|
||||||
|
link->proxy = nullptr;
|
||||||
link->done = true;
|
link->done = true;
|
||||||
if (link->loop) {
|
if (link->loop) {
|
||||||
pw_thread_loop_signal(link->loop, false);
|
pw_thread_loop_signal(link->loop, false);
|
||||||
|
|
@ -161,6 +178,8 @@ void LinkProxyError(void* data, int, int res, const char* message) {
|
||||||
if (!link) {
|
if (!link) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
link->listener_attached = false;
|
||||||
|
link->proxy = nullptr;
|
||||||
link->failed = true;
|
link->failed = true;
|
||||||
link->error = message ? message : spa_strerror(res);
|
link->error = message ? message : spa_strerror(res);
|
||||||
if (link->loop) {
|
if (link->loop) {
|
||||||
|
|
@ -697,14 +716,6 @@ void Client::Impl::RegistryGlobalRemove(void* data, uint32_t id) {
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lock(impl->cache_mutex);
|
std::lock_guard<std::mutex> lock(impl->cache_mutex);
|
||||||
impl->virtual_streams.erase(id);
|
impl->virtual_streams.erase(id);
|
||||||
auto link_it = impl->link_proxies.find(id);
|
|
||||||
if (link_it != impl->link_proxies.end()) {
|
|
||||||
if (link_it->second && link_it->second->proxy) {
|
|
||||||
spa_hook_remove(&link_it->second->listener);
|
|
||||||
link_it->second->proxy = nullptr;
|
|
||||||
}
|
|
||||||
impl->link_proxies.erase(link_it);
|
|
||||||
}
|
|
||||||
auto node_it = impl->nodes.find(id);
|
auto node_it = impl->nodes.find(id);
|
||||||
if (node_it != impl->nodes.end()) {
|
if (node_it != impl->nodes.end()) {
|
||||||
impl->nodes.erase(node_it);
|
impl->nodes.erase(node_it);
|
||||||
|
|
@ -1138,11 +1149,7 @@ void Client::Impl::DisconnectLocked() {
|
||||||
streams.swap(virtual_streams);
|
streams.swap(virtual_streams);
|
||||||
}
|
}
|
||||||
for (auto& entry : links) {
|
for (auto& entry : links) {
|
||||||
LinkProxy* link = entry.second.get();
|
DetachLinkProxy(entry.second.get(), false);
|
||||||
if (link) {
|
|
||||||
spa_hook_remove(&link->listener);
|
|
||||||
link->proxy = nullptr;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
for (auto& entry : streams) {
|
for (auto& entry : streams) {
|
||||||
StreamData* stream_data = entry.second.get();
|
StreamData* stream_data = entry.second.get();
|
||||||
|
|
@ -1157,17 +1164,11 @@ void Client::Impl::DisconnectLocked() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for (auto& entry : auto_link_proxies) {
|
for (auto& entry : auto_link_proxies) {
|
||||||
if (entry) {
|
DetachLinkProxy(entry.get(), false);
|
||||||
spa_hook_remove(&entry->listener);
|
|
||||||
entry->proxy = nullptr;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
auto_link_proxies.clear();
|
auto_link_proxies.clear();
|
||||||
for (auto& entry : saved_link_proxies) {
|
for (auto& entry : saved_link_proxies) {
|
||||||
if (entry) {
|
DetachLinkProxy(entry.get(), false);
|
||||||
spa_hook_remove(&entry->listener);
|
|
||||||
entry->proxy = nullptr;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
saved_link_proxies.clear();
|
saved_link_proxies.clear();
|
||||||
for (auto& entry : node_proxies) {
|
for (auto& entry : node_proxies) {
|
||||||
|
|
@ -1290,13 +1291,17 @@ void Client::Impl::EnforceRulesForLink(uint32_t link_id, uint32_t out_port,
|
||||||
|
|
||||||
if (!should_destroy) return;
|
if (!should_destroy) return;
|
||||||
|
|
||||||
if (link_proxies.count(link_id)) return;
|
auto active_link_proxy = link_proxies.find(link_id);
|
||||||
|
if (active_link_proxy != link_proxies.end() &&
|
||||||
|
active_link_proxy->second && active_link_proxy->second->proxy) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
for (const auto& proxy : auto_link_proxies) {
|
for (const auto& proxy : auto_link_proxies) {
|
||||||
if (proxy && proxy->output_port == out_port &&
|
if (proxy && proxy->proxy && proxy->output_port == out_port &&
|
||||||
proxy->input_port == in_port) return;
|
proxy->input_port == in_port) return;
|
||||||
}
|
}
|
||||||
for (const auto& proxy : saved_link_proxies) {
|
for (const auto& proxy : saved_link_proxies) {
|
||||||
if (proxy && proxy->output_port == out_port &&
|
if (proxy && proxy->proxy && proxy->output_port == out_port &&
|
||||||
proxy->input_port == in_port) return;
|
proxy->input_port == in_port) return;
|
||||||
}
|
}
|
||||||
for (const auto& pair : auto_link_claimed_pairs) {
|
for (const auto& pair : auto_link_claimed_pairs) {
|
||||||
|
|
@ -1437,12 +1442,14 @@ void Client::Impl::ProcessPendingAutoLinks() {
|
||||||
if (target_in == in_port) { is_ours = true; break; }
|
if (target_in == in_port) { is_ours = true; break; }
|
||||||
}
|
}
|
||||||
if (!is_ours) {
|
if (!is_ours) {
|
||||||
if (link_proxies.count(link_id)) is_ours = true;
|
auto owned = link_proxies.find(link_id);
|
||||||
|
if (owned != link_proxies.end() && owned->second && owned->second->proxy)
|
||||||
|
is_ours = true;
|
||||||
}
|
}
|
||||||
if (!is_ours) {
|
if (!is_ours) {
|
||||||
uint32_t out_port = link_entry.second.output_port.value;
|
uint32_t out_port = link_entry.second.output_port.value;
|
||||||
for (const auto& proxy : auto_link_proxies) {
|
for (const auto& proxy : auto_link_proxies) {
|
||||||
if (proxy && proxy->output_port == out_port &&
|
if (proxy && proxy->proxy && proxy->output_port == out_port &&
|
||||||
proxy->input_port == in_port) { is_ours = true; break; }
|
proxy->input_port == in_port) { is_ours = true; break; }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -1457,7 +1464,7 @@ void Client::Impl::ProcessPendingAutoLinks() {
|
||||||
if (!is_ours) {
|
if (!is_ours) {
|
||||||
uint32_t out_port = link_entry.second.output_port.value;
|
uint32_t out_port = link_entry.second.output_port.value;
|
||||||
for (const auto& proxy : saved_link_proxies) {
|
for (const auto& proxy : saved_link_proxies) {
|
||||||
if (proxy && proxy->output_port == out_port &&
|
if (proxy && proxy->proxy && proxy->output_port == out_port &&
|
||||||
proxy->input_port == in_port) { is_ours = true; break; }
|
proxy->input_port == in_port) { is_ours = true; break; }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -1505,6 +1512,7 @@ void Client::Impl::CreateAutoLinkAsync(uint32_t output_port, uint32_t input_port
|
||||||
link_data->output_port = output_port;
|
link_data->output_port = output_port;
|
||||||
link_data->input_port = input_port;
|
link_data->input_port = input_port;
|
||||||
pw_proxy_add_listener(proxy, &link_data->listener, &kLinkProxyEvents, link_data.get());
|
pw_proxy_add_listener(proxy, &link_data->listener, &kLinkProxyEvents, link_data.get());
|
||||||
|
link_data->listener_attached = true;
|
||||||
|
|
||||||
std::lock_guard<std::mutex> lock(cache_mutex);
|
std::lock_guard<std::mutex> lock(cache_mutex);
|
||||||
auto_link_proxies.push_back(std::move(link_data));
|
auto_link_proxies.push_back(std::move(link_data));
|
||||||
|
|
@ -1595,14 +1603,15 @@ void Client::Impl::ProcessSavedLinks() {
|
||||||
if (saved_in == in_port) { is_ours = true; break; }
|
if (saved_in == in_port) { is_ours = true; break; }
|
||||||
}
|
}
|
||||||
if (!is_ours) {
|
if (!is_ours) {
|
||||||
if (link_proxies.count(link_id)) {
|
auto owned = link_proxies.find(link_id);
|
||||||
|
if (owned != link_proxies.end() && owned->second && owned->second->proxy) {
|
||||||
is_ours = true;
|
is_ours = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!is_ours) {
|
if (!is_ours) {
|
||||||
uint32_t out_port = link_entry.second.output_port.value;
|
uint32_t out_port = link_entry.second.output_port.value;
|
||||||
for (const auto& proxy : auto_link_proxies) {
|
for (const auto& proxy : auto_link_proxies) {
|
||||||
if (proxy && proxy->output_port == out_port &&
|
if (proxy && proxy->proxy && proxy->output_port == out_port &&
|
||||||
proxy->input_port == in_port) { is_ours = true; break; }
|
proxy->input_port == in_port) { is_ours = true; break; }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -1617,7 +1626,7 @@ void Client::Impl::ProcessSavedLinks() {
|
||||||
if (!is_ours) {
|
if (!is_ours) {
|
||||||
uint32_t out_port = link_entry.second.output_port.value;
|
uint32_t out_port = link_entry.second.output_port.value;
|
||||||
for (const auto& proxy : saved_link_proxies) {
|
for (const auto& proxy : saved_link_proxies) {
|
||||||
if (proxy && proxy->output_port == out_port &&
|
if (proxy && proxy->proxy && proxy->output_port == out_port &&
|
||||||
proxy->input_port == in_port) { is_ours = true; break; }
|
proxy->input_port == in_port) { is_ours = true; break; }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -1661,6 +1670,7 @@ void Client::Impl::CreateSavedLinkAsync(uint32_t output_port,
|
||||||
link_data->input_port = input_port;
|
link_data->input_port = input_port;
|
||||||
pw_proxy_add_listener(proxy, &link_data->listener, &kLinkProxyEvents,
|
pw_proxy_add_listener(proxy, &link_data->listener, &kLinkProxyEvents,
|
||||||
link_data.get());
|
link_data.get());
|
||||||
|
link_data->listener_attached = true;
|
||||||
|
|
||||||
std::lock_guard<std::mutex> lock(cache_mutex);
|
std::lock_guard<std::mutex> lock(cache_mutex);
|
||||||
saved_link_proxies.push_back(std::move(link_data));
|
saved_link_proxies.push_back(std::move(link_data));
|
||||||
|
|
@ -1720,7 +1730,7 @@ void Client::Impl::AutoSave() {
|
||||||
std::lock_guard<std::mutex> lock(cache_mutex);
|
std::lock_guard<std::mutex> lock(cache_mutex);
|
||||||
std::vector<SavedLink> live;
|
std::vector<SavedLink> live;
|
||||||
for (const auto& entry : link_proxies) {
|
for (const auto& entry : link_proxies) {
|
||||||
if (!entry.second) {
|
if (!entry.second || !entry.second->proxy) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
auto link_it = links.find(entry.first);
|
auto link_it = links.find(entry.first);
|
||||||
|
|
@ -1749,7 +1759,7 @@ void Client::Impl::AutoSave() {
|
||||||
links_array.push_back(std::move(link_obj));
|
links_array.push_back(std::move(link_obj));
|
||||||
}
|
}
|
||||||
for (const auto& lp : saved_link_proxies) {
|
for (const auto& lp : saved_link_proxies) {
|
||||||
if (!lp || lp->id == SPA_ID_INVALID) continue;
|
if (!lp || !lp->proxy || lp->id == SPA_ID_INVALID) continue;
|
||||||
auto link_it = links.find(lp->id);
|
auto link_it = links.find(lp->id);
|
||||||
if (link_it == links.end()) continue;
|
if (link_it == links.end()) continue;
|
||||||
const Link& link = link_it->second;
|
const Link& link = link_it->second;
|
||||||
|
|
@ -2456,6 +2466,7 @@ Result<Link> Client::CreateLink(PortId output, PortId input, const LinkOptions&
|
||||||
link_proxy->output_port = output.value;
|
link_proxy->output_port = output.value;
|
||||||
link_proxy->input_port = input.value;
|
link_proxy->input_port = input.value;
|
||||||
pw_proxy_add_listener(proxy, &link_proxy->listener, &kLinkProxyEvents, link_proxy.get());
|
pw_proxy_add_listener(proxy, &link_proxy->listener, &kLinkProxyEvents, link_proxy.get());
|
||||||
|
link_proxy->listener_attached = true;
|
||||||
|
|
||||||
int wait_attempts = 0;
|
int wait_attempts = 0;
|
||||||
while (link_proxy->id == SPA_ID_INVALID && !link_proxy->failed && wait_attempts < 3) {
|
while (link_proxy->id == SPA_ID_INVALID && !link_proxy->failed && wait_attempts < 3) {
|
||||||
|
|
@ -2476,13 +2487,13 @@ Result<Link> Client::CreateLink(PortId output, PortId input, const LinkOptions&
|
||||||
if (link_proxy->failed) {
|
if (link_proxy->failed) {
|
||||||
std::string error = link_proxy->error.empty() ? "link creation failed" : link_proxy->error;
|
std::string error = link_proxy->error.empty() ? "link creation failed" : link_proxy->error;
|
||||||
remove_pending();
|
remove_pending();
|
||||||
pw_proxy_destroy(proxy);
|
DetachLinkProxy(link_proxy.get(), true);
|
||||||
pw_thread_loop_unlock(impl_->thread_loop);
|
pw_thread_loop_unlock(impl_->thread_loop);
|
||||||
return {Status::Error(StatusCode::kUnavailable, std::move(error)), {}};
|
return {Status::Error(StatusCode::kUnavailable, std::move(error)), {}};
|
||||||
}
|
}
|
||||||
if (link_proxy->id == SPA_ID_INVALID) {
|
if (link_proxy->id == SPA_ID_INVALID) {
|
||||||
remove_pending();
|
remove_pending();
|
||||||
pw_proxy_destroy(proxy);
|
DetachLinkProxy(link_proxy.get(), true);
|
||||||
pw_thread_loop_unlock(impl_->thread_loop);
|
pw_thread_loop_unlock(impl_->thread_loop);
|
||||||
return {Status::Error(StatusCode::kTimeout, "timed out waiting for link id"), {}};
|
return {Status::Error(StatusCode::kTimeout, "timed out waiting for link id"), {}};
|
||||||
}
|
}
|
||||||
|
|
@ -2493,7 +2504,7 @@ Result<Link> Client::CreateLink(PortId output, PortId input, const LinkOptions&
|
||||||
link.input_port = input;
|
link.input_port = input;
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lock(impl_->cache_mutex);
|
std::lock_guard<std::mutex> lock(impl_->cache_mutex);
|
||||||
impl_->link_proxies.emplace(link_proxy->id, std::move(link_proxy));
|
impl_->link_proxies[link_proxy->id] = std::move(link_proxy);
|
||||||
impl_->links[link.id.value] = link;
|
impl_->links[link.id.value] = link;
|
||||||
std::erase_if(impl_->pending_link_pairs, [&](const auto& p) {
|
std::erase_if(impl_->pending_link_pairs, [&](const auto& p) {
|
||||||
return p.first == output.value && p.second == input.value;
|
return p.first == output.value && p.second == input.value;
|
||||||
|
|
@ -2579,14 +2590,9 @@ Status Client::RemoveLink(LinkId link) {
|
||||||
}
|
}
|
||||||
auto it = impl_->link_proxies.find(link.value);
|
auto it = impl_->link_proxies.find(link.value);
|
||||||
if (it != impl_->link_proxies.end()) {
|
if (it != impl_->link_proxies.end()) {
|
||||||
if (it->second && it->second->proxy) {
|
|
||||||
spa_hook_remove(&it->second->listener);
|
|
||||||
pw_proxy_destroy(it->second->proxy);
|
|
||||||
}
|
|
||||||
if (impl_->registry) {
|
if (impl_->registry) {
|
||||||
pw_registry_destroy(impl_->registry, link.value);
|
pw_registry_destroy(impl_->registry, link.value);
|
||||||
}
|
}
|
||||||
impl_->link_proxies.erase(it);
|
|
||||||
auto link_it2 = impl_->links.find(link.value);
|
auto link_it2 = impl_->links.find(link.value);
|
||||||
if (link_it2 != impl_->links.end()) {
|
if (link_it2 != impl_->links.end()) {
|
||||||
uint32_t op = link_it2->second.output_port.value;
|
uint32_t op = link_it2->second.output_port.value;
|
||||||
|
|
@ -2616,26 +2622,6 @@ Status Client::RemoveLink(LinkId link) {
|
||||||
impl_->links.erase(link_it);
|
impl_->links.erase(link_it);
|
||||||
}
|
}
|
||||||
if (out_port && in_port) {
|
if (out_port && in_port) {
|
||||||
for (auto& p : impl_->saved_link_proxies) {
|
|
||||||
if (p && p->output_port == out_port && p->input_port == in_port) {
|
|
||||||
spa_hook_remove(&p->listener);
|
|
||||||
if (p->proxy) pw_proxy_destroy(p->proxy);
|
|
||||||
p->proxy = nullptr;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
std::erase_if(impl_->saved_link_proxies, [&](const auto& p) {
|
|
||||||
return p && p->output_port == out_port && p->input_port == in_port;
|
|
||||||
});
|
|
||||||
for (auto& p : impl_->auto_link_proxies) {
|
|
||||||
if (p && p->output_port == out_port && p->input_port == in_port) {
|
|
||||||
spa_hook_remove(&p->listener);
|
|
||||||
if (p->proxy) pw_proxy_destroy(p->proxy);
|
|
||||||
p->proxy = nullptr;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
std::erase_if(impl_->auto_link_proxies, [&](const auto& p) {
|
|
||||||
return p && p->output_port == out_port && p->input_port == in_port;
|
|
||||||
});
|
|
||||||
std::erase_if(impl_->auto_link_claimed_pairs, [&](const auto& pair) {
|
std::erase_if(impl_->auto_link_claimed_pairs, [&](const auto& pair) {
|
||||||
return pair.first == out_port && pair.second == in_port;
|
return pair.first == out_port && pair.second == in_port;
|
||||||
});
|
});
|
||||||
|
|
@ -2799,18 +2785,7 @@ Status Client::RemoveRouteRule(RuleId id) {
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lock(impl_->cache_mutex);
|
std::lock_guard<std::mutex> lock(impl_->cache_mutex);
|
||||||
for (const auto& pair : pairs_to_remove) {
|
(void)pairs_to_remove;
|
||||||
for (auto& p : impl_->auto_link_proxies) {
|
|
||||||
if (p && p->output_port == pair.first &&
|
|
||||||
p->input_port == pair.second) {
|
|
||||||
spa_hook_remove(&p->listener);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
std::erase_if(impl_->auto_link_proxies, [&](const auto& p) {
|
|
||||||
return p && p->output_port == pair.first &&
|
|
||||||
p->input_port == pair.second;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
pw_thread_loop_unlock(impl_->thread_loop);
|
pw_thread_loop_unlock(impl_->thread_loop);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue