potato/src/pipewire/pipewirecontroller.cpp
2026-01-27 19:34:30 -07:00

1031 lines
29 KiB
C++

#include "pipewirecontroller.h"
#include <QDebug>
#include <QMutexLocker>
#include <QByteArray>
#include <QString>
#include <QElapsedTimer>
#include <QThread>
#include <algorithm>
#include <cstring>
#include <cstdlib>
#include <cmath>
#include <pipewire/pipewire.h>
#include <pipewire/keys.h>
#include <pipewire/properties.h>
#include <pipewire/stream.h>
#include <pipewire/node.h>
#include <spa/param/props.h>
#include <spa/param/audio/format-utils.h>
#include <spa/param/audio/raw.h>
#include <spa/utils/dict.h>
#include <spa/utils/defs.h>
#include <spa/utils/type-info.h>
namespace Potato {
static constexpr uint32_t kMeterRingCapacityBytes = 4096;
static void writeRingValue(spa_ringbuffer *ring, std::vector<uint8_t> &buffer, float value)
{
if (!ring || buffer.empty()) {
return;
}
uint32_t index = 0;
const uint32_t avail = spa_ringbuffer_get_write_index(ring, &index);
if (avail < sizeof(float)) {
return;
}
const uint32_t size = static_cast<uint32_t>(buffer.size());
uint32_t offset = index & (size - 1);
if (offset + sizeof(float) <= size) {
std::memcpy(buffer.data() + offset, &value, sizeof(float));
} else {
const uint32_t first = size - offset;
std::memcpy(buffer.data() + offset, &value, first);
std::memcpy(buffer.data(), reinterpret_cast<const uint8_t*>(&value) + first, sizeof(float) - first);
}
spa_ringbuffer_write_update(ring, index + sizeof(float));
}
static bool readRingLatest(spa_ringbuffer *ring, std::vector<uint8_t> &buffer, float &value)
{
if (!ring || buffer.empty()) {
return false;
}
uint32_t index = 0;
const uint32_t avail = spa_ringbuffer_get_read_index(ring, &index);
if (avail < sizeof(float)) {
return false;
}
const uint32_t size = static_cast<uint32_t>(buffer.size());
const uint32_t latestIndex = index + (avail - sizeof(float));
uint32_t offset = latestIndex & (size - 1);
if (offset + sizeof(float) <= size) {
std::memcpy(&value, buffer.data() + offset, sizeof(float));
} else {
const uint32_t first = size - offset;
std::memcpy(&value, buffer.data() + offset, first);
std::memcpy(reinterpret_cast<uint8_t*>(&value) + first, buffer.data(), sizeof(float) - first);
}
spa_ringbuffer_read_update(ring, index + avail);
return true;
}
bool PipeWireController::createVirtualDevice(const QString &name,
const QString &description,
const char *factoryName,
const char *mediaClass,
int channels,
int rate)
{
if (!m_threadLoop || !m_core || name.isEmpty()) {
return false;
}
channels = channels > 0 ? channels : 2;
rate = rate > 0 ? rate : 48000;
const QByteArray nameBytes = name.toUtf8();
const QByteArray descBytes = description.isEmpty() ? nameBytes : description.toUtf8();
const QByteArray channelsBytes = QByteArray::number(channels);
const QByteArray rateBytes = QByteArray::number(rate);
struct spa_dict_item items[] = {
{ PW_KEY_FACTORY_NAME, factoryName },
{ PW_KEY_NODE_NAME, nameBytes.constData() },
{ PW_KEY_NODE_DESCRIPTION, descBytes.constData() },
{ PW_KEY_MEDIA_CLASS, mediaClass },
{ PW_KEY_AUDIO_CHANNELS, channelsBytes.constData() },
{ PW_KEY_AUDIO_RATE, rateBytes.constData() },
{ "object.linger", "true" },
{ PW_KEY_APP_NAME, "Potato-Manager" }
};
struct spa_dict dict = SPA_DICT_INIT(items, SPA_N_ELEMENTS(items));
lock();
auto *proxy = static_cast<struct pw_proxy*>(pw_core_create_object(
m_core,
"adapter",
PW_TYPE_INTERFACE_Node,
PW_VERSION_NODE,
&dict,
0));
unlock();
if (!proxy) {
return false;
}
m_virtualDevices.push_back(proxy);
return true;
}
static QString toQString(const char *value)
{
if (!value) {
return QString();
}
return QString::fromUtf8(QByteArray::fromRawData(value, static_cast<int>(strlen(value))));
}
void registryEventGlobal(void *data, uint32_t id, uint32_t permissions,
const char *type, uint32_t version,
const struct spa_dict *props)
{
Q_UNUSED(permissions)
Q_UNUSED(version)
auto *self = static_cast<PipeWireController*>(data);
if (strcmp(type, PW_TYPE_INTERFACE_Node) == 0) {
self->handleNodeInfo(id, props);
} else if (strcmp(type, PW_TYPE_INTERFACE_Port) == 0) {
self->handlePortInfo(id, props);
} else if (strcmp(type, PW_TYPE_INTERFACE_Link) == 0) {
self->handleLinkInfo(id, props);
}
}
void registryEventGlobalRemove(void *data, uint32_t id)
{
auto *self = static_cast<PipeWireController*>(data);
{
QMutexLocker lock(&self->m_nodesMutex);
if (self->m_nodes.contains(id)) {
self->m_nodes.remove(id);
emit self->nodeRemoved(id);
return;
}
if (self->m_ports.contains(id)) {
self->m_ports.remove(id);
return;
}
if (self->m_links.contains(id)) {
self->m_links.remove(id);
emit self->linkRemoved(id);
return;
}
}
}
void coreEventDone(void *data, uint32_t id, int seq)
{
Q_UNUSED(data)
Q_UNUSED(id)
Q_UNUSED(seq)
}
void coreEventError(void *data, uint32_t id, int seq, int res, const char *message)
{
Q_UNUSED(id)
Q_UNUSED(seq)
auto *self = static_cast<PipeWireController*>(data);
QString errorMsg = QString("PipeWire error (code ")
+ QString::number(res)
+ QString("): ")
+ toQString(message);
qWarning() << errorMsg;
emit self->errorOccurred(errorMsg);
if (res == -EPIPE) {
self->m_connected.storeRelaxed(false);
emit self->connectionLost();
}
}
static const struct pw_registry_events registry_events = []() {
struct pw_registry_events events{};
events.version = PW_VERSION_REGISTRY_EVENTS;
events.global = registryEventGlobal;
events.global_remove = registryEventGlobalRemove;
return events;
}();
static const struct pw_core_events core_events = []() {
struct pw_core_events events{};
events.version = PW_VERSION_CORE_EVENTS;
events.done = coreEventDone;
events.error = coreEventError;
return events;
}();
void meterProcess(void *data)
{
auto *self = static_cast<PipeWireController*>(data);
if (!self || !self->m_meterStream) {
return;
}
struct pw_buffer *buf = pw_stream_dequeue_buffer(self->m_meterStream);
if (!buf || !buf->buffer || buf->buffer->n_datas == 0) {
if (buf) {
pw_stream_queue_buffer(self->m_meterStream, buf);
}
return;
}
struct spa_buffer *spaBuf = buf->buffer;
struct spa_data *data0 = &spaBuf->datas[0];
if (!data0->data || !data0->chunk) {
pw_stream_queue_buffer(self->m_meterStream, buf);
return;
}
const uint32_t size = data0->chunk->size;
const float *samples = static_cast<const float*>(data0->data);
const uint32_t count = size / sizeof(float);
float peak = 0.0f;
for (uint32_t i = 0; i < count; ++i) {
const float value = std::fabs(samples[i]);
if (value > peak) {
peak = value;
}
}
self->m_meterPeak.store(peak, std::memory_order_relaxed);
if (self->m_meterRingReady.load(std::memory_order_relaxed)) {
writeRingValue(&self->m_meterRing, self->m_meterRingData, peak);
}
pw_stream_queue_buffer(self->m_meterStream, buf);
}
static const struct pw_stream_events meter_events = []() {
struct pw_stream_events events{};
events.version = PW_VERSION_STREAM_EVENTS;
events.process = meterProcess;
return events;
}();
struct NodeMeter {
uint32_t nodeId;
QString targetName;
pw_stream *stream = nullptr;
std::atomic<float> peak{0.0f};
};
static void nodeMeterProcess(void *data)
{
auto *meter = static_cast<NodeMeter*>(data);
if (!meter || !meter->stream) {
return;
}
struct pw_buffer *buf = pw_stream_dequeue_buffer(meter->stream);
if (!buf || !buf->buffer || buf->buffer->n_datas == 0) {
if (buf) {
pw_stream_queue_buffer(meter->stream, buf);
}
return;
}
struct spa_buffer *spaBuf = buf->buffer;
struct spa_data *data0 = &spaBuf->datas[0];
if (!data0->data || !data0->chunk) {
pw_stream_queue_buffer(meter->stream, buf);
return;
}
const uint32_t size = data0->chunk->size;
const float *samples = static_cast<const float*>(data0->data);
const uint32_t count = size / sizeof(float);
float peak = 0.0f;
for (uint32_t i = 0; i < count; ++i) {
const float value = std::fabs(samples[i]);
if (value > peak) {
peak = value;
}
}
meter->peak.store(peak, std::memory_order_relaxed);
pw_stream_queue_buffer(meter->stream, buf);
}
static const struct pw_stream_events node_meter_events = []() {
struct pw_stream_events events{};
events.version = PW_VERSION_STREAM_EVENTS;
events.process = nodeMeterProcess;
return events;
}();
PipeWireController::PipeWireController(QObject *parent)
: QObject(parent)
{
m_registryListener = new spa_hook;
m_coreListener = new spa_hook;
m_meterRingData.resize(kMeterRingCapacityBytes);
spa_ringbuffer_init(&m_meterRing);
m_meterRingReady.store(true, std::memory_order_relaxed);
}
PipeWireController::~PipeWireController()
{
shutdown();
delete m_registryListener;
delete m_coreListener;
}
bool PipeWireController::initialize()
{
if (m_initialized.loadRelaxed()) {
qWarning() << "PipeWireController already initialized";
return true;
}
pw_init(nullptr, nullptr);
m_threadLoop = pw_thread_loop_new("Potato-PW", nullptr);
if (!m_threadLoop) {
qCritical() << "Failed to create PipeWire thread loop";
emit errorOccurred("Failed to create PipeWire thread loop");
return false;
}
lock();
m_context = pw_context_new(pw_thread_loop_get_loop(m_threadLoop), nullptr, 0);
if (!m_context) {
unlock();
qCritical() << "Failed to create PipeWire context";
emit errorOccurred("Failed to create PipeWire context");
return false;
}
m_core = pw_context_connect(m_context, nullptr, 0);
if (!m_core) {
unlock();
qCritical() << "Failed to connect to PipeWire daemon";
emit errorOccurred("Failed to connect to PipeWire daemon. Is PipeWire running?");
return false;
}
pw_core_add_listener(m_core, m_coreListener, &core_events, this);
m_registry = pw_core_get_registry(m_core, PW_VERSION_REGISTRY, 0);
if (!m_registry) {
unlock();
qCritical() << "Failed to get PipeWire registry";
emit errorOccurred("Failed to get PipeWire registry");
return false;
}
pw_registry_add_listener(m_registry, m_registryListener, &registry_events, this);
if (!setupMeterStream()) {
qWarning() << "Failed to set up meter stream";
}
unlock();
if (pw_thread_loop_start(m_threadLoop) < 0) {
qCritical() << "Failed to start PipeWire thread loop";
emit errorOccurred("Failed to start PipeWire thread loop");
return false;
}
m_initialized.storeRelaxed(true);
m_connected.storeRelaxed(true);
qInfo() << "PipeWire controller initialized successfully";
return true;
}
void PipeWireController::shutdown()
{
if (!m_initialized.loadRelaxed()) {
return;
}
if (m_threadLoop) {
pw_thread_loop_stop(m_threadLoop);
}
lock();
if (m_registry) {
pw_proxy_destroy(reinterpret_cast<struct pw_proxy*>(m_registry));
m_registry = nullptr;
}
teardownMeterStream();
{
QMutexLocker lock(&m_meterMutex);
for (auto it = m_nodeMeters.begin(); it != m_nodeMeters.end(); ++it) {
NodeMeter *meter = it.value();
if (meter && meter->stream) {
pw_stream_destroy(meter->stream);
}
delete meter;
}
m_nodeMeters.clear();
}
for (auto *proxy : m_virtualDevices) {
if (proxy) {
pw_proxy_destroy(proxy);
}
}
m_virtualDevices.clear();
if (m_core) {
pw_core_disconnect(m_core);
m_core = nullptr;
}
unlock();
if (m_context) {
pw_context_destroy(m_context);
m_context = nullptr;
}
if (m_threadLoop) {
pw_thread_loop_destroy(m_threadLoop);
m_threadLoop = nullptr;
}
pw_deinit();
m_meterRingReady.store(false, std::memory_order_relaxed);
m_meterRingData.clear();
m_initialized.storeRelaxed(false);
m_connected.storeRelaxed(false);
qInfo() << "PipeWire controller shut down";
}
bool PipeWireController::isConnected() const
{
return m_connected.loadRelaxed();
}
QVector<NodeInfo> PipeWireController::nodes() const
{
QMutexLocker lock(&m_nodesMutex);
return m_nodes.values().toVector();
}
NodeInfo PipeWireController::nodeById(uint32_t id) const
{
QMutexLocker lock(&m_nodesMutex);
return m_nodes.value(id);
}
QVector<LinkInfo> PipeWireController::links() const
{
QMutexLocker lock(&m_nodesMutex);
return m_links.values().toVector();
}
float PipeWireController::meterPeak() const
{
float peak = m_meterPeak.load(std::memory_order_relaxed);
if (m_meterRingReady.load(std::memory_order_relaxed)) {
float ringPeak = 0.0f;
if (readRingLatest(&m_meterRing, m_meterRingData, ringPeak)) {
peak = ringPeak;
}
}
return peak;
}
bool PipeWireController::setNodeVolume(uint32_t nodeId, float volume, bool mute)
{
if (!m_threadLoop || !m_core || !m_registry) {
return false;
}
if (nodeId == 0) {
return false;
}
volume = std::clamp(volume, 0.0f, 1.0f);
lock();
auto *node = static_cast<struct pw_node*>(
pw_registry_bind(m_registry, nodeId, PW_TYPE_INTERFACE_Node, PW_VERSION_NODE, 0));
if (!node) {
unlock();
return false;
}
uint8_t buffer[128];
spa_pod_builder builder = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer));
auto *param = reinterpret_cast<const struct spa_pod*>(spa_pod_builder_add_object(
&builder,
SPA_TYPE_OBJECT_Props, SPA_PARAM_Props,
SPA_PROP_volume, SPA_POD_Float(volume),
SPA_PROP_mute, SPA_POD_Bool(mute)));
pw_node_set_param(node, SPA_PARAM_Props, 0, param);
pw_proxy_destroy(reinterpret_cast<struct pw_proxy*>(node));
unlock();
return true;
}
bool PipeWireController::createVirtualSink(const QString &name, const QString &description, int channels, int rate)
{
return createVirtualDevice(name, description, "support.null-audio-sink", "Audio/Sink", channels, rate);
}
bool PipeWireController::createVirtualSource(const QString &name, const QString &description, int channels, int rate)
{
return createVirtualDevice(name, description, "support.null-audio-source", "Audio/Source", channels, rate);
}
float PipeWireController::nodeMeterPeak(uint32_t nodeId) const
{
QMutexLocker lock(&m_meterMutex);
if (!m_nodeMeters.contains(nodeId)) {
return 0.0f;
}
NodeMeter *meter = m_nodeMeters.value(nodeId);
if (!meter) {
return 0.0f;
}
return meter->peak.load(std::memory_order_relaxed);
}
void PipeWireController::ensureNodeMeter(uint32_t nodeId, const QString &targetName, bool captureSink)
{
if (!m_threadLoop || !m_core) {
return;
}
{
QMutexLocker lock(&m_meterMutex);
if (m_nodeMeters.contains(nodeId)) {
return;
}
}
auto *meter = new NodeMeter;
meter->nodeId = nodeId;
meter->targetName = targetName;
const QByteArray targetNameBytes = meter->targetName.toUtf8();
struct pw_properties *props = pw_properties_new(
PW_KEY_MEDIA_TYPE, "Audio",
PW_KEY_MEDIA_CATEGORY, "Capture",
PW_KEY_MEDIA_CLASS, "Stream/Input/Audio",
PW_KEY_TARGET_OBJECT, targetNameBytes.constData(),
PW_KEY_STREAM_MONITOR, "true",
nullptr);
if (captureSink) {
pw_properties_set(props, PW_KEY_STREAM_CAPTURE_SINK, "true");
}
lock();
meter->stream = pw_stream_new_simple(
pw_thread_loop_get_loop(m_threadLoop),
"Potato-Node-Meter",
props,
&node_meter_events,
meter);
if (!meter->stream) {
pw_properties_free(props);
unlock();
delete meter;
return;
}
uint8_t buffer[512];
spa_pod_builder builder = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer));
spa_audio_info_raw info{};
info.format = SPA_AUDIO_FORMAT_F32;
info.rate = 48000;
info.channels = 2;
info.position[0] = SPA_AUDIO_CHANNEL_FL;
info.position[1] = SPA_AUDIO_CHANNEL_FR;
const struct spa_pod *params[1];
params[0] = spa_format_audio_raw_build(&builder, SPA_PARAM_EnumFormat, &info);
const int res = pw_stream_connect(
meter->stream,
PW_DIRECTION_INPUT,
PW_ID_ANY,
static_cast<pw_stream_flags>(PW_STREAM_FLAG_AUTOCONNECT | PW_STREAM_FLAG_MAP_BUFFERS | PW_STREAM_FLAG_RT_PROCESS),
params,
1);
if (res != 0) {
pw_stream_destroy(meter->stream);
unlock();
delete meter;
return;
}
unlock();
QMutexLocker lock(&m_meterMutex);
m_nodeMeters.insert(nodeId, meter);
}
void PipeWireController::removeNodeMeter(uint32_t nodeId)
{
NodeMeter *meter = nullptr;
{
QMutexLocker lock(&m_meterMutex);
if (!m_nodeMeters.contains(nodeId)) {
return;
}
meter = m_nodeMeters.take(nodeId);
}
if (meter && meter->stream) {
lock();
pw_stream_destroy(meter->stream);
unlock();
}
delete meter;
}
uint32_t PipeWireController::createLink(uint32_t outputNodeId, uint32_t outputPortId,
uint32_t inputNodeId, uint32_t inputPortId)
{
Q_UNUSED(outputNodeId)
Q_UNUSED(inputNodeId)
if (!m_connected.loadRelaxed()) {
qWarning() << "Cannot create link: not connected to PipeWire";
return 0;
}
{
QMutexLocker lock(&m_nodesMutex);
for (auto it = m_links.cbegin(); it != m_links.cend(); ++it) {
const LinkInfo &link = it.value();
if (link.outputNodeId == outputNodeId &&
link.outputPortId == outputPortId &&
link.inputNodeId == inputNodeId &&
link.inputPortId == inputPortId) {
return link.id;
}
}
}
lock();
QByteArray outNode = QByteArray::number(outputNodeId);
QByteArray outPort = QByteArray::number(outputPortId);
QByteArray inNode = QByteArray::number(inputNodeId);
QByteArray inPort = QByteArray::number(inputPortId);
struct pw_properties *props = pw_properties_new(
PW_KEY_LINK_OUTPUT_NODE, outNode.constData(),
PW_KEY_LINK_OUTPUT_PORT, outPort.constData(),
PW_KEY_LINK_INPUT_NODE, inNode.constData(),
PW_KEY_LINK_INPUT_PORT, inPort.constData(),
nullptr);
struct pw_proxy *proxy = static_cast<struct pw_proxy*>(pw_core_create_object(
m_core,
"link-factory",
PW_TYPE_INTERFACE_Link,
PW_VERSION_LINK,
&props->dict,
0));
if (!proxy) {
unlock();
qWarning() << "Failed to create link proxy";
pw_properties_free(props);
return 0;
}
unlock();
pw_properties_free(props);
uint32_t createdLinkId = 0;
QElapsedTimer timer;
timer.start();
while (timer.elapsed() < 2000) {
{
QMutexLocker lock(&m_nodesMutex);
for (auto it = m_links.cbegin(); it != m_links.cend(); ++it) {
const LinkInfo &link = it.value();
if (link.outputNodeId == outputNodeId &&
link.outputPortId == outputPortId &&
link.inputNodeId == inputNodeId &&
link.inputPortId == inputPortId) {
createdLinkId = link.id;
break;
}
}
}
if (createdLinkId != 0) {
break;
}
QThread::msleep(10);
}
if (createdLinkId != 0) {
qInfo() << "Link created:" << createdLinkId;
} else {
qWarning() << "Link created but ID not found in registry";
}
return createdLinkId;
}
bool PipeWireController::destroyLink(uint32_t linkId)
{
if (!m_connected.loadRelaxed()) {
qWarning() << "Cannot destroy link: not connected to PipeWire";
return false;
}
LinkInfo linkInfo;
{
QMutexLocker lock(&m_nodesMutex);
if (!m_links.contains(linkId)) {
qWarning() << "Link not found:" << linkId;
return false;
}
linkInfo = m_links.value(linkId);
}
lock();
struct pw_proxy *proxy = static_cast<struct pw_proxy*>(
pw_registry_bind(m_registry, linkId, PW_TYPE_INTERFACE_Link, PW_VERSION_LINK, 0));
if (proxy) {
pw_proxy_destroy(proxy);
}
unlock();
qInfo() << "Link destroy requested:" << linkId;
return true;
}
QString PipeWireController::dumpGraph() const
{
QMutexLocker lock(&m_nodesMutex);
QString dump;
dump += QString("=== PipeWire Graph Dump ===\n");
dump += QString("Nodes: %1\n").arg(m_nodes.size());
dump += QString("Ports: %1\n").arg(m_ports.size());
dump += QString("Links: %1\n\n").arg(m_links.size());
dump += QString("=== Nodes ===\n");
for (const auto &node : m_nodes) {
dump += QString("Node %1: %2\n").arg(node.id).arg(node.name);
dump += QString(" Description: %1\n").arg(node.description);
dump += QString(" Stable ID: %1\n").arg(node.stableId);
dump += QString(" Input ports: %1\n").arg(node.inputPorts.size());
dump += QString(" Output ports: %1\n").arg(node.outputPorts.size());
}
dump += QString("\n=== Links ===\n");
for (const auto &link : m_links) {
dump += QString("Link %1: Node %2:%3 -> Node %4:%5\n")
.arg(link.id)
.arg(link.outputNodeId).arg(link.outputPortId)
.arg(link.inputNodeId).arg(link.inputPortId);
}
return dump;
}
void PipeWireController::handleNodeInfo(uint32_t id, const struct spa_dict *props)
{
if (!props) {
return;
}
NodeInfo node;
node.id = id;
const char *name = spa_dict_lookup(props, PW_KEY_NODE_NAME);
const char *description = spa_dict_lookup(props, PW_KEY_NODE_DESCRIPTION);
const char *mediaClass = spa_dict_lookup(props, PW_KEY_MEDIA_CLASS);
const char *appName = spa_dict_lookup(props, PW_KEY_APP_NAME);
node.name = name ? toQString(name) : QString("Unknown");
node.description = description ? toQString(description) : node.name;
node.stableId = node.name;
QString mediaClassStr = mediaClass ? toQString(mediaClass) : QString();
QString appNameStr = appName ? toQString(appName) : QString();
node.mediaClass = NodeInfo::mediaClassFromString(mediaClassStr);
node.type = NodeInfo::typeFromProperties(mediaClassStr, appNameStr);
{
QMutexLocker lock(&m_nodesMutex);
for (auto it = m_ports.cbegin(); it != m_ports.cend(); ++it) {
const PortInfo &port = it.value();
if (port.nodeId != id) {
continue;
}
if (port.direction == PW_DIRECTION_INPUT) {
node.inputPorts.append(port);
} else if (port.direction == PW_DIRECTION_OUTPUT) {
node.outputPorts.append(port);
}
}
}
{
QMutexLocker lock(&m_nodesMutex);
bool isNewNode = !m_nodes.contains(id);
m_nodes.insert(id, node);
if (isNewNode) {
emit nodeAdded(node);
qDebug() << "Node added:" << node.id << node.name;
} else {
emit nodeChanged(node);
qDebug() << "Node changed:" << node.id << node.name;
}
}
}
void PipeWireController::handlePortInfo(uint32_t id, const struct spa_dict *props)
{
if (!props) {
return;
}
const char *name = spa_dict_lookup(props, PW_KEY_PORT_NAME);
const char *directionStr = spa_dict_lookup(props, PW_KEY_PORT_DIRECTION);
const char *nodeIdStr = spa_dict_lookup(props, PW_KEY_NODE_ID);
uint32_t direction = 0;
if (directionStr) {
if (strcmp(directionStr, "in") == 0) {
direction = PW_DIRECTION_INPUT;
} else if (strcmp(directionStr, "out") == 0) {
direction = PW_DIRECTION_OUTPUT;
}
}
QString portName = name ? toQString(name)
: QString("port_") + QString::number(id);
uint32_t nodeId = nodeIdStr ? static_cast<uint32_t>(atoi(nodeIdStr)) : 0;
PortInfo port(id, nodeId, portName, direction);
bool emitChanged = false;
NodeInfo nodeSnapshot;
{
QMutexLocker lock(&m_nodesMutex);
m_ports.insert(id, port);
if (nodeId != 0 && m_nodes.contains(nodeId)) {
NodeInfo &node = m_nodes[nodeId];
auto &ports = (direction == PW_DIRECTION_INPUT) ? node.inputPorts : node.outputPorts;
for (int i = 0; i < ports.size(); ++i) {
if (ports.at(i).id == id) {
ports.removeAt(i);
break;
}
}
ports.append(port);
nodeSnapshot = node;
emitChanged = true;
}
}
if (emitChanged) {
emit nodeChanged(nodeSnapshot);
}
qDebug() << "Port added:" << id << portName << "direction:" << direction;
}
void PipeWireController::handleLinkInfo(uint32_t id, const struct spa_dict *props)
{
if (!props) {
return;
}
const char *outputNodeStr = spa_dict_lookup(props, PW_KEY_LINK_OUTPUT_NODE);
const char *outputPortStr = spa_dict_lookup(props, PW_KEY_LINK_OUTPUT_PORT);
const char *inputNodeStr = spa_dict_lookup(props, PW_KEY_LINK_INPUT_NODE);
const char *inputPortStr = spa_dict_lookup(props, PW_KEY_LINK_INPUT_PORT);
uint32_t outputNode = outputNodeStr ? static_cast<uint32_t>(atoi(outputNodeStr)) : 0;
uint32_t outputPort = outputPortStr ? static_cast<uint32_t>(atoi(outputPortStr)) : 0;
uint32_t inputNode = inputNodeStr ? static_cast<uint32_t>(atoi(inputNodeStr)) : 0;
uint32_t inputPort = inputPortStr ? static_cast<uint32_t>(atoi(inputPortStr)) : 0;
LinkInfo link(id, outputNode, outputPort, inputNode, inputPort);
{
QMutexLocker lock(&m_nodesMutex);
m_links.insert(id, link);
}
emit linkAdded(link);
qDebug() << "Link added:" << id << "from" << outputNode << ":" << outputPort
<< "to" << inputNode << ":" << inputPort;
}
void PipeWireController::lock()
{
if (m_threadLoop) {
pw_thread_loop_lock(m_threadLoop);
}
}
void PipeWireController::unlock()
{
if (m_threadLoop) {
pw_thread_loop_unlock(m_threadLoop);
}
}
bool PipeWireController::setupMeterStream()
{
if (!m_threadLoop || !m_core) {
return false;
}
struct pw_properties *props = pw_properties_new(
PW_KEY_MEDIA_TYPE, "Audio",
PW_KEY_MEDIA_CATEGORY, "Capture",
PW_KEY_MEDIA_CLASS, "Stream/Input/Audio",
PW_KEY_STREAM_CAPTURE_SINK, "true",
PW_KEY_STREAM_MONITOR, "true",
nullptr);
m_meterStream = pw_stream_new_simple(
pw_thread_loop_get_loop(m_threadLoop),
"Potato-Meter",
props,
&meter_events,
this);
if (!m_meterStream) {
pw_properties_free(props);
return false;
}
uint8_t buffer[512];
spa_pod_builder builder = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer));
spa_audio_info_raw info{};
info.format = SPA_AUDIO_FORMAT_F32;
info.rate = 48000;
info.channels = 2;
info.position[0] = SPA_AUDIO_CHANNEL_FL;
info.position[1] = SPA_AUDIO_CHANNEL_FR;
const struct spa_pod *params[1];
params[0] = spa_format_audio_raw_build(&builder, SPA_PARAM_EnumFormat, &info);
const int res = pw_stream_connect(
m_meterStream,
PW_DIRECTION_INPUT,
PW_ID_ANY,
static_cast<pw_stream_flags>(PW_STREAM_FLAG_AUTOCONNECT | PW_STREAM_FLAG_MAP_BUFFERS | PW_STREAM_FLAG_RT_PROCESS),
params,
1);
return res == 0;
}
void PipeWireController::teardownMeterStream()
{
if (!m_meterStream) {
return;
}
pw_stream_destroy(m_meterStream);
m_meterStream = nullptr;
}
} // namespace Potato