Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 48 additions & 23 deletions wish/cpp/benchmark/high_qps_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,15 @@ bool InitConnection(ClientState* client) {
}

const std::string host = absl::GetFlag(FLAGS_host);
const std::string port_str = std::to_string(absl::GetFlag(FLAGS_port));
const std::string port = std::to_string(absl::GetFlag(FLAGS_port));

LOG(INFO) << "Target address: " << host << ":" << port;
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added using this opportunity


struct addrinfo hints{};
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
struct addrinfo* res = nullptr;
const int gai_err = getaddrinfo(host.c_str(), port_str.c_str(), &hints, &res);
const int gai_err = getaddrinfo(host.c_str(), port.c_str(), &hints, &res);
if (gai_err != 0) {
LOG(ERROR) << "getaddrinfo failed for " << host << ": "
<< gai_strerror(gai_err);
Expand Down Expand Up @@ -247,6 +249,8 @@ void CleanupConnection(ClientState* client) {
// Each worker thread owns an independent libevent connection and issues
// requests sequentially, gated by the shared TokenDispenser.
void WorkerLoop(SharedState* ss) {
LOG(INFO) << "Worker thread started";

ClientState client;
if (!InitConnection(&client)) {
LOG(ERROR) << "WorkerLoop: failed to connect, worker exiting";
Expand Down Expand Up @@ -302,14 +306,15 @@ double MeasureQps(std::atomic<int64_t>& completed, int window_ms) {
// TokenDispenser gates all workers, calibrated during warmup.
// With --poisson, inter-arrival times follow an exponential distribution
// (Poisson process), matching typical production traffic patterns.
static void BM_WiSHClientHighQPS(benchmark::State& state) {
LOG(INFO) << "Starting high-QPS benchmark: payload_size=" << state.range(0)
<< " bytes, target_qps=" << state.range(1)
<< ", poisson=" << absl::GetFlag(FLAGS_poisson);

static void BM_PlainText_HighQPS(benchmark::State& state) {
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • clarified that it's for the plain text case
  • removed WiSH as it's clear

const int payload_size = static_cast<int>(state.range(0));
const double target_qps = static_cast<double>(state.range(1));

LOG(INFO) << "Starting benchmark";
LOG(INFO) << " Payload size: " << payload_size << " bytes";
LOG(INFO) << " Target QPS: " << target_qps;
LOG(INFO) << " Poisson arrivals: " << (absl::GetFlag(FLAGS_poisson) ? "enabled" : "disabled");
Comment on lines +313 to +316
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revised format


// Allocate enough workers so no single worker is the bottleneck.
// 500 QPS/worker is conservative; it provides headroom for RTTs up to 2 ms.
const int num_workers =
Expand All @@ -329,14 +334,18 @@ static void BM_WiSHClientHighQPS(benchmark::State& state) {
}

// ── Warmup ───────────────────────────────────────────────────────────────

constexpr int kWindowMs = 2000;
constexpr int kMaxWindows = 100;

constexpr double kStableBand = 0.05;
constexpr int kStableNeeded = 3;
int near_target_count = 0;

constexpr int kPlateauLimit = 10;
constexpr double kPlateauBand = 0.01;
int plateau_count = 0;

double prev_qps = 0.0;

for (int w = 0; w < kMaxWindows &&
Expand Down Expand Up @@ -423,27 +432,43 @@ static void BM_WiSHClientHighQPS(benchmark::State& state) {
for (auto& t : workers) t.join();

// ── Report counters ───────────────────────────────────────────────────────
if (!recorded.empty()) {
std::sort(recorded.begin(), recorded.end());
state.counters["p10_us"] = PercentileFromSorted(recorded, 0.10);
state.counters["p50_us"] = PercentileFromSorted(recorded, 0.50);
state.counters["p90_us"] = PercentileFromSorted(recorded, 0.90);
state.counters["p99_us"] = PercentileFromSorted(recorded, 0.99);
state.counters["target_qps"] = target_qps;
state.counters["actual_qps"] =
static_cast<double>(recorded.size()) / wall_seconds;
state.counters["num_workers"] = static_cast<double>(num_workers);
state.SetItemsProcessed(static_cast<int64_t>(recorded.size()));
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

std::sort(recorded.begin(), recorded.end());

LOG(INFO) << "Result:";
LOG(INFO) << " # of workers: " << num_workers;
LOG(INFO) << " Measured requests: " << recorded.size();
LOG(INFO) << " Measurement duration: " << wall_seconds << " s";
LOG(INFO) << " Actual QPS: " << static_cast<double>(recorded.size()) / wall_seconds;
LOG(INFO) << " p10 latency: " << PercentileFromSorted(recorded, 0.10) << " us";
LOG(INFO) << " p50 latency: " << PercentileFromSorted(recorded, 0.50) << " us";
LOG(INFO) << " p90 latency: " << PercentileFromSorted(recorded, 0.90) << " us";
LOG(INFO) << " p99 latency: " << PercentileFromSorted(recorded, 0.99) << " us";
Comment on lines +438 to +446
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using one row for each result value as it's easier to share the result

}

// Iterations are scaled per QPS level to target ~5 s of measurement.
// Separate registrations are required because Iterations() is a global setting
// per registration and cannot vary per-args in a single ArgsProduct call.
BENCHMARK(BM_WiSHClientHighQPS)->UseManualTime()->Unit(benchmark::kMicrosecond)->Args({1 << 10, 100})->Iterations(500);
BENCHMARK(BM_WiSHClientHighQPS)->UseManualTime()->Unit(benchmark::kMicrosecond)->Args({1 << 10, 1'000})->Iterations(5000);
BENCHMARK(BM_WiSHClientHighQPS)->UseManualTime()->Unit(benchmark::kMicrosecond)->Args({1 << 10, 1'800})->Iterations(8000);
BENCHMARK(BM_WiSHClientHighQPS)->UseManualTime()->Unit(benchmark::kMicrosecond)->Args({1 << 10, 10'000})->Iterations(50000);
BENCHMARK(BM_PlainText_HighQPS)
->UseManualTime()
->Unit(benchmark::kMicrosecond)
->Args({1 << 10, 100})
->Iterations(100 * 10);
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tweaked the number of iterations so that each test case runs for 10 sec

BENCHMARK(BM_PlainText_HighQPS)
->UseManualTime()
->Unit(benchmark::kMicrosecond)
->Args({1 << 10, 1'000})
->Iterations(1'000 * 10);
BENCHMARK(BM_PlainText_HighQPS)
->UseManualTime()
->Unit(benchmark::kMicrosecond)
->Args({1 << 10, 1'800})
->Iterations(800 * 10);
BENCHMARK(BM_PlainText_HighQPS)
->UseManualTime()
->Unit(benchmark::kMicrosecond)
->Args({1 << 10, 10'000})
->Iterations(10'000 * 10);

} // namespace

Expand Down