From 639389c0505a48b438305066c4a262e74b99f8b9 Mon Sep 17 00:00:00 2001 From: Takeshi Yoshino <4511440+tyoshino@users.noreply.github.com> Date: Wed, 22 Apr 2026 02:36:08 +0000 Subject: [PATCH] feat(wish/cpp): make the high QPS client able to run multiple workers, add an option to use the poisson distribution for inter-arrival time --- wish/cpp/benchmark/high_qps_client.cc | 342 +++++++++++++++++++------- 1 file changed, 247 insertions(+), 95 deletions(-) diff --git a/wish/cpp/benchmark/high_qps_client.cc b/wish/cpp/benchmark/high_qps_client.cc index 6624223..c24e95c 100644 --- a/wish/cpp/benchmark/high_qps_client.cc +++ b/wish/cpp/benchmark/high_qps_client.cc @@ -6,9 +6,14 @@ #include #include +#include #include #include +#include #include +#include +#include +#include #include #include #include @@ -22,6 +27,9 @@ ABSL_FLAG(std::string, host, "127.0.0.1", "Server host to connect to"); ABSL_FLAG(int, port, 8080, "Server port to connect to"); +ABSL_FLAG(bool, poisson, false, + "Model inter-request arrivals as a Poisson process (exponentially " + "distributed inter-arrival times). Default: uniform spacing."); namespace { @@ -33,6 +41,114 @@ double PercentileFromSorted(const std::vector& values, double p) { return values[idx]; } +// Centralized token dispenser: issues one token every interval_us microseconds +// on average. Supports uniform (default) and Poisson inter-arrival modes. +// A dedicated issuer thread maintains a wall-clock-anchored schedule. +// Workers call Acquire() before each request. When interval_us == 0, +// Acquire() returns immediately (unlimited mode). +struct TokenDispenser { + std::mutex mu_; + std::condition_variable cv_; + + int64_t tokens_ = 0; + int64_t interval_us_ = 0; + + // Set before the issuer thread is started; read-only afterwards. + bool poisson_mode_ = false; + + // Used only inside IssuerLoop (single-threaded access; no lock needed). + std::mt19937_64 rng_{std::random_device{}()}; + + void SetInterval(int64_t new_interval_us) { + { + std::lock_guard lk(mu_); + interval_us_ = new_interval_us; + if (new_interval_us == 0) tokens_ = 0; + } + cv_.notify_all(); + } + + int64_t GetInterval() { + std::lock_guard lk(mu_); + return interval_us_; + } + + void IssuerLoop(const std::atomic& stop) { + std::uniform_real_distribution uniform(0.0, 1.0); + std::unique_lock lk(mu_); + auto next = std::chrono::steady_clock::now(); + + while (!stop.load(std::memory_order_relaxed)) { + if (interval_us_ <= 0) { + cv_.wait(lk, [&] { + return stop.load(std::memory_order_relaxed) || interval_us_ > 0; + }); + next = std::chrono::steady_clock::now(); + continue; + } + + const int64_t iv = interval_us_; + const bool woken_early = cv_.wait_until(lk, next, [&] { + return stop.load(std::memory_order_relaxed) || interval_us_ != iv; + }); + + if (stop.load(std::memory_order_relaxed)) break; + + if (woken_early) { + next = std::chrono::steady_clock::now(); + continue; + } + + ++tokens_; + if (poisson_mode_) { + // Sample next inter-arrival time from Exponential(1/iv). + // Use -log(u)*iv where u ~ Uniform(0,1); clamp u away from 0 to + // avoid infinite intervals. + double u = uniform(rng_); + if (u < std::numeric_limits::min()) + u = std::numeric_limits::min(); + next += std::chrono::microseconds( + static_cast(-std::log(u) * static_cast(iv))); + } else { + next += std::chrono::microseconds(iv); + } + cv_.notify_one(); + } + } + + bool Acquire(const std::atomic& stop) { + std::unique_lock lk(mu_); + if (interval_us_ <= 0) return true; + + cv_.wait(lk, [&] { + return stop.load(std::memory_order_relaxed) || + interval_us_ <= 0 || tokens_ > 0; + }); + + if (stop.load(std::memory_order_relaxed)) return false; + if (interval_us_ <= 0) return true; + + --tokens_; + return true; + } +}; + +// State shared by all worker threads. +struct SharedState { + std::string payload; + std::atomic stop{false}; + TokenDispenser dispenser; + + // Completed-RPC counter for QPS measurement. + std::atomic completed{0}; + + // Result queue: workers push latency_us; benchmark loop pops one per iter. + std::mutex result_mu; + std::condition_variable result_cv; + std::deque results; +}; + +// Per-connection state, owned by each worker thread. struct ClientState { struct event_base* base = nullptr; struct bufferevent* bev = nullptr; @@ -41,7 +157,7 @@ struct ClientState { bool connected = false; bool awaiting_response = false; std::chrono::steady_clock::time_point request_start; - std::vector latencies_us; + double last_latency_us = 0.0; }; bool InitConnection(ClientState* client) { @@ -97,10 +213,9 @@ bool InitConnection(ClientState* client) { } const auto end = std::chrono::steady_clock::now(); - const double latency_us = + client->last_latency_us = std::chrono::duration(end - client->request_start) .count(); - client->latencies_us.push_back(latency_us); client->awaiting_response = false; event_base_loopexit(client->base, nullptr); }); @@ -124,147 +239,185 @@ void CleanupConnection(ClientState* client) { } } -// Benchmark WiSH throughput under a fixed target QPS. +// Each worker thread owns an independent libevent connection and issues +// requests sequentially, gated by the shared TokenDispenser. +void WorkerLoop(SharedState* ss) { + ClientState client; + if (!InitConnection(&client)) { + LOG(ERROR) << "WorkerLoop: failed to connect, worker exiting"; + return; + } + + while (!ss->stop.load(std::memory_order_relaxed)) { + if (!ss->dispenser.Acquire(ss->stop)) break; + + client.awaiting_response = true; + client.request_start = std::chrono::steady_clock::now(); + if (client.handler->SendBinary(ss->payload) != 0) { + LOG(ERROR) << "WorkerLoop: SendBinary failed, worker exiting"; + break; + } + + event_base_dispatch(client.base); + if (client.awaiting_response) { + // Response never arrived — connection likely closed. + break; + } + + const double lat = client.last_latency_us; + ss->completed.fetch_add(1, std::memory_order_relaxed); + { + std::lock_guard lk(ss->result_mu); + ss->results.push_back(lat); + } + ss->result_cv.notify_one(); + } + + CleanupConnection(&client); +} + +// Measure actual QPS over a window_ms millisecond window. +double MeasureQps(std::atomic& completed, int window_ms) { + const int64_t before = completed.load(std::memory_order_relaxed); + const auto t0 = std::chrono::steady_clock::now(); + + std::this_thread::sleep_for(std::chrono::milliseconds(window_ms)); + + const int64_t after = completed.load(std::memory_order_relaxed); + const auto t1 = std::chrono::steady_clock::now(); + + const double elapsed_s = std::chrono::duration(t1 - t0).count(); + return static_cast(after - before) / elapsed_s; +} + +// Benchmark WiSH throughput under a fixed target QPS with multiple workers. // -// WiSH uses a single-threaded libevent loop (sequential request/response), so -// concurrency is always 1 and QPS is controlled solely by an inter-request -// sleep interval. A warmup phase calibrates that interval before measurement. +// Workers are sequential per-connection libevent loops; N workers run in +// parallel so that aggregate QPS can exceed 1/RTT. The centralized +// TokenDispenser gates all workers, calibrated during warmup. +// With --poisson, inter-arrival times follow an exponential distribution +// (Poisson process), matching typical production traffic patterns. static void BM_WiSHClientHighQPS(benchmark::State& state) { - LOG(INFO) << "Starting QPS benchmark iteration with payload_size=" - << state.range(0) << " bytes, target_qps=" << state.range(1); + LOG(INFO) << "Starting high-QPS benchmark: payload_size=" << state.range(0) + << " bytes, target_qps=" << state.range(1) + << ", poisson=" << absl::GetFlag(FLAGS_poisson); const int payload_size = static_cast(state.range(0)); const double target_qps = static_cast(state.range(1)); - const std::string payload(payload_size, 'A'); - ClientState client; - if (!InitConnection(&client)) { - state.SkipWithError("Failed to establish WiSH connection"); - CleanupConnection(&client); - return; + // Allocate enough workers so no single worker is the bottleneck. + // 500 QPS/worker is conservative; it provides headroom for RTTs up to 2 ms. + const int num_workers = + std::max(1, static_cast(std::ceil(target_qps / 500.0))); + + SharedState ss; + ss.payload = std::string(payload_size, 'A'); + ss.dispenser.poisson_mode_ = absl::GetFlag(FLAGS_poisson); + + // Spawn the dispenser thread first so it is ready when workers arrive. + std::thread dispenser_thread([&ss] { ss.dispenser.IssuerLoop(ss.stop); }); + + std::vector workers; + workers.reserve(num_workers); + for (int i = 0; i < num_workers; ++i) { + workers.emplace_back(WorkerLoop, &ss); } - // Warmup: calibrate inter-request sleep to hit target_qps. + // ── Warmup ─────────────────────────────────────────────────────────────── constexpr int kWindowMs = 2000; constexpr int kMaxWindows = 100; - constexpr double kStableBand = 0.05; constexpr int kStableNeeded = 3; int near_target_count = 0; - constexpr int kPlateauLimit = 10; constexpr double kPlateauBand = 0.01; int plateau_count = 0; - - double inter_request_us = 0.0; double prev_qps = 0.0; - bool error = false; - for (int w = 0; - !error && - w < kMaxWindows && - near_target_count < kStableNeeded && plateau_count < kPlateauLimit; + for (int w = 0; w < kMaxWindows && + near_target_count < kStableNeeded && plateau_count < kPlateauLimit; ++w) { - int count = 0; - client.latencies_us.clear(); - auto window_start = std::chrono::steady_clock::now(); - auto window_end = window_start + std::chrono::milliseconds(kWindowMs); - - while (std::chrono::steady_clock::now() < window_end) { - if (inter_request_us > 0.0) { - std::this_thread::sleep_for( - std::chrono::microseconds(static_cast(inter_request_us))); - } - client.awaiting_response = true; - client.request_start = std::chrono::steady_clock::now(); - if (client.handler->SendBinary(payload) != 0) { - error = true; - break; - } - event_base_dispatch(client.base); - if (client.awaiting_response) { - error = true; - break; - } - ++count; + ss.completed.store(0, std::memory_order_relaxed); + { + std::lock_guard lk(ss.result_mu); + ss.results.clear(); } - if (error) break; - const double elapsed_s = - std::chrono::duration( - std::chrono::steady_clock::now() - window_start) - .count(); - const double measured_qps = (elapsed_s > 0.0) ? count / elapsed_s : 0.0; - VLOG(2) << "[warmup] QPS: " << static_cast(measured_qps) + const double measured = MeasureQps(ss.completed, kWindowMs); + VLOG(2) << "[warmup] QPS: " << static_cast(measured) << " / target: " << static_cast(target_qps); - if (measured_qps > 0.0) { - const double delta_us = (1.0 / target_qps - 1.0 / measured_qps) * 1e6; - inter_request_us = std::max(0.0, inter_request_us + delta_us); + // Adjust only the interval knob. Workers are fixed in number; the interval + // is the sole rate-control lever. + if (measured > 0.0) { + const int64_t current_us = ss.dispenser.GetInterval(); + const int64_t new_us = std::max( + 0, current_us + static_cast( + (1.0 / target_qps - 1.0 / measured) * 1e6)); + ss.dispenser.SetInterval(new_us); } const bool near_target = - std::abs(measured_qps - target_qps) / target_qps < kStableBand; + std::abs(measured - target_qps) / target_qps < kStableBand; const bool plateau = prev_qps > 0.0 && - std::abs(measured_qps - prev_qps) / prev_qps < kPlateauBand; + std::abs(measured - prev_qps) / prev_qps < kPlateauBand; - if (near_target) { + if (near_target) ++near_target_count; - } else { + else near_target_count = 0; - } - - if (plateau) { + if (plateau) ++plateau_count; - } else { + else plateau_count = 0; - } VLOG(2) << "[warmup] near_target_count=" << near_target_count << " plateau_count=" << plateau_count; - prev_qps = measured_qps; + prev_qps = measured; } - if (error) { - state.SkipWithError("WiSH connection error during warmup"); - CleanupConnection(&client); - return; - } - - LOG(INFO) << "Warmup complete: inter_request_us=" << inter_request_us; + LOG(INFO) << "Warmup complete: num_workers=" << num_workers + << ", interval_us=" << ss.dispenser.GetInterval(); - // Reset accumulated latencies before measurement. - client.latencies_us.clear(); + // Reset before measurement. + ss.completed.store(0, std::memory_order_relaxed); + { + std::lock_guard lk(ss.result_mu); + ss.results.clear(); + } + // ── Measurement loop ───────────────────────────────────────────────────── std::vector recorded; const auto measure_start = std::chrono::steady_clock::now(); for (auto _ : state) { - if (inter_request_us > 0.0) { - std::this_thread::sleep_for( - std::chrono::microseconds(static_cast(inter_request_us))); - } - client.awaiting_response = true; - client.request_start = std::chrono::steady_clock::now(); - if (client.handler->SendBinary(payload) != 0) { - state.SkipWithError("WishHandler::SendBinary failed"); - break; - } - event_base_dispatch(client.base); - if (client.awaiting_response) { - state.SkipWithError("Connection closed before response"); - break; + double latency_us; + { + std::unique_lock lk(ss.result_mu); + ss.result_cv.wait(lk, [&ss] { + return ss.stop.load(std::memory_order_relaxed) || !ss.results.empty(); + }); + if (ss.results.empty()) break; + latency_us = ss.results.front(); + ss.results.pop_front(); } - const double lat = client.latencies_us.back(); - recorded.push_back(lat); - state.SetIterationTime(lat / 1e6); + recorded.push_back(latency_us); + state.SetIterationTime(latency_us / 1e6); } const auto measure_end = std::chrono::steady_clock::now(); const double wall_seconds = std::chrono::duration(measure_end - measure_start).count(); + // ── Teardown ───────────────────────────────────────────────────────────── + ss.stop.store(true, std::memory_order_relaxed); + ss.result_cv.notify_all(); + ss.dispenser.cv_.notify_all(); + dispenser_thread.join(); + for (auto& t : workers) t.join(); + + // ── Report counters ─────────────────────────────────────────────────────── if (!recorded.empty()) { std::sort(recorded.begin(), recorded.end()); state.counters["p10_us"] = PercentileFromSorted(recorded, 0.10); @@ -274,10 +427,9 @@ static void BM_WiSHClientHighQPS(benchmark::State& state) { state.counters["target_qps"] = target_qps; state.counters["actual_qps"] = static_cast(recorded.size()) / wall_seconds; + state.counters["num_workers"] = static_cast(num_workers); state.SetItemsProcessed(static_cast(recorded.size())); } - - CleanupConnection(&client); } // Iterations are scaled per QPS level to target ~5 s of measurement.