diff --git a/grpc-benchmark/Cargo.toml b/grpc-benchmark/Cargo.toml index 5b0396e76..986253d37 100644 --- a/grpc-benchmark/Cargo.toml +++ b/grpc-benchmark/Cargo.toml @@ -6,14 +6,20 @@ license = "MIT" rust-version = { workspace = true } [dependencies] +async-stream = "0.3" grpc = { path = "../grpc" } grpc-protobuf = { path = "../grpc-protobuf" } prost = "0.14" prost-types = "0.14" protobuf = { version = "4.35.1-release" } -tonic = { path = "../tonic" } +tokio = { version = "1.37.0", features = ["sync", "macros"] } +tokio-stream = "0.1.18" +tonic = { path = "../tonic", features = ["tls-aws-lc"] } tonic-prost = { path = "../tonic-prost" } +[target.'cfg(unix)'.dependencies] +nix = { version = "0.31.3", features = ["resource"] } + [build-dependencies] tonic-prost-build = { path = "../tonic-prost-build" } grpc-protobuf-build = { path = "../grpc-protobuf-build" } diff --git a/grpc-benchmark/src/lib.rs b/grpc-benchmark/src/lib.rs index 7d648bce2..ded7e77f9 100644 --- a/grpc-benchmark/src/lib.rs +++ b/grpc-benchmark/src/lib.rs @@ -23,19 +23,23 @@ */ #[allow(unused)] -mod generated { - pub(crate) mod benchmark_service_grpc { +pub mod generated { + pub mod benchmark_service_grpc { grpc::include_proto!("grpc/testing", "benchmark_service"); } - pub(crate) mod services { - pub(crate) mod grpc { - pub(crate) mod core { + pub mod services { + pub mod grpc { + pub mod core { include!(concat!(env!("OUT_DIR"), "/tonic/grpc.core.rs")); } - pub(crate) mod testing { + pub mod testing { include!(concat!(env!("OUT_DIR"), "/tonic/grpc.testing.rs")); } } } } + +mod rusage; +mod server; +pub mod worker; diff --git a/grpc-benchmark/src/main.rs b/grpc-benchmark/src/main.rs new file mode 100644 index 000000000..81d183f6a --- /dev/null +++ b/grpc-benchmark/src/main.rs @@ -0,0 +1,85 @@ +/* + * + * Copyright 2026 gRPC authors. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + * + */ + +use std::net::IpAddr; +use std::net::Ipv4Addr; +use std::net::SocketAddr; +use std::time::Duration; + +use grpc_benchmark::generated::services::grpc::testing::worker_service_server::WorkerServiceServer; +use grpc_benchmark::worker::WorkerServer; +use tokio::sync::mpsc; +use tokio::time; +use tonic::transport::Server; + +pub async fn run_worker(worker_port: u16) -> Result<(), Box> { + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), worker_port); + let (tx, mut rx) = mpsc::channel(1); + + let svc = WorkerServiceServer::new(WorkerServer::new(tx)); + + Server::builder() + .add_service(svc) + .serve_with_shutdown(addr, async { + rx.recv().await; + // Wait for the quit_worker response to be sent. + time::sleep(Duration::from_secs(1)).await; + }) + .await?; + + Ok(()) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + // The default Tokio runtime uses 1 thread per logical processor. While the + // testing framework supports specifying the thread count in the test config, + // the tests that run on k8s use specific machine sizes and don't depend on + // the clients/servers to restrict their resource usage. Tokio doesn't + // support nested runtimes, so support for per test thread config is not + // presently supported. + + let mut driver_port = None; + + // Skip the first argument (the binary name itself). + for arg in std::env::args().skip(1) { + if let Some(port_str) = arg.strip_prefix("--driver_port=") { + driver_port = Some(port_str.parse::().unwrap_or_else(|_| { + eprintln!("Error: --driver_port must be a valid u16 integer."); + std::process::exit(1); + })); + } else { + eprintln!("Warning: Unrecognized argument '{}'", arg); + } + } + + let Some(dp) = driver_port else { + eprintln!("Usage: worker --driver_port="); + std::process::exit(1); + }; + + run_worker(dp).await?; + + Ok(()) +} diff --git a/grpc-benchmark/src/rusage.rs b/grpc-benchmark/src/rusage.rs new file mode 100644 index 000000000..d38740796 --- /dev/null +++ b/grpc-benchmark/src/rusage.rs @@ -0,0 +1,62 @@ +/* + * + * Copyright 2026 gRPC authors. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + * + */ + +#[derive(Debug)] +pub(crate) struct Rusage { + user_time_ns: i64, + system_time_ns: i64, +} + +impl Rusage { + #[cfg(unix)] + pub(crate) fn now() -> Result { + use nix::sys::resource::UsageWho; + use nix::sys::resource::getrusage; + use nix::sys::time::TimeValLike; + + let usage = + getrusage(UsageWho::RUSAGE_SELF).map_err(|e| format!("failed to get rusage: {}", e))?; + + Ok(Rusage { + user_time_ns: usage.user_time().num_nanoseconds(), + system_time_ns: usage.system_time().num_nanoseconds(), + }) + } + + #[cfg(not(unix))] + pub(crate) fn now() -> Result { + Ok(Rusage { + user_time_ns: 0, + system_time_ns: 0, + }) + } + + pub(crate) fn user_time_nanos(&self) -> i64 { + self.user_time_ns + } + + pub(crate) fn system_time_nanos(&self) -> i64 { + self.system_time_ns + } +} diff --git a/grpc-benchmark/src/server.rs b/grpc-benchmark/src/server.rs new file mode 100644 index 000000000..006fcb2e6 --- /dev/null +++ b/grpc-benchmark/src/server.rs @@ -0,0 +1,241 @@ +/* + * + * Copyright 2026 gRPC authors. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + * + */ + +use std::net::IpAddr; +use std::net::Ipv4Addr; +use std::net::SocketAddr; +use std::pin::Pin; +use std::sync::Arc; +use std::time::Instant; + +use tokio::sync::Notify; +use tokio_stream::Stream; +use tokio_stream::StreamExt; +use tonic::Request; +use tonic::Response; +use tonic::Status; +use tonic::Streaming; +use tonic::transport::Identity; +use tonic::transport::Server; +use tonic::transport::ServerTlsConfig; + +use crate::generated::services::grpc::testing::Payload; +use crate::generated::services::grpc::testing::PayloadType; +use crate::generated::services::grpc::testing::ServerConfig; +use crate::generated::services::grpc::testing::ServerStats; +use crate::generated::services::grpc::testing::SimpleProtoParams; +use crate::generated::services::grpc::testing::SimpleRequest; +use crate::generated::services::grpc::testing::SimpleResponse; +use crate::generated::services::grpc::testing::benchmark_service_server::BenchmarkService; +use crate::generated::services::grpc::testing::benchmark_service_server::BenchmarkServiceServer; +use crate::generated::services::grpc::testing::payload_config::Payload::BytebufParams; +use crate::generated::services::grpc::testing::payload_config::Payload::ComplexParams; +use crate::generated::services::grpc::testing::payload_config::Payload::SimpleParams; +use crate::rusage::Rusage; + +const DEFAULT_PORT: u16 = 50055; +const SERVER_PEM: &[u8] = include_bytes!("../data/tls/server1.pem"); +const SERVER_KEY: &[u8] = include_bytes!("../data/tls/server1.key"); + +pub struct BenchmarkServer { + last_reset_time: Instant, + last_rusage: Rusage, + shutdown_notify: Arc, + port: u16, +} + +impl BenchmarkServer { + pub(crate) fn start(config: ServerConfig) -> Result { + println!("Starting benchmark server with config: {:?}", config); + + let mut server_builder = Server::builder(); + // Parse security config. + if let Some(securit_params) = config.security_params { + let tls_config = if securit_params.use_test_ca { + ServerTlsConfig::new().identity(Identity::from_pem(SERVER_PEM, SERVER_KEY)) + } else { + ServerTlsConfig::new() + }; + server_builder = server_builder.tls_config(tls_config).map_err(|err| { + Status::invalid_argument(format!("failed to create TLS config: {err}")) + })?; + }; + + // Parse payload config. + let payload_type = match config.payload_config { + Some(payload_config) => payload_config.payload.ok_or(Status::invalid_argument( + "payload missing in payload_config", + ))?, + None => SimpleParams(SimpleProtoParams::default()), + }; + + let router = match payload_type { + BytebufParams(_) | ComplexParams(_) => { + return Err(Status::unimplemented("codec not implemented.")); + } + SimpleParams(_) => { + let server = BenchmarkServiceServer::new(ProtoServer {}); + server_builder.add_service(server) + } + }; + + let shutdown_notify = Arc::new(Notify::new()); + let shutdown_notify_copy = shutdown_notify.clone(); + let port = if config.port > 0 { + config.port as u16 + } else { + DEFAULT_PORT + }; + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port); + tokio::spawn(router.serve_with_shutdown(addr, async move { + shutdown_notify_copy.notified().await; + println!("BenchmarkServer is shutting down.") + })); + + Ok(BenchmarkServer { + last_reset_time: Instant::now(), + last_rusage: Rusage::now().map_err(|err| { + Status::internal(format!("failed to query system resource usage: {err}")) + })?, + shutdown_notify, + port, + }) + } + + pub(crate) fn get_stats(&mut self, reset: bool) -> Result { + let now = Instant::now(); + let wall_time_elapsed = now.duration_since(self.last_reset_time); + let latest_rusage = Rusage::now().map_err(|err| { + Status::internal(format!("failed to query system resource usage: {err}")) + })?; + let user_time_ns = latest_rusage.user_time_nanos() - self.last_rusage.user_time_nanos(); + let system_time_ns = + latest_rusage.system_time_nanos() - self.last_rusage.system_time_nanos(); + + if reset { + self.last_rusage = latest_rusage; + self.last_reset_time = now; + } + + Ok(ServerStats { + time_elapsed: wall_time_elapsed.as_nanos() as f64 / 1e9, + time_user: user_time_ns as f64 / 1e9, + time_system: system_time_ns as f64 / 1e9, + // The following fields are not set by Java and Go. + idle_cpu_time: 0, + cq_poll_count: 0, + total_cpu_time: 0, + core_stats: None, + }) + } + + pub(crate) fn port(&self) -> u16 { + self.port + } +} + +#[derive(Debug)] +struct ProtoServer {} + +#[tonic::async_trait] +impl BenchmarkService for ProtoServer { + async fn unary_call( + &self, + request: Request, + ) -> Result, Status> { + Ok(Response::new(SimpleResponse { + payload: Some(Payload { + r#type: PayloadType::Compressable as i32, + body: vec![0; request.into_inner().response_size as usize], + }), + username: String::new(), + oauth_scope: String::new(), + server_id: String::new(), + grpclb_route_type: 0, + hostname: String::new(), + })) + } + + type StreamingCallStream = + Pin> + Send + 'static>>; + + async fn streaming_call( + &self, + request: Request>, + ) -> Result, Status> { + let mut inbound = request.into_inner(); + + let output = async_stream::try_stream! { + while let Some(simple_request) = inbound.next().await { + let request = simple_request?; + yield SimpleResponse { + payload: Some(Payload { + r#type: PayloadType::Compressable as i32, + body: vec![0; request.response_size as usize], + }), + username: String::new(), + oauth_scope: String::new(), + server_id: String::new(), + grpclb_route_type: 0, + hostname: String::new(), + }; + } + }; + + Ok(Response::new(Box::pin(output) as Self::StreamingCallStream)) + } + + async fn streaming_from_client( + &self, + _request: tonic::Request>, + ) -> Result, Status> { + Err(Status::unimplemented("method unimplemented")) + } + + type StreamingFromServerStream = + Pin> + Send + 'static>>; + + async fn streaming_from_server( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("method unimplemented")) + } + + type StreamingBothWaysStream = + Pin> + Send + 'static>>; + + async fn streaming_both_ways( + &self, + _request: Request>, + ) -> Result, Status> { + Err(Status::unimplemented("method unimplemented")) + } +} + +impl Drop for BenchmarkServer { + fn drop(&mut self) { + self.shutdown_notify.notify_one(); + } +} diff --git a/grpc-benchmark/src/worker.rs b/grpc-benchmark/src/worker.rs new file mode 100644 index 000000000..5e5c19a84 --- /dev/null +++ b/grpc-benchmark/src/worker.rs @@ -0,0 +1,155 @@ +/* + * + * Copyright 2026 gRPC authors. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + * + */ + +use std::pin::Pin; +use std::result::Result; +use std::thread::available_parallelism; + +use tokio::sync::mpsc; +use tokio_stream::Stream; +use tokio_stream::StreamExt; +use tonic::Request; +use tonic::Response; +use tonic::Status; +use tonic::Streaming; + +use crate::generated::services::grpc::testing::ClientArgs; +use crate::generated::services::grpc::testing::ClientStatus; +use crate::generated::services::grpc::testing::CoreRequest; +use crate::generated::services::grpc::testing::CoreResponse; +use crate::generated::services::grpc::testing::ServerArgs; +use crate::generated::services::grpc::testing::ServerStatus; +use crate::generated::services::grpc::testing::Void; +use crate::generated::services::grpc::testing::server_args::Argtype; +use crate::generated::services::grpc::testing::worker_service_server::WorkerService; +use crate::server::BenchmarkServer; + +pub struct WorkerServer { + shutdown_channel: mpsc::Sender<()>, +} + +impl WorkerServer { + pub fn new(shutdown_channel: mpsc::Sender<()>) -> Self { + WorkerServer { shutdown_channel } + } +} + +fn core_count() -> Result { + let cores = available_parallelism() + .map_err(|e| Status::internal(format!("failed to determine core count: {e}")))? + .get() as i32; + + Ok(cores) +} + +#[tonic::async_trait] +impl WorkerService for WorkerServer { + // Server streaming response type for the RunServer method. + type RunServerStream = + Pin> + Send + 'static>>; + + async fn run_server( + &self, + request: Request>, + ) -> Result, Status> { + println!("Handling server stream."); + let mut stream = request.into_inner(); + + let output = async_stream::try_stream! { + let mut benchmark_server: Option = None; + + while let Some(request) = stream.next().await { + let request = request?; + let mut reset_stats = false; + + let argtype = request.argtype + .ok_or_else(|| Status::invalid_argument("missing request.argtype"))?; + + match argtype { + Argtype::Setup(server_config) => { + println!("Server creation requested."); + + if benchmark_server.take().is_some() { + eprintln!("Server setup received when server already exists, shutting down the existing server"); + } + + let server = BenchmarkServer::start(server_config).map_err(|status| { + println!("Error while creating server: {:?}", status); + status + })?; + + benchmark_server = Some(server); + } + Argtype::Mark(mark) => { + println!("Server stats requested."); + + benchmark_server.as_ref().ok_or_else(|| { + Status::invalid_argument("server does not exist when mark received") + })?; + + reset_stats = mark.reset; + } + }; + + let server = benchmark_server.as_mut().unwrap(); + let stats = server.get_stats(reset_stats)?; + + yield ServerStatus { + stats: Some(stats), + cores: core_count()?, + port: server.port() as i32, + }; + } + }; + + Ok(Response::new(Box::pin(output) as Self::RunServerStream)) + } + + type RunClientStream = + Pin> + Send + 'static>>; + + async fn run_client( + &self, + _request: Request>, + ) -> Result, Status> { + Err(Status::unimplemented("")) + } + + async fn core_count( + &self, + _request: Request, + ) -> Result, Status> { + Ok(Response::new(CoreResponse { + cores: core_count()?, + })) + } + + async fn quit_worker(&self, _request: Request) -> Result, Status> { + self.shutdown_channel + .send(()) + .await + .map(|_| Response::new(Void {})) + .map_err(|err| Status::internal(format!("failed to stop server: {err}"))) + } +}