From 975f14797034e0eae08c28d5ccc8217b75787447 Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Mon, 22 Jun 2026 23:55:52 +0530 Subject: [PATCH 1/8] generated code and TLS certs --- Cargo.toml | 1 + grpc-benchmark/Cargo.toml | 20 + grpc-benchmark/README.md | 9 + grpc-benchmark/build.rs | 65 ++++ grpc-benchmark/data/tls/ca.pem | 20 + grpc-benchmark/data/tls/server.key | 28 ++ grpc-benchmark/data/tls/server.pem | 22 ++ .../grpc/testing/benchmark_service.proto | 48 +++ .../proto/grpc/testing/control.proto | 288 +++++++++++++++ .../proto/grpc/testing/messages.proto | 346 ++++++++++++++++++ .../proto/grpc/testing/payloads.proto | 44 +++ .../testing/report_qps_scenario_service.proto | 30 ++ grpc-benchmark/proto/grpc/testing/stats.proto | 77 ++++ .../proto/grpc/testing/worker_service.proto | 45 +++ grpc-benchmark/src/lib.rs | 41 +++ 15 files changed, 1084 insertions(+) create mode 100644 grpc-benchmark/Cargo.toml create mode 100644 grpc-benchmark/README.md create mode 100644 grpc-benchmark/build.rs create mode 100644 grpc-benchmark/data/tls/ca.pem create mode 100644 grpc-benchmark/data/tls/server.key create mode 100644 grpc-benchmark/data/tls/server.pem create mode 100644 grpc-benchmark/proto/grpc/testing/benchmark_service.proto create mode 100644 grpc-benchmark/proto/grpc/testing/control.proto create mode 100644 grpc-benchmark/proto/grpc/testing/messages.proto create mode 100644 grpc-benchmark/proto/grpc/testing/payloads.proto create mode 100644 grpc-benchmark/proto/grpc/testing/report_qps_scenario_service.proto create mode 100644 grpc-benchmark/proto/grpc/testing/stats.proto create mode 100644 grpc-benchmark/proto/grpc/testing/worker_service.proto create mode 100644 grpc-benchmark/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index dce26972f..c492e8b15 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ members = [ "examples", "codegen", "grpc", + "grpc-benchmark", "grpc-google", "grpc-protobuf", "grpc-protobuf-build", diff --git a/grpc-benchmark/Cargo.toml b/grpc-benchmark/Cargo.toml new file mode 100644 index 000000000..96e72d85c --- /dev/null +++ b/grpc-benchmark/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "grpc-benchmark" +version = "0.0.0-alpha" +edition = "2024" +authors = ["gRPC Authors"] +license = "MIT" +publish = false +rust-version = { workspace = true } + +[dependencies] +grpc = { path = "../grpc" } +grpc-protobuf = { path = "../grpc-protobuf" } +prost = "0.14" +protobuf = { version = "4.34.0-release" } +tonic = { path = "../tonic" } +tonic-prost = { path = "../tonic-prost" } + +[build-dependencies] +tonic-prost-build = { path = "../tonic-prost-build" } +grpc-protobuf-build = { path = "../grpc-protobuf-build" } diff --git a/grpc-benchmark/README.md b/grpc-benchmark/README.md new file mode 100644 index 000000000..7e1b85ae4 --- /dev/null +++ b/grpc-benchmark/README.md @@ -0,0 +1,9 @@ +# gRPC Benchmarking Framework Code + +This directory contains the worker, server and client implementations for the +[gRPC benchmarking framework](https://grpc.io/docs/guides/benchmarking/). The +driver code resides in the +[grpc/grpc repository](https://github.com/grpc/grpc/blob/master/tools/run_tests/performance/README.md) +along with instructions to run the benchmarks. The benchmarks continuously +monitor gRPC performance to provide performance tracking though the +[performance dashboard](https://grafana-dot-grpc-testing.appspot.com/). diff --git a/grpc-benchmark/build.rs b/grpc-benchmark/build.rs new file mode 100644 index 000000000..96c422be3 --- /dev/null +++ b/grpc-benchmark/build.rs @@ -0,0 +1,65 @@ +/* + * + * Copyright 2026 gRPC authors. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + * + */ + +use std::env; +use std::path::PathBuf; + +fn main() { + let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap()); + + grpc_protobuf_build::CodeGen::new() + .include("proto/grpc/testing") + .inputs([ + "worker_service.proto", + "control.proto", + "payloads.proto", + "stats.proto", + ]) + .output_dir(out_dir.join("worker_service")) + .client_only() + .compile() + .unwrap(); + + let behchmark_service_grpc = out_dir.join("benchmark_service/grpc"); + let behchmark_service_tonic = out_dir.join("benchmark_service/tonic"); + + grpc_protobuf_build::CodeGen::new() + .include("proto/grpc/testing") + .inputs(["benchmark_service.proto", "messages.proto"]) + .output_dir(behchmark_service_grpc) + .client_only() + .compile() + .unwrap(); + + // TODO: Use gRPC server when available. + let _ = std::fs::create_dir(behchmark_service_tonic.clone()); + tonic_prost_build::configure() + .out_dir(behchmark_service_tonic) + .build_client(false) + .compile_protos( + &["proto/grpc/testing/benchmark_service.proto"], + &["proto/grpc/testing/"], + ) + .unwrap(); +} diff --git a/grpc-benchmark/data/tls/ca.pem b/grpc-benchmark/data/tls/ca.pem new file mode 100644 index 000000000..49d39cd8e --- /dev/null +++ b/grpc-benchmark/data/tls/ca.pem @@ -0,0 +1,20 @@ +-----BEGIN CERTIFICATE----- +MIIDWjCCAkKgAwIBAgIUWrP0VvHcy+LP6UuYNtiL9gBhD5owDQYJKoZIhvcNAQEL +BQAwVjELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM +GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDEPMA0GA1UEAwwGdGVzdGNhMB4XDTIw +MDMxNzE4NTk1MVoXDTMwMDMxNTE4NTk1MVowVjELMAkGA1UEBhMCQVUxEzARBgNV +BAgMClNvbWUtU3RhdGUxITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0 +ZDEPMA0GA1UEAwwGdGVzdGNhMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKC +AQEAsGL0oXflF0LzoM+Bh+qUU9yhqzw2w8OOX5mu/iNCyUOBrqaHi7mGHx73GD01 +diNzCzvlcQqdNIH6NQSL7DTpBjca66jYT9u73vZe2MDrr1nVbuLvfu9850cdxiUO +Inv5xf8+sTHG0C+a+VAvMhsLiRjsq+lXKRJyk5zkbbsETybqpxoJ+K7CoSy3yc/k +QIY3TipwEtwkKP4hzyo6KiGd/DPexie4nBUInN3bS1BUeNZ5zeaIC2eg3bkeeW7c +qT55b+Yen6CxY0TEkzBK6AKt/WUialKMgT0wbTxRZO7kUCH3Sq6e/wXeFdJ+HvdV +LPlAg5TnMaNpRdQih/8nRFpsdwIDAQABoyAwHjAMBgNVHRMEBTADAQH/MA4GA1Ud +DwEB/wQEAwICBDANBgkqhkiG9w0BAQsFAAOCAQEAkTrKZjBrJXHps/HrjNCFPb5a +THuGPCSsepe1wkKdSp1h4HGRpLoCgcLysCJ5hZhRpHkRihhef+rFHEe60UePQO3S +CVTtdJB4CYWpcNyXOdqefrbJW5QNljxgi6Fhvs7JJkBqdXIkWXtFk2eRgOIP2Eo9 +/OHQHlYnwZFrk6sp4wPyR+A95S0toZBcyDVz7u+hOW0pGK3wviOe9lvRgj/H3Pwt +bewb0l+MhRig0/DVHamyVxrDRbqInU1/GTNCwcZkXKYFWSf92U+kIcTth24Q1gcw +eZiLl5FfrWokUNytFElXob0V0a5/kbhiLc3yWmvWqHTpqCALbVyF+rKJo2f5Kw== +-----END CERTIFICATE----- diff --git a/grpc-benchmark/data/tls/server.key b/grpc-benchmark/data/tls/server.key new file mode 100644 index 000000000..086462992 --- /dev/null +++ b/grpc-benchmark/data/tls/server.key @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDnE443EknxvxBq +6+hvn/t09hl8hx366EBYvZmVM/NC+7igXRAjiJiA/mIaCvL3MS0Iz5hBLxSGICU+ +WproA3GCIFITIwcf/ETyWj/5xpgZ4AKrLrjQmmX8mhwUajfF3UvwMJrCOVqPp67t +PtP+2kBXaqrXdvnvXR41FsIB8V7zIAuIZB6bHQhiGVlc1sgZYsE2EGG9WMmHtS86 +qkAOTjG2XyjmPTGAwhGDpYkYrpzp99IiDh4/Veai81hn0ssQkbry0XRD/Ig3jcHh +23WiriPNJ0JsbgXUSLKRPZObA9VgOLy2aXoN84IMaeK3yy+cwSYG/99w93fUZJte +MXwz4oYZAgMBAAECggEBAIVn2Ncai+4xbH0OLWckabwgyJ4IM9rDc0LIU368O1kU +koais8qP9dujAWgfoh3sGh/YGgKn96VnsZjKHlyMgF+r4TaDJn3k2rlAOWcurGlj +1qaVlsV4HiEzp7pxiDmHhWvp4672Bb6iBG+bsjCUOEk/n9o9KhZzIBluRhtxCmw5 +nw4Do7z00PTvN81260uPWSc04IrytvZUiAIx/5qxD72bij2xJ8t/I9GI8g4FtoVB +8pB6S/hJX1PZhh9VlU6Yk+TOfOVnbebG4W5138LkB835eqk3Zz0qsbc2euoi8Hxi +y1VGwQEmMQ63jXz4c6g+X55ifvUK9Jpn5E8pq+pMd7ECgYEA93lYq+Cr54K4ey5t +sWMa+ye5RqxjzgXj2Kqr55jb54VWG7wp2iGbg8FMlkQwzTJwebzDyCSatguEZLuB +gRGroRnsUOy9vBvhKPOch9bfKIl6qOgzMJB267fBVWx5ybnRbWN/I7RvMQf3k+9y +biCIVnxDLEEYyx7z85/5qxsXg/MCgYEA7wmWKtCTn032Hy9P8OL49T0X6Z8FlkDC +Rk42ygrc/MUbugq9RGUxcCxoImOG9JXUpEtUe31YDm2j+/nbvrjl6/bP2qWs0V7l +dTJl6dABP51pCw8+l4cWgBBX08Lkeen812AAFNrjmDCjX6rHjWHLJcpS18fnRRkP +V1d/AHWX7MMCgYEA6Gsw2guhp0Zf2GCcaNK5DlQab8OL4Hwrpttzo4kuTlwtqNKp +Q9H4al9qfF4Cr1TFya98+EVYf8yFRM3NLNjZpe3gwYf2EerlJj7VLcahw0KKzoN1 +QBENfwgPLRk5sDkx9VhSmcfl/diLroZdpAwtv3vo4nEoxeuGFbKTGx3Qkf0CgYEA +xyR+dcb05Ygm3w4klHQTowQ10s1H80iaUcZBgQuR1ghEtDbUPZHsoR5t1xCB02ys +DgAwLv1bChIvxvH/L6KM8ovZ2LekBX4AviWxoBxJnfz/EVau98B0b1auRN6eSC83 +FRuGldlSOW1z/nSh8ViizSYE5H5HX1qkXEippvFRE88CgYB3Bfu3YQY60ITWIShv +nNkdcbTT9eoP9suaRJjw92Ln+7ZpALYlQMKUZmJ/5uBmLs4RFwUTQruLOPL4yLTH +awADWUzs3IRr1fwn9E+zM8JVyKCnUEM3w4N5UZskGO2klashAd30hWO+knRv/y0r +uGIYs9Ek7YXlXIRVrzMwcsrt1w== +-----END PRIVATE KEY----- diff --git a/grpc-benchmark/data/tls/server.pem b/grpc-benchmark/data/tls/server.pem new file mode 100644 index 000000000..88244f856 --- /dev/null +++ b/grpc-benchmark/data/tls/server.pem @@ -0,0 +1,22 @@ +-----BEGIN CERTIFICATE----- +MIIDtDCCApygAwIBAgIUbJfTREJ6k6/+oInWhV1O1j3ZT0IwDQYJKoZIhvcNAQEL +BQAwVjELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM +GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDEPMA0GA1UEAwwGdGVzdGNhMB4XDTIw +MDMxODAzMTA0MloXDTMwMDMxNjAzMTA0MlowZTELMAkGA1UEBhMCVVMxETAPBgNV +BAgMCElsbGlub2lzMRAwDgYDVQQHDAdDaGljYWdvMRUwEwYDVQQKDAxFeGFtcGxl +LCBDby4xGjAYBgNVBAMMESoudGVzdC5nb29nbGUuY29tMIIBIjANBgkqhkiG9w0B +AQEFAAOCAQ8AMIIBCgKCAQEA5xOONxJJ8b8Qauvob5/7dPYZfIcd+uhAWL2ZlTPz +Qvu4oF0QI4iYgP5iGgry9zEtCM+YQS8UhiAlPlqa6ANxgiBSEyMHH/xE8lo/+caY +GeACqy640Jpl/JocFGo3xd1L8DCawjlaj6eu7T7T/tpAV2qq13b5710eNRbCAfFe +8yALiGQemx0IYhlZXNbIGWLBNhBhvVjJh7UvOqpADk4xtl8o5j0xgMIRg6WJGK6c +6ffSIg4eP1XmovNYZ9LLEJG68tF0Q/yIN43B4dt1oq4jzSdCbG4F1EiykT2TmwPV +YDi8tml6DfOCDGnit8svnMEmBv/fcPd31GSbXjF8M+KGGQIDAQABo2swaTAJBgNV +HRMEAjAAMAsGA1UdDwQEAwIF4DBPBgNVHREESDBGghAqLnRlc3QuZ29vZ2xlLmZy +ghh3YXRlcnpvb2kudGVzdC5nb29nbGUuYmWCEioudGVzdC55b3V0dWJlLmNvbYcE +wKgBAzANBgkqhkiG9w0BAQsFAAOCAQEAS8hDQA8PSgipgAml7Q3/djwQ644ghWQv +C2Kb+r30RCY1EyKNhnQnIIh/OUbBZvh0M0iYsy6xqXgfDhCB93AA6j0i5cS8fkhH +Jl4RK0tSkGQ3YNY4NzXwQP/vmUgfkw8VBAZ4Y4GKxppdATjffIW+srbAmdDruIRM +wPeikgOoRrXf0LA1fi4TqxARzeRwenQpayNfGHTvVF9aJkl8HoaMunTAdG5pIVcr +9GKi/gEMpXUJbbVv3U5frX1Wo4CFo+rZWJ/LyCMeb0jciNLxSdMwj/E/ZuExlyeZ +gc9ctPjSMvgSyXEKv6Vwobleeg88V2ZgzenziORoWj4KszG/lbQZvg== +-----END CERTIFICATE----- diff --git a/grpc-benchmark/proto/grpc/testing/benchmark_service.proto b/grpc-benchmark/proto/grpc/testing/benchmark_service.proto new file mode 100644 index 000000000..661269f8d --- /dev/null +++ b/grpc-benchmark/proto/grpc/testing/benchmark_service.proto @@ -0,0 +1,48 @@ +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// An integration test service that covers all the method signature permutations +// of unary/streaming requests/responses. +syntax = "proto3"; + +package grpc.testing; + +import "messages.proto"; + +option java_multiple_files = true; +option java_outer_classname = "BenchmarkServiceProto"; +option java_package = "io.grpc.testing"; + +service BenchmarkService { + // One request followed by one response. + // The server returns the client payload as-is. + rpc UnaryCall(SimpleRequest) returns (SimpleResponse); + + // Repeated sequence of one request followed by one response. + // Should be called streaming ping-pong + // The server returns the client payload as-is on each response + rpc StreamingCall(stream SimpleRequest) returns (stream SimpleResponse); + + // Single-sided unbounded streaming from client to server + // The server returns the client payload as-is once the client does WritesDone + rpc StreamingFromClient(stream SimpleRequest) returns (SimpleResponse); + + // Single-sided unbounded streaming from server to client + // The server repeatedly returns the client payload as-is + rpc StreamingFromServer(SimpleRequest) returns (stream SimpleResponse); + + // Two-sided unbounded streaming between server to client + // Both sides send the content of their own choice to the other + rpc StreamingBothWays(stream SimpleRequest) returns (stream SimpleResponse); +} diff --git a/grpc-benchmark/proto/grpc/testing/control.proto b/grpc-benchmark/proto/grpc/testing/control.proto new file mode 100644 index 000000000..a8e543de0 --- /dev/null +++ b/grpc-benchmark/proto/grpc/testing/control.proto @@ -0,0 +1,288 @@ +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package grpc.testing; + +import "payloads.proto"; +import "stats.proto"; + +option java_multiple_files = true; +option java_outer_classname = "ControlProto"; +option java_package = "io.grpc.testing"; + +enum ClientType { + // Many languages support a basic distinction between using + // sync or async client, and this allows the specification + SYNC_CLIENT = 0; + ASYNC_CLIENT = 1; + OTHER_CLIENT = 2; // used for some language-specific variants + CALLBACK_CLIENT = 3; +} + +enum ServerType { + SYNC_SERVER = 0; + ASYNC_SERVER = 1; + ASYNC_GENERIC_SERVER = 2; + OTHER_SERVER = 3; // used for some language-specific variants + CALLBACK_SERVER = 4; +} + +enum RpcType { + UNARY = 0; + STREAMING = 1; + STREAMING_FROM_CLIENT = 2; + STREAMING_FROM_SERVER = 3; + STREAMING_BOTH_WAYS = 4; +} + +// Parameters of poisson process distribution, which is a good representation +// of activity coming in from independent identical stationary sources. +message PoissonParams { + // The rate of arrivals (a.k.a. lambda parameter of the exp distribution). + double offered_load = 1; +} + +// Once an RPC finishes, immediately start a new one. +// No configuration parameters needed. +message ClosedLoopParams {} + +message LoadParams { + oneof load { + ClosedLoopParams closed_loop = 1; + PoissonParams poisson = 2; + } +} + +// presence of SecurityParams implies use of TLS +message SecurityParams { + bool use_test_ca = 1; + string server_host_override = 2; + string cred_type = 3; +} + +message ChannelArg { + string name = 1; + oneof value { + string str_value = 2; + int32 int_value = 3; + } +} + +message ClientConfig { + // List of targets to connect to. At least one target needs to be specified. + repeated string server_targets = 1; + ClientType client_type = 2; + SecurityParams security_params = 3; + // How many concurrent RPCs to start for each channel. + // For synchronous client, use a separate thread for each outstanding RPC. + int32 outstanding_rpcs_per_channel = 4; + // Number of independent client channels to create. + // i-th channel will connect to server_target[i % server_targets.size()] + int32 client_channels = 5; + // Only for async client. Number of threads to use to start/manage RPCs. + int32 async_client_threads = 7; + RpcType rpc_type = 8; + // The requested load for the entire client (aggregated over all the threads). + LoadParams load_params = 10; + PayloadConfig payload_config = 11; + HistogramParams histogram_params = 12; + + // Specify the cores we should run the client on, if desired + repeated int32 core_list = 13; + int32 core_limit = 14; + + // If we use an OTHER_CLIENT client_type, this string gives more detail + string other_client_api = 15; + + repeated ChannelArg channel_args = 16; + + // Number of threads that share each completion queue + int32 threads_per_cq = 17; + + // Number of messages on a stream before it gets finished/restarted + int32 messages_per_stream = 18; + + // Use coalescing API when possible. + bool use_coalesce_api = 19; + + // If 0, disabled. Else, specifies the period between gathering latency + // medians in milliseconds. + int32 median_latency_collection_interval_millis = 20; + + // Number of client processes. 0 indicates no restriction. + int32 client_processes = 21; +} + +message ClientStatus { + ClientStats stats = 1; +} + +// Request current stats +message Mark { + // if true, the stats will be reset after taking their snapshot. + bool reset = 1; +} + +message ClientArgs { + oneof argtype { + ClientConfig setup = 1; + Mark mark = 2; + } +} + +message ServerConfig { + ServerType server_type = 1; + SecurityParams security_params = 2; + // Port on which to listen. Zero means pick unused port. + int32 port = 4; + // Only for async server. Number of threads used to serve the requests. + int32 async_server_threads = 7; + // Specify the number of cores to limit server to, if desired + int32 core_limit = 8; + // payload config, used in generic server. + // Note this must NOT be used in proto (non-generic) servers. For proto servers, + // 'response sizes' must be configured from the 'response_size' field of the + // 'SimpleRequest' objects in RPC requests. + PayloadConfig payload_config = 9; + + // Specify the cores we should run the server on, if desired + repeated int32 core_list = 10; + + // If we use an OTHER_SERVER client_type, this string gives more detail + string other_server_api = 11; + + // Number of threads that share each completion queue + int32 threads_per_cq = 12; + + // c++-only options (for now) -------------------------------- + + // Buffer pool size (no buffer pool specified if unset) + int32 resource_quota_size = 1001; + repeated ChannelArg channel_args = 1002; + + // Number of server processes. 0 indicates no restriction. + int32 server_processes = 21; +} + +message ServerArgs { + oneof argtype { + ServerConfig setup = 1; + Mark mark = 2; + } +} + +message ServerStatus { + ServerStats stats = 1; + // the port bound by the server + int32 port = 2; + // Number of cores available to the server + int32 cores = 3; +} + +message CoreRequest {} + +message CoreResponse { + // Number of cores available on the server + int32 cores = 1; +} + +message Void {} + +// A single performance scenario: input to qps_json_driver +message Scenario { + // Human readable name for this scenario + string name = 1; + // Client configuration + ClientConfig client_config = 2; + // Number of clients to start for the test + int32 num_clients = 3; + // Server configuration + ServerConfig server_config = 4; + // Number of servers to start for the test + int32 num_servers = 5; + // Warmup period, in seconds + int32 warmup_seconds = 6; + // Benchmark time, in seconds + int32 benchmark_seconds = 7; + // Number of workers to spawn locally (usually zero) + int32 spawn_local_worker_count = 8; +} + +// A set of scenarios to be run with qps_json_driver +message Scenarios { + repeated Scenario scenarios = 1; +} + +// Basic summary that can be computed from ClientStats and ServerStats +// once the scenario has finished. +message ScenarioResultSummary { + // Total number of operations per second over all clients. What is counted as 1 'operation' depends on the benchmark scenarios: + // For unary benchmarks, an operation is processing of a single unary RPC. + // For streaming benchmarks, an operation is processing of a single ping pong of request and response. + double qps = 1; + // QPS per server core. + double qps_per_server_core = 2; + // The total server cpu load based on system time across all server processes, expressed as percentage of a single cpu core. + // For example, 85 implies 85% of a cpu core, 125 implies 125% of a cpu core. Since we are accumulating the cpu load across all the server + // processes, the value could > 100 when there are multiple servers or a single server using multiple threads and cores. + // Same explanation for the total client cpu load below. + double server_system_time = 3; + // The total server cpu load based on user time across all server processes, expressed as percentage of a single cpu core. (85 => 85%, 125 => 125%) + double server_user_time = 4; + // The total client cpu load based on system time across all client processes, expressed as percentage of a single cpu core. (85 => 85%, 125 => 125%) + double client_system_time = 5; + // The total client cpu load based on user time across all client processes, expressed as percentage of a single cpu core. (85 => 85%, 125 => 125%) + double client_user_time = 6; + + // X% latency percentiles (in nanoseconds) + double latency_50 = 7; + double latency_90 = 8; + double latency_95 = 9; + double latency_99 = 10; + double latency_999 = 11; + + // server cpu usage percentage + double server_cpu_usage = 12; + + // Number of requests that succeeded/failed + double successful_requests_per_second = 13; + double failed_requests_per_second = 14; + + // Number of polls called inside completion queue per request + double client_polls_per_request = 15; + double server_polls_per_request = 16; +} + +// Results of a single benchmark scenario. +message ScenarioResult { + // Inputs used to run the scenario. + Scenario scenario = 1; + // Histograms from all clients merged into one histogram. + HistogramData latencies = 2; + // Client stats for each client + repeated ClientStats client_stats = 3; + // Server stats for each server + repeated ServerStats server_stats = 4; + // Number of cores available to each server + repeated int32 server_cores = 5; + // An after-the-fact computed summary + ScenarioResultSummary summary = 6; + // Information on success or failure of each worker + repeated bool client_success = 7; + repeated bool server_success = 8; + // Number of failed requests (one row per status code seen) + repeated RequestResultCount request_results = 9; +} diff --git a/grpc-benchmark/proto/grpc/testing/messages.proto b/grpc-benchmark/proto/grpc/testing/messages.proto new file mode 100644 index 000000000..66c457308 --- /dev/null +++ b/grpc-benchmark/proto/grpc/testing/messages.proto @@ -0,0 +1,346 @@ +// Copyright 2015-2016 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Message definitions to be used by integration test service definitions. + +syntax = "proto3"; + +package grpc.testing; + +option java_package = "io.grpc.testing.integration"; + +// TODO(dgq): Go back to using well-known types once +// https://github.com/grpc/grpc/issues/6980 has been fixed. +// import "google/protobuf/wrappers.proto"; +message BoolValue { + // The bool value. + bool value = 1; +} + +// The type of payload that should be returned. +enum PayloadType { + // Compressable text format. + COMPRESSABLE = 0; +} + +// A block of data, to simply increase gRPC message size. +message Payload { + // The type of data in body. + PayloadType type = 1; + // Primary contents of payload. + bytes body = 2; +} + +// A protobuf representation for grpc status. This is used by test +// clients to specify a status that the server should attempt to return. +message EchoStatus { + int32 code = 1; + string message = 2; +} + +// The type of route that a client took to reach a server w.r.t. gRPCLB. +// The server must fill in "fallback" if it detects that the RPC reached +// the server via the "gRPCLB fallback" path, and "backend" if it detects +// that the RPC reached the server via "gRPCLB backend" path (i.e. if it got +// the address of this server from the gRPCLB server BalanceLoad RPC). Exactly +// how this detection is done is context and server dependent. +enum GrpclbRouteType { + // Server didn't detect the route that a client took to reach it. + GRPCLB_ROUTE_TYPE_UNKNOWN = 0; + // Indicates that a client reached a server via gRPCLB fallback. + GRPCLB_ROUTE_TYPE_FALLBACK = 1; + // Indicates that a client reached a server as a gRPCLB-given backend. + GRPCLB_ROUTE_TYPE_BACKEND = 2; +} + +// Unary request. +message SimpleRequest { + // Desired payload type in the response from the server. + // If response_type is RANDOM, server randomly chooses one from other formats. + PayloadType response_type = 1; + + // Desired payload size in the response from the server. + int32 response_size = 2; + + // Optional input payload sent along with the request. + Payload payload = 3; + + // Whether SimpleResponse should include username. + bool fill_username = 4; + + // Whether SimpleResponse should include OAuth scope. + bool fill_oauth_scope = 5; + + // Whether to request the server to compress the response. This field is + // "nullable" in order to interoperate seamlessly with clients not able to + // implement the full compression tests by introspecting the call to verify + // the response's compression status. + BoolValue response_compressed = 6; + + // Whether server should return a given status + EchoStatus response_status = 7; + + // Whether the server should expect this request to be compressed. + BoolValue expect_compressed = 8; + + // Whether SimpleResponse should include server_id. + bool fill_server_id = 9; + + // Whether SimpleResponse should include grpclb_route_type. + bool fill_grpclb_route_type = 10; + + // If set the server should record this metrics report data for the current RPC. + TestOrcaReport orca_per_query_report = 11; +} + +// Unary response, as configured by the request. +message SimpleResponse { + // Payload to increase message size. + Payload payload = 1; + // The user the request came from, for verifying authentication was + // successful when the client expected it. + string username = 2; + // OAuth scope. + string oauth_scope = 3; + + // Server ID. This must be unique among different server instances, + // but the same across all RPC's made to a particular server instance. + string server_id = 4; + // gRPCLB Path. + GrpclbRouteType grpclb_route_type = 5; + + // Server hostname. + string hostname = 6; +} + +// Client-streaming request. +message StreamingInputCallRequest { + // Optional input payload sent along with the request. + Payload payload = 1; + + // Whether the server should expect this request to be compressed. This field + // is "nullable" in order to interoperate seamlessly with servers not able to + // implement the full compression tests by introspecting the call to verify + // the request's compression status. + BoolValue expect_compressed = 2; + + // Not expecting any payload from the response. +} + +// Client-streaming response. +message StreamingInputCallResponse { + // Aggregated size of payloads received from the client. + int32 aggregated_payload_size = 1; +} + +// Configuration for a particular response. +message ResponseParameters { + // Desired payload sizes in responses from the server. + int32 size = 1; + + // Desired interval between consecutive responses in the response stream in + // microseconds. + int32 interval_us = 2; + + // Whether to request the server to compress the response. This field is + // "nullable" in order to interoperate seamlessly with clients not able to + // implement the full compression tests by introspecting the call to verify + // the response's compression status. + BoolValue compressed = 3; +} + +// Server-streaming request. +message StreamingOutputCallRequest { + // Desired payload type in the response from the server. + // If response_type is RANDOM, the payload from each response in the stream + // might be of different types. This is to simulate a mixed type of payload + // stream. + PayloadType response_type = 1; + + // Configuration for each expected response message. + repeated ResponseParameters response_parameters = 2; + + // Optional input payload sent along with the request. + Payload payload = 3; + + // Whether server should return a given status + EchoStatus response_status = 7; + + // If set the server should update this metrics report data at the OOB server. + TestOrcaReport orca_oob_report = 8; +} + +// Server-streaming response, as configured by the request and parameters. +message StreamingOutputCallResponse { + // Payload to increase response size. + Payload payload = 1; +} + +// For reconnect interop test only. +// Client tells server what reconnection parameters it used. +message ReconnectParams { + int32 max_reconnect_backoff_ms = 1; +} + +// For reconnect interop test only. +// Server tells client whether its reconnects are following the spec and the +// reconnect backoffs it saw. +message ReconnectInfo { + bool passed = 1; + repeated int32 backoff_ms = 2; +} + +message LoadBalancerStatsRequest { + // Request stats for the next num_rpcs sent by client. + int32 num_rpcs = 1; + // If num_rpcs have not completed within timeout_sec, return partial results. + int32 timeout_sec = 2; + // Response header + trailer metadata entries we want the values of. + // Matching of the keys is case-insensitive as per rfc7540#section-8.1.2 + // * (asterisk) is a special value that will return all metadata entries + repeated string metadata_keys = 3; +} + +message LoadBalancerStatsResponse { + enum MetadataType { + UNKNOWN = 0; + INITIAL = 1; + TRAILING = 2; + } + message MetadataEntry { + // Key, exactly as received from the server. Case may be different from what + // was requested in the LoadBalancerStatsRequest) + string key = 1; + // Value, exactly as received from the server. + string value = 2; + // Metadata type + MetadataType type = 3; + } + message RpcMetadata { + // metadata values for each rpc for the keys specified in + // LoadBalancerStatsRequest.metadata_keys. + repeated MetadataEntry metadata = 1; + } + message MetadataByPeer { + // List of RpcMetadata in for each RPC with a given peer + repeated RpcMetadata rpc_metadata = 1; + } + message RpcsByPeer { + // The number of completed RPCs for each peer. + map rpcs_by_peer = 1; + } + // The number of completed RPCs for each peer. + map rpcs_by_peer = 1; + // The number of RPCs that failed to record a remote peer. + int32 num_failures = 2; + map rpcs_by_method = 3; + // All the metadata of all RPCs for each peer. + map metadatas_by_peer = 4; +} + +// Request for retrieving a test client's accumulated stats. +message LoadBalancerAccumulatedStatsRequest {} + +// Accumulated stats for RPCs sent by a test client. +message LoadBalancerAccumulatedStatsResponse { + // The total number of RPCs have ever issued for each type. + // Deprecated: use stats_per_method.rpcs_started instead. + map num_rpcs_started_by_method = 1 [deprecated = true]; + // The total number of RPCs have ever completed successfully for each type. + // Deprecated: use stats_per_method.result instead. + map num_rpcs_succeeded_by_method = 2 [deprecated = true]; + // The total number of RPCs have ever failed for each type. + // Deprecated: use stats_per_method.result instead. + map num_rpcs_failed_by_method = 3 [deprecated = true]; + + message MethodStats { + // The number of RPCs that were started for this method. + int32 rpcs_started = 1; + + // The number of RPCs that completed with each status for this method. The + // key is the integral value of a google.rpc.Code; the value is the count. + map result = 2; + } + + // Per-method RPC statistics. The key is the RpcType in string form; e.g. + // 'EMPTY_CALL' or 'UNARY_CALL' + map stats_per_method = 4; +} + +// Configurations for a test client. +message ClientConfigureRequest { + // Type of RPCs to send. + enum RpcType { + EMPTY_CALL = 0; + UNARY_CALL = 1; + } + + // Metadata to be attached for the given type of RPCs. + message Metadata { + RpcType type = 1; + string key = 2; + string value = 3; + } + + // The types of RPCs the client sends. + repeated RpcType types = 1; + // The collection of custom metadata to be attached to RPCs sent by the client. + repeated Metadata metadata = 2; + // The deadline to use, in seconds, for all RPCs. If unset or zero, the + // client will use the default from the command-line. + int32 timeout_sec = 3; +} + +// Response for updating a test client's configuration. +message ClientConfigureResponse {} + +message MemorySize { + int64 rss = 1; +} + +// Metrics data the server will update and send to the client. It mirrors orca load report +// https://github.com/cncf/xds/blob/eded343319d09f30032952beda9840bbd3dcf7ac/xds/data/orca/v3/orca_load_report.proto#L15, +// but avoids orca dependency. Used by both per-query and out-of-band reporting tests. +message TestOrcaReport { + double cpu_utilization = 1; + double memory_utilization = 2; + map request_cost = 3; + map utilization = 4; +} + +// Status that will be return to callers of the Hook method +message SetReturnStatusRequest { + int32 grpc_code_to_return = 1; + string grpc_status_description = 2; +} + +message HookRequest { + enum HookRequestCommand { + // Default value + UNSPECIFIED = 0; + // Start the HTTP endpoint + START = 1; + // Stop + STOP = 2; + // Return from HTTP GET/POST + RETURN = 3; + } + HookRequestCommand command = 1; + int32 grpc_code_to_return = 2; + string grpc_status_description = 3; + // Server port to listen to + int32 server_port = 4; +} + +message HookResponse {} diff --git a/grpc-benchmark/proto/grpc/testing/payloads.proto b/grpc-benchmark/proto/grpc/testing/payloads.proto new file mode 100644 index 000000000..3ecd06858 --- /dev/null +++ b/grpc-benchmark/proto/grpc/testing/payloads.proto @@ -0,0 +1,44 @@ +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package grpc.testing; + +option java_multiple_files = true; +option java_outer_classname = "PayloadsProto"; +option java_package = "io.grpc.testing"; + +message ByteBufferParams { + int32 req_size = 1; + int32 resp_size = 2; +} + +message SimpleProtoParams { + int32 req_size = 1; + int32 resp_size = 2; +} + +message ComplexProtoParams { + // TODO (vpai): Fill this in once the details of complex, representative + // protos are decided +} + +message PayloadConfig { + oneof payload { + ByteBufferParams bytebuf_params = 1; + SimpleProtoParams simple_params = 2; + ComplexProtoParams complex_params = 3; + } +} diff --git a/grpc-benchmark/proto/grpc/testing/report_qps_scenario_service.proto b/grpc-benchmark/proto/grpc/testing/report_qps_scenario_service.proto new file mode 100644 index 000000000..d29cad875 --- /dev/null +++ b/grpc-benchmark/proto/grpc/testing/report_qps_scenario_service.proto @@ -0,0 +1,30 @@ +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// An integration test service that covers all the method signature permutations +// of unary/streaming requests/responses. +syntax = "proto3"; + +package grpc.testing; + +import "control.proto"; + +option java_multiple_files = true; +option java_outer_classname = "ReportQpsScenarioServiceProto"; +option java_package = "io.grpc.testing"; + +service ReportQpsScenarioService { + // Report results of a QPS test benchmark scenario. + rpc ReportScenario(ScenarioResult) returns (Void); +} diff --git a/grpc-benchmark/proto/grpc/testing/stats.proto b/grpc-benchmark/proto/grpc/testing/stats.proto new file mode 100644 index 000000000..99c999f5a --- /dev/null +++ b/grpc-benchmark/proto/grpc/testing/stats.proto @@ -0,0 +1,77 @@ +// Copyright 2015 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +syntax = "proto3"; + +package grpc.testing; + +option java_outer_classname = "Stats"; +option java_package = "io.grpc.benchmarks.proto"; + +message ServerStats { + // wall clock time change in seconds since last reset + double time_elapsed = 1; + + // change in user time (in seconds) used by the server since last reset + double time_user = 2; + + // change in server time (in seconds) used by the server process and all + // threads since last reset + double time_system = 3; + + // change in total cpu time of the server (data from proc/stat) + uint64 total_cpu_time = 4; + + // change in idle time of the server (data from proc/stat) + uint64 idle_cpu_time = 5; + + // Number of polls called inside completion queue + uint64 cq_poll_count = 6; +} + +// Histogram params based on grpc/support/histogram.c +message HistogramParams { + double resolution = 1; // first bucket is [0, 1 + resolution) + double max_possible = 2; // use enough buckets to allow this value +} + +// Histogram data based on grpc/support/histogram.c +message HistogramData { + repeated uint32 bucket = 1; + double min_seen = 2; + double max_seen = 3; + double sum = 4; + double sum_of_squares = 5; + double count = 6; +} + +message RequestResultCount { + int32 status_code = 1; + int64 count = 2; +} + +message ClientStats { + // Latency histogram. Data points are in nanoseconds. + HistogramData latencies = 1; + + // See ServerStats for details. + double time_elapsed = 2; + double time_user = 3; + double time_system = 4; + + // Number of failed requests (one row per status code seen) + repeated RequestResultCount request_results = 5; + + // Number of polls called inside completion queue + uint64 cq_poll_count = 6; +} diff --git a/grpc-benchmark/proto/grpc/testing/worker_service.proto b/grpc-benchmark/proto/grpc/testing/worker_service.proto new file mode 100644 index 000000000..b49595d95 --- /dev/null +++ b/grpc-benchmark/proto/grpc/testing/worker_service.proto @@ -0,0 +1,45 @@ +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// An integration test service that covers all the method signature permutations +// of unary/streaming requests/responses. +syntax = "proto3"; + +package grpc.testing; + +import "control.proto"; + +service WorkerService { + // Start server with specified workload. + // First request sent specifies the ServerConfig followed by ServerStatus + // response. After that, a "Mark" can be sent anytime to request the latest + // stats. Closing the stream will initiate shutdown of the test server + // and once the shutdown has finished, the OK status is sent to terminate + // this RPC. + rpc RunServer(stream ServerArgs) returns (stream ServerStatus); + + // Start client with specified workload. + // First request sent specifies the ClientConfig followed by ClientStatus + // response. After that, a "Mark" can be sent anytime to request the latest + // stats. Closing the stream will initiate shutdown of the test client + // and once the shutdown has finished, the OK status is sent to terminate + // this RPC. + rpc RunClient(stream ClientArgs) returns (stream ClientStatus); + + // Just return the core count - unary call + rpc CoreCount(CoreRequest) returns (CoreResponse); + + // Quit this worker + rpc QuitWorker(Void) returns (Void); +} diff --git a/grpc-benchmark/src/lib.rs b/grpc-benchmark/src/lib.rs new file mode 100644 index 000000000..d4b6aea7d --- /dev/null +++ b/grpc-benchmark/src/lib.rs @@ -0,0 +1,41 @@ +/* + * + * Copyright 2026 gRPC authors. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + * + */ + +#[allow(unused)] +mod generated { + pub(crate) mod worker_service { + grpc::include_proto!("worker_service", "worker_service"); + } + + pub(crate) mod benchmark_service_grpc { + grpc::include_proto!("benchmark_service/grpc", "benchmark_service"); + } + + pub(crate) mod benchmark_service_tonic { + include!(concat!( + env!("OUT_DIR"), + "/benchmark_service/tonic/grpc.testing.rs" + )); + } +} From 78f6c3d4f8adbdce69cc5f1ae529a3a0528b1208 Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Tue, 23 Jun 2026 11:41:45 +0530 Subject: [PATCH 2/8] exactly copy grpc-protos --- grpc-benchmark/Cargo.toml | 4 +- grpc-benchmark/build.rs | 35 +++++++--------- grpc-benchmark/proto/grpc/core/stats.proto | 38 +++++++++++++++++ .../grpc/testing/benchmark_service.proto | 2 +- .../proto/grpc/testing/control.proto | 41 ++++++++++++------- .../proto/grpc/testing/messages.proto | 10 ++++- .../proto/grpc/testing/payloads.proto | 2 +- .../testing/report_qps_scenario_service.proto | 6 +-- grpc-benchmark/proto/grpc/testing/stats.proto | 18 ++++++-- .../proto/grpc/testing/worker_service.proto | 6 ++- grpc-benchmark/src/lib.rs | 9 ++-- 11 files changed, 117 insertions(+), 54 deletions(-) create mode 100644 grpc-benchmark/proto/grpc/core/stats.proto diff --git a/grpc-benchmark/Cargo.toml b/grpc-benchmark/Cargo.toml index 96e72d85c..701eb60ec 100644 --- a/grpc-benchmark/Cargo.toml +++ b/grpc-benchmark/Cargo.toml @@ -1,10 +1,8 @@ [package] name = "grpc-benchmark" -version = "0.0.0-alpha" edition = "2024" authors = ["gRPC Authors"] license = "MIT" -publish = false rust-version = { workspace = true } [dependencies] @@ -12,9 +10,11 @@ grpc = { path = "../grpc" } grpc-protobuf = { path = "../grpc-protobuf" } prost = "0.14" protobuf = { version = "4.34.0-release" } +protobuf-well-known-types = { version = "4.34.0-release" } tonic = { path = "../tonic" } tonic-prost = { path = "../tonic-prost" } [build-dependencies] tonic-prost-build = { path = "../tonic-prost-build" } grpc-protobuf-build = { path = "../grpc-protobuf-build" } +protobuf-well-known-types = { version = "4.34.0-release" } diff --git a/grpc-benchmark/build.rs b/grpc-benchmark/build.rs index 96c422be3..0b21771a2 100644 --- a/grpc-benchmark/build.rs +++ b/grpc-benchmark/build.rs @@ -27,39 +27,34 @@ use std::path::PathBuf; fn main() { let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap()); + let dependencies = protobuf_well_known_types::get_dependency("protobuf_well_known_types") + .into_iter() + .map(|d| d.into()) + .collect(); grpc_protobuf_build::CodeGen::new() - .include("proto/grpc/testing") + .include("proto") .inputs([ - "worker_service.proto", - "control.proto", - "payloads.proto", - "stats.proto", + "grpc/testing/worker_service.proto", + "grpc/testing/benchmark_service.proto", + "grpc/testing/messages.proto", + "grpc/testing/control.proto", + "grpc/testing/payloads.proto", + "grpc/testing/stats.proto", + "grpc/core/stats.proto", ]) - .output_dir(out_dir.join("worker_service")) + .dependencies(dependencies) .client_only() .compile() .unwrap(); - let behchmark_service_grpc = out_dir.join("benchmark_service/grpc"); - let behchmark_service_tonic = out_dir.join("benchmark_service/tonic"); - - grpc_protobuf_build::CodeGen::new() - .include("proto/grpc/testing") - .inputs(["benchmark_service.proto", "messages.proto"]) - .output_dir(behchmark_service_grpc) - .client_only() - .compile() - .unwrap(); + let behchmark_service_tonic = out_dir.join("tonic"); // TODO: Use gRPC server when available. let _ = std::fs::create_dir(behchmark_service_tonic.clone()); tonic_prost_build::configure() .out_dir(behchmark_service_tonic) .build_client(false) - .compile_protos( - &["proto/grpc/testing/benchmark_service.proto"], - &["proto/grpc/testing/"], - ) + .compile_protos(&["proto/grpc/testing/benchmark_service.proto"], &["proto"]) .unwrap(); } diff --git a/grpc-benchmark/proto/grpc/core/stats.proto b/grpc-benchmark/proto/grpc/core/stats.proto new file mode 100644 index 000000000..ac181b043 --- /dev/null +++ b/grpc-benchmark/proto/grpc/core/stats.proto @@ -0,0 +1,38 @@ +// Copyright 2017 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package grpc.core; + +message Bucket { + double start = 1; + uint64 count = 2; +} + +message Histogram { + repeated Bucket buckets = 1; +} + +message Metric { + string name = 1; + oneof value { + uint64 count = 10; + Histogram histogram = 11; + } +} + +message Stats { + repeated Metric metrics = 1; +} diff --git a/grpc-benchmark/proto/grpc/testing/benchmark_service.proto b/grpc-benchmark/proto/grpc/testing/benchmark_service.proto index 661269f8d..feddee7de 100644 --- a/grpc-benchmark/proto/grpc/testing/benchmark_service.proto +++ b/grpc-benchmark/proto/grpc/testing/benchmark_service.proto @@ -18,7 +18,7 @@ syntax = "proto3"; package grpc.testing; -import "messages.proto"; +import "grpc/testing/messages.proto"; option java_multiple_files = true; option java_outer_classname = "BenchmarkServiceProto"; diff --git a/grpc-benchmark/proto/grpc/testing/control.proto b/grpc-benchmark/proto/grpc/testing/control.proto index a8e543de0..e309e5f9c 100644 --- a/grpc-benchmark/proto/grpc/testing/control.proto +++ b/grpc-benchmark/proto/grpc/testing/control.proto @@ -14,14 +14,15 @@ syntax = "proto3"; -package grpc.testing; +import "grpc/testing/payloads.proto"; +import "grpc/testing/stats.proto"; +import "google/protobuf/timestamp.proto"; -import "payloads.proto"; -import "stats.proto"; +package grpc.testing; option java_multiple_files = true; -option java_outer_classname = "ControlProto"; option java_package = "io.grpc.testing"; +option java_outer_classname = "ControlProto"; enum ClientType { // Many languages support a basic distinction between using @@ -63,7 +64,7 @@ message LoadParams { oneof load { ClosedLoopParams closed_loop = 1; PoissonParams poisson = 2; - } + }; } // presence of SecurityParams implies use of TLS @@ -126,9 +127,7 @@ message ClientConfig { int32 client_processes = 21; } -message ClientStatus { - ClientStats stats = 1; -} +message ClientStatus { ClientStats stats = 1; } // Request current stats message Mark { @@ -192,14 +191,16 @@ message ServerStatus { int32 cores = 3; } -message CoreRequest {} +message CoreRequest { +} message CoreResponse { // Number of cores available on the server int32 cores = 1; } -message Void {} +message Void { +} // A single performance scenario: input to qps_json_driver message Scenario { @@ -228,16 +229,17 @@ message Scenarios { // Basic summary that can be computed from ClientStats and ServerStats // once the scenario has finished. -message ScenarioResultSummary { +message ScenarioResultSummary +{ // Total number of operations per second over all clients. What is counted as 1 'operation' depends on the benchmark scenarios: - // For unary benchmarks, an operation is processing of a single unary RPC. - // For streaming benchmarks, an operation is processing of a single ping pong of request and response. + // For unary benchmarks, an operation is processing of a single unary RPC. + // For streaming benchmarks, an operation is processing of a single ping pong of request and response. double qps = 1; // QPS per server core. double qps_per_server_core = 2; // The total server cpu load based on system time across all server processes, expressed as percentage of a single cpu core. - // For example, 85 implies 85% of a cpu core, 125 implies 125% of a cpu core. Since we are accumulating the cpu load across all the server - // processes, the value could > 100 when there are multiple servers or a single server using multiple threads and cores. + // For example, 85 implies 85% of a cpu core, 125 implies 125% of a cpu core. Since we are accumulating the cpu load across all the server + // processes, the value could > 100 when there are multiple servers or a single server using multiple threads and cores. // Same explanation for the total client cpu load below. double server_system_time = 3; // The total server cpu load based on user time across all server processes, expressed as percentage of a single cpu core. (85 => 85%, 125 => 125%) @@ -264,6 +266,15 @@ message ScenarioResultSummary { // Number of polls called inside completion queue per request double client_polls_per_request = 15; double server_polls_per_request = 16; + + // Queries per CPU-sec over all servers or clients + double server_queries_per_cpu_sec = 17; + double client_queries_per_cpu_sec = 18; + + + // Start and end time for the test scenario + google.protobuf.Timestamp start_time = 19; + google.protobuf.Timestamp end_time =20; } // Results of a single benchmark scenario. diff --git a/grpc-benchmark/proto/grpc/testing/messages.proto b/grpc-benchmark/proto/grpc/testing/messages.proto index 66c457308..93979bf4f 100644 --- a/grpc-benchmark/proto/grpc/testing/messages.proto +++ b/grpc-benchmark/proto/grpc/testing/messages.proto @@ -158,6 +158,10 @@ message ResponseParameters { // implement the full compression tests by introspecting the call to verify // the response's compression status. BoolValue compressed = 3; + + // Whether to request the server to send the requesting peer's socket + // address in the response. + BoolValue fill_peer_socket_address = 4; } // Server-streaming request. @@ -185,6 +189,9 @@ message StreamingOutputCallRequest { message StreamingOutputCallResponse { // Payload to increase response size. Payload payload = 1; + + // The peer's socket address if requested. + string peer_socket_address = 2; } // For reconnect interop test only. @@ -343,4 +350,5 @@ message HookRequest { int32 server_port = 4; } -message HookResponse {} +message HookResponse { +} diff --git a/grpc-benchmark/proto/grpc/testing/payloads.proto b/grpc-benchmark/proto/grpc/testing/payloads.proto index 3ecd06858..8cbc9db6c 100644 --- a/grpc-benchmark/proto/grpc/testing/payloads.proto +++ b/grpc-benchmark/proto/grpc/testing/payloads.proto @@ -17,8 +17,8 @@ syntax = "proto3"; package grpc.testing; option java_multiple_files = true; -option java_outer_classname = "PayloadsProto"; option java_package = "io.grpc.testing"; +option java_outer_classname = "PayloadsProto"; message ByteBufferParams { int32 req_size = 1; diff --git a/grpc-benchmark/proto/grpc/testing/report_qps_scenario_service.proto b/grpc-benchmark/proto/grpc/testing/report_qps_scenario_service.proto index d29cad875..77b243aac 100644 --- a/grpc-benchmark/proto/grpc/testing/report_qps_scenario_service.proto +++ b/grpc-benchmark/proto/grpc/testing/report_qps_scenario_service.proto @@ -16,13 +16,13 @@ // of unary/streaming requests/responses. syntax = "proto3"; -package grpc.testing; +import "grpc/testing/control.proto"; -import "control.proto"; +package grpc.testing; option java_multiple_files = true; -option java_outer_classname = "ReportQpsScenarioServiceProto"; option java_package = "io.grpc.testing"; +option java_outer_classname = "ReportQpsScenarioServiceProto"; service ReportQpsScenarioService { // Report results of a QPS test benchmark scenario. diff --git a/grpc-benchmark/proto/grpc/testing/stats.proto b/grpc-benchmark/proto/grpc/testing/stats.proto index 99c999f5a..1f0fae4e5 100644 --- a/grpc-benchmark/proto/grpc/testing/stats.proto +++ b/grpc-benchmark/proto/grpc/testing/stats.proto @@ -1,4 +1,4 @@ -// Copyright 2015 The gRPC Authors +// Copyright 2015 gRPC authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -11,12 +11,16 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + syntax = "proto3"; package grpc.testing; -option java_outer_classname = "Stats"; -option java_package = "io.grpc.benchmarks.proto"; +import "grpc/core/stats.proto"; + +option java_multiple_files = true; +option java_package = "io.grpc.testing"; +option java_outer_classname = "StatsProto"; message ServerStats { // wall clock time change in seconds since last reset @@ -37,11 +41,14 @@ message ServerStats { // Number of polls called inside completion queue uint64 cq_poll_count = 6; + + // Core library stats + grpc.core.Stats core_stats = 7; } // Histogram params based on grpc/support/histogram.c message HistogramParams { - double resolution = 1; // first bucket is [0, 1 + resolution) + double resolution = 1; // first bucket is [0, 1 + resolution) double max_possible = 2; // use enough buckets to allow this value } @@ -74,4 +81,7 @@ message ClientStats { // Number of polls called inside completion queue uint64 cq_poll_count = 6; + + // Core library stats + grpc.core.Stats core_stats = 7; } diff --git a/grpc-benchmark/proto/grpc/testing/worker_service.proto b/grpc-benchmark/proto/grpc/testing/worker_service.proto index b49595d95..ff3aa2931 100644 --- a/grpc-benchmark/proto/grpc/testing/worker_service.proto +++ b/grpc-benchmark/proto/grpc/testing/worker_service.proto @@ -16,9 +16,13 @@ // of unary/streaming requests/responses. syntax = "proto3"; +import "grpc/testing/control.proto"; + package grpc.testing; -import "control.proto"; +option java_multiple_files = true; +option java_package = "io.grpc.testing"; +option java_outer_classname = "WorkerServiceProto"; service WorkerService { // Start server with specified workload. diff --git a/grpc-benchmark/src/lib.rs b/grpc-benchmark/src/lib.rs index d4b6aea7d..8e4490db3 100644 --- a/grpc-benchmark/src/lib.rs +++ b/grpc-benchmark/src/lib.rs @@ -25,17 +25,14 @@ #[allow(unused)] mod generated { pub(crate) mod worker_service { - grpc::include_proto!("worker_service", "worker_service"); + grpc::include_proto!("grpc/testing", "worker_service"); } pub(crate) mod benchmark_service_grpc { - grpc::include_proto!("benchmark_service/grpc", "benchmark_service"); + grpc::include_proto!("grpc/testing", "benchmark_service"); } pub(crate) mod benchmark_service_tonic { - include!(concat!( - env!("OUT_DIR"), - "/benchmark_service/tonic/grpc.testing.rs" - )); + include!(concat!(env!("OUT_DIR"), "/tonic/grpc.testing.rs")); } } From df57997685429e2827e74a4e0d8aa1baacb00df8 Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Tue, 23 Jun 2026 13:17:17 +0530 Subject: [PATCH 3/8] use tonic worker server --- grpc-benchmark/Cargo.toml | 3 +-- grpc-benchmark/build.rs | 27 +++++++++++---------------- grpc-benchmark/src/lib.rs | 15 +++++++++------ 3 files changed, 21 insertions(+), 24 deletions(-) diff --git a/grpc-benchmark/Cargo.toml b/grpc-benchmark/Cargo.toml index 701eb60ec..acaab30e4 100644 --- a/grpc-benchmark/Cargo.toml +++ b/grpc-benchmark/Cargo.toml @@ -9,12 +9,11 @@ rust-version = { workspace = true } grpc = { path = "../grpc" } grpc-protobuf = { path = "../grpc-protobuf" } prost = "0.14" +prost-types = "0.14" protobuf = { version = "4.34.0-release" } -protobuf-well-known-types = { version = "4.34.0-release" } tonic = { path = "../tonic" } tonic-prost = { path = "../tonic-prost" } [build-dependencies] tonic-prost-build = { path = "../tonic-prost-build" } grpc-protobuf-build = { path = "../grpc-protobuf-build" } -protobuf-well-known-types = { version = "4.34.0-release" } diff --git a/grpc-benchmark/build.rs b/grpc-benchmark/build.rs index 0b21771a2..48d4d904c 100644 --- a/grpc-benchmark/build.rs +++ b/grpc-benchmark/build.rs @@ -27,34 +27,29 @@ use std::path::PathBuf; fn main() { let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap()); - let dependencies = protobuf_well_known_types::get_dependency("protobuf_well_known_types") - .into_iter() - .map(|d| d.into()) - .collect(); - grpc_protobuf_build::CodeGen::new() .include("proto") .inputs([ - "grpc/testing/worker_service.proto", "grpc/testing/benchmark_service.proto", "grpc/testing/messages.proto", - "grpc/testing/control.proto", - "grpc/testing/payloads.proto", - "grpc/testing/stats.proto", - "grpc/core/stats.proto", ]) - .dependencies(dependencies) .client_only() .compile() .unwrap(); - let behchmark_service_tonic = out_dir.join("tonic"); + let services_tonic = out_dir.join("tonic"); - // TODO: Use gRPC server when available. - let _ = std::fs::create_dir(behchmark_service_tonic.clone()); + // TODO: Use gRPC servers when available. + let _ = std::fs::create_dir(services_tonic.clone()); tonic_prost_build::configure() - .out_dir(behchmark_service_tonic) + .out_dir(services_tonic) .build_client(false) - .compile_protos(&["proto/grpc/testing/benchmark_service.proto"], &["proto"]) + .compile_protos( + &[ + "grpc/testing/benchmark_service.proto", + "grpc/testing/worker_service.proto", + ], + &["proto"], + ) .unwrap(); } diff --git a/grpc-benchmark/src/lib.rs b/grpc-benchmark/src/lib.rs index 8e4490db3..7d648bce2 100644 --- a/grpc-benchmark/src/lib.rs +++ b/grpc-benchmark/src/lib.rs @@ -24,15 +24,18 @@ #[allow(unused)] mod generated { - pub(crate) mod worker_service { - grpc::include_proto!("grpc/testing", "worker_service"); - } - pub(crate) mod benchmark_service_grpc { grpc::include_proto!("grpc/testing", "benchmark_service"); } - pub(crate) mod benchmark_service_tonic { - include!(concat!(env!("OUT_DIR"), "/tonic/grpc.testing.rs")); + pub(crate) mod services { + pub(crate) mod grpc { + pub(crate) mod core { + include!(concat!(env!("OUT_DIR"), "/tonic/grpc.core.rs")); + } + pub(crate) mod testing { + include!(concat!(env!("OUT_DIR"), "/tonic/grpc.testing.rs")); + } + } } } From c55fa212cae8b4760f5bb4271fcf2861c6d107e9 Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Fri, 26 Jun 2026 13:14:46 +0530 Subject: [PATCH 4/8] worker and server implementation --- grpc-benchmark/Cargo.toml | 6 +- grpc-benchmark/src/lib.rs | 15 ++- grpc-benchmark/src/main.rs | 94 ++++++++++++++ grpc-benchmark/src/server.rs | 236 +++++++++++++++++++++++++++++++++++ grpc-benchmark/src/worker.rs | 155 +++++++++++++++++++++++ 5 files changed, 499 insertions(+), 7 deletions(-) create mode 100644 grpc-benchmark/src/main.rs create mode 100644 grpc-benchmark/src/server.rs create mode 100644 grpc-benchmark/src/worker.rs diff --git a/grpc-benchmark/Cargo.toml b/grpc-benchmark/Cargo.toml index acaab30e4..de7581627 100644 --- a/grpc-benchmark/Cargo.toml +++ b/grpc-benchmark/Cargo.toml @@ -6,12 +6,16 @@ license = "MIT" rust-version = { workspace = true } [dependencies] +async-stream = "0.3" grpc = { path = "../grpc" } grpc-protobuf = { path = "../grpc-protobuf" } +nix = { version = "0.31.3", features = ["resource"] } prost = "0.14" prost-types = "0.14" protobuf = { version = "4.34.0-release" } -tonic = { path = "../tonic" } +tokio = { version = "1.37.0", features = ["sync", "macros"] } +tokio-stream = "0.1.18" +tonic = { path = "../tonic", features = ["tls-aws-lc"] } tonic-prost = { path = "../tonic-prost" } [build-dependencies] diff --git a/grpc-benchmark/src/lib.rs b/grpc-benchmark/src/lib.rs index 7d648bce2..f420adae3 100644 --- a/grpc-benchmark/src/lib.rs +++ b/grpc-benchmark/src/lib.rs @@ -23,19 +23,22 @@ */ #[allow(unused)] -mod generated { - pub(crate) mod benchmark_service_grpc { +pub mod generated { + pub mod benchmark_service_grpc { grpc::include_proto!("grpc/testing", "benchmark_service"); } - pub(crate) mod services { - pub(crate) mod grpc { - pub(crate) mod core { + pub mod services { + pub mod grpc { + pub mod core { include!(concat!(env!("OUT_DIR"), "/tonic/grpc.core.rs")); } - pub(crate) mod testing { + pub mod testing { include!(concat!(env!("OUT_DIR"), "/tonic/grpc.testing.rs")); } } } } + +mod server; +pub mod worker; diff --git a/grpc-benchmark/src/main.rs b/grpc-benchmark/src/main.rs new file mode 100644 index 000000000..c9518633e --- /dev/null +++ b/grpc-benchmark/src/main.rs @@ -0,0 +1,94 @@ +/* + * + * Copyright 2026 gRPC authors. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + * + */ + +use std::env; +use std::process; +use std::time::Duration; + +use grpc_benchmark::generated::services::grpc::testing::worker_service_server::WorkerServiceServer; +use grpc_benchmark::worker::WorkerServer; +use tokio::sync::mpsc; +use tokio::time; +use tonic::transport::Server; + +#[derive(Debug)] +struct Args { + /// Port to expose grpc.testing.WorkerService, Used by driver to initiate + /// work. + driver_port: u16, +} + +async fn run_worker(args: Args) -> Result<(), Box> { + let addr = format!("0.0.0.0:{}", args.driver_port).parse().unwrap(); + let (tx, mut rx) = mpsc::channel(1); + + let svc = WorkerServiceServer::new(WorkerServer::new(tx)); + + Server::builder() + .add_service(svc) + .serve_with_shutdown(addr, async { + rx.recv().await; + // Wait for the quit_worker response to be sent. + time::sleep(Duration::from_secs(1)).await; + }) + .await?; + + Ok(()) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + // The default Tokio runtime uses 1 thread per logical processor. While the + // testing framework supports specifying the thread count in the test config, + // the tests that run on k8s use specific machine sizes and don't depend on + // the clients/servers to restrict their resource usage. Tokio doesn't + // support nested runtimes, so adding support for per test thread config + // is not presently supported. + + let mut driver_port = None; + + // Skip the first argument (the binary name itself). + for arg in env::args().skip(1) { + if let Some(port_str) = arg.strip_prefix("--driver_port=") { + driver_port = Some(port_str.parse::().unwrap_or_else(|_| { + eprintln!("Error: --driver_port must be a valid u16 integer."); + process::exit(1); + })); + } else { + eprintln!("Warning: Unrecognized argument '{}'", arg); + } + } + + let Some(dp) = driver_port else { + eprintln!("Usage: worker --driver_port="); + process::exit(1); + }; + + let args = Args { driver_port: dp }; + + println!("{:?}", args); + run_worker(args).await?; + + Ok(()) +} diff --git a/grpc-benchmark/src/server.rs b/grpc-benchmark/src/server.rs new file mode 100644 index 000000000..d69e24131 --- /dev/null +++ b/grpc-benchmark/src/server.rs @@ -0,0 +1,236 @@ +/* + * + * Copyright 2026 gRPC authors. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + * + */ + +use std::pin::Pin; +use std::sync::Arc; +use std::time::Instant; + +use nix::sys::resource::Usage; +use nix::sys::resource::UsageWho; +use nix::sys::resource::getrusage; +use nix::sys::time::TimeValLike; +use tokio::sync::Notify; +use tokio_stream::Stream; +use tokio_stream::StreamExt; +use tonic::Request; +use tonic::Response; +use tonic::Status; +use tonic::Streaming; +use tonic::transport::Identity; +use tonic::transport::Server; +use tonic::transport::ServerTlsConfig; + +use crate::generated::services::grpc::testing::Payload; +use crate::generated::services::grpc::testing::PayloadType; +use crate::generated::services::grpc::testing::ServerConfig; +use crate::generated::services::grpc::testing::ServerStats; +use crate::generated::services::grpc::testing::SimpleProtoParams; +use crate::generated::services::grpc::testing::SimpleRequest; +use crate::generated::services::grpc::testing::SimpleResponse; +use crate::generated::services::grpc::testing::benchmark_service_server::BenchmarkService; +use crate::generated::services::grpc::testing::benchmark_service_server::BenchmarkServiceServer; +use crate::generated::services::grpc::testing::payload_config::Payload::BytebufParams; +use crate::generated::services::grpc::testing::payload_config::Payload::ComplexParams; +use crate::generated::services::grpc::testing::payload_config::Payload::SimpleParams; + +const DEFAULT_PORT: usize = 50055; +const SERVER_PEM: &[u8] = include_bytes!("../data/tls/server.pem"); +const SERVER_KEY: &[u8] = include_bytes!("../data/tls/server.key"); + +pub struct BenchmarkServer { + last_reset_time: Instant, + last_rusage: Usage, + shutdown_notify: Arc, + pub port: usize, +} + +impl BenchmarkServer { + pub fn start(config: ServerConfig) -> Result { + println!("Starting benchmark server with config: {:?}", config); + + let mut server_builder = Server::builder(); + // Parse security config. + if let Some(securit_params) = config.security_params { + let tls_config = if securit_params.use_test_ca { + ServerTlsConfig::new().identity(Identity::from_pem(SERVER_PEM, SERVER_KEY)) + } else { + ServerTlsConfig::new() + }; + server_builder = server_builder.tls_config(tls_config).map_err(|err| { + Status::invalid_argument(format!("failed to create TLS config: {err}")) + })?; + }; + + // Parse payload config. + let payload_type = match config.payload_config { + Some(payload_config) => payload_config.payload.ok_or(Status::invalid_argument( + "payload missing in payload_config", + ))?, + None => SimpleParams(SimpleProtoParams::default()), + }; + + let router = match payload_type { + BytebufParams(_) | ComplexParams(_) => { + return Err(Status::unimplemented("codec not implemented.")); + } + SimpleParams(_) => { + let server = BenchmarkServiceServer::new(ProtoServer {}); + server_builder.add_service(server) + } + }; + + let shutdown_notify = Arc::new(Notify::new()); + let shutdown_notify_copy = shutdown_notify.clone(); + let port = if config.port > 0 { + config.port as usize + } else { + DEFAULT_PORT + }; + let addr = format!("[::]:{}", port).parse().unwrap(); + tokio::spawn(router.serve_with_shutdown(addr, async move { + shutdown_notify_copy.notified().await; + println!("BenchmarkServer is shutting down.") + })); + + Ok(BenchmarkServer { + last_reset_time: Instant::now(), + last_rusage: getrusage(UsageWho::RUSAGE_SELF).map_err(|err| { + Status::internal(format!("failed to query system resource usage: {err}")) + })?, + shutdown_notify, + port, + }) + } + + pub fn get_stats(&mut self, reset: bool) -> Result { + let now = Instant::now(); + let wall_time_elapsed = now.duration_since(self.last_reset_time); + let latest_rusage = getrusage(UsageWho::RUSAGE_SELF).map_err(|err| { + Status::internal(format!("failed to query system resource usage: {err}")) + })?; + let user_time = latest_rusage.user_time() - self.last_rusage.user_time(); + let system_time = latest_rusage.system_time() - self.last_rusage.system_time(); + + if reset { + self.last_rusage = latest_rusage; + self.last_reset_time = now; + } + + Ok(ServerStats { + time_elapsed: wall_time_elapsed.as_nanos() as f64 / 1e9, + time_user: user_time.num_nanoseconds() as f64 / 1e9, + time_system: system_time.num_nanoseconds() as f64 / 1e9, + // The following fields are not set by Java and Go. + idle_cpu_time: 0, + cq_poll_count: 0, + total_cpu_time: 0, + core_stats: None, + }) + } +} + +#[derive(Debug)] +struct ProtoServer {} + +#[tonic::async_trait] +impl BenchmarkService for ProtoServer { + async fn unary_call( + &self, + request: Request, + ) -> Result, Status> { + Ok(Response::new(SimpleResponse { + payload: Some(Payload { + r#type: PayloadType::Compressable as i32, + body: vec![0; request.into_inner().response_size as usize], + }), + username: String::new(), + oauth_scope: String::new(), + server_id: String::new(), + grpclb_route_type: 0, + hostname: String::new(), + })) + } + + type StreamingCallStream = + Pin> + Send + 'static>>; + + async fn streaming_call( + &self, + request: Request>, + ) -> Result, Status> { + let mut inbound = request.into_inner(); + + let output = async_stream::try_stream! { + while let Some(simple_request) = inbound.next().await { + let request = simple_request?; + yield SimpleResponse { + payload: Some(Payload { + r#type: PayloadType::Compressable as i32, + body: vec![0; request.response_size as usize], + }), + username: String::new(), + oauth_scope: String::new(), + server_id: String::new(), + grpclb_route_type: 0, + hostname: String::new(), + }; + } + }; + + Ok(Response::new(Box::pin(output) as Self::StreamingCallStream)) + } + + async fn streaming_from_client( + &self, + _request: tonic::Request>, + ) -> Result, Status> { + Err(Status::unimplemented("method unimplemented")) + } + + type StreamingFromServerStream = + Pin> + Send + 'static>>; + + async fn streaming_from_server( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("method unimplemented")) + } + + type StreamingBothWaysStream = + Pin> + Send + 'static>>; + + async fn streaming_both_ways( + &self, + _request: Request>, + ) -> Result, Status> { + Err(Status::unimplemented("method unimplemented")) + } +} + +impl Drop for BenchmarkServer { + fn drop(&mut self) { + self.shutdown_notify.notify_one(); + } +} diff --git a/grpc-benchmark/src/worker.rs b/grpc-benchmark/src/worker.rs new file mode 100644 index 000000000..09a0e95d9 --- /dev/null +++ b/grpc-benchmark/src/worker.rs @@ -0,0 +1,155 @@ +/* + * + * Copyright 2026 gRPC authors. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + * + */ + +use std::pin::Pin; +use std::result::Result; +use std::thread::available_parallelism; + +use tokio::sync::mpsc; +use tokio_stream::Stream; +use tokio_stream::StreamExt; +use tonic::Request; +use tonic::Response; +use tonic::Status; +use tonic::Streaming; + +use crate::generated::services::grpc::testing::ClientArgs; +use crate::generated::services::grpc::testing::ClientStatus; +use crate::generated::services::grpc::testing::CoreRequest; +use crate::generated::services::grpc::testing::CoreResponse; +use crate::generated::services::grpc::testing::ServerArgs; +use crate::generated::services::grpc::testing::ServerStatus; +use crate::generated::services::grpc::testing::Void; +use crate::generated::services::grpc::testing::server_args::Argtype; +use crate::generated::services::grpc::testing::worker_service_server::WorkerService; +use crate::server::BenchmarkServer; + +pub struct WorkerServer { + shutdown_channel: mpsc::Sender<()>, +} + +impl WorkerServer { + pub fn new(shutdown_channel: mpsc::Sender<()>) -> Self { + WorkerServer { shutdown_channel } + } +} + +fn core_count() -> Result { + let cores = available_parallelism() + .map_err(|e| Status::internal(format!("failed to determine core count: {e}")))? + .get() as i32; + + Ok(cores) +} + +#[tonic::async_trait] +impl WorkerService for WorkerServer { + // Server streaming response type for the RunServer method. + type RunServerStream = + Pin> + Send + 'static>>; + + async fn run_server( + &self, + request: Request>, + ) -> Result, Status> { + println!("Handling server stream."); + let mut stream = request.into_inner(); + + let output = async_stream::try_stream! { + let mut benchmark_server: Option = None; + + while let Some(request) = stream.next().await { + let request = request?; + let mut reset_stats = false; + + let argtype = request.argtype + .ok_or_else(|| Status::invalid_argument("missing request.argtype"))?; + + match argtype { + Argtype::Setup(server_config) => { + println!("Server creation requested."); + + if benchmark_server.take().is_some() { + eprintln!("Server setup received when server already exists, shutting down the existing server"); + } + + let server = BenchmarkServer::start(server_config).map_err(|status| { + println!("Error while creating server: {:?}", status); + status + })?; + + benchmark_server = Some(server); + } + Argtype::Mark(mark) => { + println!("Server stats requested."); + + benchmark_server.as_ref().ok_or_else(|| { + Status::invalid_argument("server does not exist when mark received") + })?; + + reset_stats = mark.reset; + } + }; + + let server = benchmark_server.as_mut().unwrap(); + let stats = server.get_stats(reset_stats)?; + + yield ServerStatus { + stats: Some(stats), + cores: core_count()?, + port: server.port as i32, + }; + } + }; + + Ok(Response::new(Box::pin(output) as Self::RunServerStream)) + } + + type RunClientStream = + Pin> + Send + 'static>>; + + async fn run_client( + &self, + _request: Request>, + ) -> Result, Status> { + Err(Status::unimplemented("")) + } + + async fn core_count( + &self, + _request: Request, + ) -> Result, Status> { + Ok(Response::new(CoreResponse { + cores: core_count()?, + })) + } + + async fn quit_worker(&self, _request: Request) -> Result, Status> { + self.shutdown_channel + .send(()) + .await + .map(|_| Response::new(Void {})) + .map_err(|err| Status::internal(format!("failed to stop server: {err}"))) + } +} From b9bce39aed4ed46fbe5800b65a2374cfae49fd8d Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Fri, 26 Jun 2026 13:51:05 +0530 Subject: [PATCH 5/8] fix non-linux builds --- grpc-benchmark/src/lib.rs | 1 + grpc-benchmark/src/main.rs | 67 ++++++++++++++++++-------------------- 2 files changed, 33 insertions(+), 35 deletions(-) diff --git a/grpc-benchmark/src/lib.rs b/grpc-benchmark/src/lib.rs index f420adae3..0a5b02534 100644 --- a/grpc-benchmark/src/lib.rs +++ b/grpc-benchmark/src/lib.rs @@ -21,6 +21,7 @@ * IN THE SOFTWARE. * */ +#![cfg(target_os = "linux")] #[allow(unused)] pub mod generated { diff --git a/grpc-benchmark/src/main.rs b/grpc-benchmark/src/main.rs index c9518633e..f2090fba4 100644 --- a/grpc-benchmark/src/main.rs +++ b/grpc-benchmark/src/main.rs @@ -22,41 +22,36 @@ * */ -use std::env; -use std::process; -use std::time::Duration; +#[cfg(target_os = "linux")] +mod app { + use std::time::Duration; -use grpc_benchmark::generated::services::grpc::testing::worker_service_server::WorkerServiceServer; -use grpc_benchmark::worker::WorkerServer; -use tokio::sync::mpsc; -use tokio::time; -use tonic::transport::Server; + use grpc_benchmark::generated::services::grpc::testing::worker_service_server::WorkerServiceServer; + use grpc_benchmark::worker::WorkerServer; + use tokio::sync::mpsc; + use tokio::time; + use tonic::transport::Server; -#[derive(Debug)] -struct Args { - /// Port to expose grpc.testing.WorkerService, Used by driver to initiate - /// work. - driver_port: u16, -} - -async fn run_worker(args: Args) -> Result<(), Box> { - let addr = format!("0.0.0.0:{}", args.driver_port).parse().unwrap(); - let (tx, mut rx) = mpsc::channel(1); + pub async fn run_worker(worker_port: u16) -> Result<(), Box> { + let addr = format!("0.0.0.0:{}", worker_port).parse().unwrap(); + let (tx, mut rx) = mpsc::channel(1); - let svc = WorkerServiceServer::new(WorkerServer::new(tx)); + let svc = WorkerServiceServer::new(WorkerServer::new(tx)); - Server::builder() - .add_service(svc) - .serve_with_shutdown(addr, async { - rx.recv().await; - // Wait for the quit_worker response to be sent. - time::sleep(Duration::from_secs(1)).await; - }) - .await?; + Server::builder() + .add_service(svc) + .serve_with_shutdown(addr, async { + rx.recv().await; + // Wait for the quit_worker response to be sent. + time::sleep(Duration::from_secs(1)).await; + }) + .await?; - Ok(()) + Ok(()) + } } +#[cfg(target_os = "linux")] #[tokio::main] async fn main() -> Result<(), Box> { // The default Tokio runtime uses 1 thread per logical processor. While the @@ -69,11 +64,11 @@ async fn main() -> Result<(), Box> { let mut driver_port = None; // Skip the first argument (the binary name itself). - for arg in env::args().skip(1) { + for arg in std::env::args().skip(1) { if let Some(port_str) = arg.strip_prefix("--driver_port=") { driver_port = Some(port_str.parse::().unwrap_or_else(|_| { eprintln!("Error: --driver_port must be a valid u16 integer."); - process::exit(1); + std::process::exit(1); })); } else { eprintln!("Warning: Unrecognized argument '{}'", arg); @@ -82,13 +77,15 @@ async fn main() -> Result<(), Box> { let Some(dp) = driver_port else { eprintln!("Usage: worker --driver_port="); - process::exit(1); + std::process::exit(1); }; - let args = Args { driver_port: dp }; - - println!("{:?}", args); - run_worker(args).await?; + app::run_worker(dp).await?; Ok(()) } + +#[cfg(not(target_os = "linux"))] +fn main() { + println!("This benchmark worker is only supported on Linux."); +} From 16a11dd768a5fbeddcd9aea99822081ed192b402 Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Mon, 29 Jun 2026 16:25:25 +0530 Subject: [PATCH 6/8] fix merge --- grpc-benchmark/data/tls/server.key | 28 ---------------------------- grpc-benchmark/data/tls/server.pem | 22 ---------------------- 2 files changed, 50 deletions(-) delete mode 100644 grpc-benchmark/data/tls/server.key delete mode 100644 grpc-benchmark/data/tls/server.pem diff --git a/grpc-benchmark/data/tls/server.key b/grpc-benchmark/data/tls/server.key deleted file mode 100644 index 086462992..000000000 --- a/grpc-benchmark/data/tls/server.key +++ /dev/null @@ -1,28 +0,0 @@ ------BEGIN PRIVATE KEY----- -MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDnE443EknxvxBq -6+hvn/t09hl8hx366EBYvZmVM/NC+7igXRAjiJiA/mIaCvL3MS0Iz5hBLxSGICU+ -WproA3GCIFITIwcf/ETyWj/5xpgZ4AKrLrjQmmX8mhwUajfF3UvwMJrCOVqPp67t -PtP+2kBXaqrXdvnvXR41FsIB8V7zIAuIZB6bHQhiGVlc1sgZYsE2EGG9WMmHtS86 -qkAOTjG2XyjmPTGAwhGDpYkYrpzp99IiDh4/Veai81hn0ssQkbry0XRD/Ig3jcHh -23WiriPNJ0JsbgXUSLKRPZObA9VgOLy2aXoN84IMaeK3yy+cwSYG/99w93fUZJte -MXwz4oYZAgMBAAECggEBAIVn2Ncai+4xbH0OLWckabwgyJ4IM9rDc0LIU368O1kU -koais8qP9dujAWgfoh3sGh/YGgKn96VnsZjKHlyMgF+r4TaDJn3k2rlAOWcurGlj -1qaVlsV4HiEzp7pxiDmHhWvp4672Bb6iBG+bsjCUOEk/n9o9KhZzIBluRhtxCmw5 -nw4Do7z00PTvN81260uPWSc04IrytvZUiAIx/5qxD72bij2xJ8t/I9GI8g4FtoVB -8pB6S/hJX1PZhh9VlU6Yk+TOfOVnbebG4W5138LkB835eqk3Zz0qsbc2euoi8Hxi -y1VGwQEmMQ63jXz4c6g+X55ifvUK9Jpn5E8pq+pMd7ECgYEA93lYq+Cr54K4ey5t -sWMa+ye5RqxjzgXj2Kqr55jb54VWG7wp2iGbg8FMlkQwzTJwebzDyCSatguEZLuB -gRGroRnsUOy9vBvhKPOch9bfKIl6qOgzMJB267fBVWx5ybnRbWN/I7RvMQf3k+9y -biCIVnxDLEEYyx7z85/5qxsXg/MCgYEA7wmWKtCTn032Hy9P8OL49T0X6Z8FlkDC -Rk42ygrc/MUbugq9RGUxcCxoImOG9JXUpEtUe31YDm2j+/nbvrjl6/bP2qWs0V7l -dTJl6dABP51pCw8+l4cWgBBX08Lkeen812AAFNrjmDCjX6rHjWHLJcpS18fnRRkP -V1d/AHWX7MMCgYEA6Gsw2guhp0Zf2GCcaNK5DlQab8OL4Hwrpttzo4kuTlwtqNKp -Q9H4al9qfF4Cr1TFya98+EVYf8yFRM3NLNjZpe3gwYf2EerlJj7VLcahw0KKzoN1 -QBENfwgPLRk5sDkx9VhSmcfl/diLroZdpAwtv3vo4nEoxeuGFbKTGx3Qkf0CgYEA -xyR+dcb05Ygm3w4klHQTowQ10s1H80iaUcZBgQuR1ghEtDbUPZHsoR5t1xCB02ys -DgAwLv1bChIvxvH/L6KM8ovZ2LekBX4AviWxoBxJnfz/EVau98B0b1auRN6eSC83 -FRuGldlSOW1z/nSh8ViizSYE5H5HX1qkXEippvFRE88CgYB3Bfu3YQY60ITWIShv -nNkdcbTT9eoP9suaRJjw92Ln+7ZpALYlQMKUZmJ/5uBmLs4RFwUTQruLOPL4yLTH -awADWUzs3IRr1fwn9E+zM8JVyKCnUEM3w4N5UZskGO2klashAd30hWO+knRv/y0r -uGIYs9Ek7YXlXIRVrzMwcsrt1w== ------END PRIVATE KEY----- diff --git a/grpc-benchmark/data/tls/server.pem b/grpc-benchmark/data/tls/server.pem deleted file mode 100644 index 88244f856..000000000 --- a/grpc-benchmark/data/tls/server.pem +++ /dev/null @@ -1,22 +0,0 @@ ------BEGIN CERTIFICATE----- -MIIDtDCCApygAwIBAgIUbJfTREJ6k6/+oInWhV1O1j3ZT0IwDQYJKoZIhvcNAQEL -BQAwVjELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM -GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDEPMA0GA1UEAwwGdGVzdGNhMB4XDTIw -MDMxODAzMTA0MloXDTMwMDMxNjAzMTA0MlowZTELMAkGA1UEBhMCVVMxETAPBgNV -BAgMCElsbGlub2lzMRAwDgYDVQQHDAdDaGljYWdvMRUwEwYDVQQKDAxFeGFtcGxl -LCBDby4xGjAYBgNVBAMMESoudGVzdC5nb29nbGUuY29tMIIBIjANBgkqhkiG9w0B -AQEFAAOCAQ8AMIIBCgKCAQEA5xOONxJJ8b8Qauvob5/7dPYZfIcd+uhAWL2ZlTPz -Qvu4oF0QI4iYgP5iGgry9zEtCM+YQS8UhiAlPlqa6ANxgiBSEyMHH/xE8lo/+caY -GeACqy640Jpl/JocFGo3xd1L8DCawjlaj6eu7T7T/tpAV2qq13b5710eNRbCAfFe -8yALiGQemx0IYhlZXNbIGWLBNhBhvVjJh7UvOqpADk4xtl8o5j0xgMIRg6WJGK6c -6ffSIg4eP1XmovNYZ9LLEJG68tF0Q/yIN43B4dt1oq4jzSdCbG4F1EiykT2TmwPV -YDi8tml6DfOCDGnit8svnMEmBv/fcPd31GSbXjF8M+KGGQIDAQABo2swaTAJBgNV -HRMEAjAAMAsGA1UdDwQEAwIF4DBPBgNVHREESDBGghAqLnRlc3QuZ29vZ2xlLmZy -ghh3YXRlcnpvb2kudGVzdC5nb29nbGUuYmWCEioudGVzdC55b3V0dWJlLmNvbYcE -wKgBAzANBgkqhkiG9w0BAQsFAAOCAQEAS8hDQA8PSgipgAml7Q3/djwQ644ghWQv -C2Kb+r30RCY1EyKNhnQnIIh/OUbBZvh0M0iYsy6xqXgfDhCB93AA6j0i5cS8fkhH -Jl4RK0tSkGQ3YNY4NzXwQP/vmUgfkw8VBAZ4Y4GKxppdATjffIW+srbAmdDruIRM -wPeikgOoRrXf0LA1fi4TqxARzeRwenQpayNfGHTvVF9aJkl8HoaMunTAdG5pIVcr -9GKi/gEMpXUJbbVv3U5frX1Wo4CFo+rZWJ/LyCMeb0jciNLxSdMwj/E/ZuExlyeZ -gc9ctPjSMvgSyXEKv6Vwobleeg88V2ZgzenziORoWj4KszG/lbQZvg== ------END CERTIFICATE----- From 74f0cd19f026aedc7012f67743200f2c2337a100 Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Mon, 29 Jun 2026 16:37:55 +0530 Subject: [PATCH 7/8] minor fixes --- grpc-benchmark/src/main.rs | 4 ++-- grpc-benchmark/src/server.rs | 14 +++++++++----- grpc-benchmark/src/worker.rs | 2 +- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/grpc-benchmark/src/main.rs b/grpc-benchmark/src/main.rs index f2090fba4..de2821f84 100644 --- a/grpc-benchmark/src/main.rs +++ b/grpc-benchmark/src/main.rs @@ -58,8 +58,8 @@ async fn main() -> Result<(), Box> { // testing framework supports specifying the thread count in the test config, // the tests that run on k8s use specific machine sizes and don't depend on // the clients/servers to restrict their resource usage. Tokio doesn't - // support nested runtimes, so adding support for per test thread config - // is not presently supported. + // support nested runtimes, so support for per test thread config is not + // presently supported. let mut driver_port = None; diff --git a/grpc-benchmark/src/server.rs b/grpc-benchmark/src/server.rs index 6ec102835..e58bf8514 100644 --- a/grpc-benchmark/src/server.rs +++ b/grpc-benchmark/src/server.rs @@ -54,7 +54,7 @@ use crate::generated::services::grpc::testing::payload_config::Payload::BytebufP use crate::generated::services::grpc::testing::payload_config::Payload::ComplexParams; use crate::generated::services::grpc::testing::payload_config::Payload::SimpleParams; -const DEFAULT_PORT: usize = 50055; +const DEFAULT_PORT: u16 = 50055; const SERVER_PEM: &[u8] = include_bytes!("../data/tls/server1.pem"); const SERVER_KEY: &[u8] = include_bytes!("../data/tls/server1.key"); @@ -62,11 +62,11 @@ pub struct BenchmarkServer { last_reset_time: Instant, last_rusage: Usage, shutdown_notify: Arc, - pub port: usize, + port: u16, } impl BenchmarkServer { - pub fn start(config: ServerConfig) -> Result { + pub(crate) fn start(config: ServerConfig) -> Result { println!("Starting benchmark server with config: {:?}", config); let mut server_builder = Server::builder(); @@ -103,7 +103,7 @@ impl BenchmarkServer { let shutdown_notify = Arc::new(Notify::new()); let shutdown_notify_copy = shutdown_notify.clone(); let port = if config.port > 0 { - config.port as usize + config.port as u16 } else { DEFAULT_PORT }; @@ -123,7 +123,7 @@ impl BenchmarkServer { }) } - pub fn get_stats(&mut self, reset: bool) -> Result { + pub(crate) fn get_stats(&mut self, reset: bool) -> Result { let now = Instant::now(); let wall_time_elapsed = now.duration_since(self.last_reset_time); let latest_rusage = getrusage(UsageWho::RUSAGE_SELF).map_err(|err| { @@ -148,6 +148,10 @@ impl BenchmarkServer { core_stats: None, }) } + + pub(crate) fn port(&self) -> u16 { + self.port + } } #[derive(Debug)] diff --git a/grpc-benchmark/src/worker.rs b/grpc-benchmark/src/worker.rs index 09a0e95d9..5e5c19a84 100644 --- a/grpc-benchmark/src/worker.rs +++ b/grpc-benchmark/src/worker.rs @@ -118,7 +118,7 @@ impl WorkerService for WorkerServer { yield ServerStatus { stats: Some(stats), cores: core_count()?, - port: server.port as i32, + port: server.port() as i32, }; } }; From 7af74987eb85c4f6e4b91df5c918258b4ad3fd03 Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Tue, 30 Jun 2026 15:10:09 +0530 Subject: [PATCH 8/8] empty rusage on non-unix --- grpc-benchmark/Cargo.toml | 4 ++- grpc-benchmark/src/lib.rs | 2 +- grpc-benchmark/src/main.rs | 52 +++++++++++++----------------- grpc-benchmark/src/rusage.rs | 62 ++++++++++++++++++++++++++++++++++++ grpc-benchmark/src/server.rs | 25 ++++++++------- 5 files changed, 102 insertions(+), 43 deletions(-) create mode 100644 grpc-benchmark/src/rusage.rs diff --git a/grpc-benchmark/Cargo.toml b/grpc-benchmark/Cargo.toml index 36c4c306e..986253d37 100644 --- a/grpc-benchmark/Cargo.toml +++ b/grpc-benchmark/Cargo.toml @@ -9,7 +9,6 @@ rust-version = { workspace = true } async-stream = "0.3" grpc = { path = "../grpc" } grpc-protobuf = { path = "../grpc-protobuf" } -nix = { version = "0.31.3", features = ["resource"] } prost = "0.14" prost-types = "0.14" protobuf = { version = "4.35.1-release" } @@ -18,6 +17,9 @@ tokio-stream = "0.1.18" tonic = { path = "../tonic", features = ["tls-aws-lc"] } tonic-prost = { path = "../tonic-prost" } +[target.'cfg(unix)'.dependencies] +nix = { version = "0.31.3", features = ["resource"] } + [build-dependencies] tonic-prost-build = { path = "../tonic-prost-build" } grpc-protobuf-build = { path = "../grpc-protobuf-build" } diff --git a/grpc-benchmark/src/lib.rs b/grpc-benchmark/src/lib.rs index 0a5b02534..ded7e77f9 100644 --- a/grpc-benchmark/src/lib.rs +++ b/grpc-benchmark/src/lib.rs @@ -21,7 +21,6 @@ * IN THE SOFTWARE. * */ -#![cfg(target_os = "linux")] #[allow(unused)] pub mod generated { @@ -41,5 +40,6 @@ pub mod generated { } } +mod rusage; mod server; pub mod worker; diff --git a/grpc-benchmark/src/main.rs b/grpc-benchmark/src/main.rs index de2821f84..81d183f6a 100644 --- a/grpc-benchmark/src/main.rs +++ b/grpc-benchmark/src/main.rs @@ -22,36 +22,35 @@ * */ -#[cfg(target_os = "linux")] -mod app { - use std::time::Duration; +use std::net::IpAddr; +use std::net::Ipv4Addr; +use std::net::SocketAddr; +use std::time::Duration; - use grpc_benchmark::generated::services::grpc::testing::worker_service_server::WorkerServiceServer; - use grpc_benchmark::worker::WorkerServer; - use tokio::sync::mpsc; - use tokio::time; - use tonic::transport::Server; +use grpc_benchmark::generated::services::grpc::testing::worker_service_server::WorkerServiceServer; +use grpc_benchmark::worker::WorkerServer; +use tokio::sync::mpsc; +use tokio::time; +use tonic::transport::Server; - pub async fn run_worker(worker_port: u16) -> Result<(), Box> { - let addr = format!("0.0.0.0:{}", worker_port).parse().unwrap(); - let (tx, mut rx) = mpsc::channel(1); +pub async fn run_worker(worker_port: u16) -> Result<(), Box> { + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), worker_port); + let (tx, mut rx) = mpsc::channel(1); - let svc = WorkerServiceServer::new(WorkerServer::new(tx)); + let svc = WorkerServiceServer::new(WorkerServer::new(tx)); - Server::builder() - .add_service(svc) - .serve_with_shutdown(addr, async { - rx.recv().await; - // Wait for the quit_worker response to be sent. - time::sleep(Duration::from_secs(1)).await; - }) - .await?; + Server::builder() + .add_service(svc) + .serve_with_shutdown(addr, async { + rx.recv().await; + // Wait for the quit_worker response to be sent. + time::sleep(Duration::from_secs(1)).await; + }) + .await?; - Ok(()) - } + Ok(()) } -#[cfg(target_os = "linux")] #[tokio::main] async fn main() -> Result<(), Box> { // The default Tokio runtime uses 1 thread per logical processor. While the @@ -80,12 +79,7 @@ async fn main() -> Result<(), Box> { std::process::exit(1); }; - app::run_worker(dp).await?; + run_worker(dp).await?; Ok(()) } - -#[cfg(not(target_os = "linux"))] -fn main() { - println!("This benchmark worker is only supported on Linux."); -} diff --git a/grpc-benchmark/src/rusage.rs b/grpc-benchmark/src/rusage.rs new file mode 100644 index 000000000..d38740796 --- /dev/null +++ b/grpc-benchmark/src/rusage.rs @@ -0,0 +1,62 @@ +/* + * + * Copyright 2026 gRPC authors. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + * + */ + +#[derive(Debug)] +pub(crate) struct Rusage { + user_time_ns: i64, + system_time_ns: i64, +} + +impl Rusage { + #[cfg(unix)] + pub(crate) fn now() -> Result { + use nix::sys::resource::UsageWho; + use nix::sys::resource::getrusage; + use nix::sys::time::TimeValLike; + + let usage = + getrusage(UsageWho::RUSAGE_SELF).map_err(|e| format!("failed to get rusage: {}", e))?; + + Ok(Rusage { + user_time_ns: usage.user_time().num_nanoseconds(), + system_time_ns: usage.system_time().num_nanoseconds(), + }) + } + + #[cfg(not(unix))] + pub(crate) fn now() -> Result { + Ok(Rusage { + user_time_ns: 0, + system_time_ns: 0, + }) + } + + pub(crate) fn user_time_nanos(&self) -> i64 { + self.user_time_ns + } + + pub(crate) fn system_time_nanos(&self) -> i64 { + self.system_time_ns + } +} diff --git a/grpc-benchmark/src/server.rs b/grpc-benchmark/src/server.rs index e58bf8514..006fcb2e6 100644 --- a/grpc-benchmark/src/server.rs +++ b/grpc-benchmark/src/server.rs @@ -22,14 +22,13 @@ * */ +use std::net::IpAddr; +use std::net::Ipv4Addr; +use std::net::SocketAddr; use std::pin::Pin; use std::sync::Arc; use std::time::Instant; -use nix::sys::resource::Usage; -use nix::sys::resource::UsageWho; -use nix::sys::resource::getrusage; -use nix::sys::time::TimeValLike; use tokio::sync::Notify; use tokio_stream::Stream; use tokio_stream::StreamExt; @@ -53,6 +52,7 @@ use crate::generated::services::grpc::testing::benchmark_service_server::Benchma use crate::generated::services::grpc::testing::payload_config::Payload::BytebufParams; use crate::generated::services::grpc::testing::payload_config::Payload::ComplexParams; use crate::generated::services::grpc::testing::payload_config::Payload::SimpleParams; +use crate::rusage::Rusage; const DEFAULT_PORT: u16 = 50055; const SERVER_PEM: &[u8] = include_bytes!("../data/tls/server1.pem"); @@ -60,7 +60,7 @@ const SERVER_KEY: &[u8] = include_bytes!("../data/tls/server1.key"); pub struct BenchmarkServer { last_reset_time: Instant, - last_rusage: Usage, + last_rusage: Rusage, shutdown_notify: Arc, port: u16, } @@ -107,7 +107,7 @@ impl BenchmarkServer { } else { DEFAULT_PORT }; - let addr = format!("[::]:{}", port).parse().unwrap(); + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port); tokio::spawn(router.serve_with_shutdown(addr, async move { shutdown_notify_copy.notified().await; println!("BenchmarkServer is shutting down.") @@ -115,7 +115,7 @@ impl BenchmarkServer { Ok(BenchmarkServer { last_reset_time: Instant::now(), - last_rusage: getrusage(UsageWho::RUSAGE_SELF).map_err(|err| { + last_rusage: Rusage::now().map_err(|err| { Status::internal(format!("failed to query system resource usage: {err}")) })?, shutdown_notify, @@ -126,11 +126,12 @@ impl BenchmarkServer { pub(crate) fn get_stats(&mut self, reset: bool) -> Result { let now = Instant::now(); let wall_time_elapsed = now.duration_since(self.last_reset_time); - let latest_rusage = getrusage(UsageWho::RUSAGE_SELF).map_err(|err| { + let latest_rusage = Rusage::now().map_err(|err| { Status::internal(format!("failed to query system resource usage: {err}")) })?; - let user_time = latest_rusage.user_time() - self.last_rusage.user_time(); - let system_time = latest_rusage.system_time() - self.last_rusage.system_time(); + let user_time_ns = latest_rusage.user_time_nanos() - self.last_rusage.user_time_nanos(); + let system_time_ns = + latest_rusage.system_time_nanos() - self.last_rusage.system_time_nanos(); if reset { self.last_rusage = latest_rusage; @@ -139,8 +140,8 @@ impl BenchmarkServer { Ok(ServerStats { time_elapsed: wall_time_elapsed.as_nanos() as f64 / 1e9, - time_user: user_time.num_nanoseconds() as f64 / 1e9, - time_system: system_time.num_nanoseconds() as f64 / 1e9, + time_user: user_time_ns as f64 / 1e9, + time_system: system_time_ns as f64 / 1e9, // The following fields are not set by Java and Go. idle_cpu_time: 0, cq_poll_count: 0,