warp-pipe/perf/warppipe_perf.cpp
2026-01-29 21:13:18 -07:00

479 lines
16 KiB
C++

#include <chrono>
#include <algorithm>
#include <cstdint>
#include <cstdlib>
#include <iomanip>
#include <iostream>
#include <optional>
#include <string>
#include <vector>
#include <unistd.h>
#include <warppipe/warppipe.hpp>
namespace {
struct Options {
std::string mode = "create-destroy";
std::string type = "sink";
std::string target;
uint32_t count = 200;
uint32_t events = 100;
uint32_t links = 200;
uint32_t batch = 0;
uint32_t rate = 48000;
uint32_t channels = 2;
};
bool ParseUInt(const char* value, uint32_t* out) {
if (!value || !out) {
return false;
}
char* end = nullptr;
unsigned long parsed = std::strtoul(value, &end, 10);
if (!end || end == value) {
return false;
}
*out = static_cast<uint32_t>(parsed);
return true;
}
void PrintUsage() {
std::cout << "warppipe_perf usage:\n"
<< " --mode create-destroy|registry|links|policy|e2e\n"
<< " --type sink|source|both\n"
<< " --count N (default 200, per-type when --type both)\n"
<< " --events N (registry mode, default 100)\n"
<< " --links N (links mode, default 200)\n"
<< " --batch N (links mode, batch size)\n"
<< " --rate N (default 48000)\n"
<< " --channels N (default 2)\n"
<< " --target <node-name> (loopback target, optional)\n";
}
bool ParseArgs(int argc, char* argv[], Options* options) {
for (int i = 1; i < argc; ++i) {
std::string arg = argv[i];
if (arg == "--mode" && i + 1 < argc) {
options->mode = argv[++i];
} else if (arg == "--type" && i + 1 < argc) {
options->type = argv[++i];
} else if (arg == "--count" && i + 1 < argc) {
if (!ParseUInt(argv[++i], &options->count)) {
return false;
}
} else if (arg == "--events" && i + 1 < argc) {
if (!ParseUInt(argv[++i], &options->events)) {
return false;
}
} else if (arg == "--links" && i + 1 < argc) {
if (!ParseUInt(argv[++i], &options->links)) {
return false;
}
} else if (arg == "--batch" && i + 1 < argc) {
if (!ParseUInt(argv[++i], &options->batch)) {
return false;
}
} else if (arg == "--rate" && i + 1 < argc) {
if (!ParseUInt(argv[++i], &options->rate)) {
return false;
}
} else if (arg == "--channels" && i + 1 < argc) {
if (!ParseUInt(argv[++i], &options->channels)) {
return false;
}
} else if (arg == "--target" && i + 1 < argc) {
options->target = argv[++i];
} else if (arg == "--help" || arg == "-h") {
return false;
} else {
return false;
}
}
if (options->type != "sink" && options->type != "source" && options->type != "both") {
return false;
}
if (options->mode != "create-destroy" && options->mode != "registry" &&
options->mode != "links" && options->mode != "policy" &&
options->mode != "e2e") {
return false;
}
return true;
}
std::string Prefix() {
return "warppipe-perf-" + std::to_string(static_cast<long>(getpid()));
}
double ToMillis(std::chrono::steady_clock::duration duration) {
return std::chrono::duration_cast<std::chrono::duration<double, std::milli>>(duration).count();
}
std::optional<warppipe::PortId> FindPort(warppipe::Client* client,
warppipe::NodeId node,
bool want_input) {
for (int attempt = 0; attempt < 50; ++attempt) {
auto ports = client->ListPorts(node);
if (ports.ok()) {
for (const auto& port : ports.value) {
if (port.is_input == want_input) {
return port.id;
}
}
}
usleep(5000);
}
return std::nullopt;
}
} // namespace
int main(int argc, char* argv[]) {
Options options;
if (!ParseArgs(argc, argv, &options)) {
PrintUsage();
return 2;
}
warppipe::ConnectionOptions connection;
connection.application_name = "warppipe-perf";
auto client = warppipe::Client::Create(connection);
if (!client.ok()) {
std::cerr << "warppipe_perf: failed to connect: " << client.status.message << "\n";
return 1;
}
warppipe::VirtualNodeOptions node_options;
node_options.format.rate = options.rate;
node_options.format.channels = options.channels;
node_options.display_name = "warppipe-perf";
node_options.group = "warppipe-perf";
if (!options.target.empty()) {
node_options.behavior = warppipe::VirtualBehavior::kLoopback;
node_options.target_node = options.target;
}
const bool is_source = options.type == "source";
const bool is_both = options.type == "both";
const std::string prefix = Prefix();
if (options.mode == "create-destroy") {
std::vector<warppipe::NodeId> sink_nodes;
std::vector<warppipe::NodeId> source_nodes;
sink_nodes.reserve(options.count);
if (is_both) {
source_nodes.reserve(options.count);
}
auto start = std::chrono::steady_clock::now();
if (!is_source || is_both) {
for (uint32_t i = 0; i < options.count; ++i) {
std::string name = prefix + "-sink-" + std::to_string(i);
warppipe::Result<warppipe::VirtualSink> sink_result =
client.value->CreateVirtualSink(name, node_options);
if (!sink_result.ok()) {
std::cerr << "create failed at " << i << ": " << sink_result.status.message << "\n";
break;
}
sink_nodes.push_back(sink_result.value.node);
}
}
if (is_source || is_both) {
for (uint32_t i = 0; i < options.count; ++i) {
std::string name = prefix + "-source-" + std::to_string(i);
warppipe::Result<warppipe::VirtualSource> source_result =
client.value->CreateVirtualSource(name, node_options);
if (!source_result.ok()) {
std::cerr << "create failed at " << i << ": " << source_result.status.message << "\n";
break;
}
source_nodes.push_back(source_result.value.node);
}
}
auto created = std::chrono::steady_clock::now();
for (const auto& node : sink_nodes) {
client.value->RemoveNode(node);
}
for (const auto& node : source_nodes) {
client.value->RemoveNode(node);
}
auto destroyed = std::chrono::steady_clock::now();
const double create_ms = ToMillis(created - start);
const double destroy_ms = ToMillis(destroyed - created);
const double total_ms = ToMillis(destroyed - start);
const double ops = static_cast<double>(sink_nodes.size() + source_nodes.size());
std::cout << "create_count=" << static_cast<size_t>(ops) << "\n"
<< "create_ms=" << std::fixed << std::setprecision(2) << create_ms << "\n"
<< "destroy_ms=" << destroy_ms << "\n"
<< "total_ms=" << total_ms << "\n";
if (total_ms > 0.0) {
std::cout << "ops_per_sec=" << (ops / (total_ms / 1000.0)) << "\n";
}
return 0;
}
if (options.mode == "registry") {
std::vector<warppipe::NodeId> nodes;
nodes.reserve(options.count);
for (uint32_t i = 0; i < options.count; ++i) {
std::string name = prefix + "-node-" + std::to_string(i);
auto result = client.value->CreateVirtualSink(name, node_options);
if (!result.ok()) {
std::cerr << "create failed at " << i << ": " << result.status.message << "\n";
break;
}
nodes.push_back(result.value.node);
}
auto list_start = std::chrono::steady_clock::now();
auto listed = client.value->ListNodes();
auto list_end = std::chrono::steady_clock::now();
if (!listed.ok()) {
std::cerr << "ListNodes failed: " << listed.status.message << "\n";
}
auto events_start = std::chrono::steady_clock::now();
for (uint32_t i = 0; i < options.events; ++i) {
std::string name = prefix + "-event-" + std::to_string(i);
auto result = client.value->CreateVirtualSink(name, node_options);
if (!result.ok()) {
break;
}
client.value->RemoveNode(result.value.node);
}
auto events_end = std::chrono::steady_clock::now();
for (const auto& node : nodes) {
client.value->RemoveNode(node);
}
const double list_ms = ToMillis(list_end - list_start);
const double events_ms = ToMillis(events_end - events_start);
std::cout << "registry_nodes=" << nodes.size() << "\n"
<< "list_ms=" << std::fixed << std::setprecision(2) << list_ms << "\n"
<< "event_ops=" << options.events << "\n"
<< "event_ms=" << events_ms << "\n";
return 0;
}
if (options.mode == "links") {
const uint32_t pair_count = options.links;
const uint32_t batch = options.batch == 0 ? pair_count : options.batch;
double total_create_ms = 0.0;
double total_remove_ms = 0.0;
size_t total_links = 0;
for (uint32_t start_index = 0; start_index < pair_count; start_index += batch) {
const uint32_t batch_count = std::min(batch, pair_count - start_index);
std::vector<warppipe::NodeId> sinks;
std::vector<warppipe::NodeId> sources;
sinks.reserve(batch_count);
sources.reserve(batch_count);
for (uint32_t i = 0; i < batch_count; ++i) {
uint32_t index = start_index + i;
std::string sink_name = prefix + "-sink-" + std::to_string(index);
auto sink = client.value->CreateVirtualSink(sink_name, node_options);
if (!sink.ok()) {
std::cerr << "create sink failed at " << index << ": " << sink.status.message << "\n";
break;
}
sinks.push_back(sink.value.node);
std::string source_name = prefix + "-source-" + std::to_string(index);
auto source = client.value->CreateVirtualSource(source_name, node_options);
if (!source.ok()) {
std::cerr << "create source failed at " << index << ": " << source.status.message << "\n";
break;
}
sources.push_back(source.value.node);
}
const size_t pair_limit = std::min(sinks.size(), sources.size());
std::vector<warppipe::LinkId> links;
links.reserve(pair_limit);
std::vector<warppipe::PortId> out_ports;
std::vector<warppipe::PortId> in_ports;
out_ports.reserve(pair_limit);
in_ports.reserve(pair_limit);
for (size_t i = 0; i < pair_limit; ++i) {
auto out_port = FindPort(client.value.get(), sources[i], false);
auto in_port = FindPort(client.value.get(), sinks[i], true);
if (!out_port || !in_port) {
std::cerr << "port lookup failed at " << (start_index + i) << "\n";
break;
}
out_ports.push_back(*out_port);
in_ports.push_back(*in_port);
}
auto create_start = std::chrono::steady_clock::now();
for (size_t i = 0; i < out_ports.size() && i < in_ports.size(); ++i) {
auto link = client.value->CreateLink(out_ports[i], in_ports[i], warppipe::LinkOptions{});
if (!link.ok()) {
std::cerr << "link failed at " << (start_index + i) << ": " << link.status.message << "\n";
break;
}
links.push_back(link.value.id);
}
auto create_end = std::chrono::steady_clock::now();
for (const auto& link_id : links) {
client.value->RemoveLink(link_id);
}
auto remove_end = std::chrono::steady_clock::now();
for (const auto& node : sources) {
client.value->RemoveNode(node);
}
for (const auto& node : sinks) {
client.value->RemoveNode(node);
}
total_links += links.size();
total_create_ms += ToMillis(create_end - create_start);
total_remove_ms += ToMillis(remove_end - create_end);
}
const double total_ms = total_create_ms + total_remove_ms;
std::cout << "link_count=" << total_links << "\n"
<< "link_create_ms=" << std::fixed << std::setprecision(2) << total_create_ms << "\n"
<< "link_remove_ms=" << total_remove_ms << "\n"
<< "link_total_ms=" << total_ms << "\n"
<< "link_batch=" << batch << "\n";
return 0;
}
if (options.mode == "policy") {
auto sink = client.value->CreateVirtualSink(prefix + "-target-sink", node_options);
if (!sink.ok()) {
std::cerr << "create target sink failed: " << sink.status.message << "\n";
return 1;
}
warppipe::RouteRule rule;
rule.match.application_name = "warppipe-perf";
rule.target_node = prefix + "-target-sink";
auto rule_result = client.value->AddRouteRule(rule);
if (!rule_result.ok()) {
std::cerr << "add rule failed: " << rule_result.status.message << "\n";
return 1;
}
std::vector<warppipe::NodeId> sources;
sources.reserve(options.count);
auto start = std::chrono::steady_clock::now();
for (uint32_t i = 0; i < options.count; ++i) {
std::string name = prefix + "-ephemeral-" + std::to_string(i);
auto source = client.value->CreateVirtualSource(name, node_options);
if (!source.ok()) {
std::cerr << "create source failed at " << i << ": "
<< source.status.message << "\n";
break;
}
sources.push_back(source.value.node);
}
auto created = std::chrono::steady_clock::now();
for (const auto& node : sources) {
client.value->RemoveNode(node);
}
auto destroyed = std::chrono::steady_clock::now();
client.value->RemoveRouteRule(rule_result.value);
client.value->RemoveNode(sink.value.node);
const double create_ms = ToMillis(created - start);
const double destroy_ms = ToMillis(destroyed - created);
const double total_ms = ToMillis(destroyed - start);
const double ops = static_cast<double>(sources.size());
std::cout << "policy_sources=" << sources.size() << "\n"
<< "policy_create_ms=" << std::fixed << std::setprecision(2) << create_ms << "\n"
<< "policy_destroy_ms=" << destroy_ms << "\n"
<< "policy_total_ms=" << total_ms << "\n";
if (total_ms > 0.0) {
std::cout << "policy_ops_per_sec=" << (ops / (total_ms / 1000.0)) << "\n";
}
return 0;
}
if (options.mode == "e2e") {
std::vector<double> latencies;
latencies.reserve(options.count);
for (uint32_t i = 0; i < options.count; ++i) {
auto iter_start = std::chrono::steady_clock::now();
std::string sink_name = prefix + "-e2e-sink-" + std::to_string(i);
auto sink = client.value->CreateVirtualSink(sink_name, node_options);
if (!sink.ok()) {
std::cerr << "e2e: create sink failed at " << i << ": "
<< sink.status.message << "\n";
break;
}
warppipe::RouteRule rule;
rule.match.application_name = "warppipe-perf";
rule.target_node = sink_name;
auto rule_result = client.value->AddRouteRule(rule);
if (!rule_result.ok()) {
client.value->RemoveNode(sink.value.node);
std::cerr << "e2e: add rule failed at " << i << ": "
<< rule_result.status.message << "\n";
break;
}
std::string source_name = prefix + "-e2e-src-" + std::to_string(i);
auto source = client.value->CreateVirtualSource(source_name, node_options);
if (!source.ok()) {
client.value->RemoveRouteRule(rule_result.value);
client.value->RemoveNode(sink.value.node);
std::cerr << "e2e: create source failed at " << i << ": "
<< source.status.message << "\n";
break;
}
client.value->RemoveNode(source.value.node);
client.value->RemoveRouteRule(rule_result.value);
client.value->RemoveNode(sink.value.node);
auto iter_end = std::chrono::steady_clock::now();
latencies.push_back(ToMillis(iter_end - iter_start));
}
if (latencies.empty()) {
std::cerr << "e2e: no iterations completed\n";
return 1;
}
std::sort(latencies.begin(), latencies.end());
size_t n = latencies.size();
size_t p50 = n / 2;
size_t p95 = static_cast<size_t>(static_cast<double>(n) * 0.95);
if (p95 >= n) {
p95 = n - 1;
}
double total = 0.0;
for (double v : latencies) {
total += v;
}
std::cout << "e2e_iterations=" << n << "\n"
<< "e2e_total_ms=" << std::fixed << std::setprecision(2) << total << "\n"
<< "e2e_median_ms=" << latencies[p50] << "\n"
<< "e2e_p95_ms=" << latencies[p95] << "\n"
<< "e2e_min_ms=" << latencies[0] << "\n"
<< "e2e_max_ms=" << latencies[n - 1] << "\n";
return 0;
}
PrintUsage();
return 2;
}