diff --git a/wish/cpp/benchmark/high_qps_client.cc b/wish/cpp/benchmark/high_qps_client.cc index 0cabb7c..a54bbbb 100644 --- a/wish/cpp/benchmark/high_qps_client.cc +++ b/wish/cpp/benchmark/high_qps_client.cc @@ -169,13 +169,15 @@ bool InitConnection(ClientState* client) { } const std::string host = absl::GetFlag(FLAGS_host); - const std::string port_str = std::to_string(absl::GetFlag(FLAGS_port)); + const std::string port = std::to_string(absl::GetFlag(FLAGS_port)); + + LOG(INFO) << "Target address: " << host << ":" << port; struct addrinfo hints{}; hints.ai_family = AF_UNSPEC; hints.ai_socktype = SOCK_STREAM; struct addrinfo* res = nullptr; - const int gai_err = getaddrinfo(host.c_str(), port_str.c_str(), &hints, &res); + const int gai_err = getaddrinfo(host.c_str(), port.c_str(), &hints, &res); if (gai_err != 0) { LOG(ERROR) << "getaddrinfo failed for " << host << ": " << gai_strerror(gai_err); @@ -247,6 +249,8 @@ void CleanupConnection(ClientState* client) { // Each worker thread owns an independent libevent connection and issues // requests sequentially, gated by the shared TokenDispenser. void WorkerLoop(SharedState* ss) { + LOG(INFO) << "Worker thread started"; + ClientState client; if (!InitConnection(&client)) { LOG(ERROR) << "WorkerLoop: failed to connect, worker exiting"; @@ -302,14 +306,15 @@ double MeasureQps(std::atomic& completed, int window_ms) { // TokenDispenser gates all workers, calibrated during warmup. // With --poisson, inter-arrival times follow an exponential distribution // (Poisson process), matching typical production traffic patterns. -static void BM_WiSHClientHighQPS(benchmark::State& state) { - LOG(INFO) << "Starting high-QPS benchmark: payload_size=" << state.range(0) - << " bytes, target_qps=" << state.range(1) - << ", poisson=" << absl::GetFlag(FLAGS_poisson); - +static void BM_PlainText_HighQPS(benchmark::State& state) { const int payload_size = static_cast(state.range(0)); const double target_qps = static_cast(state.range(1)); + LOG(INFO) << "Starting benchmark"; + LOG(INFO) << " Payload size: " << payload_size << " bytes"; + LOG(INFO) << " Target QPS: " << target_qps; + LOG(INFO) << " Poisson arrivals: " << (absl::GetFlag(FLAGS_poisson) ? "enabled" : "disabled"); + // Allocate enough workers so no single worker is the bottleneck. // 500 QPS/worker is conservative; it provides headroom for RTTs up to 2 ms. const int num_workers = @@ -329,14 +334,18 @@ static void BM_WiSHClientHighQPS(benchmark::State& state) { } // ── Warmup ─────────────────────────────────────────────────────────────── + constexpr int kWindowMs = 2000; constexpr int kMaxWindows = 100; + constexpr double kStableBand = 0.05; constexpr int kStableNeeded = 3; int near_target_count = 0; + constexpr int kPlateauLimit = 10; constexpr double kPlateauBand = 0.01; int plateau_count = 0; + double prev_qps = 0.0; for (int w = 0; w < kMaxWindows && @@ -423,27 +432,43 @@ static void BM_WiSHClientHighQPS(benchmark::State& state) { for (auto& t : workers) t.join(); // ── Report counters ─────────────────────────────────────────────────────── - if (!recorded.empty()) { - std::sort(recorded.begin(), recorded.end()); - state.counters["p10_us"] = PercentileFromSorted(recorded, 0.10); - state.counters["p50_us"] = PercentileFromSorted(recorded, 0.50); - state.counters["p90_us"] = PercentileFromSorted(recorded, 0.90); - state.counters["p99_us"] = PercentileFromSorted(recorded, 0.99); - state.counters["target_qps"] = target_qps; - state.counters["actual_qps"] = - static_cast(recorded.size()) / wall_seconds; - state.counters["num_workers"] = static_cast(num_workers); - state.SetItemsProcessed(static_cast(recorded.size())); - } + + std::sort(recorded.begin(), recorded.end()); + + LOG(INFO) << "Result:"; + LOG(INFO) << " # of workers: " << num_workers; + LOG(INFO) << " Measured requests: " << recorded.size(); + LOG(INFO) << " Measurement duration: " << wall_seconds << " s"; + LOG(INFO) << " Actual QPS: " << static_cast(recorded.size()) / wall_seconds; + LOG(INFO) << " p10 latency: " << PercentileFromSorted(recorded, 0.10) << " us"; + LOG(INFO) << " p50 latency: " << PercentileFromSorted(recorded, 0.50) << " us"; + LOG(INFO) << " p90 latency: " << PercentileFromSorted(recorded, 0.90) << " us"; + LOG(INFO) << " p99 latency: " << PercentileFromSorted(recorded, 0.99) << " us"; } // Iterations are scaled per QPS level to target ~5 s of measurement. // Separate registrations are required because Iterations() is a global setting // per registration and cannot vary per-args in a single ArgsProduct call. -BENCHMARK(BM_WiSHClientHighQPS)->UseManualTime()->Unit(benchmark::kMicrosecond)->Args({1 << 10, 100})->Iterations(500); -BENCHMARK(BM_WiSHClientHighQPS)->UseManualTime()->Unit(benchmark::kMicrosecond)->Args({1 << 10, 1'000})->Iterations(5000); -BENCHMARK(BM_WiSHClientHighQPS)->UseManualTime()->Unit(benchmark::kMicrosecond)->Args({1 << 10, 1'800})->Iterations(8000); -BENCHMARK(BM_WiSHClientHighQPS)->UseManualTime()->Unit(benchmark::kMicrosecond)->Args({1 << 10, 10'000})->Iterations(50000); +BENCHMARK(BM_PlainText_HighQPS) + ->UseManualTime() + ->Unit(benchmark::kMicrosecond) + ->Args({1 << 10, 100}) + ->Iterations(100 * 10); +BENCHMARK(BM_PlainText_HighQPS) + ->UseManualTime() + ->Unit(benchmark::kMicrosecond) + ->Args({1 << 10, 1'000}) + ->Iterations(1'000 * 10); +BENCHMARK(BM_PlainText_HighQPS) + ->UseManualTime() + ->Unit(benchmark::kMicrosecond) + ->Args({1 << 10, 1'800}) + ->Iterations(800 * 10); +BENCHMARK(BM_PlainText_HighQPS) + ->UseManualTime() + ->Unit(benchmark::kMicrosecond) + ->Args({1 << 10, 10'000}) + ->Iterations(10'000 * 10); } // namespace