Milestone 2

This commit is contained in:
Joey Yakimowich-Payne 2026-01-29 17:24:51 -07:00
commit 866f0419ad
19 changed files with 2006 additions and 23 deletions

862
src/warppipe.cpp Normal file
View file

@ -0,0 +1,862 @@
#include <cerrno>
#include <cstdlib>
#include <cstring>
#include <mutex>
#include <unordered_map>
#include <utility>
#include <pipewire/keys.h>
#include <pipewire/pipewire.h>
#include <spa/param/audio/format-utils.h>
#include <spa/utils/defs.h>
#include <spa/utils/result.h>
#include <spa/utils/string.h>
#include <warppipe/warppipe.hpp>
namespace warppipe {
namespace {
constexpr int kSyncWaitSeconds = 2;
constexpr uint32_t kDefaultRate = 48000;
constexpr uint32_t kDefaultChannels = 2;
const char* SafeLookup(const spa_dict* dict, const char* key) {
if (!dict || !key) {
return nullptr;
}
return spa_dict_lookup(dict, key);
}
std::string LookupString(const spa_dict* dict, const char* key) {
const char* value = SafeLookup(dict, key);
return value ? std::string(value) : std::string();
}
bool ParseUint32(const char* value, uint32_t* out) {
if (!value || !out) {
return false;
}
char* end = nullptr;
unsigned long parsed = std::strtoul(value, &end, 10);
if (!end || end == value) {
return false;
}
*out = static_cast<uint32_t>(parsed);
return true;
}
bool IsNodeType(const char* type) {
return type && spa_streq(type, PW_TYPE_INTERFACE_Node);
}
bool IsPortType(const char* type) {
return type && spa_streq(type, PW_TYPE_INTERFACE_Port);
}
bool IsLinkType(const char* type) {
return type && spa_streq(type, PW_TYPE_INTERFACE_Link);
}
struct StreamData {
pw_stream* stream = nullptr;
spa_hook listener{};
pw_thread_loop* loop = nullptr;
bool is_source = false;
bool loopback = false;
std::string target_node;
std::string name;
bool ready = false;
bool failed = false;
std::string error;
uint32_t node_id = SPA_ID_INVALID;
uint32_t channels = kDefaultChannels;
uint32_t rate = kDefaultRate;
};
void StreamProcess(void* data) {
auto* stream_data = static_cast<StreamData*>(data);
if (!stream_data || !stream_data->stream) {
return;
}
if (!stream_data->is_source) {
struct pw_buffer* buffer = nullptr;
while ((buffer = pw_stream_dequeue_buffer(stream_data->stream)) != nullptr) {
pw_stream_queue_buffer(stream_data->stream, buffer);
}
return;
}
struct pw_buffer* buffer = nullptr;
while ((buffer = pw_stream_dequeue_buffer(stream_data->stream)) != nullptr) {
struct spa_buffer* spa_buffer = buffer->buffer;
if (!spa_buffer) {
pw_stream_queue_buffer(stream_data->stream, buffer);
continue;
}
const uint32_t stride = sizeof(float) * stream_data->channels;
for (uint32_t i = 0; i < spa_buffer->n_datas; ++i) {
struct spa_data* data_entry = &spa_buffer->datas[i];
if (!data_entry->data || !data_entry->chunk) {
continue;
}
std::memset(data_entry->data, 0, data_entry->maxsize);
uint32_t frames = stride > 0 ? data_entry->maxsize / stride : 0;
if (buffer->requested > 0 && buffer->requested < frames) {
frames = buffer->requested;
}
data_entry->chunk->offset = 0;
data_entry->chunk->stride = stride;
data_entry->chunk->size = frames * stride;
}
pw_stream_queue_buffer(stream_data->stream, buffer);
}
}
void StreamStateChanged(void* data,
enum pw_stream_state,
enum pw_stream_state state,
const char* error) {
auto* stream_data = static_cast<StreamData*>(data);
if (!stream_data) {
return;
}
if (state == PW_STREAM_STATE_ERROR) {
stream_data->failed = true;
if (error) {
stream_data->error = error;
}
}
if (stream_data->stream) {
uint32_t node_id = pw_stream_get_node_id(stream_data->stream);
if (node_id != SPA_ID_INVALID) {
stream_data->node_id = node_id;
stream_data->ready = true;
}
}
if (stream_data->loop) {
pw_thread_loop_signal(stream_data->loop, false);
}
}
static const pw_stream_events kStreamEvents = {
PW_VERSION_STREAM_EVENTS,
.state_changed = StreamStateChanged,
.process = StreamProcess,
};
} // namespace
Status Status::Ok() {
return Status{};
}
Status Status::Error(StatusCode code, std::string message) {
Status status;
status.code = code;
status.message = std::move(message);
return status;
}
bool Status::ok() const {
return code == StatusCode::kOk;
}
struct Client::Impl {
ConnectionOptions options;
pw_thread_loop* thread_loop = nullptr;
pw_context* context = nullptr;
pw_core* core = nullptr;
pw_registry* registry = nullptr;
spa_hook core_listener{};
spa_hook registry_listener{};
bool core_listener_attached = false;
bool registry_listener_attached = false;
bool connected = false;
uint32_t pending_sync = 0;
uint32_t last_sync = 0;
Status last_error = Status::Ok();
std::mutex cache_mutex;
std::unordered_map<uint32_t, NodeInfo> nodes;
std::unordered_map<uint32_t, PortInfo> ports;
std::unordered_map<uint32_t, Link> links;
std::unordered_map<uint32_t, std::unique_ptr<StreamData>> virtual_streams;
Status ConnectLocked();
void DisconnectLocked();
Status SyncLocked();
void ClearCache();
Status EnsureConnected();
Result<uint32_t> CreateVirtualStreamLocked(std::string_view name,
bool is_source,
const VirtualNodeOptions& options);
static void RegistryGlobal(void* data,
uint32_t id,
uint32_t permissions,
const char* type,
uint32_t version,
const spa_dict* props);
static void RegistryGlobalRemove(void* data, uint32_t id);
static void CoreDone(void* data, uint32_t id, int seq);
static void CoreError(void* data, uint32_t id, int seq, int res, const char* message);
};
void Client::Impl::RegistryGlobal(void* data,
uint32_t id,
uint32_t,
const char* type,
uint32_t,
const spa_dict* props) {
auto* impl = static_cast<Client::Impl*>(data);
if (!impl) {
return;
}
std::lock_guard<std::mutex> lock(impl->cache_mutex);
if (IsNodeType(type)) {
NodeInfo info;
info.id = NodeId{id};
info.name = LookupString(props, PW_KEY_NODE_NAME);
info.media_class = LookupString(props, PW_KEY_MEDIA_CLASS);
impl->nodes[id] = std::move(info);
return;
}
if (IsPortType(type)) {
PortInfo info;
info.id = PortId{id};
info.name = LookupString(props, PW_KEY_PORT_NAME);
info.is_input = false;
uint32_t node_id = 0;
if (ParseUint32(SafeLookup(props, PW_KEY_NODE_ID), &node_id)) {
info.node = NodeId{node_id};
}
const char* direction = SafeLookup(props, PW_KEY_PORT_DIRECTION);
if (direction && spa_streq(direction, "in")) {
info.is_input = true;
}
impl->ports[id] = std::move(info);
return;
}
if (IsLinkType(type)) {
Link info;
info.id = LinkId{id};
uint32_t out_port = 0;
uint32_t in_port = 0;
if (ParseUint32(SafeLookup(props, PW_KEY_LINK_OUTPUT_PORT), &out_port)) {
info.output_port = PortId{out_port};
}
if (ParseUint32(SafeLookup(props, PW_KEY_LINK_INPUT_PORT), &in_port)) {
info.input_port = PortId{in_port};
}
impl->links[id] = std::move(info);
}
}
void Client::Impl::RegistryGlobalRemove(void* data, uint32_t id) {
auto* impl = static_cast<Client::Impl*>(data);
if (!impl) {
return;
}
std::lock_guard<std::mutex> lock(impl->cache_mutex);
impl->virtual_streams.erase(id);
auto node_it = impl->nodes.find(id);
if (node_it != impl->nodes.end()) {
impl->nodes.erase(node_it);
std::vector<uint32_t> removed_ports;
for (auto it = impl->ports.begin(); it != impl->ports.end();) {
if (it->second.node.value == id) {
removed_ports.push_back(it->first);
it = impl->ports.erase(it);
} else {
++it;
}
}
for (auto it = impl->links.begin(); it != impl->links.end();) {
bool remove_link = false;
for (uint32_t port_id : removed_ports) {
if (it->second.input_port.value == port_id || it->second.output_port.value == port_id) {
remove_link = true;
break;
}
}
if (remove_link) {
it = impl->links.erase(it);
} else {
++it;
}
}
return;
}
if (impl->ports.erase(id) > 0) {
for (auto it = impl->links.begin(); it != impl->links.end();) {
if (it->second.input_port.value == id || it->second.output_port.value == id) {
it = impl->links.erase(it);
} else {
++it;
}
}
return;
}
impl->links.erase(id);
}
void Client::Impl::CoreDone(void* data, uint32_t, int seq) {
auto* impl = static_cast<Client::Impl*>(data);
if (!impl || !impl->thread_loop) {
return;
}
if (seq >= static_cast<int>(impl->pending_sync)) {
impl->last_sync = static_cast<uint32_t>(seq);
pw_thread_loop_signal(impl->thread_loop, false);
}
}
void Client::Impl::CoreError(void* data, uint32_t, int, int res, const char* message) {
auto* impl = static_cast<Client::Impl*>(data);
if (!impl) {
return;
}
impl->connected = false;
impl->last_error = Status::Error(StatusCode::kUnavailable,
message ? message : spa_strerror(res));
if (impl->thread_loop) {
pw_thread_loop_signal(impl->thread_loop, false);
}
}
Status Client::Impl::SyncLocked() {
if (!core || !thread_loop) {
return Status::Error(StatusCode::kUnavailable, "pipewire core not connected");
}
pending_sync = pw_core_sync(core, PW_ID_CORE, 0);
if (pending_sync == SPA_ID_INVALID) {
return Status::Error(StatusCode::kInternal, "failed to sync with pipewire core");
}
while (last_sync < pending_sync) {
int wait_res = pw_thread_loop_timed_wait(thread_loop, kSyncWaitSeconds);
if (wait_res == -ETIMEDOUT) {
return Status::Error(StatusCode::kTimeout, "timeout waiting for pipewire sync");
}
}
return Status::Ok();
}
void Client::Impl::ClearCache() {
std::lock_guard<std::mutex> lock(cache_mutex);
nodes.clear();
ports.clear();
links.clear();
}
Status Client::Impl::EnsureConnected() {
if (connected) {
return Status::Ok();
}
if (!options.autoconnect) {
return Status::Error(StatusCode::kUnavailable, "pipewire core disconnected");
}
if (!thread_loop) {
return Status::Error(StatusCode::kUnavailable, "pipewire thread loop not running");
}
pw_thread_loop_lock(thread_loop);
Status status = ConnectLocked();
pw_thread_loop_unlock(thread_loop);
return status;
}
Result<uint32_t> Client::Impl::CreateVirtualStreamLocked(std::string_view name,
bool is_source,
const VirtualNodeOptions& options) {
if (!core || !thread_loop) {
return {Status::Error(StatusCode::kUnavailable, "pipewire core not connected"), 0};
}
if (options.format.rate == 0 || options.format.channels == 0) {
return {Status::Error(StatusCode::kInvalidArgument, "invalid audio format"), 0};
}
if (options.behavior == VirtualBehavior::kLoopback && !options.target_node) {
return {Status::Error(StatusCode::kInvalidArgument, "loopback requires target node"), 0};
}
if (options.media_class_override && options.media_class_override->empty()) {
return {Status::Error(StatusCode::kInvalidArgument, "missing media class"), 0};
}
std::string stream_name = name.empty() ? (is_source ? "warppipe-source" : "warppipe-sink")
: std::string(name);
const char* media_class = is_source ? "Audio/Source" : "Audio/Sink";
std::string media_class_value = options.media_class_override ? *options.media_class_override
: std::string(media_class);
if (media_class_value.empty()) {
return {Status::Error(StatusCode::kInvalidArgument, "missing media class"), 0};
}
const char* media_category = is_source ? "Capture" : "Playback";
std::string display_name = options.display_name.empty() ? stream_name : options.display_name;
const char* node_group = options.group.empty() ? nullptr : options.group.c_str();
{
std::lock_guard<std::mutex> lock(cache_mutex);
for (const auto& entry : nodes) {
if (entry.second.name == stream_name) {
return {Status::Error(StatusCode::kInvalidArgument, "duplicate node name"), 0};
}
}
for (const auto& entry : virtual_streams) {
if (entry.second && entry.second->name == stream_name) {
return {Status::Error(StatusCode::kInvalidArgument, "duplicate node name"), 0};
}
}
if (options.behavior == VirtualBehavior::kLoopback && options.target_node) {
bool found_target = false;
for (const auto& entry : nodes) {
if (entry.second.name == *options.target_node) {
found_target = true;
break;
}
}
if (!found_target) {
return {Status::Error(StatusCode::kNotFound, "target node not found"), 0};
}
}
}
pw_properties* props = pw_properties_new(PW_KEY_MEDIA_TYPE, "Audio",
PW_KEY_MEDIA_CATEGORY, media_category,
PW_KEY_MEDIA_ROLE, "Music",
PW_KEY_MEDIA_CLASS, media_class_value.c_str(),
PW_KEY_NODE_NAME, stream_name.c_str(),
PW_KEY_MEDIA_NAME, display_name.c_str(),
PW_KEY_NODE_DESCRIPTION, display_name.c_str(),
PW_KEY_NODE_VIRTUAL, "true",
nullptr);
if (!props) {
return {Status::Error(StatusCode::kInternal, "failed to allocate stream properties"), 0};
}
if (node_group) {
pw_properties_set(props, PW_KEY_NODE_GROUP, node_group);
}
if (options.behavior == VirtualBehavior::kLoopback && options.target_node) {
pw_properties_set(props, PW_KEY_TARGET_OBJECT, options.target_node->c_str());
}
pw_stream* stream = pw_stream_new(core, stream_name.c_str(), props);
if (!stream) {
return {Status::Error(StatusCode::kUnavailable, "failed to create pipewire stream"), 0};
}
auto stream_data = std::make_unique<StreamData>();
stream_data->stream = stream;
stream_data->loop = thread_loop;
stream_data->is_source = is_source;
stream_data->loopback = options.behavior == VirtualBehavior::kLoopback;
if (options.target_node) {
stream_data->target_node = *options.target_node;
}
stream_data->name = stream_name;
if (options.format.rate != 0) {
stream_data->rate = options.format.rate;
}
if (options.format.channels != 0) {
stream_data->channels = options.format.channels;
}
pw_stream_add_listener(stream, &stream_data->listener, &kStreamEvents, stream_data.get());
const struct spa_pod* params[1];
uint8_t buffer[1024];
spa_pod_builder builder = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer));
spa_audio_info_raw audio_info{};
audio_info.format = SPA_AUDIO_FORMAT_F32;
audio_info.rate = stream_data->rate;
audio_info.channels = stream_data->channels;
params[0] = spa_format_audio_raw_build(&builder, SPA_PARAM_EnumFormat, &audio_info);
enum pw_direction direction = is_source ? PW_DIRECTION_OUTPUT : PW_DIRECTION_INPUT;
enum pw_stream_flags flags = PW_STREAM_FLAG_MAP_BUFFERS;
if (options.behavior == VirtualBehavior::kLoopback && options.target_node) {
flags = static_cast<pw_stream_flags>(flags | PW_STREAM_FLAG_AUTOCONNECT);
}
int res = pw_stream_connect(stream, direction, PW_ID_ANY, flags, params, 1);
if (res < 0) {
pw_stream_destroy(stream);
return {Status::Error(StatusCode::kUnavailable, "failed to connect pipewire stream"), 0};
}
uint32_t node_id = pw_stream_get_node_id(stream);
int wait_attempts = 0;
while (node_id == SPA_ID_INVALID && !stream_data->failed && wait_attempts < 3) {
int wait_res = pw_thread_loop_timed_wait(thread_loop, kSyncWaitSeconds);
if (wait_res == -ETIMEDOUT) {
break;
}
node_id = stream_data->node_id;
++wait_attempts;
}
if (stream_data->failed) {
std::string error = stream_data->error.empty() ? "stream entered error state" : stream_data->error;
pw_stream_destroy(stream);
return {Status::Error(StatusCode::kUnavailable, std::move(error)), 0};
}
if (node_id == SPA_ID_INVALID) {
pw_stream_destroy(stream);
return {Status::Error(StatusCode::kTimeout, "timed out waiting for stream node id"), 0};
}
stream_data->node_id = node_id;
stream_data->ready = true;
virtual_streams.emplace(node_id, std::move(stream_data));
return {Status::Ok(), node_id};
}
Status Client::Impl::ConnectLocked() {
if (connected) {
return Status::Ok();
}
if (!thread_loop) {
return Status::Error(StatusCode::kInternal, "thread loop not initialized");
}
pw_loop* loop = pw_thread_loop_get_loop(thread_loop);
if (!context) {
context = pw_context_new(loop, nullptr, 0);
if (!context) {
return Status::Error(StatusCode::kUnavailable, "failed to create pipewire context");
}
}
pw_properties* props = pw_properties_new(PW_KEY_APP_NAME, options.application_name.c_str(), nullptr);
if (options.remote_name && !options.remote_name->empty()) {
pw_properties_set(props, PW_KEY_REMOTE_NAME, options.remote_name->c_str());
}
core = pw_context_connect(context, props, 0);
if (!core) {
return Status::Error(StatusCode::kUnavailable, "failed to connect to pipewire core");
}
static const pw_core_events core_events = {
PW_VERSION_CORE_EVENTS,
.done = CoreDone,
.error = CoreError,
};
pw_core_add_listener(core, &core_listener, &core_events, this);
core_listener_attached = true;
registry = pw_core_get_registry(core, PW_VERSION_REGISTRY, 0);
if (!registry) {
return Status::Error(StatusCode::kUnavailable, "failed to get pipewire registry");
}
static const pw_registry_events registry_events = {
PW_VERSION_REGISTRY_EVENTS,
.global = RegistryGlobal,
.global_remove = RegistryGlobalRemove,
};
pw_registry_add_listener(registry, &registry_listener, &registry_events, this);
registry_listener_attached = true;
connected = true;
last_error = Status::Ok();
ClearCache();
return SyncLocked();
}
void Client::Impl::DisconnectLocked() {
for (auto& entry : virtual_streams) {
StreamData* stream_data = entry.second.get();
if (stream_data && stream_data->stream) {
pw_stream_disconnect(stream_data->stream);
pw_stream_destroy(stream_data->stream);
stream_data->stream = nullptr;
}
}
virtual_streams.clear();
if (registry_listener_attached) {
spa_hook_remove(&registry_listener);
registry_listener_attached = false;
}
if (core_listener_attached) {
spa_hook_remove(&core_listener);
core_listener_attached = false;
}
if (registry) {
pw_proxy_destroy(reinterpret_cast<pw_proxy*>(registry));
registry = nullptr;
}
if (core) {
pw_core_disconnect(core);
core = nullptr;
}
connected = false;
ClearCache();
}
Client::Client(std::unique_ptr<Impl> impl) : impl_(std::move(impl)) {}
Client::Client(Client&&) noexcept = default;
Client& Client::operator=(Client&&) noexcept = default;
Client::~Client() {
if (impl_) {
Shutdown();
}
}
Result<std::unique_ptr<Client>> Client::Create(const ConnectionOptions& options) {
pw_init(nullptr, nullptr);
if (options.threading == ThreadingMode::kCallerThread) {
return {Status::Error(StatusCode::kNotImplemented, "caller thread mode not implemented"), {}};
}
auto impl = std::make_unique<Impl>();
impl->options = options;
impl->thread_loop = pw_thread_loop_new("warppipe", nullptr);
if (!impl->thread_loop) {
return {Status::Error(StatusCode::kUnavailable, "failed to create pipewire thread loop"), {}};
}
if (pw_thread_loop_start(impl->thread_loop) != 0) {
pw_thread_loop_destroy(impl->thread_loop);
impl->thread_loop = nullptr;
return {Status::Error(StatusCode::kUnavailable, "failed to start pipewire thread loop"), {}};
}
pw_thread_loop_lock(impl->thread_loop);
Status status = impl->ConnectLocked();
pw_thread_loop_unlock(impl->thread_loop);
if (!status.ok()) {
pw_thread_loop_lock(impl->thread_loop);
impl->DisconnectLocked();
if (impl->context) {
pw_context_destroy(impl->context);
impl->context = nullptr;
}
pw_thread_loop_unlock(impl->thread_loop);
pw_thread_loop_stop(impl->thread_loop);
pw_thread_loop_destroy(impl->thread_loop);
impl->thread_loop = nullptr;
return {status, {}};
}
return {Status::Ok(), std::unique_ptr<Client>(new Client(std::move(impl)))};
}
Status Client::Shutdown() {
if (!impl_) {
return Status::Ok();
}
if (impl_->thread_loop) {
pw_thread_loop_lock(impl_->thread_loop);
impl_->DisconnectLocked();
if (impl_->context) {
pw_context_destroy(impl_->context);
impl_->context = nullptr;
}
pw_thread_loop_unlock(impl_->thread_loop);
pw_thread_loop_stop(impl_->thread_loop);
pw_thread_loop_destroy(impl_->thread_loop);
impl_->thread_loop = nullptr;
}
pw_deinit();
return Status::Ok();
}
Result<std::vector<NodeInfo>> Client::ListNodes() {
Status status = impl_->EnsureConnected();
if (!status.ok()) {
return {status, {}};
}
std::lock_guard<std::mutex> lock(impl_->cache_mutex);
std::vector<NodeInfo> items;
items.reserve(impl_->nodes.size());
for (const auto& entry : impl_->nodes) {
items.push_back(entry.second);
}
return {Status::Ok(), std::move(items)};
}
Result<std::vector<PortInfo>> Client::ListPorts(NodeId node) {
Status status = impl_->EnsureConnected();
if (!status.ok()) {
return {status, {}};
}
std::lock_guard<std::mutex> lock(impl_->cache_mutex);
std::vector<PortInfo> items;
for (const auto& entry : impl_->ports) {
if (entry.second.node.value == node.value) {
items.push_back(entry.second);
}
}
return {Status::Ok(), std::move(items)};
}
Result<std::vector<Link>> Client::ListLinks() {
Status status = impl_->EnsureConnected();
if (!status.ok()) {
return {status, {}};
}
std::lock_guard<std::mutex> lock(impl_->cache_mutex);
std::vector<Link> items;
items.reserve(impl_->links.size());
for (const auto& entry : impl_->links) {
items.push_back(entry.second);
}
return {Status::Ok(), std::move(items)};
}
Result<VirtualSink> Client::CreateVirtualSink(std::string_view name,
const VirtualNodeOptions& options) {
Status status = impl_->EnsureConnected();
if (!status.ok()) {
return {status, {}};
}
std::string name_value = name.empty() ? std::string() : std::string(name);
pw_thread_loop_lock(impl_->thread_loop);
auto result = impl_->CreateVirtualStreamLocked(name_value, false, options);
pw_thread_loop_unlock(impl_->thread_loop);
if (!result.ok()) {
return {result.status, {}};
}
VirtualSink sink;
sink.node = NodeId{result.value};
sink.name = name_value.empty() ? "warppipe-sink" : name_value;
return {Status::Ok(), std::move(sink)};
}
Result<VirtualSource> Client::CreateVirtualSource(std::string_view name,
const VirtualNodeOptions& options) {
Status status = impl_->EnsureConnected();
if (!status.ok()) {
return {status, {}};
}
std::string name_value = name.empty() ? std::string() : std::string(name);
pw_thread_loop_lock(impl_->thread_loop);
auto result = impl_->CreateVirtualStreamLocked(name_value, true, options);
pw_thread_loop_unlock(impl_->thread_loop);
if (!result.ok()) {
return {result.status, {}};
}
VirtualSource source;
source.node = NodeId{result.value};
source.name = name_value.empty() ? "warppipe-source" : name_value;
return {Status::Ok(), std::move(source)};
}
Status Client::RemoveNode(NodeId node) {
Status status = impl_->EnsureConnected();
if (!status.ok()) {
return status;
}
pw_thread_loop_lock(impl_->thread_loop);
auto it = impl_->virtual_streams.find(node.value);
if (it == impl_->virtual_streams.end()) {
pw_thread_loop_unlock(impl_->thread_loop);
return Status::Error(StatusCode::kNotFound, "node not managed by warppipe");
}
StreamData* stream_data = it->second.get();
if (stream_data && stream_data->stream) {
pw_stream_disconnect(stream_data->stream);
pw_stream_destroy(stream_data->stream);
stream_data->stream = nullptr;
}
impl_->virtual_streams.erase(it);
pw_thread_loop_unlock(impl_->thread_loop);
return Status::Ok();
}
Result<Link> Client::CreateLink(PortId, PortId, const LinkOptions&) {
return {Status::Error(StatusCode::kNotImplemented, "create link not implemented"), {}};
}
Status Client::RemoveLink(LinkId) {
return Status::Error(StatusCode::kNotImplemented, "remove link not implemented");
}
Result<RuleId> Client::AddRouteRule(const RouteRule&) {
return {Status::Error(StatusCode::kNotImplemented, "add route rule not implemented"), {}};
}
Status Client::RemoveRouteRule(RuleId) {
return Status::Error(StatusCode::kNotImplemented, "remove route rule not implemented");
}
Status Client::SaveConfig(std::string_view) {
return Status::Error(StatusCode::kNotImplemented, "save config not implemented");
}
Status Client::LoadConfig(std::string_view) {
return Status::Error(StatusCode::kNotImplemented, "load config not implemented");
}
#ifdef WARPPIPE_TESTING
Status Client::Test_InsertNode(const NodeInfo& node) {
if (!impl_) {
return Status::Error(StatusCode::kInternal, "client not initialized");
}
std::lock_guard<std::mutex> lock(impl_->cache_mutex);
impl_->nodes[node.id.value] = node;
return Status::Ok();
}
Status Client::Test_InsertPort(const PortInfo& port) {
if (!impl_) {
return Status::Error(StatusCode::kInternal, "client not initialized");
}
std::lock_guard<std::mutex> lock(impl_->cache_mutex);
impl_->ports[port.id.value] = port;
return Status::Ok();
}
Status Client::Test_InsertLink(const Link& link) {
if (!impl_) {
return Status::Error(StatusCode::kInternal, "client not initialized");
}
std::lock_guard<std::mutex> lock(impl_->cache_mutex);
impl_->links[link.id.value] = link;
return Status::Ok();
}
Status Client::Test_RemoveGlobal(uint32_t id) {
if (!impl_) {
return Status::Error(StatusCode::kInternal, "client not initialized");
}
Client::Impl::RegistryGlobalRemove(impl_.get(), id);
return Status::Ok();
}
Status Client::Test_ForceDisconnect() {
if (!impl_ || !impl_->thread_loop) {
return Status::Error(StatusCode::kInternal, "thread loop not initialized");
}
pw_thread_loop_lock(impl_->thread_loop);
impl_->DisconnectLocked();
pw_thread_loop_unlock(impl_->thread_loop);
return Status::Ok();
}
#endif
} // namespace warppipe