#include #include #include #include #include #include #include #include #include #include #include namespace { struct Options { std::string mode = "create-destroy"; std::string type = "sink"; std::string target; uint32_t count = 200; uint32_t events = 100; uint32_t links = 200; uint32_t batch = 0; uint32_t rate = 48000; uint32_t channels = 2; }; bool ParseUInt(const char* value, uint32_t* out) { if (!value || !out) { return false; } char* end = nullptr; unsigned long parsed = std::strtoul(value, &end, 10); if (!end || end == value) { return false; } *out = static_cast(parsed); return true; } void PrintUsage() { std::cout << "warppipe_perf usage:\n" << " --mode create-destroy|registry|links|policy|e2e\n" << " --type sink|source|both\n" << " --count N (default 200, per-type when --type both)\n" << " --events N (registry mode, default 100)\n" << " --links N (links mode, default 200)\n" << " --batch N (links mode, batch size)\n" << " --rate N (default 48000)\n" << " --channels N (default 2)\n" << " --target (loopback target, optional)\n"; } bool ParseArgs(int argc, char* argv[], Options* options) { for (int i = 1; i < argc; ++i) { std::string arg = argv[i]; if (arg == "--mode" && i + 1 < argc) { options->mode = argv[++i]; } else if (arg == "--type" && i + 1 < argc) { options->type = argv[++i]; } else if (arg == "--count" && i + 1 < argc) { if (!ParseUInt(argv[++i], &options->count)) { return false; } } else if (arg == "--events" && i + 1 < argc) { if (!ParseUInt(argv[++i], &options->events)) { return false; } } else if (arg == "--links" && i + 1 < argc) { if (!ParseUInt(argv[++i], &options->links)) { return false; } } else if (arg == "--batch" && i + 1 < argc) { if (!ParseUInt(argv[++i], &options->batch)) { return false; } } else if (arg == "--rate" && i + 1 < argc) { if (!ParseUInt(argv[++i], &options->rate)) { return false; } } else if (arg == "--channels" && i + 1 < argc) { if (!ParseUInt(argv[++i], &options->channels)) { return false; } } else if (arg == "--target" && i + 1 < argc) { options->target = argv[++i]; } else if (arg == "--help" || arg == "-h") { return false; } else { return false; } } if (options->type != "sink" && options->type != "source" && options->type != "both") { return false; } if (options->mode != "create-destroy" && options->mode != "registry" && options->mode != "links" && options->mode != "policy" && options->mode != "e2e") { return false; } return true; } std::string Prefix() { return "warppipe-perf-" + std::to_string(static_cast(getpid())); } double ToMillis(std::chrono::steady_clock::duration duration) { return std::chrono::duration_cast>(duration).count(); } std::optional FindPort(warppipe::Client* client, warppipe::NodeId node, bool want_input) { for (int attempt = 0; attempt < 50; ++attempt) { auto ports = client->ListPorts(node); if (ports.ok()) { for (const auto& port : ports.value) { if (port.is_input == want_input) { return port.id; } } } usleep(5000); } return std::nullopt; } } // namespace int main(int argc, char* argv[]) { Options options; if (!ParseArgs(argc, argv, &options)) { PrintUsage(); return 2; } warppipe::ConnectionOptions connection; connection.application_name = "warppipe-perf"; auto client = warppipe::Client::Create(connection); if (!client.ok()) { std::cerr << "warppipe_perf: failed to connect: " << client.status.message << "\n"; return 1; } warppipe::VirtualNodeOptions node_options; node_options.format.rate = options.rate; node_options.format.channels = options.channels; node_options.display_name = "warppipe-perf"; node_options.group = "warppipe-perf"; if (!options.target.empty()) { node_options.behavior = warppipe::VirtualBehavior::kLoopback; node_options.target_node = options.target; } const bool is_source = options.type == "source"; const bool is_both = options.type == "both"; const std::string prefix = Prefix(); if (options.mode == "create-destroy") { std::vector sink_nodes; std::vector source_nodes; sink_nodes.reserve(options.count); if (is_both) { source_nodes.reserve(options.count); } auto start = std::chrono::steady_clock::now(); if (!is_source || is_both) { for (uint32_t i = 0; i < options.count; ++i) { std::string name = prefix + "-sink-" + std::to_string(i); warppipe::Result sink_result = client.value->CreateVirtualSink(name, node_options); if (!sink_result.ok()) { std::cerr << "create failed at " << i << ": " << sink_result.status.message << "\n"; break; } sink_nodes.push_back(sink_result.value.node); } } if (is_source || is_both) { for (uint32_t i = 0; i < options.count; ++i) { std::string name = prefix + "-source-" + std::to_string(i); warppipe::Result source_result = client.value->CreateVirtualSource(name, node_options); if (!source_result.ok()) { std::cerr << "create failed at " << i << ": " << source_result.status.message << "\n"; break; } source_nodes.push_back(source_result.value.node); } } auto created = std::chrono::steady_clock::now(); for (const auto& node : sink_nodes) { client.value->RemoveNode(node); } for (const auto& node : source_nodes) { client.value->RemoveNode(node); } auto destroyed = std::chrono::steady_clock::now(); const double create_ms = ToMillis(created - start); const double destroy_ms = ToMillis(destroyed - created); const double total_ms = ToMillis(destroyed - start); const double ops = static_cast(sink_nodes.size() + source_nodes.size()); std::cout << "create_count=" << static_cast(ops) << "\n" << "create_ms=" << std::fixed << std::setprecision(2) << create_ms << "\n" << "destroy_ms=" << destroy_ms << "\n" << "total_ms=" << total_ms << "\n"; if (total_ms > 0.0) { std::cout << "ops_per_sec=" << (ops / (total_ms / 1000.0)) << "\n"; } return 0; } if (options.mode == "registry") { std::vector nodes; nodes.reserve(options.count); for (uint32_t i = 0; i < options.count; ++i) { std::string name = prefix + "-node-" + std::to_string(i); auto result = client.value->CreateVirtualSink(name, node_options); if (!result.ok()) { std::cerr << "create failed at " << i << ": " << result.status.message << "\n"; break; } nodes.push_back(result.value.node); } auto list_start = std::chrono::steady_clock::now(); auto listed = client.value->ListNodes(); auto list_end = std::chrono::steady_clock::now(); if (!listed.ok()) { std::cerr << "ListNodes failed: " << listed.status.message << "\n"; } auto events_start = std::chrono::steady_clock::now(); for (uint32_t i = 0; i < options.events; ++i) { std::string name = prefix + "-event-" + std::to_string(i); auto result = client.value->CreateVirtualSink(name, node_options); if (!result.ok()) { break; } client.value->RemoveNode(result.value.node); } auto events_end = std::chrono::steady_clock::now(); for (const auto& node : nodes) { client.value->RemoveNode(node); } const double list_ms = ToMillis(list_end - list_start); const double events_ms = ToMillis(events_end - events_start); std::cout << "registry_nodes=" << nodes.size() << "\n" << "list_ms=" << std::fixed << std::setprecision(2) << list_ms << "\n" << "event_ops=" << options.events << "\n" << "event_ms=" << events_ms << "\n"; return 0; } if (options.mode == "links") { const uint32_t pair_count = options.links; const uint32_t batch = options.batch == 0 ? pair_count : options.batch; double total_create_ms = 0.0; double total_remove_ms = 0.0; size_t total_links = 0; for (uint32_t start_index = 0; start_index < pair_count; start_index += batch) { const uint32_t batch_count = std::min(batch, pair_count - start_index); std::vector sinks; std::vector sources; sinks.reserve(batch_count); sources.reserve(batch_count); for (uint32_t i = 0; i < batch_count; ++i) { uint32_t index = start_index + i; std::string sink_name = prefix + "-sink-" + std::to_string(index); auto sink = client.value->CreateVirtualSink(sink_name, node_options); if (!sink.ok()) { std::cerr << "create sink failed at " << index << ": " << sink.status.message << "\n"; break; } sinks.push_back(sink.value.node); std::string source_name = prefix + "-source-" + std::to_string(index); auto source = client.value->CreateVirtualSource(source_name, node_options); if (!source.ok()) { std::cerr << "create source failed at " << index << ": " << source.status.message << "\n"; break; } sources.push_back(source.value.node); } const size_t pair_limit = std::min(sinks.size(), sources.size()); std::vector links; links.reserve(pair_limit); std::vector out_ports; std::vector in_ports; out_ports.reserve(pair_limit); in_ports.reserve(pair_limit); for (size_t i = 0; i < pair_limit; ++i) { auto out_port = FindPort(client.value.get(), sources[i], false); auto in_port = FindPort(client.value.get(), sinks[i], true); if (!out_port || !in_port) { std::cerr << "port lookup failed at " << (start_index + i) << "\n"; break; } out_ports.push_back(*out_port); in_ports.push_back(*in_port); } auto create_start = std::chrono::steady_clock::now(); for (size_t i = 0; i < out_ports.size() && i < in_ports.size(); ++i) { auto link = client.value->CreateLink(out_ports[i], in_ports[i], warppipe::LinkOptions{}); if (!link.ok()) { std::cerr << "link failed at " << (start_index + i) << ": " << link.status.message << "\n"; break; } links.push_back(link.value.id); } auto create_end = std::chrono::steady_clock::now(); for (const auto& link_id : links) { client.value->RemoveLink(link_id); } auto remove_end = std::chrono::steady_clock::now(); for (const auto& node : sources) { client.value->RemoveNode(node); } for (const auto& node : sinks) { client.value->RemoveNode(node); } total_links += links.size(); total_create_ms += ToMillis(create_end - create_start); total_remove_ms += ToMillis(remove_end - create_end); } const double total_ms = total_create_ms + total_remove_ms; std::cout << "link_count=" << total_links << "\n" << "link_create_ms=" << std::fixed << std::setprecision(2) << total_create_ms << "\n" << "link_remove_ms=" << total_remove_ms << "\n" << "link_total_ms=" << total_ms << "\n" << "link_batch=" << batch << "\n"; return 0; } if (options.mode == "policy") { auto sink = client.value->CreateVirtualSink(prefix + "-target-sink", node_options); if (!sink.ok()) { std::cerr << "create target sink failed: " << sink.status.message << "\n"; return 1; } warppipe::RouteRule rule; rule.match.application_name = "warppipe-perf"; rule.target_node = prefix + "-target-sink"; auto rule_result = client.value->AddRouteRule(rule); if (!rule_result.ok()) { std::cerr << "add rule failed: " << rule_result.status.message << "\n"; return 1; } std::vector sources; sources.reserve(options.count); auto start = std::chrono::steady_clock::now(); for (uint32_t i = 0; i < options.count; ++i) { std::string name = prefix + "-ephemeral-" + std::to_string(i); auto source = client.value->CreateVirtualSource(name, node_options); if (!source.ok()) { std::cerr << "create source failed at " << i << ": " << source.status.message << "\n"; break; } sources.push_back(source.value.node); } auto created = std::chrono::steady_clock::now(); for (const auto& node : sources) { client.value->RemoveNode(node); } auto destroyed = std::chrono::steady_clock::now(); client.value->RemoveRouteRule(rule_result.value); client.value->RemoveNode(sink.value.node); const double create_ms = ToMillis(created - start); const double destroy_ms = ToMillis(destroyed - created); const double total_ms = ToMillis(destroyed - start); const double ops = static_cast(sources.size()); std::cout << "policy_sources=" << sources.size() << "\n" << "policy_create_ms=" << std::fixed << std::setprecision(2) << create_ms << "\n" << "policy_destroy_ms=" << destroy_ms << "\n" << "policy_total_ms=" << total_ms << "\n"; if (total_ms > 0.0) { std::cout << "policy_ops_per_sec=" << (ops / (total_ms / 1000.0)) << "\n"; } return 0; } if (options.mode == "e2e") { std::vector latencies; latencies.reserve(options.count); for (uint32_t i = 0; i < options.count; ++i) { auto iter_start = std::chrono::steady_clock::now(); std::string sink_name = prefix + "-e2e-sink-" + std::to_string(i); auto sink = client.value->CreateVirtualSink(sink_name, node_options); if (!sink.ok()) { std::cerr << "e2e: create sink failed at " << i << ": " << sink.status.message << "\n"; break; } warppipe::RouteRule rule; rule.match.application_name = "warppipe-perf"; rule.target_node = sink_name; auto rule_result = client.value->AddRouteRule(rule); if (!rule_result.ok()) { client.value->RemoveNode(sink.value.node); std::cerr << "e2e: add rule failed at " << i << ": " << rule_result.status.message << "\n"; break; } std::string source_name = prefix + "-e2e-src-" + std::to_string(i); auto source = client.value->CreateVirtualSource(source_name, node_options); if (!source.ok()) { client.value->RemoveRouteRule(rule_result.value); client.value->RemoveNode(sink.value.node); std::cerr << "e2e: create source failed at " << i << ": " << source.status.message << "\n"; break; } client.value->RemoveNode(source.value.node); client.value->RemoveRouteRule(rule_result.value); client.value->RemoveNode(sink.value.node); auto iter_end = std::chrono::steady_clock::now(); latencies.push_back(ToMillis(iter_end - iter_start)); } if (latencies.empty()) { std::cerr << "e2e: no iterations completed\n"; return 1; } std::sort(latencies.begin(), latencies.end()); size_t n = latencies.size(); size_t p50 = n / 2; size_t p95 = static_cast(static_cast(n) * 0.95); if (p95 >= n) { p95 = n - 1; } double total = 0.0; for (double v : latencies) { total += v; } std::cout << "e2e_iterations=" << n << "\n" << "e2e_total_ms=" << std::fixed << std::setprecision(2) << total << "\n" << "e2e_median_ms=" << latencies[p50] << "\n" << "e2e_p95_ms=" << latencies[p95] << "\n" << "e2e_min_ms=" << latencies[0] << "\n" << "e2e_max_ms=" << latencies[n - 1] << "\n"; return 0; } PrintUsage(); return 2; }