This commit is contained in:
Joey Yakimowich-Payne 2026-01-30 10:40:52 -07:00
commit ecec82c70e
10 changed files with 809 additions and 21 deletions

View file

@ -1,10 +1,13 @@
#include <algorithm>
#include <atomic>
#include <cerrno>
#include <cmath>
#include <cstdlib>
#include <cstring>
#include <fstream>
#include <mutex>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <pipewire/keys.h>
@ -241,6 +244,52 @@ static const pw_stream_events kStreamEvents = {
.process = StreamProcess,
};
struct MeterStreamData {
uint32_t node_id = 0;
std::string target_name;
pw_stream* stream = nullptr;
spa_hook listener{};
std::atomic<float> peak_left{0.0f};
std::atomic<float> peak_right{0.0f};
};
void NodeMeterProcess(void* data) {
auto* meter = static_cast<MeterStreamData*>(data);
if (!meter || !meter->stream) {
return;
}
pw_buffer* buf = pw_stream_dequeue_buffer(meter->stream);
if (!buf || !buf->buffer || buf->buffer->n_datas == 0) {
if (buf) {
pw_stream_queue_buffer(meter->stream, buf);
}
return;
}
spa_data* d = &buf->buffer->datas[0];
if (!d->data || !d->chunk) {
pw_stream_queue_buffer(meter->stream, buf);
return;
}
const float* samples = static_cast<const float*>(d->data);
uint32_t count = d->chunk->size / sizeof(float);
float left = 0.0f;
float right = 0.0f;
for (uint32_t i = 0; i + 1 < count; i += 2) {
float l = std::fabs(samples[i]);
float r = std::fabs(samples[i + 1]);
if (l > left) left = l;
if (r > right) right = r;
}
meter->peak_left.store(left, std::memory_order_relaxed);
meter->peak_right.store(right, std::memory_order_relaxed);
pw_stream_queue_buffer(meter->stream, buf);
}
static const pw_stream_events kNodeMeterEvents = {
.version = PW_VERSION_STREAM_EVENTS,
.process = NodeMeterProcess,
};
} // namespace
Status Status::Ok() {
@ -281,6 +330,13 @@ struct Client::Impl {
std::unordered_map<uint32_t, VolumeState> volume_states;
std::unordered_map<uint32_t, MeterState> meter_states;
std::unordered_set<uint32_t> metered_nodes;
MeterState master_meter;
std::unique_ptr<MeterStreamData> master_meter_data;
std::unordered_map<uint32_t, std::unique_ptr<MeterStreamData>> live_meters;
uint32_t next_rule_id = 1;
std::unordered_map<uint32_t, RouteRule> route_rules;
std::vector<PendingAutoLink> pending_auto_links;
@ -307,6 +363,9 @@ struct Client::Impl {
void ProcessPendingAutoLinks();
void CreateAutoLinkAsync(uint32_t output_port, uint32_t input_port);
void AutoSave();
void SetupMasterMeter();
void TeardownMasterMeter();
void TeardownAllLiveMeters();
static void RegistryGlobal(void* data,
uint32_t id,
@ -730,10 +789,14 @@ Status Client::Impl::ConnectLocked() {
if (!sync_status.ok()) {
return sync_status;
}
SetupMasterMeter();
return Status::Ok();
}
void Client::Impl::DisconnectLocked() {
TeardownMasterMeter();
TeardownAllLiveMeters();
std::unordered_map<uint32_t, std::unique_ptr<LinkProxy>> links;
std::unordered_map<uint32_t, std::unique_ptr<StreamData>> streams;
{
@ -982,6 +1045,74 @@ void Client::Impl::AutoSave() {
}
}
void Client::Impl::SetupMasterMeter() {
if (!thread_loop || !core || master_meter_data) {
return;
}
auto meter = std::make_unique<MeterStreamData>();
pw_properties* props = pw_properties_new(
PW_KEY_MEDIA_TYPE, "Audio",
PW_KEY_MEDIA_CATEGORY, "Capture",
PW_KEY_MEDIA_CLASS, "Stream/Input/Audio",
PW_KEY_STREAM_CAPTURE_SINK, "true",
PW_KEY_STREAM_MONITOR, "true",
PW_KEY_NODE_NAME, "",
nullptr);
meter->stream = pw_stream_new_simple(
pw_thread_loop_get_loop(thread_loop),
"warppipe-meter", props, &kNodeMeterEvents, meter.get());
if (!meter->stream) {
return;
}
uint8_t buffer[512];
spa_pod_builder builder = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer));
spa_audio_info_raw info{};
info.format = SPA_AUDIO_FORMAT_F32;
info.rate = 48000;
info.channels = 2;
info.position[0] = SPA_AUDIO_CHANNEL_FL;
info.position[1] = SPA_AUDIO_CHANNEL_FR;
const spa_pod* params[1];
params[0] = spa_format_audio_raw_build(&builder, SPA_PARAM_EnumFormat, &info);
int res = pw_stream_connect(
meter->stream, PW_DIRECTION_INPUT, PW_ID_ANY,
static_cast<pw_stream_flags>(
PW_STREAM_FLAG_AUTOCONNECT | PW_STREAM_FLAG_MAP_BUFFERS | PW_STREAM_FLAG_RT_PROCESS),
params, 1);
if (res != 0) {
pw_stream_destroy(meter->stream);
return;
}
master_meter_data = std::move(meter);
}
void Client::Impl::TeardownMasterMeter() {
if (!master_meter_data) {
return;
}
if (master_meter_data->stream) {
pw_stream_destroy(master_meter_data->stream);
}
master_meter_data.reset();
}
void Client::Impl::TeardownAllLiveMeters() {
std::unordered_map<uint32_t, std::unique_ptr<MeterStreamData>> meters;
{
std::lock_guard<std::mutex> lock(cache_mutex);
meters.swap(live_meters);
}
for (auto& entry : meters) {
if (entry.second && entry.second->stream) {
pw_stream_destroy(entry.second->stream);
entry.second->stream = nullptr;
}
}
}
int Client::Impl::MetadataProperty(void* data, uint32_t subject,
const char* key, const char* type,
const char* value) {
@ -1278,6 +1409,140 @@ Result<VolumeState> Client::GetNodeVolume(NodeId node) const {
return {Status::Ok(), it->second};
}
Status Client::EnsureNodeMeter(NodeId node) {
if (node.value == 0) {
return Status::Error(StatusCode::kInvalidArgument, "invalid node id");
}
std::string target_name;
bool capture_sink = false;
{
std::lock_guard<std::mutex> lock(impl_->cache_mutex);
auto node_it = impl_->nodes.find(node.value);
if (node_it == impl_->nodes.end()) {
return Status::Error(StatusCode::kNotFound, "node not found");
}
impl_->metered_nodes.insert(node.value);
if (impl_->meter_states.find(node.value) == impl_->meter_states.end()) {
impl_->meter_states[node.value] = MeterState{};
}
if (impl_->live_meters.find(node.value) != impl_->live_meters.end()) {
return Status::Ok();
}
target_name = node_it->second.name;
const auto& mc = node_it->second.media_class;
capture_sink = (mc.find("Sink") != std::string::npos ||
mc.find("Duplex") != std::string::npos);
}
if (!impl_->thread_loop || !impl_->core) {
return Status::Ok();
}
pw_thread_loop_lock(impl_->thread_loop);
auto meter = std::make_unique<MeterStreamData>();
meter->node_id = node.value;
meter->target_name = target_name;
pw_properties* props = pw_properties_new(
PW_KEY_MEDIA_TYPE, "Audio",
PW_KEY_MEDIA_CATEGORY, "Capture",
PW_KEY_MEDIA_CLASS, "Stream/Input/Audio",
PW_KEY_TARGET_OBJECT, target_name.c_str(),
PW_KEY_STREAM_MONITOR, "true",
PW_KEY_NODE_NAME, "",
nullptr);
if (capture_sink) {
pw_properties_set(props, PW_KEY_STREAM_CAPTURE_SINK, "true");
}
meter->stream = pw_stream_new_simple(
pw_thread_loop_get_loop(impl_->thread_loop),
"warppipe-node-meter", props, &kNodeMeterEvents, meter.get());
if (!meter->stream) {
pw_thread_loop_unlock(impl_->thread_loop);
return Status::Ok();
}
uint8_t buffer[512];
spa_pod_builder builder = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer));
spa_audio_info_raw info{};
info.format = SPA_AUDIO_FORMAT_F32;
info.rate = 48000;
info.channels = 2;
info.position[0] = SPA_AUDIO_CHANNEL_FL;
info.position[1] = SPA_AUDIO_CHANNEL_FR;
const spa_pod* params[1];
params[0] = spa_format_audio_raw_build(&builder, SPA_PARAM_EnumFormat, &info);
int res = pw_stream_connect(
meter->stream, PW_DIRECTION_INPUT, PW_ID_ANY,
static_cast<pw_stream_flags>(
PW_STREAM_FLAG_AUTOCONNECT | PW_STREAM_FLAG_MAP_BUFFERS | PW_STREAM_FLAG_RT_PROCESS),
params, 1);
if (res != 0) {
pw_stream_destroy(meter->stream);
pw_thread_loop_unlock(impl_->thread_loop);
return Status::Ok();
}
{
std::lock_guard<std::mutex> lock(impl_->cache_mutex);
impl_->live_meters[node.value] = std::move(meter);
}
pw_thread_loop_unlock(impl_->thread_loop);
return Status::Ok();
}
Status Client::DisableNodeMeter(NodeId node) {
std::unique_ptr<MeterStreamData> meter;
{
std::lock_guard<std::mutex> lock(impl_->cache_mutex);
impl_->metered_nodes.erase(node.value);
impl_->meter_states.erase(node.value);
auto it = impl_->live_meters.find(node.value);
if (it != impl_->live_meters.end()) {
meter = std::move(it->second);
impl_->live_meters.erase(it);
}
}
if (meter && meter->stream && impl_->thread_loop) {
pw_thread_loop_lock(impl_->thread_loop);
pw_stream_destroy(meter->stream);
meter->stream = nullptr;
pw_thread_loop_unlock(impl_->thread_loop);
}
return Status::Ok();
}
Result<MeterState> Client::NodeMeterPeak(NodeId node) const {
std::lock_guard<std::mutex> lock(impl_->cache_mutex);
auto live_it = impl_->live_meters.find(node.value);
if (live_it != impl_->live_meters.end() && live_it->second) {
MeterState state;
state.peak_left = live_it->second->peak_left.load(std::memory_order_relaxed);
state.peak_right = live_it->second->peak_right.load(std::memory_order_relaxed);
return {Status::Ok(), state};
}
auto it = impl_->meter_states.find(node.value);
if (it == impl_->meter_states.end()) {
return {Status::Error(StatusCode::kNotFound, "node not metered"), {}};
}
return {Status::Ok(), it->second};
}
Result<MeterState> Client::MeterPeak() const {
std::lock_guard<std::mutex> lock(impl_->cache_mutex);
if (impl_->master_meter_data) {
MeterState state;
state.peak_left = impl_->master_meter_data->peak_left.load(std::memory_order_relaxed);
state.peak_right = impl_->master_meter_data->peak_right.load(std::memory_order_relaxed);
return {Status::Ok(), state};
}
return {Status::Ok(), impl_->master_meter};
}
Result<Link> Client::CreateLink(PortId output, PortId input, const LinkOptions& options) {
Status status = impl_->EnsureConnected();
if (!status.ok()) {
@ -1805,6 +2070,38 @@ Result<VolumeState> Client::Test_GetNodeVolume(NodeId node) const {
}
return {Status::Ok(), it->second};
}
Status Client::Test_SetNodeMeterPeak(NodeId node, float left, float right) {
if (!impl_) {
return Status::Error(StatusCode::kUnavailable, "no impl");
}
std::lock_guard<std::mutex> lock(impl_->cache_mutex);
float cl = std::clamp(left, 0.0f, 1.0f);
float cr = std::clamp(right, 0.0f, 1.0f);
impl_->meter_states[node.value] = MeterState{cl, cr};
impl_->metered_nodes.insert(node.value);
auto it = impl_->live_meters.find(node.value);
if (it != impl_->live_meters.end() && it->second) {
it->second->peak_left.store(cl, std::memory_order_relaxed);
it->second->peak_right.store(cr, std::memory_order_relaxed);
}
return Status::Ok();
}
Status Client::Test_SetMasterMeterPeak(float left, float right) {
if (!impl_) {
return Status::Error(StatusCode::kUnavailable, "no impl");
}
std::lock_guard<std::mutex> lock(impl_->cache_mutex);
float cl = std::clamp(left, 0.0f, 1.0f);
float cr = std::clamp(right, 0.0f, 1.0f);
impl_->master_meter = MeterState{cl, cr};
if (impl_->master_meter_data) {
impl_->master_meter_data->peak_left.store(cl, std::memory_order_relaxed);
impl_->master_meter_data->peak_right.store(cr, std::memory_order_relaxed);
}
return Status::Ok();
}
#endif
} // namespace warppipe