diff --git a/wish/cpp/benchmark/CMakeLists.txt b/wish/cpp/benchmark/CMakeLists.txt index e0a5a36..fa6ba7c 100644 --- a/wish/cpp/benchmark/CMakeLists.txt +++ b/wish/cpp/benchmark/CMakeLists.txt @@ -11,6 +11,19 @@ target_link_libraries(benchmark_client benchmark::benchmark ) +add_executable(high_qps_benchmark_client + high_qps_client.cc +) +target_link_libraries(high_qps_benchmark_client + wish_handler + absl::flags + absl::flags_parse + absl::log + absl::log_initialize + "$" + benchmark::benchmark +) + add_executable(tls_benchmark_client tls_client.cc ) diff --git a/wish/cpp/benchmark/benchmark.py b/wish/cpp/benchmark/benchmark.py index 0cb8cea..2ecd6a1 100644 --- a/wish/cpp/benchmark/benchmark.py +++ b/wish/cpp/benchmark/benchmark.py @@ -13,6 +13,7 @@ PLAIN_SERVER_BINARY_NAME = "examples/echo_server" PLAIN_CLIENT_BINARY_NAME = "benchmark/benchmark_client" +HIGH_QPS_CLIENT_BINARY_NAME = "benchmark/high_qps_benchmark_client" def _client_host_from_remote_target(remote_host): @@ -56,13 +57,21 @@ def _start_server(remote_host, server_binary_path, server_binary_name, certs_dir return subprocess.Popen(["ssh", "-tt", remote_host, remote_command], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=0) -def run_benchmark(remote_host=None, tls=True): - server_binary_name = TLS_SERVER_BINARY_NAME if tls else PLAIN_SERVER_BINARY_NAME - client_binary_name = TLS_CLIENT_BINARY_NAME if tls else PLAIN_CLIENT_BINARY_NAME +def run_benchmark(remote_host=None, tls=True, high_qps=False): + # The high-QPS client is plain only (no TLS variant). + if high_qps: + server_binary_name = PLAIN_SERVER_BINARY_NAME + client_binary_name = HIGH_QPS_CLIENT_BINARY_NAME + else: + server_binary_name = TLS_SERVER_BINARY_NAME if tls else PLAIN_SERVER_BINARY_NAME + client_binary_name = TLS_CLIENT_BINARY_NAME if tls else PLAIN_CLIENT_BINARY_NAME server_binary_path = os.path.join(BUILD_DIR, server_binary_name) client_binary_path = os.path.join(BUILD_DIR, client_binary_name) - print(f"Running {'TLS' if tls else 'plain'} benchmark ...") + if high_qps: + print("Running high-QPS benchmark (plain) ...") + else: + print(f"Running {'TLS' if tls else 'plain'} benchmark ...") client_host = "127.0.0.1" if remote_host: @@ -71,20 +80,25 @@ def run_benchmark(remote_host=None, tls=True): else: print("Starting local server ...") - server_process = _start_server(remote_host, server_binary_path, server_binary_name, certs_dir=CERTS_DIR if tls else None) + server_process = _start_server(remote_host, server_binary_path, server_binary_name, certs_dir=CERTS_DIR if (tls and not high_qps) else None) time.sleep(2) # wait for server to start if server_process.poll() is not None: raise RuntimeError(f"{server_binary_name} failed to start") + # The high-QPS client uses Iterations() to control measurement time, so + # --benchmark_min_time must be omitted to avoid conflicting with it. + client_cmd = [ + client_binary_path, + "--stderrthreshold=0", + "--benchmark_counters_tabular=true", + f"--host={client_host}", + ] + if not high_qps: + client_cmd.append("--benchmark_min_time=5.0s") + try: - subprocess.run([ - client_binary_path, - "--stderrthreshold=0", - "--benchmark_counters_tabular=true", - "--benchmark_min_time=5.0s", - f"--host={client_host}", - ], capture_output=False, text=True, check=True) + subprocess.run(client_cmd, capture_output=False, text=True, check=True) except subprocess.CalledProcessError as e: print(f"Error running client: {e}") except Exception as e: @@ -105,12 +119,22 @@ def run_benchmark(remote_host=None, tls=True): "--tls", action=argparse.BooleanOptionalAction, default=True, - help="Use TLS (default: enabled). Pass --no-tls for plain HTTP.", + help="Use TLS (default: enabled). Pass --no-tls for plain HTTP. Ignored when --high-qps is set.", + ) + parser.add_argument( + "--high-qps", + action="store_true", + default=False, + help="Run the high-QPS (target-QPS) benchmark instead of the default latency sweep. Always uses the plain server.", ) args = parser.parse_args() - server_binary_name = TLS_SERVER_BINARY_NAME if args.tls else PLAIN_SERVER_BINARY_NAME - client_binary_name = TLS_CLIENT_BINARY_NAME if args.tls else PLAIN_CLIENT_BINARY_NAME + if args.high_qps: + server_binary_name = PLAIN_SERVER_BINARY_NAME + client_binary_name = HIGH_QPS_CLIENT_BINARY_NAME + else: + server_binary_name = TLS_SERVER_BINARY_NAME if args.tls else PLAIN_SERVER_BINARY_NAME + client_binary_name = TLS_CLIENT_BINARY_NAME if args.tls else PLAIN_CLIENT_BINARY_NAME server_binary_path = os.path.join(BUILD_DIR, server_binary_name) client_binary_path = os.path.join(BUILD_DIR, client_binary_name) @@ -127,4 +151,4 @@ def run_benchmark(remote_host=None, tls=True): exit(1) print("Starting benchmarks...") - run_benchmark(remote_host=args.remote_host, tls=args.tls) + run_benchmark(remote_host=args.remote_host, tls=args.tls, high_qps=args.high_qps) diff --git a/wish/cpp/benchmark/high_qps_client.cc b/wish/cpp/benchmark/high_qps_client.cc new file mode 100644 index 0000000..6624223 --- /dev/null +++ b/wish/cpp/benchmark/high_qps_client.cc @@ -0,0 +1,304 @@ +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "../src/wish_handler.h" +#include "absl/flags/flag.h" +#include "absl/flags/parse.h" +#include "absl/log/initialize.h" +#include "absl/log/log.h" +#include "benchmark/benchmark.h" + +ABSL_FLAG(std::string, host, "127.0.0.1", "Server host to connect to"); +ABSL_FLAG(int, port, 8080, "Server port to connect to"); + +namespace { + +double PercentileFromSorted(const std::vector& values, double p) { + if (values.empty()) { + return 0.0; + } + const size_t idx = static_cast(p * (values.size() - 1)); + return values[idx]; +} + +struct ClientState { + struct event_base* base = nullptr; + struct bufferevent* bev = nullptr; + WishHandler* handler = nullptr; + + bool connected = false; + bool awaiting_response = false; + std::chrono::steady_clock::time_point request_start; + std::vector latencies_us; +}; + +bool InitConnection(ClientState* client) { + client->base = event_base_new(); + if (!client->base) { + LOG(ERROR) << "event_base_new() failed"; + return false; + } + + struct sockaddr_in sin; + std::memset(&sin, 0, sizeof(sin)); + sin.sin_family = AF_INET; + sin.sin_port = htons(absl::GetFlag(FLAGS_port)); + + const std::string host = absl::GetFlag(FLAGS_host); + + if (inet_pton(AF_INET, host.c_str(), &sin.sin_addr) != 1) { + LOG(ERROR) << "Invalid IPv4 host: " << host; + return false; + } + + client->bev = bufferevent_socket_new(client->base, -1, BEV_OPT_CLOSE_ON_FREE); + if (!client->bev) { + LOG(ERROR) << "bufferevent_socket_new() failed"; + return false; + } + + if (bufferevent_socket_connect(client->bev, + reinterpret_cast(&sin), + sizeof(sin)) < 0) { + LOG(ERROR) << "bufferevent_socket_connect() failed"; + return false; + } + + client->handler = new WishHandler(client->bev, false); + client->handler->SetOnOpen([client]() { + const int fd = bufferevent_getfd(client->bev); + if (fd >= 0) { + int nodelay = 1; + setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &nodelay, sizeof(nodelay)); + } + + client->connected = true; + event_base_loopexit(client->base, nullptr); + }); + + client->handler->SetOnMessage([client](uint8_t opcode, const std::string& msg) { + (void)opcode; + (void)msg; + + if (!client->awaiting_response) { + return; + } + + const auto end = std::chrono::steady_clock::now(); + const double latency_us = + std::chrono::duration(end - client->request_start) + .count(); + client->latencies_us.push_back(latency_us); + client->awaiting_response = false; + event_base_loopexit(client->base, nullptr); + }); + + client->handler->Start(); + + event_base_dispatch(client->base); + return client->connected; +} + +void CleanupConnection(ClientState* client) { + if (client->handler) { + delete client->handler; + client->handler = nullptr; + client->bev = nullptr; + } + + if (client->base) { + event_base_free(client->base); + client->base = nullptr; + } +} + +// Benchmark WiSH throughput under a fixed target QPS. +// +// WiSH uses a single-threaded libevent loop (sequential request/response), so +// concurrency is always 1 and QPS is controlled solely by an inter-request +// sleep interval. A warmup phase calibrates that interval before measurement. +static void BM_WiSHClientHighQPS(benchmark::State& state) { + LOG(INFO) << "Starting QPS benchmark iteration with payload_size=" + << state.range(0) << " bytes, target_qps=" << state.range(1); + + const int payload_size = static_cast(state.range(0)); + const double target_qps = static_cast(state.range(1)); + const std::string payload(payload_size, 'A'); + + ClientState client; + if (!InitConnection(&client)) { + state.SkipWithError("Failed to establish WiSH connection"); + CleanupConnection(&client); + return; + } + + // Warmup: calibrate inter-request sleep to hit target_qps. + constexpr int kWindowMs = 2000; + constexpr int kMaxWindows = 100; + + constexpr double kStableBand = 0.05; + constexpr int kStableNeeded = 3; + int near_target_count = 0; + + constexpr int kPlateauLimit = 10; + constexpr double kPlateauBand = 0.01; + int plateau_count = 0; + + double inter_request_us = 0.0; + double prev_qps = 0.0; + bool error = false; + + for (int w = 0; + !error && + w < kMaxWindows && + near_target_count < kStableNeeded && plateau_count < kPlateauLimit; + ++w) { + int count = 0; + client.latencies_us.clear(); + auto window_start = std::chrono::steady_clock::now(); + auto window_end = window_start + std::chrono::milliseconds(kWindowMs); + + while (std::chrono::steady_clock::now() < window_end) { + if (inter_request_us > 0.0) { + std::this_thread::sleep_for( + std::chrono::microseconds(static_cast(inter_request_us))); + } + client.awaiting_response = true; + client.request_start = std::chrono::steady_clock::now(); + if (client.handler->SendBinary(payload) != 0) { + error = true; + break; + } + event_base_dispatch(client.base); + if (client.awaiting_response) { + error = true; + break; + } + ++count; + } + if (error) break; + + const double elapsed_s = + std::chrono::duration( + std::chrono::steady_clock::now() - window_start) + .count(); + const double measured_qps = (elapsed_s > 0.0) ? count / elapsed_s : 0.0; + VLOG(2) << "[warmup] QPS: " << static_cast(measured_qps) + << " / target: " << static_cast(target_qps); + + if (measured_qps > 0.0) { + const double delta_us = (1.0 / target_qps - 1.0 / measured_qps) * 1e6; + inter_request_us = std::max(0.0, inter_request_us + delta_us); + } + + const bool near_target = + std::abs(measured_qps - target_qps) / target_qps < kStableBand; + const bool plateau = + prev_qps > 0.0 && + std::abs(measured_qps - prev_qps) / prev_qps < kPlateauBand; + + if (near_target) { + ++near_target_count; + } else { + near_target_count = 0; + } + + if (plateau) { + ++plateau_count; + } else { + plateau_count = 0; + } + + VLOG(2) << "[warmup] near_target_count=" << near_target_count + << " plateau_count=" << plateau_count; + prev_qps = measured_qps; + } + + if (error) { + state.SkipWithError("WiSH connection error during warmup"); + CleanupConnection(&client); + return; + } + + LOG(INFO) << "Warmup complete: inter_request_us=" << inter_request_us; + + // Reset accumulated latencies before measurement. + client.latencies_us.clear(); + + std::vector recorded; + const auto measure_start = std::chrono::steady_clock::now(); + + for (auto _ : state) { + if (inter_request_us > 0.0) { + std::this_thread::sleep_for( + std::chrono::microseconds(static_cast(inter_request_us))); + } + client.awaiting_response = true; + client.request_start = std::chrono::steady_clock::now(); + if (client.handler->SendBinary(payload) != 0) { + state.SkipWithError("WishHandler::SendBinary failed"); + break; + } + event_base_dispatch(client.base); + if (client.awaiting_response) { + state.SkipWithError("Connection closed before response"); + break; + } + const double lat = client.latencies_us.back(); + recorded.push_back(lat); + state.SetIterationTime(lat / 1e6); + } + + const auto measure_end = std::chrono::steady_clock::now(); + const double wall_seconds = + std::chrono::duration(measure_end - measure_start).count(); + + if (!recorded.empty()) { + std::sort(recorded.begin(), recorded.end()); + state.counters["p10_us"] = PercentileFromSorted(recorded, 0.10); + state.counters["p50_us"] = PercentileFromSorted(recorded, 0.50); + state.counters["p90_us"] = PercentileFromSorted(recorded, 0.90); + state.counters["p99_us"] = PercentileFromSorted(recorded, 0.99); + state.counters["target_qps"] = target_qps; + state.counters["actual_qps"] = + static_cast(recorded.size()) / wall_seconds; + state.SetItemsProcessed(static_cast(recorded.size())); + } + + CleanupConnection(&client); +} + +// Iterations are scaled per QPS level to target ~5 s of measurement. +// Separate registrations are required because Iterations() is a global setting +// per registration and cannot vary per-args in a single ArgsProduct call. +BENCHMARK(BM_WiSHClientHighQPS)->UseManualTime()->Unit(benchmark::kMicrosecond)->Args({1 << 10, 100})->Iterations(500); +BENCHMARK(BM_WiSHClientHighQPS)->UseManualTime()->Unit(benchmark::kMicrosecond)->Args({1 << 10, 1'000})->Iterations(5000); +BENCHMARK(BM_WiSHClientHighQPS)->UseManualTime()->Unit(benchmark::kMicrosecond)->Args({1 << 10, 1'800})->Iterations(8000); +BENCHMARK(BM_WiSHClientHighQPS)->UseManualTime()->Unit(benchmark::kMicrosecond)->Args({1 << 10, 10'000})->Iterations(50000); + +} // namespace + +int main(int argc, char** argv) { + benchmark::MaybeReenterWithoutASLR(argc, argv); + benchmark::Initialize(&argc, argv); + + absl::ParseCommandLine(argc, argv); + absl::InitializeLog(); + + benchmark::RunSpecifiedBenchmarks(); + benchmark::Shutdown(); + + return 0; +}