Milestone 3

This commit is contained in:
Joey Yakimowich-Payne 2026-01-29 17:47:31 -07:00
commit 691eb327d0
6 changed files with 507 additions and 24 deletions

View file

@ -6,6 +6,7 @@
#include <utility>
#include <pipewire/keys.h>
#include <pipewire/link.h>
#include <pipewire/pipewire.h>
#include <spa/param/audio/format-utils.h>
@ -76,6 +77,58 @@ struct StreamData {
uint32_t rate = kDefaultRate;
};
struct LinkProxy {
pw_proxy* proxy = nullptr;
spa_hook listener{};
pw_thread_loop* loop = nullptr;
bool done = false;
bool failed = false;
std::string error;
uint32_t id = SPA_ID_INVALID;
};
void LinkProxyBound(void* data, uint32_t global_id) {
auto* link = static_cast<LinkProxy*>(data);
if (!link) {
return;
}
link->id = global_id;
link->done = true;
if (link->loop) {
pw_thread_loop_signal(link->loop, false);
}
}
void LinkProxyRemoved(void* data) {
auto* link = static_cast<LinkProxy*>(data);
if (!link) {
return;
}
link->done = true;
if (link->loop) {
pw_thread_loop_signal(link->loop, false);
}
}
void LinkProxyError(void* data, int, int res, const char* message) {
auto* link = static_cast<LinkProxy*>(data);
if (!link) {
return;
}
link->failed = true;
link->error = message ? message : spa_strerror(res);
if (link->loop) {
pw_thread_loop_signal(link->loop, false);
}
}
static const pw_proxy_events kLinkProxyEvents = {
PW_VERSION_PROXY_EVENTS,
.bound = LinkProxyBound,
.removed = LinkProxyRemoved,
.error = LinkProxyError,
};
void StreamProcess(void* data) {
auto* stream_data = static_cast<StreamData*>(data);
if (!stream_data || !stream_data->stream) {
@ -187,6 +240,7 @@ struct Client::Impl {
std::unordered_map<uint32_t, PortInfo> ports;
std::unordered_map<uint32_t, Link> links;
std::unordered_map<uint32_t, std::unique_ptr<StreamData>> virtual_streams;
std::unordered_map<uint32_t, std::unique_ptr<LinkProxy>> link_proxies;
Status ConnectLocked();
void DisconnectLocked();
@ -270,6 +324,7 @@ void Client::Impl::RegistryGlobalRemove(void* data, uint32_t id) {
std::lock_guard<std::mutex> lock(impl->cache_mutex);
impl->virtual_streams.erase(id);
impl->link_proxies.erase(id);
auto node_it = impl->nodes.find(id);
if (node_it != impl->nodes.end()) {
impl->nodes.erase(node_it);
@ -519,7 +574,10 @@ Result<uint32_t> Client::Impl::CreateVirtualStreamLocked(std::string_view name,
stream_data->node_id = node_id;
stream_data->ready = true;
virtual_streams.emplace(node_id, std::move(stream_data));
{
std::lock_guard<std::mutex> lock(cache_mutex);
virtual_streams.emplace(node_id, std::move(stream_data));
}
return {Status::Ok(), node_id};
}
@ -575,7 +633,21 @@ Status Client::Impl::ConnectLocked() {
}
void Client::Impl::DisconnectLocked() {
for (auto& entry : virtual_streams) {
std::unordered_map<uint32_t, std::unique_ptr<LinkProxy>> links;
std::unordered_map<uint32_t, std::unique_ptr<StreamData>> streams;
{
std::lock_guard<std::mutex> lock(cache_mutex);
links.swap(link_proxies);
streams.swap(virtual_streams);
}
for (auto& entry : links) {
LinkProxy* link = entry.second.get();
if (link && link->proxy) {
pw_proxy_destroy(link->proxy);
link->proxy = nullptr;
}
}
for (auto& entry : streams) {
StreamData* stream_data = entry.second.get();
if (stream_data && stream_data->stream) {
pw_stream_disconnect(stream_data->stream);
@ -583,7 +655,6 @@ void Client::Impl::DisconnectLocked() {
stream_data->stream = nullptr;
}
}
virtual_streams.clear();
if (registry_listener_attached) {
spa_hook_remove(&registry_listener);
registry_listener_attached = false;
@ -772,28 +843,214 @@ Status Client::RemoveNode(NodeId node) {
}
pw_thread_loop_lock(impl_->thread_loop);
auto it = impl_->virtual_streams.find(node.value);
if (it == impl_->virtual_streams.end()) {
pw_thread_loop_unlock(impl_->thread_loop);
return Status::Error(StatusCode::kNotFound, "node not managed by warppipe");
std::unique_ptr<StreamData> owned_stream;
{
std::lock_guard<std::mutex> lock(impl_->cache_mutex);
auto it = impl_->virtual_streams.find(node.value);
if (it == impl_->virtual_streams.end()) {
pw_thread_loop_unlock(impl_->thread_loop);
return Status::Error(StatusCode::kNotFound, "node not managed by warppipe");
}
owned_stream = std::move(it->second);
impl_->virtual_streams.erase(it);
}
StreamData* stream_data = it->second.get();
if (stream_data && stream_data->stream) {
pw_stream_disconnect(stream_data->stream);
pw_stream_destroy(stream_data->stream);
stream_data->stream = nullptr;
if (owned_stream && owned_stream->stream) {
pw_stream_disconnect(owned_stream->stream);
pw_stream_destroy(owned_stream->stream);
owned_stream->stream = nullptr;
}
impl_->virtual_streams.erase(it);
pw_thread_loop_unlock(impl_->thread_loop);
return Status::Ok();
}
Result<Link> Client::CreateLink(PortId, PortId, const LinkOptions&) {
return {Status::Error(StatusCode::kNotImplemented, "create link not implemented"), {}};
Result<Link> Client::CreateLink(PortId output, PortId input, const LinkOptions& options) {
Status status = impl_->EnsureConnected();
if (!status.ok()) {
return {status, {}};
}
if (output.value == 0 || input.value == 0) {
return {Status::Error(StatusCode::kInvalidArgument, "invalid port id"), {}};
}
pw_thread_loop_lock(impl_->thread_loop);
PortInfo out_port;
PortInfo in_port;
{
std::lock_guard<std::mutex> lock(impl_->cache_mutex);
auto out_it = impl_->ports.find(output.value);
if (out_it == impl_->ports.end()) {
pw_thread_loop_unlock(impl_->thread_loop);
return {Status::Error(StatusCode::kNotFound, "output port not found"), {}};
}
auto in_it = impl_->ports.find(input.value);
if (in_it == impl_->ports.end()) {
pw_thread_loop_unlock(impl_->thread_loop);
return {Status::Error(StatusCode::kNotFound, "input port not found"), {}};
}
out_port = out_it->second;
in_port = in_it->second;
for (const auto& entry : impl_->links) {
const Link& link = entry.second;
if (link.output_port.value == output.value && link.input_port.value == input.value) {
pw_thread_loop_unlock(impl_->thread_loop);
return {Status::Error(StatusCode::kInvalidArgument, "link already exists"), {}};
}
}
}
if (out_port.is_input || !in_port.is_input) {
pw_thread_loop_unlock(impl_->thread_loop);
return {Status::Error(StatusCode::kInvalidArgument, "port directions do not match"), {}};
}
pw_properties* props = pw_properties_new(nullptr, nullptr);
if (!props) {
pw_thread_loop_unlock(impl_->thread_loop);
return {Status::Error(StatusCode::kInternal, "failed to allocate link properties"), {}};
}
pw_properties_setf(props, PW_KEY_LINK_OUTPUT_PORT, "%u", output.value);
pw_properties_setf(props, PW_KEY_LINK_INPUT_PORT, "%u", input.value);
if (options.passive) {
pw_properties_set(props, PW_KEY_LINK_PASSIVE, "true");
}
if (options.linger) {
pw_properties_set(props, PW_KEY_OBJECT_LINGER, "true");
}
pw_proxy* proxy = reinterpret_cast<pw_proxy*>(
pw_core_create_object(impl_->core,
"link-factory",
PW_TYPE_INTERFACE_Link,
PW_VERSION_LINK,
&props->dict,
0));
pw_properties_free(props);
if (!proxy) {
pw_thread_loop_unlock(impl_->thread_loop);
return {Status::Error(StatusCode::kUnavailable, "failed to create link"), {}};
}
auto link_proxy = std::make_unique<LinkProxy>();
link_proxy->proxy = proxy;
link_proxy->loop = impl_->thread_loop;
pw_proxy_add_listener(proxy, &link_proxy->listener, &kLinkProxyEvents, link_proxy.get());
int wait_attempts = 0;
while (link_proxy->id == SPA_ID_INVALID && !link_proxy->failed && wait_attempts < 3) {
int wait_res = pw_thread_loop_timed_wait(impl_->thread_loop, kSyncWaitSeconds);
if (wait_res == -ETIMEDOUT) {
break;
}
++wait_attempts;
}
if (link_proxy->failed) {
std::string error = link_proxy->error.empty() ? "link creation failed" : link_proxy->error;
pw_proxy_destroy(proxy);
pw_thread_loop_unlock(impl_->thread_loop);
return {Status::Error(StatusCode::kUnavailable, std::move(error)), {}};
}
if (link_proxy->id == SPA_ID_INVALID) {
pw_proxy_destroy(proxy);
pw_thread_loop_unlock(impl_->thread_loop);
return {Status::Error(StatusCode::kTimeout, "timed out waiting for link id"), {}};
}
Link link;
link.id = LinkId{link_proxy->id};
link.output_port = output;
link.input_port = input;
{
std::lock_guard<std::mutex> lock(impl_->cache_mutex);
impl_->link_proxies.emplace(link_proxy->id, std::move(link_proxy));
impl_->links[link.id.value] = link;
}
pw_thread_loop_unlock(impl_->thread_loop);
return {Status::Ok(), link};
}
Status Client::RemoveLink(LinkId) {
return Status::Error(StatusCode::kNotImplemented, "remove link not implemented");
Result<Link> Client::CreateLinkByName(std::string_view output_node,
std::string_view output_port,
std::string_view input_node,
std::string_view input_port,
const LinkOptions& options) {
Status status = impl_->EnsureConnected();
if (!status.ok()) {
return {status, {}};
}
if (output_node.empty() || output_port.empty() || input_node.empty() || input_port.empty()) {
return {Status::Error(StatusCode::kInvalidArgument, "node or port name missing"), {}};
}
std::optional<PortId> out_id;
std::optional<PortId> in_id;
{
std::lock_guard<std::mutex> lock(impl_->cache_mutex);
for (const auto& entry : impl_->ports) {
const PortInfo& port = entry.second;
auto node_it = impl_->nodes.find(port.node.value);
if (node_it == impl_->nodes.end()) {
continue;
}
const NodeInfo& node = node_it->second;
if (port.is_input) {
if (node.name == input_node && port.name == input_port) {
in_id = port.id;
}
} else {
if (node.name == output_node && port.name == output_port) {
out_id = port.id;
}
}
if (out_id && in_id) {
break;
}
}
}
if (!out_id || !in_id) {
return {Status::Error(StatusCode::kNotFound, "matching ports not found"), {}};
}
return CreateLink(*out_id, *in_id, options);
}
Status Client::RemoveLink(LinkId link) {
Status status = impl_->EnsureConnected();
if (!status.ok()) {
return status;
}
pw_thread_loop_lock(impl_->thread_loop);
bool removed = false;
{
std::lock_guard<std::mutex> lock(impl_->cache_mutex);
auto it = impl_->link_proxies.find(link.value);
if (it != impl_->link_proxies.end()) {
if (it->second && it->second->proxy) {
pw_proxy_destroy(it->second->proxy);
}
impl_->link_proxies.erase(it);
impl_->links.erase(link.value);
removed = true;
}
}
if (!removed && impl_->registry) {
int res = pw_registry_destroy(impl_->registry, link.value);
if (res < 0) {
pw_thread_loop_unlock(impl_->thread_loop);
return Status::Error(StatusCode::kNotFound, "link not found");
}
removed = true;
}
pw_thread_loop_unlock(impl_->thread_loop);
return removed ? Status::Ok() : Status::Error(StatusCode::kNotFound, "link not found");
}
Result<RuleId> Client::AddRouteRule(const RouteRule&) {