From 0048acd4e41bcb27296e28175f5595398030411e Mon Sep 17 00:00:00 2001 From: greg Date: Sun, 14 Jun 2026 09:41:01 +0000 Subject: [PATCH 1/4] Replace core_affinity with CPU utils --- Cargo.lock | 24 ++--- Cargo.toml | 3 +- cpu-utils/Cargo.toml | 15 +++ cpu-utils/src/affinity.rs | 145 ++++++++++++++++++++++++++ cpu-utils/src/lib.rs | 26 +++++ cpu-utils/tests/integration_test.rs | 77 ++++++++++++++ dev-bins/Cargo.lock | 22 ++-- poh/Cargo.toml | 2 +- poh/src/poh_service.rs | 22 +++- programs/sbf/Cargo.lock | 24 ++--- validator/Cargo.toml | 2 +- validator/src/commands/run/args.rs | 15 +-- validator/src/commands/run/execute.rs | 35 ++++--- xdp/Cargo.toml | 2 +- xdp/src/lib.rs | 45 -------- xdp/src/transmitter.rs | 49 ++++----- xdp/src/tx_loop.rs | 4 +- 17 files changed, 366 insertions(+), 146 deletions(-) create mode 100644 cpu-utils/Cargo.toml create mode 100644 cpu-utils/src/affinity.rs create mode 100644 cpu-utils/src/lib.rs create mode 100644 cpu-utils/tests/integration_test.rs diff --git a/Cargo.lock b/Cargo.lock index d3b4b987978..ec6db13f474 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -80,6 +80,13 @@ dependencies = [ "wincode", ] +[[package]] +name = "agave-cpu-utils" +version = "4.2.0-alpha.0" +dependencies = [ + "libc", +] + [[package]] name = "agave-feature-set" version = "4.2.0-alpha.0" @@ -339,6 +346,7 @@ dependencies = [ name = "agave-validator" version = "4.2.0-alpha.0" dependencies = [ + "agave-cpu-utils", "agave-logger", "agave-snapshots", "agave-votor", @@ -349,7 +357,6 @@ dependencies = [ "chrono", "clap 2.33.3", "console 0.16.3", - "core_affinity", "crossbeam-channel", "fd-lock", "indicatif", @@ -531,13 +538,13 @@ dependencies = [ name = "agave-xdp" version = "4.2.0-alpha.0" dependencies = [ + "agave-cpu-utils", "agave-xdp-ebpf", "arc-swap", "arrayvec", "aya", "bytes", "caps", - "core_affinity", "crossbeam-channel", "libc", "log", @@ -2019,17 +2026,6 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" -[[package]] -name = "core_affinity" -version = "0.8.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a034b3a7b624016c6e13f5df875747cc25f884156aad2abd12b6c46797971342" -dependencies = [ - "libc", - "num_cpus", - "winapi", -] - [[package]] name = "cpufeatures" version = "0.2.17" @@ -9207,12 +9203,12 @@ dependencies = [ name = "solana-poh" version = "4.2.0-alpha.0" dependencies = [ + "agave-cpu-utils", "agave-logger", "agave-votor-messages", "arc-swap", "assert_matches", "bincode", - "core_affinity", "criterion", "crossbeam-channel", "log", diff --git a/Cargo.toml b/Cargo.toml index 20446810142..91754922c25 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ members = [ "connection-cache", "core", "cost-model", + "cpu-utils", "download-utils", "entry", "faucet", @@ -170,6 +171,7 @@ collapsible_if = "allow" Inflector = "0.11.4" agave-banking-stage-ingress-types = { path = "banking-stage-ingress-types", version = "=4.2.0-alpha.0", features = ["agave-unstable-api"] } agave-bls-cert-verify = { path = "bls-cert-verify", version = "=4.2.0-alpha.0", features = ["agave-unstable-api"] } +agave-cpu-utils = { path = "cpu-utils", version = "=4.2.0-alpha.0", features = ["agave-unstable-api"] } agave-feature-set = { path = "feature-set", version = "=4.2.0-alpha.0", features = ["agave-unstable-api"] } agave-fs = { path = "fs", version = "=4.2.0-alpha.0", features = ["agave-unstable-api"] } agave-geyser-plugin-interface = { path = "geyser-plugin-interface", version = "=4.2.0-alpha.0" } @@ -223,7 +225,6 @@ chrono-humanize = "0.2.3" clap = { version = "2.33.1", default-features = false, features = ["suggestions"] } console = "0.16.3" const_format = "0.2.36" -core_affinity = "0.8.3" criterion = "0.8.2" crossbeam-channel = "0.5.15" csv = "1.4.0" diff --git a/cpu-utils/Cargo.toml b/cpu-utils/Cargo.toml new file mode 100644 index 00000000000..3958efbf076 --- /dev/null +++ b/cpu-utils/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "agave-cpu-utils" +description = "Agave CPU Utils" +version = { workspace = true } +repository = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +edition = { workspace = true } +publish = true + +[features] +agave-unstable-api = [] + +[dependencies] +libc = { workspace = true } diff --git a/cpu-utils/src/affinity.rs b/cpu-utils/src/affinity.rs new file mode 100644 index 00000000000..bac514ede83 --- /dev/null +++ b/cpu-utils/src/affinity.rs @@ -0,0 +1,145 @@ +//! Core CPU affinity operations. + +use std::{io, mem, ops::Deref}; + +const CPU_SETSIZE: usize = libc::CPU_SETSIZE as usize; + +/// Identifies a logical CPU (hardware thread) by its kernel-assigned ID. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct CpuId(usize); + +impl CpuId { + pub fn new(cpu: usize) -> io::Result { + if cpu < CPU_SETSIZE { + Ok(Self(cpu)) + } else { + Err(io::Error::from_raw_os_error(libc::EINVAL)) + } + } +} + +impl Deref for CpuId { + type Target = usize; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +/// Set CPU affinity for a thread. +/// +/// Restricts the thread to run only on the specified CPUs. +/// +/// # Arguments +/// * `thread_id` - Thread ID to set affinity for. `None` means the calling thread. +/// * `cpus` - CPU IDs to bind the thread to. Can be any iterable collection. +/// +/// # Examples +/// +/// ```no_run +/// # use agave_cpu_utils::*; +/// # fn main() -> std::io::Result<()> { +/// // Pin current thread to CPU 0 +/// set_cpu_affinity(None, [CpuId::new(0)?])?; +/// +/// // Pin current thread to multiple CPUs +/// set_cpu_affinity(None, [CpuId::new(0)?, CpuId::new(1)?, CpuId::new(2)?])?; +/// # Ok(()) +/// # } +/// ``` +/// +/// # Errors +/// +/// Returns [`io::ErrorKind::InvalidInput`] if any CPU ID is too large for `cpu_set_t`. +/// Returns [`io::Error`] if the system call fails, including offline CPUs or CPUs +/// disallowed by the task's hard cpuset/cgroup. +pub fn set_cpu_affinity( + thread_id: Option, + cpus: impl IntoIterator, +) -> io::Result<()> { + // safety: cpu_set_t is a POD type, zero-initialization is standard + let mut cpu_set: libc::cpu_set_t = unsafe { mem::zeroed() }; + + for cpu in cpus { + // safety: CpuId values are constructed only after validating cpu < CPU_SETSIZE. + unsafe { libc::CPU_SET(*cpu, &mut cpu_set) }; + } + + let tid = thread_id.unwrap_or(0); + + // safety: sched_setaffinity is safe with valid parameters + let result = + unsafe { libc::sched_setaffinity(tid, mem::size_of::(), &cpu_set) }; + + if result != 0 { + return Err(io::Error::last_os_error()); + } + + Ok(()) +} + +/// Get the CPU affinity mask for a thread. +/// +/// Returns a sorted vector of CPU IDs that the thread is allowed to run on. +/// +/// # Arguments +/// * `thread_id` - Thread ID to query. `None` means the calling thread. +/// +/// # Examples +/// +/// ```no_run +/// # use agave_cpu_utils::*; +/// # fn main() -> std::io::Result<()> { +/// let cpus = cpu_affinity(None)?; +/// println!("Thread can run on CPUs: {:?}", cpus); +/// # Ok(()) +/// # } +/// ``` +/// +/// # Errors +/// +/// Returns [`io::Error`] if the system call fails. +pub fn cpu_affinity(thread_id: Option) -> io::Result> { + // safety: cpu_set_t is a POD type, zero-initialization is standard + let mut cpu_set: libc::cpu_set_t = unsafe { mem::zeroed() }; + + let tid = thread_id.unwrap_or(0); + + // safety: sched_getaffinity is safe with valid parameters + let result = + unsafe { libc::sched_getaffinity(tid, mem::size_of::(), &mut cpu_set) }; + + if result != 0 { + return Err(io::Error::last_os_error()); + } + + let mut cpus = Vec::new(); + for cpu in 0..CPU_SETSIZE { + // safety: cpu < CPU_SETSIZE by construction + if unsafe { libc::CPU_ISSET(cpu, &cpu_set) } { + cpus.push(CpuId::new(cpu)?); + } + } + + Ok(cpus) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_cpu_id_validation() { + let result = CpuId::new(CPU_SETSIZE); + assert_eq!(result.unwrap_err().raw_os_error(), Some(libc::EINVAL)); + } + + #[test] + fn test_cpu_affinity_returns_sorted() { + let cpus = cpu_affinity(None).expect("failed to query current CPU affinity"); + assert!( + cpus.windows(2).all(|window| *window[0] <= *window[1]), + "cpu_affinity should return sorted CPU list" + ); + } +} diff --git a/cpu-utils/src/lib.rs b/cpu-utils/src/lib.rs new file mode 100644 index 00000000000..7806376f1a6 --- /dev/null +++ b/cpu-utils/src/lib.rs @@ -0,0 +1,26 @@ +#![cfg(all(feature = "agave-unstable-api", target_os = "linux"))] + +//! CPU affinity utilities for Linux systems. +//! +//! This crate provides safe Rust bindings for setting CPU affinity and querying +//! the current task affinity mask. Useful for performance-critical applications +//! that need precise control over thread placement. +//! +//! # Examples +//! +//! ```no_run +//! use agave_cpu_utils::*; +//! +//! # fn main() -> std::io::Result<()> { +//! let allowed = cpu_affinity(None)?; +//! if let Some(&cpu) = allowed.first() { +//! set_cpu_affinity(None, [cpu])?; +//! } +//! # Ok(()) +//! # } +//! ``` +//! + +mod affinity; + +pub use affinity::{CpuId, cpu_affinity, set_cpu_affinity}; diff --git a/cpu-utils/tests/integration_test.rs b/cpu-utils/tests/integration_test.rs new file mode 100644 index 00000000000..12c81cd3fcc --- /dev/null +++ b/cpu-utils/tests/integration_test.rs @@ -0,0 +1,77 @@ +#![cfg(all(feature = "agave-unstable-api", target_os = "linux"))] + +//! Integration tests for agave-cpu-utils +//! +//! These tests require a Linux system. +//! Tests that modify CPU affinity spawn dedicated threads to avoid affecting the +//! test harness. + +use { + agave_cpu_utils::{CpuId, cpu_affinity, set_cpu_affinity}, + std::{io, thread}, +}; + +fn current_affinity() -> Vec { + cpu_affinity(None).expect("failed to query current CPU affinity") +} + +fn handle_affinity_result(result: io::Result<()>, test_name: &str) { + match result { + Ok(()) => {} + Err(e) => panic!("{test_name}: unexpected error: {e:?}"), + } +} + +#[test] +fn test_set_and_get_affinity() { + let affinity = current_affinity(); + let cpu = affinity + .first() + .copied() + .expect("current CPU affinity mask should not be empty"); + + let result = thread::spawn(move || { + set_cpu_affinity(None, [cpu])?; + let affinity = cpu_affinity(None)?; + assert_eq!(affinity, vec![cpu]); + Ok::<(), io::Error>(()) + }) + .join() + .expect("Thread panicked"); + + handle_affinity_result(result, "test_set_and_get_affinity"); +} + +#[test] +fn test_affinity_with_multiple_cpus() { + let available = current_affinity(); + if available.len() < 2 { + eprintln!( + "Skipping test_affinity_with_multiple_cpus: current CPU affinity mask has fewer than \ + 2 CPUs" + ); + return; + } + let (cpu0, cpu1) = (available[0], available[1]); + + let result = thread::spawn(move || { + set_cpu_affinity(None, [cpu0, cpu1])?; + let affinity = cpu_affinity(None)?; + assert_eq!(affinity, vec![cpu0, cpu1]); + Ok::<(), io::Error>(()) + }) + .join() + .expect("Thread panicked"); + + handle_affinity_result(result, "test_affinity_with_multiple_cpus"); +} + +#[test] +fn test_invalid_cpu_index_rejected() { + assert_eq!( + CpuId::new(libc::CPU_SETSIZE as usize) + .unwrap_err() + .raw_os_error(), + Some(libc::EINVAL) + ); +} diff --git a/dev-bins/Cargo.lock b/dev-bins/Cargo.lock index 1ce083cef65..8abdce325e0 100644 --- a/dev-bins/Cargo.lock +++ b/dev-bins/Cargo.lock @@ -411,13 +411,13 @@ dependencies = [ name = "agave-xdp" version = "4.2.0-alpha.0" dependencies = [ + "agave-cpu-utils", "agave-xdp-ebpf", "arc-swap", "arrayvec", "aya", "bytes", "caps", - "core_affinity", "crossbeam-channel", "libc", "log", @@ -1708,17 +1708,6 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" -[[package]] -name = "core_affinity" -version = "0.8.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a034b3a7b624016c6e13f5df875747cc25f884156aad2abd12b6c46797971342" -dependencies = [ - "libc", - "num_cpus", - "winapi", -] - [[package]] name = "cpufeatures" version = "0.2.17" @@ -7798,9 +7787,9 @@ dependencies = [ name = "solana-poh" version = "4.2.0-alpha.0" dependencies = [ + "agave-cpu-utils", "agave-votor-messages", "arc-swap", - "core_affinity", "crossbeam-channel", "log", "qualifier_attr", @@ -9597,6 +9586,13 @@ dependencies = [ "wincode", ] +[[package]] +name = "agave-cpu-utils" +version = "4.2.0-alpha.0" +dependencies = [ + "libc", +] + [[package]] name = "solana-zero-copy" version = "1.0.0" diff --git a/poh/Cargo.toml b/poh/Cargo.toml index 48f136daae3..e75e72cbef6 100644 --- a/poh/Cargo.toml +++ b/poh/Cargo.toml @@ -24,7 +24,7 @@ shuttle-test = ["dep:shuttle"] [dependencies] agave-votor-messages = { workspace = true } arc-swap = { workspace = true } -core_affinity = { workspace = true } +agave-cpu-utils = { workspace = true } crossbeam-channel = { workspace = true } log = { workspace = true } qualifier_attr = { workspace = true } diff --git a/poh/src/poh_service.rs b/poh/src/poh_service.rs index 80b9719e746..0240e71fcf8 100644 --- a/poh/src/poh_service.rs +++ b/poh/src/poh_service.rs @@ -1,5 +1,7 @@ //! The `poh_service` module implements a service that records the passing of //! "ticks", a measure of time in the PoH stream +#[cfg(target_os = "linux")] +use agave_cpu_utils::{CpuId, set_cpu_affinity}; use { crate::{ poh_controller::{PohServiceMessage, PohServiceMessageGuard, PohServiceMessageReceiver}, @@ -109,6 +111,8 @@ impl PohService { ) -> Self { migration_status.set_poh_service_started(); let poh_config = poh_config.clone(); + #[cfg(not(target_os = "linux"))] + let _ = pinned_cpu_core; let tick_producer = Builder::new() .name("solPohTickProd".to_string()) .spawn(move || { @@ -144,11 +148,19 @@ impl PohService { ) } } else { - // PoH service runs in a tight loop, generating hashes as fast as possible. - // Let's dedicate one of the CPU cores to this thread so that it can gain - // from cache performance. - if let Some(cores) = core_affinity::get_core_ids() { - core_affinity::set_for_current(cores[pinned_cpu_core]); + #[cfg(target_os = "linux")] + { + // PoH service runs in a tight loop, generating hashes as fast as possible. + // Let's dedicate one of the CPU cores to this thread so that it can gain + // from cache performance. + let pinned_cpu = CpuId::new(pinned_cpu_core).unwrap(); + info!("Pinning PoH service to CPU core {pinned_cpu_core}"); + set_cpu_affinity(None, [pinned_cpu]).unwrap_or_else(|e| { + panic!( + "Failed to set CPU affinity for PoH service to CPU \ + {pinned_cpu_core}: {e:?}. This is critical for performance." + ) + }); } Self::tick_producer( poh_recorder, diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index d3273019379..136a3a4d955 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -253,6 +253,7 @@ dependencies = [ name = "agave-validator" version = "4.2.0-alpha.0" dependencies = [ + "agave-cpu-utils", "agave-logger", "agave-snapshots", "agave-votor", @@ -261,7 +262,6 @@ dependencies = [ "chrono", "clap", "console 0.16.3", - "core_affinity", "crossbeam-channel", "fd-lock", "indicatif", @@ -399,13 +399,13 @@ dependencies = [ name = "agave-xdp" version = "4.2.0-alpha.0" dependencies = [ + "agave-cpu-utils", "agave-xdp-ebpf", "arc-swap", "arrayvec", "aya", "bytes", "caps", - "core_affinity", "crossbeam-channel", "libc", "log", @@ -1637,17 +1637,6 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" -[[package]] -name = "core_affinity" -version = "0.8.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a034b3a7b624016c6e13f5df875747cc25f884156aad2abd12b6c46797971342" -dependencies = [ - "libc", - "num_cpus", - "winapi", -] - [[package]] name = "cpufeatures" version = "0.2.17" @@ -7569,9 +7558,9 @@ dependencies = [ name = "solana-poh" version = "4.2.0-alpha.0" dependencies = [ + "agave-cpu-utils", "agave-votor-messages", "arc-swap", - "core_affinity", "crossbeam-channel", "log", "qualifier_attr", @@ -10103,6 +10092,13 @@ dependencies = [ "wincode", ] +[[package]] +name = "agave-cpu-utils" +version = "4.2.0-alpha.0" +dependencies = [ + "libc", +] + [[package]] name = "solana-zero-copy" version = "1.0.0" diff --git a/validator/Cargo.toml b/validator/Cargo.toml index f843577fa3d..cd7b91d4459 100644 --- a/validator/Cargo.toml +++ b/validator/Cargo.toml @@ -18,6 +18,7 @@ targets = ["x86_64-unknown-linux-gnu"] agave-unstable-api = [] [dependencies] +agave-cpu-utils = { workspace = true } agave-logger = { workspace = true } agave-snapshots = { workspace = true } agave-votor = { workspace = true } @@ -25,7 +26,6 @@ agave-xdp = { workspace = true } chrono = { workspace = true, features = ["default", "serde"] } clap = { workspace = true } console = { workspace = true } -core_affinity = { workspace = true } crossbeam-channel = { workspace = true } fd-lock = { workspace = true } indicatif = { workspace = true } diff --git a/validator/src/commands/run/args.rs b/validator/src/commands/run/args.rs index 71292bc14ff..f9940919f9b 100644 --- a/validator/src/commands/run/args.rs +++ b/validator/src/commands/run/args.rs @@ -30,7 +30,7 @@ use { solana_send_transaction_service::send_transaction_service::Config as SendTransactionServiceConfig, solana_signer::Signer, solana_unified_scheduler_pool::DefaultSchedulerPool, - std::{collections::HashSet, net::SocketAddr, path::PathBuf, str::FromStr}, + std::{collections::HashSet, net::SocketAddr, path::PathBuf}, }; const EXCLUDE_KEY: &str = "account-index-exclude-key"; @@ -867,17 +867,8 @@ pub fn add_args<'a>(app: App<'a, 'a>, default_args: &'a DefaultArgs) -> App<'a, .hidden(hidden_unless_forced()) .long("experimental-poh-pinned-cpu-core") .takes_value(true) - .value_name("CPU_CORE_INDEX") - .validator(|s| { - let core_index = usize::from_str(&s).map_err(|e| e.to_string())?; - let max_index = core_affinity::get_core_ids() - .map(|cids| cids.len() - 1) - .unwrap_or(0); - if core_index > max_index { - return Err(format!("core index must be in the range [0, {max_index}]")); - } - Ok(()) - }) + .value_name("CPU_ID") + .validator(is_parsable::) .help("EXPERIMENTAL: Specify which CPU core PoH is pinned to"), ) .arg( diff --git a/validator/src/commands/run/execute.rs b/validator/src/commands/run/execute.rs index f6ab90d6dde..bcf12570d91 100644 --- a/validator/src/commands/run/execute.rs +++ b/validator/src/commands/run/execute.rs @@ -1,3 +1,5 @@ +#[cfg(target_os = "linux")] +use agave_cpu_utils::{CpuId, cpu_affinity, set_cpu_affinity}; use { crate::{ admin_rpc_service::{self, StakedNodesOverrides, load_staked_nodes_overrides}, @@ -12,7 +14,7 @@ use { snapshot_config::{SnapshotConfig, SnapshotUsage}, }, agave_votor::vote_history_storage, - agave_xdp::{set_cpu_affinity, transmitter::XdpConfig}, + agave_xdp::transmitter::XdpConfig, clap::{ArgMatches, crate_name, value_t, value_t_or_exit, values_t, values_t_or_exit}, crossbeam_channel::unbounded, log::*, @@ -412,20 +414,27 @@ pub fn execute( #[cfg(not(target_os = "linux"))] let xdp_transmit_setup = None; - let reserved = xdp_transmit_config - .map(|xdp| xdp.cpus.clone()) - .unwrap_or_default() - .iter() - .cloned() - .collect::>(); - if !reserved.is_empty() { - let available = core_affinity::get_core_ids() + #[cfg(target_os = "linux")] + { + let reserved = xdp_transmit_config + .as_ref() + .map(|xdp| xdp.cpus.clone()) .unwrap_or_default() .into_iter() - .map(|core_id| core_id.id) - .collect::>(); - let available = available.difference(&reserved); - set_cpu_affinity(available.into_iter().copied()).unwrap(); + .map(CpuId::new) + .collect::>>()?; + if !reserved.is_empty() { + let available = cpu_affinity(None)? + .into_iter() + .filter(|cpu| !reserved.contains(cpu)) + .collect::>(); + if available.is_empty() { + Err(String::from( + "XDP reserved all available CPU cores; no CPU available for the validator main thread", + ))?; + } + set_cpu_affinity(None, available.iter().copied())?; + } } solana_core::validator::report_target_features(); diff --git a/xdp/Cargo.toml b/xdp/Cargo.toml index d4437e904e5..1cb2c40e3d7 100644 --- a/xdp/Cargo.toml +++ b/xdp/Cargo.toml @@ -24,7 +24,7 @@ agave-xdp-ebpf = { workspace = true } arrayvec = { workspace = true } aya = { workspace = true } caps = { workspace = true } -core_affinity = { workspace = true } +agave-cpu-utils = { workspace = true } [lints] workspace = true diff --git a/xdp/src/lib.rs b/xdp/src/lib.rs index 6f149efa5af..daf78473219 100644 --- a/xdp/src/lib.rs +++ b/xdp/src/lib.rs @@ -31,51 +31,6 @@ pub mod transmitter; pub use program::load_xdp_program; use std::{io, net::Ipv4Addr}; -#[cfg(target_os = "linux")] -pub fn set_cpu_affinity(cpus: impl IntoIterator) -> Result<(), io::Error> { - unsafe { - let mut cpu_set = std::mem::zeroed(); - - for cpu in cpus { - libc::CPU_SET(cpu, &mut cpu_set); - } - - let result = libc::sched_setaffinity( - 0, - std::mem::size_of::(), - &cpu_set as *const libc::cpu_set_t, - ); - if result != 0 { - Err(io::Error::last_os_error()) - } else { - Ok(()) - } - } -} - -#[cfg(not(target_os = "linux"))] -pub fn set_cpu_affinity(_cpus: impl IntoIterator) -> Result<(), io::Error> { - unimplemented!() -} - -#[cfg(target_os = "linux")] -pub fn get_cpu() -> Result { - unsafe { - let result = libc::sched_getcpu(); - if result < 0 { - assert_eq!(result, -1); - Err(io::Error::last_os_error()) - } else { - Ok(result as usize) - } - } -} - -#[cfg(not(target_os = "linux"))] -pub fn get_cpu() -> Result { - unimplemented!() -} - /// Returns the IPv4 address of the specified network interface. /// /// If the interface is part of a bonded interface, returns the master's IPv4 address. diff --git a/xdp/src/transmitter.rs b/xdp/src/transmitter.rs index e72b6ad793d..09bc99a7ba0 100644 --- a/xdp/src/transmitter.rs +++ b/xdp/src/transmitter.rs @@ -16,10 +16,10 @@ use { load_xdp_program, route::{RouteTable, Router, RoutingTables}, route_monitor::RouteMonitor, - set_cpu_affinity, tx_loop::{TxLoop, TxLoopBuilder, TxLoopConfigBuilder, TxPacket}, umem::{OwnedUmem, PageAlignedMemory}, }, + agave_cpu_utils::{CpuId, cpu_affinity, set_cpu_affinity}, arc_swap::ArcSwap, arrayvec::ArrayVec, aya::Ebpf, @@ -251,32 +251,33 @@ impl TransmitterBuilder { tx_loop_config_builder.zero_copy(zero_copy); let tx_loop_config = tx_loop_config_builder.build_with_src_device(&dev); - let reserved_cores = cpus.iter().cloned().collect::>(); - let available_cores = core_affinity::get_core_ids() - .expect("linux provide affine cores") + let reserved_cores = cpus + .iter() + .copied() + .map(CpuId::new) + .collect::>>()?; + let unreserved_cores = cpu_affinity(None)? .into_iter() - .map(|core_affinity::CoreId { id }| id) - .collect::>(); - let unreserved_cores = available_cores - .difference(&reserved_cores) - .cloned() + .filter(|core| !reserved_cores.contains(core)) .collect::>(); - let tx_loop_builders = cpus - .into_iter() - .zip(std::iter::repeat_with(|| tx_loop_config.clone())) - .enumerate() - .map(|(i, (cpu_id, config))| { - // since we aren't necessarily allocating from the thread that we intend to run on, - // temporarily switch to the target cpu for each TxLoop to ensure that the Umem region - // is allocated to the correct numa node - set_cpu_affinity([cpu_id]).unwrap(); - let tx_loop_builder = TxLoopBuilder::new(cpu_id, QueueId(i as u64), config, &dev); - // migrate main thread back off of the last xdp reserved cpu - set_cpu_affinity(unreserved_cores.clone()).unwrap(); - tx_loop_builder - }) - .collect::>(); + if unreserved_cores.is_empty() { + return Err("all CPUs are reserved; no CPU available for the main thread".into()); + } + + let mut tx_loop_builders = Vec::with_capacity(cpus.len()); + for (i, cpu_id) in cpus.into_iter().enumerate() { + // since we aren't necessarily allocating from the thread that we intend to run on, + // temporarily switch to the target cpu for each TxLoop to ensure that the Umem region + // is allocated to the correct numa node + let cpu = CpuId::new(cpu_id)?; + set_cpu_affinity(None, [cpu])?; + let tx_loop_builder = + TxLoopBuilder::new(cpu_id, QueueId(i as u64), tx_loop_config.clone(), &dev); + // migrate main thread back off of the last xdp reserved cpu + set_cpu_affinity(None, unreserved_cores.iter().copied())?; + tx_loop_builders.push(tx_loop_builder); + } // switch to higher caps while we setup XDP. We assume that an error in // this function is irrecoverable so we don't try to drop on errors. diff --git a/xdp/src/tx_loop.rs b/xdp/src/tx_loop.rs index 00630cdd7d6..7f9b62f142c 100644 --- a/xdp/src/tx_loop.rs +++ b/xdp/src/tx_loop.rs @@ -14,10 +14,10 @@ use { write_ip_header_for_udp, write_udp_header, }, route::NextHop, - set_cpu_affinity, socket::{Socket, Tx, TxRing}, umem::{Frame, OwnedUmem, PageAlignedMemory, Umem}, }, + agave_cpu_utils::{CpuId, set_cpu_affinity}, crossbeam_channel::{Receiver, Sender, TryRecvError}, libc::{_SC_PAGESIZE, sysconf}, std::{ @@ -240,7 +240,7 @@ impl TxLoop { } = self; // each queue is bound to its own CPU core - set_cpu_affinity([cpu_id]).unwrap(); + set_cpu_affinity(None, [CpuId::new(cpu_id).unwrap()]).unwrap(); let umem = socket.umem(); let umem_tx_capacity = umem.available(); From feee556352b60652899f7939945238c1d53c3c2a Mon Sep 17 00:00:00 2001 From: greg Date: Sun, 14 Jun 2026 09:42:21 +0000 Subject: [PATCH 2/4] Promote PoH pinned CPU core flag --- CHANGELOG.md | 2 + poh/src/poh_service.rs | 2 +- validator/src/cli.rs | 11 +++++ validator/src/commands/run/args.rs | 5 +- validator/src/commands/run/execute.rs | 66 ++++++++++++++++++++++++++- 5 files changed, 80 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e37e6476767..4785a34f22b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,10 +22,12 @@ Release channels have their own copy of this changelog: `getLatestBlockhash` response together with its context (notably `context.slot`). ### Validator #### Breaking +* The default PoH pinned CPU core is now CPU core 10. Use `--poh-pinned-cpu-core` to override it. #### Deprecations * `--accounts-db-access-storages-method` is now deprecated and a no-op (the `mmap` value was deprecated in v4.0.0; mmap mode has now been removed entirely). The flag is still accepted for backward compatibility, but account storages are always accessed via file I/O. +* `--experimental-poh-pinned-cpu-core` is now deprecated. Use `--poh-pinned-cpu-core` instead. #### Changes * Turbine shred ingestion now rejects shreds more than half an epoch in the future (previously up to 2 full epochs ahead was accepted). ### CLI diff --git a/poh/src/poh_service.rs b/poh/src/poh_service.rs index 0240e71fcf8..8ce3c08d18b 100644 --- a/poh/src/poh_service.rs +++ b/poh/src/poh_service.rs @@ -39,7 +39,7 @@ const TARGET_HASH_BATCH_TIME_US: u64 = 50; pub const DEFAULT_HASHES_PER_BATCH: u64 = TARGET_HASH_BATCH_TIME_US * DEFAULT_HASHES_PER_SECOND / 1_000_000; -pub const DEFAULT_PINNED_CPU_CORE: usize = 0; +pub const DEFAULT_PINNED_CPU_CORE: usize = 10; const TARGET_SLOT_ADJUSTMENT_NS: u64 = 50_000_000; diff --git a/validator/src/cli.rs b/validator/src/cli.rs index 7c5484ac007..4011ed283b8 100644 --- a/validator/src/cli.rs +++ b/validator/src/cli.rs @@ -158,6 +158,17 @@ fn deprecated_arguments() -> Vec { .conflicts_with("accounts_index_limit"), replaced_by: "accounts-index-limit", ); + add_arg!( + // deprecated in v4.2.0 + Arg::with_name("experimental_poh_pinned_cpu_core") + .long("experimental-poh-pinned-cpu-core") + .takes_value(true) + .value_name("CPU_ID") + .conflicts_with("poh_pinned_cpu_core") + .validator(is_parsable::) + .help("Specify which CPU core PoH is pinned to. Use --poh-pinned-cpu-core instead"), + replaced_by: "poh-pinned-cpu-core", + ); add_arg!( // deprecated in v4.1.0 Arg::with_name("experimental_retransmit_xdp_cpu_cores") diff --git a/validator/src/commands/run/args.rs b/validator/src/commands/run/args.rs index f9940919f9b..c329213df6c 100644 --- a/validator/src/commands/run/args.rs +++ b/validator/src/commands/run/args.rs @@ -864,12 +864,11 @@ pub fn add_args<'a>(app: App<'a, 'a>, default_args: &'a DefaultArgs) -> App<'a, ) .arg( Arg::with_name("poh_pinned_cpu_core") - .hidden(hidden_unless_forced()) - .long("experimental-poh-pinned-cpu-core") + .long("poh-pinned-cpu-core") .takes_value(true) .value_name("CPU_ID") .validator(is_parsable::) - .help("EXPERIMENTAL: Specify which CPU core PoH is pinned to"), + .help("Specify which CPU core PoH is pinned to. Defaults to CPU 10 on Linux"), ) .arg( Arg::with_name("poh_hashes_per_batch") diff --git a/validator/src/commands/run/execute.rs b/validator/src/commands/run/execute.rs index bcf12570d91..1cc4110e905 100644 --- a/validator/src/commands/run/execute.rs +++ b/validator/src/commands/run/execute.rs @@ -93,6 +93,12 @@ pub enum Operation { Run, } +fn parse_poh_pinned_cpu_core(matches: &ArgMatches) -> usize { + value_of(matches, "poh_pinned_cpu_core") + .or_else(|| value_of(matches, "experimental_poh_pinned_cpu_core")) + .unwrap_or(poh_service::DEFAULT_PINNED_CPU_CORE) +} + pub fn execute( matches: &ArgMatches, solana_version: &str, @@ -167,6 +173,10 @@ pub fn execute( } } + let poh_pinned_cpu_core = parse_poh_pinned_cpu_core(matches); + #[cfg(target_os = "linux")] + info!("PoH pinned CPU core: {poh_pinned_cpu_core}"); + let xdp_transmit_config = if let Some(xdp_cpu_cores) = matches .value_of("xdp_cpu_cores") .or_else(|| matches.value_of("experimental_retransmit_xdp_cpu_cores")) @@ -813,8 +823,7 @@ pub fn execute( // The validator needs to open many files, check that the process has // permission to do so in order to fail quickly and give a direct error enforce_ulimit_nofile: true, - poh_pinned_cpu_core: value_of(matches, "poh_pinned_cpu_core") - .unwrap_or(poh_service::DEFAULT_PINNED_CPU_CORE), + poh_pinned_cpu_core, poh_hashes_per_batch: value_of(matches, "poh_hashes_per_batch") .unwrap_or(poh_service::DEFAULT_HASHES_PER_BATCH), process_ledger_before_services: matches.is_present("process_ledger_before_services"), @@ -1383,3 +1392,56 @@ fn new_snapshot_config( Ok(snapshot_config) } +#[cfg(all(test, target_os = "linux"))] +mod tests { + use super::*; + + #[test] + fn poh_pinned_cpu_core_defaults_to_configured_default() { + let default_args = cli::DefaultArgs::default(); + let matches = cli::app("test", &default_args).get_matches_from(vec!["agave-validator"]); + + assert_eq!( + parse_poh_pinned_cpu_core(&matches), + poh_service::DEFAULT_PINNED_CPU_CORE + ); + } + + #[test] + fn poh_pinned_cpu_core_uses_stable_arg() { + let default_args = cli::DefaultArgs::default(); + let matches = cli::app("test", &default_args).get_matches_from(vec![ + "agave-validator", + "--poh-pinned-cpu-core", + "0", + ]); + + assert_eq!(parse_poh_pinned_cpu_core(&matches), 0); + } + + #[test] + fn poh_pinned_cpu_core_accepts_deprecated_experimental_arg() { + let default_args = cli::DefaultArgs::default(); + let matches = cli::app("test", &default_args).get_matches_from(vec![ + "agave-validator", + "--experimental-poh-pinned-cpu-core", + "0", + ]); + + assert_eq!(parse_poh_pinned_cpu_core(&matches), 0); + } + + #[test] + fn poh_pinned_cpu_core_args_conflict() { + let default_args = cli::DefaultArgs::default(); + let matches = cli::app("test", &default_args).get_matches_from_safe(vec![ + "agave-validator", + "--poh-pinned-cpu-core", + "0", + "--experimental-poh-pinned-cpu-core", + "0", + ]); + + assert!(matches.is_err()); + } +} From cfb80a96a2c49ef04a4059be1faaa4d7341d96db Mon Sep 17 00:00:00 2001 From: greg Date: Sun, 14 Jun 2026 09:43:08 +0000 Subject: [PATCH 3/4] Validate XDP and PoH CPU separation --- validator/src/commands/run/execute.rs | 95 +++++++++++++++++++++++++-- 1 file changed, 90 insertions(+), 5 deletions(-) diff --git a/validator/src/commands/run/execute.rs b/validator/src/commands/run/execute.rs index 1cc4110e905..dff88b43aee 100644 --- a/validator/src/commands/run/execute.rs +++ b/validator/src/commands/run/execute.rs @@ -99,6 +99,65 @@ fn parse_poh_pinned_cpu_core(matches: &ArgMatches) -> usize { .unwrap_or(poh_service::DEFAULT_PINNED_CPU_CORE) } +#[cfg(target_os = "linux")] +fn validate_xdp_cpus(cpus: &[usize], poh_pinned_cpu_core: usize) -> Result<(), String> { + for cpu in cpus { + CpuId::new(*cpu).map_err(|err| format!("invalid XDP CPU core {cpu}: {err}"))?; + } + validate_xdp_cpus_are_separate_from_poh_physical_core( + cpus, + poh_pinned_cpu_core, + read_thread_siblings_list, + ) +} + +#[cfg(target_os = "linux")] +fn read_thread_siblings_list(cpu: usize) -> Result, String> { + let path = Path::new("/sys/devices/system/cpu") + .join(format!("cpu{cpu}")) + .join("topology/thread_siblings_list"); + let cpu_ranges = fs::read_to_string(&path) + .map_err(|err| format!("failed to read {}: {err}", path.display()))?; + solana_clap_utils::input_parsers::parse_cpu_ranges(cpu_ranges.trim()) + .map_err(|err| format!("failed to parse {}: {err}", path.display())) +} + +#[cfg(target_os = "linux")] +fn validate_xdp_cpus_are_separate_from_poh_physical_core( + cpus: &[usize], + poh_pinned_cpu_core: usize, + thread_siblings: F, +) -> Result<(), String> +where + F: Fn(usize) -> Result, String>, +{ + for cpu in cpus { + if cpu_shares_physical_core_with_poh(*cpu, poh_pinned_cpu_core, &thread_siblings)? { + return Err(format!( + "XDP CPU core {cpu} shares a physical core with PoH CPU core \ + {poh_pinned_cpu_core}; provide --xdp-cpu-cores with CPU cores on separate \ + physical cores" + )); + } + } + Ok(()) +} + +#[cfg(target_os = "linux")] +fn cpu_shares_physical_core_with_poh( + cpu: usize, + poh_pinned_cpu_core: usize, + thread_siblings: &F, +) -> Result +where + F: Fn(usize) -> Result, String>, +{ + if cpu == poh_pinned_cpu_core { + return Ok(true); + } + Ok(thread_siblings(cpu)?.contains(&poh_pinned_cpu_core)) +} + pub fn execute( matches: &ArgMatches, solana_version: &str, @@ -186,11 +245,11 @@ pub fn execute( .or_else(|| matches.value_of("experimental_retransmit_xdp_interface")); let xdp_zero_copy = matches.is_present("xdp_zero_copy") || matches.is_present("experimental_retransmit_xdp_zero_copy"); - let config = XdpConfig::new( - xdp_interface, - parse_cpu_ranges(xdp_cpu_cores).unwrap(), - xdp_zero_copy, - ); + let xdp_cpus = parse_cpu_ranges(xdp_cpu_cores).unwrap(); + #[cfg(target_os = "linux")] + validate_xdp_cpus(&xdp_cpus, poh_pinned_cpu_core)?; + let config = XdpConfig::new(xdp_interface, xdp_cpus, xdp_zero_copy); + info!("XDP enabled on CPU cores: {:?}", config.cpus); if bind_addresses.len() > 1 { Err(String::from( "--xdp-cpu-cores cannot be used in a multihoming context", @@ -1444,4 +1503,30 @@ mod tests { assert!(matches.is_err()); } + + fn test_thread_siblings(cpu: usize) -> Result, String> { + Ok(match cpu { + 2 | 10 => vec![2, 10], + 3 | 11 => vec![3, 11], + _ => vec![cpu], + }) + } + + #[test] + fn explicit_xdp_cpu_rejects_poh_physical_core() { + let err = + validate_xdp_cpus_are_separate_from_poh_physical_core(&[2], 10, test_thread_siblings) + .unwrap_err(); + assert!(err.contains("shares a physical core")); + assert!(err.contains("--xdp-cpu-cores")); + assert!(!err.contains("--no-xdp")); + } + + #[test] + fn explicit_xdp_cpu_accepts_separate_physical_core() { + assert!( + validate_xdp_cpus_are_separate_from_poh_physical_core(&[3], 10, test_thread_siblings,) + .is_ok() + ); + } } From 0849ba3ce79088a326523b5065f4d16327ab62d9 Mon Sep 17 00:00:00 2001 From: greg Date: Sun, 14 Jun 2026 09:44:46 +0000 Subject: [PATCH 4/4] Enable XDP copy mode by default --- CHANGELOG.md | 4 + docs/src/operations/running-with-af-xdp.md | 25 +- multinode-demo/bootstrap-validator.sh | 1 + multinode-demo/validator.sh | 1 + scripts/run.sh | 1 + validator/src/cli.rs | 11 +- validator/src/commands/run/args.rs | 36 ++- validator/src/commands/run/execute.rs | 310 +++++++++++++++++++-- 8 files changed, 333 insertions(+), 56 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4785a34f22b..094a3f1cfd3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,10 @@ Release channels have their own copy of this changelog: `getLatestBlockhash` response together with its context (notably `context.slot`). ### Validator #### Breaking +* XDP transmit is now enabled by default on Linux in copy mode on an auto-selected CPU + core separate from PoH. Use `--xdp-cpu-cores` to override the XDP CPU assignment. + Use `--xdp-zero-copy` with `--xdp-interface` to opt in to zero copy. Default validator + startup now requires the XDP copy-mode capabilities. * The default PoH pinned CPU core is now CPU core 10. Use `--poh-pinned-cpu-core` to override it. #### Deprecations * `--accounts-db-access-storages-method` is now deprecated and a no-op (the `mmap` value was diff --git a/docs/src/operations/running-with-af-xdp.md b/docs/src/operations/running-with-af-xdp.md index 94b1f57b4a2..67fd7f60ecd 100644 --- a/docs/src/operations/running-with-af-xdp.md +++ b/docs/src/operations/running-with-af-xdp.md @@ -15,27 +15,30 @@ Before rolling out XDP on a production validator, you should test it on your set * **Performance Gain:** Confirm that performance is improved with the new configuration (e.g. lower CPU usage or higher throughput in Turbine’s retransmit stage). * **Metric Visibility:** Verify that you can observe the retransmit-stage metrics, which show time spent sending shreds, to gauge the impact of XDP on network transmission. -To enable XDP in Agave, add the following command-line flags to your validator startup command (using Agave v3.0.9+): +XDP is enabled by default on Linux in Agave. The default XDP configuration uses copy mode on an auto-selected CPU core separate from PoH. To use different CPU cores for XDP, pass: ```bash ---experimental-retransmit-xdp-cpu-cores 1 ---experimental-retransmit-xdp-zero-copy # Do NOT pass this flag when using the bnxt_en driver. ---experimental-poh-pinned-cpu-core 10 +--xdp-cpu-cores 2 ``` -Note that --experimental-retransmit-xdp-zero-copy will avoid using socket buffers for data, but this is only possible when talking directly to the Network Interface Card (NIC). As a result, zero copy cannot be used with the bonded interface itself. When using a bonded network interface, specify the underlying member interface to which the XDP program should be attached: +Zero copy avoids using socket buffers for data, but this is only possible when talking directly to the Network Interface Card (NIC). To opt in to zero copy, pass an explicit physical interface: ```bash ---experimental-retransmit-xdp-interface +--xdp-zero-copy --xdp-interface ``` - Also note that XDP and PoH *must* be assigned to separate (physical) cores. The ---experimental-poh-pinned-cpu-core N flag can be used to move the PoH thread. +Zero copy cannot be used with a bonded interface itself. When using a bonded network interface, specify the underlying member interface to which the XDP program should be attached: -Next, your validator binary will need to have access to a few higher level permissions. The validator process requires the CAP_NET_RAW, CAP_NET_ADMIN, CAP_BPF, and CAP_PERFMON capabilities. These can be configured in the systemd service file by setting CapabilityBoundingSet=CAP_NET_RAW CAP_NET_ADMIN CAP_BPF CAP_PERFMON under the [Service] section or directly on the binary with the command: +```bash +--xdp-zero-copy --xdp-interface +``` + +Also note that XDP and PoH *must* be assigned to separate physical cores. PoH defaults to CPU core 10, and XDP defaults to an auto-selected CPU core separate from PoH. The `--poh-pinned-cpu-core N` flag can be used to move the PoH thread. + +Next, your validator binary will need to have access to a few higher level permissions. With default copy-mode XDP, the validator process requires the CAP_NET_RAW and CAP_NET_ADMIN capabilities. Zero copy additionally requires CAP_BPF and CAP_PERFMON. These capabilities can be configured in the systemd service file by setting CapabilityBoundingSet=CAP_NET_RAW CAP_NET_ADMIN under the [Service] section or directly on the binary with the command: ```bash -sudo setcap cap_net_raw,cap_net_admin,cap_bpf,cap_perfmon=p +sudo setcap cap_net_raw,cap_net_admin=p #this command must be run each time the binary is replaced ``` @@ -78,7 +81,7 @@ modinfo bnxt_en | `igb` / Intel I210 | ✅ Works | ✅ Works w/ caveat | caveat: `igb` requires kernel `>= 6.14` for ZC. Field report: I210 on 6.17 enabled ZC but had severe network degradation/high skips, so fall back to non-ZC if unstable. | | `ixgbe` / Intel X540, X550 | ✅ Works | ⚠️ Mixed / unstable | Alessandro guidance for freeze/link-flap cases: start without ZC while `ixgbe` is debugged. Stay tuned! | | `ice` / Intel E800 | ✅ Works | ✅ Works | `ice` supports native XDP and AF_XDP zero-copy. Caveats: XDP is blocked for frame sizes larger than 3KB | -| `bnxt_en` / Broadcom | ✅ Works | ❌ Does not work | `bnxt_en` works with XDP, but do not pass the zero-copy flag. Broadcom non-ZC can still be reasonably fast. But please get a non-broadcom NIC | +| `bnxt_en` / Broadcom | ✅ Works | ❌ Does not work | `bnxt_en` works with default copy-mode XDP. Broadcom non-ZC can still be reasonably fast. But please get a non-broadcom NIC | | `tg3` / Broadcom | ❌ No native/driver XDP; generic XDP only at best | ❌ Does not work | Broadcom BCM5720 uses the `tg3` driver. Treat as unsupported for Agave/AF_XDP performance work: no native XDP and no AF_XDP zero-copy. | | `r8169` / Realtek | ❌ No native/driver XDP; generic XDP only at best | ❌ Does not work | Realtek NICs using `r8169` should be treated as unsupported for Agave/AF_XDP performance work: no native XDP and no AF_XDP zero-copy.| | `mlx4_en` / Mellanox ConnectX-3 | ❌ Do not use | ❌ Does not work | Driver is no longer supported. Zero-copy does not work. Do not use. | diff --git a/multinode-demo/bootstrap-validator.sh b/multinode-demo/bootstrap-validator.sh index 6b231383e1d..fa75b51a88d 100755 --- a/multinode-demo/bootstrap-validator.sh +++ b/multinode-demo/bootstrap-validator.sh @@ -144,6 +144,7 @@ args+=( --no-wait-for-vote-to-start-leader --full-rpc-api --allow-private-addr + --no-xdp ) default_arg --gossip-port 8001 default_arg --log - diff --git a/multinode-demo/validator.sh b/multinode-demo/validator.sh index 0d76fab0284..07bd51a86fb 100755 --- a/multinode-demo/validator.sh +++ b/multinode-demo/validator.sh @@ -10,6 +10,7 @@ args=( --max-genesis-archive-unpacked-size 1073741824 --no-poh-speed-test --no-os-network-limits-test + --no-xdp ) airdrops_enabled=1 node_sol=500 # 500 SOL: number of SOL to airdrop the node for transaction fees and vote account rent exemption (ignored if airdrops_enabled=0) diff --git a/scripts/run.sh b/scripts/run.sh index 18f13e339b1..5452a8d4dfc 100755 --- a/scripts/run.sh +++ b/scripts/run.sh @@ -121,6 +121,7 @@ args=( --require-tower --no-wait-for-vote-to-start-leader --no-os-network-limits-test + --no-xdp ) # shellcheck disable=SC2086 agave-validator "${args[@]}" $SOLANA_RUN_SH_VALIDATOR_ARGS & diff --git a/validator/src/cli.rs b/validator/src/cli.rs index 4011ed283b8..0158c57266e 100644 --- a/validator/src/cli.rs +++ b/validator/src/cli.rs @@ -175,13 +175,12 @@ fn deprecated_arguments() -> Vec { .long("experimental-retransmit-xdp-cpu-cores") .takes_value(true) .value_name("CPU_LIST") + .conflicts_with("no_xdp") .conflicts_with("xdp_cpu_cores") .validator(|value| { validate_cpu_ranges(value, "--experimental-retransmit-xdp-cpu-cores") }) - .help( - "Enable XDP retransmit on the specified CPU cores. Use --xdp-cpu-cores instead", - ), + .help("Use the specified CPU cores for XDP. Use --xdp-cpu-cores instead"), replaced_by: "xdp-cpu-cores", ); add_arg!( @@ -190,9 +189,9 @@ fn deprecated_arguments() -> Vec { .long("experimental-retransmit-xdp-interface") .takes_value(true) .value_name("INTERFACE") + .conflicts_with("no_xdp") .conflicts_with("xdp_interface") - .requires("experimental_retransmit_xdp_cpu_cores") - .help("Network interface to use for XDP retransmit. Use --xdp-interface instead"), + .help("Network interface to use for XDP. Use --xdp-interface instead"), replaced_by: "xdp-interface", ); add_arg!( @@ -200,8 +199,8 @@ fn deprecated_arguments() -> Vec { Arg::with_name("experimental_retransmit_xdp_zero_copy") .long("experimental-retransmit-xdp-zero-copy") .takes_value(false) + .conflicts_with("no_xdp") .conflicts_with("xdp_zero_copy") - .requires("experimental_retransmit_xdp_cpu_cores") .help("Enable XDP zero copy. Use --xdp-zero-copy instead"), replaced_by: "xdp-zero-copy", ); diff --git a/validator/src/commands/run/args.rs b/validator/src/commands/run/args.rs index c329213df6c..e6725f4e560 100644 --- a/validator/src/commands/run/args.rs +++ b/validator/src/commands/run/args.rs @@ -1207,28 +1207,44 @@ pub fn add_args<'a>(app: App<'a, 'a>, default_args: &'a DefaultArgs) -> App<'a, .validator(|s| is_within_range(s, 1..)) .help(DefaultSchedulerPool::cli_message()), ) + .arg( + Arg::with_name("no_xdp") + .long("no-xdp") + .takes_value(false) + .conflicts_with("experimental_retransmit_xdp_cpu_cores") + .conflicts_with("experimental_retransmit_xdp_interface") + .conflicts_with("experimental_retransmit_xdp_zero_copy") + .conflicts_with("xdp_cpu_cores") + .conflicts_with("xdp_interface") + .conflicts_with("xdp_zero_copy") + .help("Do not use XDP transmit"), + ) + .arg( + Arg::with_name("xdp_zero_copy") + .long("xdp-zero-copy") + .takes_value(false) + .conflicts_with("no_xdp") + .help("Enable XDP zero copy"), + ) .arg( Arg::with_name("xdp_interface") .long("xdp-interface") .takes_value(true) .value_name("INTERFACE") - .requires("xdp_cpu_cores") - .help("Network interface to use for XDP"), + .conflicts_with("no_xdp") + .help("Network interface to use for XDP. Required when XDP zero copy is enabled"), ) .arg( Arg::with_name("xdp_cpu_cores") .long("xdp-cpu-cores") .takes_value(true) .value_name("CPU_LIST") + .conflicts_with("no_xdp") .validator(|value| validate_cpu_ranges(value, "--xdp-cpu-cores")) - .help("Use the specified CPU cores for XDP"), - ) - .arg( - Arg::with_name("xdp_zero_copy") - .long("xdp-zero-copy") - .takes_value(false) - .requires("xdp_cpu_cores") - .help("Enable XDP zero copy. Requires hardware support"), + .help( + "Use the specified CPU cores for XDP. Defaults to an auto-selected CPU on a \ + physical core separate from PoH", + ), ) .args(&pub_sub_config::args(/*test_validator:*/ false)) .args(&json_rpc_config::args()) diff --git a/validator/src/commands/run/execute.rs b/validator/src/commands/run/execute.rs index dff88b43aee..72033c65629 100644 --- a/validator/src/commands/run/execute.rs +++ b/validator/src/commands/run/execute.rs @@ -31,9 +31,7 @@ use { create_and_canonicalize_directory, }, }, - solana_clap_utils::input_parsers::{ - keypair_of, keypairs_of, parse_cpu_ranges, pubkey_of, value_of, values_of, - }, + solana_clap_utils::input_parsers::{keypair_of, keypairs_of, pubkey_of, value_of, values_of}, solana_clock::{DEFAULT_SLOTS_PER_EPOCH, Slot}, solana_core::{ banking_stage::transaction_scheduler::scheduler_controller::SchedulerConfig, @@ -87,7 +85,7 @@ use { }, }; -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum Operation { Initialize, Run, @@ -99,6 +97,83 @@ fn parse_poh_pinned_cpu_core(matches: &ArgMatches) -> usize { .unwrap_or(poh_service::DEFAULT_PINNED_CPU_CORE) } +fn parse_xdp_transmit_config( + matches: &ArgMatches, + bind_addresses: &BindIpAddrs, + operation: Operation, + poh_pinned_cpu_core: usize, +) -> Result, String> { + if matches.is_present("no_xdp") || operation == Operation::Initialize { + return Ok(None); + } + + #[cfg(not(target_os = "linux"))] + { + let _ = (bind_addresses, poh_pinned_cpu_core); + let xdp_config_requested = matches.value_of("xdp_cpu_cores").is_some() + || matches + .value_of("experimental_retransmit_xdp_cpu_cores") + .is_some() + || matches.value_of("xdp_interface").is_some() + || matches + .value_of("experimental_retransmit_xdp_interface") + .is_some() + || matches.is_present("xdp_zero_copy") + || matches.is_present("experimental_retransmit_xdp_zero_copy"); + if xdp_config_requested { + return Err(String::from("XDP is only supported on Linux")); + } + Ok(None) + } + + #[cfg(target_os = "linux")] + { + if bind_addresses.len() > 1 { + return Err(String::from("XDP cannot be used in a multihoming context")); + } + + let xdp_interface = matches + .value_of("xdp_interface") + .or_else(|| matches.value_of("experimental_retransmit_xdp_interface")); + let xdp_zero_copy = matches.is_present("xdp_zero_copy") + || matches.is_present("experimental_retransmit_xdp_zero_copy"); + if xdp_zero_copy && xdp_interface.is_none() { + return Err(String::from( + "XDP zero copy requires an explicit network interface. Use --xdp-interface to \ + select the XDP interface", + )); + } + let xdp_cpu_ranges = matches + .value_of("xdp_cpu_cores") + .or_else(|| matches.value_of("experimental_retransmit_xdp_cpu_cores")); + let xdp_cpus = if let Some(cpu_ranges) = xdp_cpu_ranges { + let cpus = solana_clap_utils::input_parsers::parse_cpu_ranges(cpu_ranges) + .map_err(|err| err.to_string())?; + validate_xdp_cpus(&cpus, poh_pinned_cpu_core)?; + cpus + } else { + let allowed_cpus = cpu_affinity(None) + .map_err(|err| { + format!( + "failed to query CPU affinity for XDP CPU selection: {err}. \ + Provide --xdp-cpu-cores explicitly" + ) + })? + .iter() + .map(|cpu| **cpu) + .collect::>(); + vec![select_default_xdp_cpu( + &allowed_cpus, + poh_pinned_cpu_core, + read_thread_siblings_list, + )?] + }; + + info!("XDP enabled on CPU cores: {xdp_cpus:?}"); + Ok(Some(XdpConfig::new(xdp_interface, xdp_cpus, xdp_zero_copy))) + } +} + #[cfg(target_os = "linux")] fn validate_xdp_cpus(cpus: &[usize], poh_pinned_cpu_core: usize) -> Result<(), String> { for cpu in cpus { @@ -143,6 +218,30 @@ where Ok(()) } +#[cfg(target_os = "linux")] +fn select_default_xdp_cpu( + allowed_cpus: &[usize], + poh_pinned_cpu_core: usize, + thread_siblings: F, +) -> Result +where + F: Fn(usize) -> Result, String>, +{ + CpuId::new(poh_pinned_cpu_core) + .map_err(|err| format!("invalid PoH CPU core {poh_pinned_cpu_core}: {err}"))?; + for cpu in allowed_cpus.iter().rev().copied() { + CpuId::new(cpu).map_err(|err| format!("invalid allowed CPU core {cpu}: {err}"))?; + if !cpu_shares_physical_core_with_poh(cpu, poh_pinned_cpu_core, &thread_siblings)? { + return Ok(cpu); + } + } + + Err(format!( + "XDP requires an available CPU core on a physical core separate from PoH CPU core \ + {poh_pinned_cpu_core}; provide --xdp-cpu-cores explicitly" + )) +} + #[cfg(target_os = "linux")] fn cpu_shares_physical_core_with_poh( cpu: usize, @@ -235,30 +334,8 @@ pub fn execute( let poh_pinned_cpu_core = parse_poh_pinned_cpu_core(matches); #[cfg(target_os = "linux")] info!("PoH pinned CPU core: {poh_pinned_cpu_core}"); - - let xdp_transmit_config = if let Some(xdp_cpu_cores) = matches - .value_of("xdp_cpu_cores") - .or_else(|| matches.value_of("experimental_retransmit_xdp_cpu_cores")) - { - let xdp_interface = matches - .value_of("xdp_interface") - .or_else(|| matches.value_of("experimental_retransmit_xdp_interface")); - let xdp_zero_copy = matches.is_present("xdp_zero_copy") - || matches.is_present("experimental_retransmit_xdp_zero_copy"); - let xdp_cpus = parse_cpu_ranges(xdp_cpu_cores).unwrap(); - #[cfg(target_os = "linux")] - validate_xdp_cpus(&xdp_cpus, poh_pinned_cpu_core)?; - let config = XdpConfig::new(xdp_interface, xdp_cpus, xdp_zero_copy); - info!("XDP enabled on CPU cores: {:?}", config.cpus); - if bind_addresses.len() > 1 { - Err(String::from( - "--xdp-cpu-cores cannot be used in a multihoming context", - ))?; - } - Some(config) - } else { - None - }; + let xdp_transmit_config = + parse_xdp_transmit_config(matches, &bind_addresses, operation, poh_pinned_cpu_core)?; let dynamic_port_range = solana_net_utils::parse_port_range(matches.value_of("dynamic_port_range").unwrap()) @@ -1451,9 +1528,32 @@ fn new_snapshot_config( Ok(snapshot_config) } + #[cfg(all(test, target_os = "linux"))] mod tests { - use super::*; + use { + super::*, + std::net::{IpAddr, Ipv4Addr}, + }; + + fn xdp_config_for_args( + args: &[&str], + bind_addresses: &BindIpAddrs, + ) -> Result, String> { + xdp_config_for_args_and_operation(args, bind_addresses, Operation::Run) + } + + fn xdp_config_for_args_and_operation( + args: &[&str], + bind_addresses: &BindIpAddrs, + operation: Operation, + ) -> Result, String> { + let default_args = cli::DefaultArgs::default(); + let matches = + cli::app("test", &default_args).get_matches_from([&["agave-validator"], args].concat()); + let poh_pinned_cpu_core = parse_poh_pinned_cpu_core(&matches); + parse_xdp_transmit_config(&matches, bind_addresses, operation, poh_pinned_cpu_core) + } #[test] fn poh_pinned_cpu_core_defaults_to_configured_default() { @@ -1504,6 +1604,142 @@ mod tests { assert!(matches.is_err()); } + #[test] + fn default_xdp_config_uses_copy_mode_and_auto_selected_cpu() { + let bind_addresses = BindIpAddrs::default(); + let config = xdp_config_for_args(&[], &bind_addresses).unwrap().unwrap(); + + assert_eq!(config.interface, None); + assert_eq!(config.cpus.len(), 1); + assert_ne!(config.cpus[0], poh_service::DEFAULT_PINNED_CPU_CORE); + assert!(!config.zero_copy); + } + + #[test] + fn xdp_zero_copy_requires_interface() { + let bind_addresses = BindIpAddrs::default(); + + let err = xdp_config_for_args(&["--xdp-zero-copy"], &bind_addresses).unwrap_err(); + assert!(err.contains("--xdp-interface")); + assert!(!err.contains("--no-xdp")); + } + + #[test] + fn xdp_zero_copy_uses_default_cpu_and_configured_interface() { + let bind_addresses = BindIpAddrs::default(); + let config = xdp_config_for_args( + &["--xdp-zero-copy", "--xdp-interface", "eth0"], + &bind_addresses, + ) + .unwrap() + .unwrap(); + + assert_eq!(config.interface.as_deref(), Some("eth0")); + assert_eq!(config.cpus.len(), 1); + assert_ne!(config.cpus[0], poh_service::DEFAULT_PINNED_CPU_CORE); + assert!(config.zero_copy); + } + + #[test] + fn xdp_zero_copy_accepts_deprecated_args() { + let bind_addresses = BindIpAddrs::default(); + let config = xdp_config_for_args( + &[ + "--experimental-retransmit-xdp-zero-copy", + "--experimental-retransmit-xdp-interface", + "eth0", + ], + &bind_addresses, + ) + .unwrap() + .unwrap(); + + assert_eq!(config.interface.as_deref(), Some("eth0")); + assert!(config.zero_copy); + } + + #[test] + fn no_xdp_returns_no_config() { + let bind_addresses = BindIpAddrs::default(); + assert!( + xdp_config_for_args(&["--no-xdp"], &bind_addresses) + .unwrap() + .is_none() + ); + } + + #[test] + fn init_returns_no_xdp_config() { + let bind_addresses = BindIpAddrs::default(); + assert!( + xdp_config_for_args_and_operation(&[], &bind_addresses, Operation::Initialize) + .unwrap() + .is_none() + ); + assert!( + xdp_config_for_args_and_operation( + &["--xdp-zero-copy"], + &bind_addresses, + Operation::Initialize, + ) + .unwrap() + .is_none() + ); + } + + #[test] + fn xdp_cpu_and_interface_are_configurable_in_copy_mode() { + let bind_addresses = BindIpAddrs::default(); + let config = xdp_config_for_args( + &[ + "--poh-pinned-cpu-core", + "1023", + "--xdp-interface", + "eth0", + "--xdp-cpu-cores", + "2-3", + ], + &bind_addresses, + ) + .unwrap() + .unwrap(); + + assert_eq!(config.interface.as_deref(), Some("eth0")); + assert_eq!(config.cpus, vec![2, 3]); + assert!(!config.zero_copy); + } + + #[test] + fn xdp_requires_single_bind_address() { + let bind_addresses = BindIpAddrs::new(vec![ + IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), + IpAddr::V4(Ipv4Addr::new(1, 1, 1, 1)), + ]) + .unwrap(); + + let err = xdp_config_for_args(&[], &bind_addresses).unwrap_err(); + assert!(err.contains("multihoming")); + assert!(!err.contains("--no-xdp")); + assert!( + xdp_config_for_args(&["--no-xdp"], &bind_addresses) + .unwrap() + .is_none() + ); + } + + #[test] + fn no_xdp_conflicts_with_xdp_overrides() { + let default_args = cli::DefaultArgs::default(); + let matches = cli::app("test", &default_args).get_matches_from_safe(vec![ + "agave-validator", + "--no-xdp", + "--xdp-cpu-cores", + "2", + ]); + + assert!(matches.is_err()); + } + fn test_thread_siblings(cpu: usize) -> Result, String> { Ok(match cpu { 2 | 10 => vec![2, 10], @@ -1512,6 +1748,22 @@ mod tests { }) } + #[test] + fn default_xdp_cpu_skips_poh_physical_core() { + assert_eq!( + select_default_xdp_cpu(&[3, 2], 10, test_thread_siblings), + Ok(3) + ); + } + + #[test] + fn default_xdp_cpu_errors_without_separate_physical_core() { + let err = select_default_xdp_cpu(&[2, 10], 10, test_thread_siblings).unwrap_err(); + assert!(err.contains("physical core separate from PoH")); + assert!(err.contains("--xdp-cpu-cores")); + assert!(!err.contains("--no-xdp")); + } + #[test] fn explicit_xdp_cpu_rejects_poh_physical_core() { let err =