diff --git a/CHANGELOG.md b/CHANGELOG.md index b8c9ec0b80a..63572f52d40 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,10 +23,16 @@ Release channels have their own copy of this changelog: `getLatestBlockhash` response together with its context (notably `context.slot`). ### Validator #### Breaking +* XDP transmit is now enabled by default on Linux in copy mode on CPU core 1. Use + `--xdp-cpu-cores` to override the XDP CPU assignment. Use `--xdp-zero-copy` with + `--xdp-interface` to opt in to zero copy. Default validator startup now requires the XDP + copy-mode capabilities. +* The default PoH pinned CPU core is now CPU core 10. Use `--poh-pinned-cpu-core` to override it. #### Deprecations * `--accounts-db-access-storages-method` is now deprecated and a no-op (the `mmap` value was deprecated in v4.0.0; mmap mode has now been removed entirely). The flag is still accepted for backward compatibility, but account storages are always accessed via file I/O. +* `--experimental-poh-pinned-cpu-core` is now deprecated. Use `--poh-pinned-cpu-core` instead. #### Changes * Turbine shred ingestion now rejects shreds more than half an epoch in the future (previously up to 2 full epochs ahead was accepted). ### CLI diff --git a/docs/src/operations/running-with-af-xdp.md b/docs/src/operations/running-with-af-xdp.md index 94b1f57b4a2..85b4f62c3ce 100644 --- a/docs/src/operations/running-with-af-xdp.md +++ b/docs/src/operations/running-with-af-xdp.md @@ -15,27 +15,30 @@ Before rolling out XDP on a production validator, you should test it on your set * **Performance Gain:** Confirm that performance is improved with the new configuration (e.g. lower CPU usage or higher throughput in Turbine’s retransmit stage). * **Metric Visibility:** Verify that you can observe the retransmit-stage metrics, which show time spent sending shreds, to gauge the impact of XDP on network transmission. -To enable XDP in Agave, add the following command-line flags to your validator startup command (using Agave v3.0.9+): +XDP is enabled by default on Linux in Agave. The default XDP configuration uses CPU core 1 and copy mode. To use different CPU cores for XDP, pass: ```bash ---experimental-retransmit-xdp-cpu-cores 1 ---experimental-retransmit-xdp-zero-copy # Do NOT pass this flag when using the bnxt_en driver. ---experimental-poh-pinned-cpu-core 10 +--xdp-cpu-cores 2 ``` -Note that --experimental-retransmit-xdp-zero-copy will avoid using socket buffers for data, but this is only possible when talking directly to the Network Interface Card (NIC). As a result, zero copy cannot be used with the bonded interface itself. When using a bonded network interface, specify the underlying member interface to which the XDP program should be attached: +Zero copy avoids using socket buffers for data, but this is only possible when talking directly to the Network Interface Card (NIC). To opt in to zero copy, pass an explicit physical interface: ```bash ---experimental-retransmit-xdp-interface +--xdp-zero-copy --xdp-interface ``` - Also note that XDP and PoH *must* be assigned to separate (physical) cores. The ---experimental-poh-pinned-cpu-core N flag can be used to move the PoH thread. +Zero copy cannot be used with a bonded interface itself. When using a bonded network interface, specify the underlying member interface to which the XDP program should be attached: -Next, your validator binary will need to have access to a few higher level permissions. The validator process requires the CAP_NET_RAW, CAP_NET_ADMIN, CAP_BPF, and CAP_PERFMON capabilities. These can be configured in the systemd service file by setting CapabilityBoundingSet=CAP_NET_RAW CAP_NET_ADMIN CAP_BPF CAP_PERFMON under the [Service] section or directly on the binary with the command: +```bash +--xdp-zero-copy --xdp-interface +``` + +Also note that XDP and PoH *must* be assigned to separate (physical) cores. PoH defaults to CPU core 10, and XDP defaults to CPU core 1. The --poh-pinned-cpu-core N flag can be used to move the PoH thread. + +Next, your validator binary will need to have access to a few higher level permissions. With default copy-mode XDP, the validator process requires the CAP_NET_RAW and CAP_NET_ADMIN capabilities. Zero copy additionally requires CAP_BPF and CAP_PERFMON. These capabilities can be configured in the systemd service file by setting CapabilityBoundingSet=CAP_NET_RAW CAP_NET_ADMIN under the [Service] section or directly on the binary with the command: ```bash -sudo setcap cap_net_raw,cap_net_admin,cap_bpf,cap_perfmon=p +sudo setcap cap_net_raw,cap_net_admin=p #this command must be run each time the binary is replaced ``` @@ -78,7 +81,7 @@ modinfo bnxt_en | `igb` / Intel I210 | ✅ Works | ✅ Works w/ caveat | caveat: `igb` requires kernel `>= 6.14` for ZC. Field report: I210 on 6.17 enabled ZC but had severe network degradation/high skips, so fall back to non-ZC if unstable. | | `ixgbe` / Intel X540, X550 | ✅ Works | ⚠️ Mixed / unstable | Alessandro guidance for freeze/link-flap cases: start without ZC while `ixgbe` is debugged. Stay tuned! | | `ice` / Intel E800 | ✅ Works | ✅ Works | `ice` supports native XDP and AF_XDP zero-copy. Caveats: XDP is blocked for frame sizes larger than 3KB | -| `bnxt_en` / Broadcom | ✅ Works | ❌ Does not work | `bnxt_en` works with XDP, but do not pass the zero-copy flag. Broadcom non-ZC can still be reasonably fast. But please get a non-broadcom NIC | +| `bnxt_en` / Broadcom | ✅ Works | ❌ Does not work | `bnxt_en` works with default copy-mode XDP. Broadcom non-ZC can still be reasonably fast. But please get a non-broadcom NIC | | `tg3` / Broadcom | ❌ No native/driver XDP; generic XDP only at best | ❌ Does not work | Broadcom BCM5720 uses the `tg3` driver. Treat as unsupported for Agave/AF_XDP performance work: no native XDP and no AF_XDP zero-copy. | | `r8169` / Realtek | ❌ No native/driver XDP; generic XDP only at best | ❌ Does not work | Realtek NICs using `r8169` should be treated as unsupported for Agave/AF_XDP performance work: no native XDP and no AF_XDP zero-copy.| | `mlx4_en` / Mellanox ConnectX-3 | ❌ Do not use | ❌ Does not work | Driver is no longer supported. Zero-copy does not work. Do not use. | diff --git a/multinode-demo/bootstrap-validator.sh b/multinode-demo/bootstrap-validator.sh index 6b231383e1d..fa75b51a88d 100755 --- a/multinode-demo/bootstrap-validator.sh +++ b/multinode-demo/bootstrap-validator.sh @@ -144,6 +144,7 @@ args+=( --no-wait-for-vote-to-start-leader --full-rpc-api --allow-private-addr + --no-xdp ) default_arg --gossip-port 8001 default_arg --log - diff --git a/multinode-demo/validator.sh b/multinode-demo/validator.sh index 0d76fab0284..07bd51a86fb 100755 --- a/multinode-demo/validator.sh +++ b/multinode-demo/validator.sh @@ -10,6 +10,7 @@ args=( --max-genesis-archive-unpacked-size 1073741824 --no-poh-speed-test --no-os-network-limits-test + --no-xdp ) airdrops_enabled=1 node_sol=500 # 500 SOL: number of SOL to airdrop the node for transaction fees and vote account rent exemption (ignored if airdrops_enabled=0) diff --git a/poh/src/poh_service.rs b/poh/src/poh_service.rs index c8dbc60ef95..82cb74913a6 100644 --- a/poh/src/poh_service.rs +++ b/poh/src/poh_service.rs @@ -40,7 +40,7 @@ pub const DEFAULT_HASHES_PER_BATCH: u64 = TARGET_HASH_BATCH_TIME_US * DEFAULT_HASHES_PER_SECOND / 1_000_000; #[cfg(target_os = "linux")] -pub const DEFAULT_PINNED_CPU_CORE: Option = Some(0); +pub const DEFAULT_PINNED_CPU_CORE: Option = Some(10); #[cfg(not(target_os = "linux"))] pub const DEFAULT_PINNED_CPU_CORE: Option = None; @@ -154,12 +154,12 @@ impl PohService { #[cfg(target_os = "linux")] if let Some(pinned_cpu_core) = pinned_cpu_core { // PoH service runs in a tight loop, generating hashes as fast as possible. - // Let's dedicate one of the CPU cores to this thread so that it can gain - // from cache performance. + // Dedicate one CPU core to this thread for cache performance. let pinned_cpu = CpuId::new(pinned_cpu_core).unwrap(); + info!("Pinning PoH service to CPU core {pinned_cpu_core}"); set_cpu_affinity(None, [pinned_cpu]).unwrap_or_else(|e| { panic!( - "Failed to set CPU affinity for POH service to CPU \ + "Failed to set CPU affinity for PoH service to CPU \ {pinned_cpu_core}: {e:?}. This is critical for performance." ) }); diff --git a/scripts/run.sh b/scripts/run.sh index 18f13e339b1..5452a8d4dfc 100755 --- a/scripts/run.sh +++ b/scripts/run.sh @@ -121,6 +121,7 @@ args=( --require-tower --no-wait-for-vote-to-start-leader --no-os-network-limits-test + --no-xdp ) # shellcheck disable=SC2086 agave-validator "${args[@]}" $SOLANA_RUN_SH_VALIDATOR_ARGS & diff --git a/validator/src/cli.rs b/validator/src/cli.rs index 7c5484ac007..0158c57266e 100644 --- a/validator/src/cli.rs +++ b/validator/src/cli.rs @@ -158,19 +158,29 @@ fn deprecated_arguments() -> Vec { .conflicts_with("accounts_index_limit"), replaced_by: "accounts-index-limit", ); + add_arg!( + // deprecated in v4.2.0 + Arg::with_name("experimental_poh_pinned_cpu_core") + .long("experimental-poh-pinned-cpu-core") + .takes_value(true) + .value_name("CPU_ID") + .conflicts_with("poh_pinned_cpu_core") + .validator(is_parsable::) + .help("Specify which CPU core PoH is pinned to. Use --poh-pinned-cpu-core instead"), + replaced_by: "poh-pinned-cpu-core", + ); add_arg!( // deprecated in v4.1.0 Arg::with_name("experimental_retransmit_xdp_cpu_cores") .long("experimental-retransmit-xdp-cpu-cores") .takes_value(true) .value_name("CPU_LIST") + .conflicts_with("no_xdp") .conflicts_with("xdp_cpu_cores") .validator(|value| { validate_cpu_ranges(value, "--experimental-retransmit-xdp-cpu-cores") }) - .help( - "Enable XDP retransmit on the specified CPU cores. Use --xdp-cpu-cores instead", - ), + .help("Use the specified CPU cores for XDP. Use --xdp-cpu-cores instead"), replaced_by: "xdp-cpu-cores", ); add_arg!( @@ -179,9 +189,9 @@ fn deprecated_arguments() -> Vec { .long("experimental-retransmit-xdp-interface") .takes_value(true) .value_name("INTERFACE") + .conflicts_with("no_xdp") .conflicts_with("xdp_interface") - .requires("experimental_retransmit_xdp_cpu_cores") - .help("Network interface to use for XDP retransmit. Use --xdp-interface instead"), + .help("Network interface to use for XDP. Use --xdp-interface instead"), replaced_by: "xdp-interface", ); add_arg!( @@ -189,8 +199,8 @@ fn deprecated_arguments() -> Vec { Arg::with_name("experimental_retransmit_xdp_zero_copy") .long("experimental-retransmit-xdp-zero-copy") .takes_value(false) + .conflicts_with("no_xdp") .conflicts_with("xdp_zero_copy") - .requires("experimental_retransmit_xdp_cpu_cores") .help("Enable XDP zero copy. Use --xdp-zero-copy instead"), replaced_by: "xdp-zero-copy", ); diff --git a/validator/src/commands/run/args.rs b/validator/src/commands/run/args.rs index 6ac31cda870..e6725f4e560 100644 --- a/validator/src/commands/run/args.rs +++ b/validator/src/commands/run/args.rs @@ -30,7 +30,7 @@ use { solana_send_transaction_service::send_transaction_service::Config as SendTransactionServiceConfig, solana_signer::Signer, solana_unified_scheduler_pool::DefaultSchedulerPool, - std::{collections::HashSet, net::SocketAddr, path::PathBuf, str::FromStr}, + std::{collections::HashSet, net::SocketAddr, path::PathBuf}, }; const EXCLUDE_KEY: &str = "account-index-exclude-key"; @@ -864,12 +864,11 @@ pub fn add_args<'a>(app: App<'a, 'a>, default_args: &'a DefaultArgs) -> App<'a, ) .arg( Arg::with_name("poh_pinned_cpu_core") - .hidden(hidden_unless_forced()) - .long("experimental-poh-pinned-cpu-core") + .long("poh-pinned-cpu-core") .takes_value(true) .value_name("CPU_ID") - .validator(|s| usize::from_str(&s).map(|_| ()).map_err(|e| e.to_string())) - .help("Specify which CPU core PoH is pinned to"), + .validator(is_parsable::) + .help("Specify which CPU core PoH is pinned to. Defaults to CPU 10 on Linux"), ) .arg( Arg::with_name("poh_hashes_per_batch") @@ -1208,28 +1207,44 @@ pub fn add_args<'a>(app: App<'a, 'a>, default_args: &'a DefaultArgs) -> App<'a, .validator(|s| is_within_range(s, 1..)) .help(DefaultSchedulerPool::cli_message()), ) + .arg( + Arg::with_name("no_xdp") + .long("no-xdp") + .takes_value(false) + .conflicts_with("experimental_retransmit_xdp_cpu_cores") + .conflicts_with("experimental_retransmit_xdp_interface") + .conflicts_with("experimental_retransmit_xdp_zero_copy") + .conflicts_with("xdp_cpu_cores") + .conflicts_with("xdp_interface") + .conflicts_with("xdp_zero_copy") + .help("Do not use XDP transmit"), + ) + .arg( + Arg::with_name("xdp_zero_copy") + .long("xdp-zero-copy") + .takes_value(false) + .conflicts_with("no_xdp") + .help("Enable XDP zero copy"), + ) .arg( Arg::with_name("xdp_interface") .long("xdp-interface") .takes_value(true) .value_name("INTERFACE") - .requires("xdp_cpu_cores") - .help("Network interface to use for XDP"), + .conflicts_with("no_xdp") + .help("Network interface to use for XDP. Required when XDP zero copy is enabled"), ) .arg( Arg::with_name("xdp_cpu_cores") .long("xdp-cpu-cores") .takes_value(true) .value_name("CPU_LIST") + .conflicts_with("no_xdp") .validator(|value| validate_cpu_ranges(value, "--xdp-cpu-cores")) - .help("Use the specified CPU cores for XDP"), - ) - .arg( - Arg::with_name("xdp_zero_copy") - .long("xdp-zero-copy") - .takes_value(false) - .requires("xdp_cpu_cores") - .help("Enable XDP zero copy. Requires hardware support"), + .help( + "Use the specified CPU cores for XDP. Defaults to an auto-selected CPU on a \ + physical core separate from PoH", + ), ) .args(&pub_sub_config::args(/*test_validator:*/ false)) .args(&json_rpc_config::args()) diff --git a/validator/src/commands/run/execute.rs b/validator/src/commands/run/execute.rs index 37b441b9836..fe153c2fc9b 100644 --- a/validator/src/commands/run/execute.rs +++ b/validator/src/commands/run/execute.rs @@ -1,3 +1,5 @@ +#[cfg(target_os = "linux")] +use agave_cpu_utils::{CpuId, cpu_affinity, set_cpu_affinity}; use { crate::{ admin_rpc_service::{self, StakedNodesOverrides, load_staked_nodes_overrides}, @@ -12,6 +14,7 @@ use { snapshot_config::{SnapshotConfig, SnapshotUsage}, }, agave_votor::vote_history_storage, + agave_xdp::transmitter::XdpConfig, clap::{ArgMatches, crate_name, value_t, value_t_or_exit, values_t, values_t_or_exit}, crossbeam_channel::unbounded, log::*, @@ -81,15 +84,190 @@ use { sync::{Arc, RwLock, atomic::AtomicBool}, }, }; -#[cfg(target_os = "linux")] -use {agave_xdp::transmitter::XdpConfig, solana_clap_utils::input_parsers::parse_cpu_ranges}; -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum Operation { Initialize, Run, } +fn parse_poh_pinned_cpu_core(matches: &ArgMatches) -> Option { + #[cfg(target_os = "linux")] + { + value_of(matches, "poh_pinned_cpu_core") + .or_else(|| value_of(matches, "experimental_poh_pinned_cpu_core")) + .or(poh_service::DEFAULT_PINNED_CPU_CORE) + } + #[cfg(not(target_os = "linux"))] + { + let _ = matches; + None + } +} + +fn parse_xdp_transmit_config( + matches: &ArgMatches, + bind_addresses: &BindIpAddrs, + operation: Operation, + poh_pinned_cpu_core: Option, +) -> Result, String> { + if matches.is_present("no_xdp") || operation == Operation::Initialize { + return Ok(None); + } + + #[cfg(not(target_os = "linux"))] + { + let _ = (bind_addresses, poh_pinned_cpu_core); + let xdp_config_requested = matches.value_of("xdp_cpu_cores").is_some() + || matches + .value_of("experimental_retransmit_xdp_cpu_cores") + .is_some() + || matches.value_of("xdp_interface").is_some() + || matches + .value_of("experimental_retransmit_xdp_interface") + .is_some() + || matches.is_present("xdp_zero_copy") + || matches.is_present("experimental_retransmit_xdp_zero_copy"); + if xdp_config_requested { + return Err(String::from("XDP is only supported on Linux")); + } + Ok(None) + } + + #[cfg(target_os = "linux")] + { + let poh_pinned_cpu_core = poh_pinned_cpu_core.ok_or_else(|| { + String::from("XDP requires PoH to be pinned to a CPU core") + })?; + if bind_addresses.len() > 1 { + return Err(String::from("XDP cannot be used in a multihoming context")); + } + + let xdp_interface = matches + .value_of("xdp_interface") + .or_else(|| matches.value_of("experimental_retransmit_xdp_interface")); + let xdp_zero_copy = matches.is_present("xdp_zero_copy") + || matches.is_present("experimental_retransmit_xdp_zero_copy"); + if xdp_zero_copy && xdp_interface.is_none() { + return Err(String::from( + "XDP zero copy requires an explicit network interface. Use --xdp-interface to \ + select the XDP interface", + )); + } + let xdp_cpu_ranges = matches + .value_of("xdp_cpu_cores") + .or_else(|| matches.value_of("experimental_retransmit_xdp_cpu_cores")); + let xdp_cpus = if let Some(cpu_ranges) = xdp_cpu_ranges { + let cpus = solana_clap_utils::input_parsers::parse_cpu_ranges(cpu_ranges) + .map_err(|err| err.to_string())?; + validate_xdp_cpus(&cpus, poh_pinned_cpu_core)?; + cpus + } else { + let allowed_cpus = cpu_affinity(None) + .map_err(|err| { + format!( + "failed to query CPU affinity for XDP CPU selection: {err}. \ + Provide --xdp-cpu-cores explicitly" + ) + })? + .iter() + .map(|cpu| **cpu) + .collect::>(); + vec![select_default_xdp_cpu( + &allowed_cpus, + poh_pinned_cpu_core, + read_thread_siblings_list, + )?] + }; + + info!("XDP enabled on CPU cores: {xdp_cpus:?}"); + Ok(Some(XdpConfig::new(xdp_interface, xdp_cpus, xdp_zero_copy))) + } +} + +#[cfg(target_os = "linux")] +fn validate_xdp_cpus(cpus: &[usize], poh_pinned_cpu_core: usize) -> Result<(), String> { + for cpu in cpus { + CpuId::new(*cpu).map_err(|err| format!("invalid XDP CPU core {cpu}: {err}"))?; + } + validate_xdp_cpus_are_separate_from_poh_physical_core( + cpus, + poh_pinned_cpu_core, + read_thread_siblings_list, + ) +} + +#[cfg(target_os = "linux")] +fn read_thread_siblings_list(cpu: usize) -> Result, String> { + let path = Path::new("/sys/devices/system/cpu") + .join(format!("cpu{cpu}")) + .join("topology/thread_siblings_list"); + let cpu_ranges = fs::read_to_string(&path) + .map_err(|err| format!("failed to read {}: {err}", path.display()))?; + solana_clap_utils::input_parsers::parse_cpu_ranges(cpu_ranges.trim()) + .map_err(|err| format!("failed to parse {}: {err}", path.display())) +} + +#[cfg(target_os = "linux")] +fn validate_xdp_cpus_are_separate_from_poh_physical_core( + cpus: &[usize], + poh_pinned_cpu_core: usize, + thread_siblings: F, +) -> Result<(), String> +where + F: Fn(usize) -> Result, String>, +{ + for cpu in cpus { + if cpu_shares_physical_core_with_poh(*cpu, poh_pinned_cpu_core, &thread_siblings)? { + return Err(format!( + "XDP CPU core {cpu} shares a physical core with PoH CPU core \ + {poh_pinned_cpu_core}; provide --xdp-cpu-cores with CPU cores on separate \ + physical cores" + )); + } + } + Ok(()) +} + +#[cfg(target_os = "linux")] +fn select_default_xdp_cpu( + allowed_cpus: &[usize], + poh_pinned_cpu_core: usize, + thread_siblings: F, +) -> Result +where + F: Fn(usize) -> Result, String>, +{ + CpuId::new(poh_pinned_cpu_core) + .map_err(|err| format!("invalid PoH CPU core {poh_pinned_cpu_core}: {err}"))?; + for cpu in allowed_cpus.iter().rev().copied() { + CpuId::new(cpu).map_err(|err| format!("invalid allowed CPU core {cpu}: {err}"))?; + if !cpu_shares_physical_core_with_poh(cpu, poh_pinned_cpu_core, &thread_siblings)? { + return Ok(cpu); + } + } + + Err(format!( + "XDP requires an available CPU core on a physical core separate from PoH CPU core \ + {poh_pinned_cpu_core}; provide --xdp-cpu-cores explicitly" + )) +} + +#[cfg(target_os = "linux")] +fn cpu_shares_physical_core_with_poh( + cpu: usize, + poh_pinned_cpu_core: usize, + thread_siblings: &F, +) -> Result +where + F: Fn(usize) -> Result, String>, +{ + if cpu == poh_pinned_cpu_core { + return Ok(true); + } + Ok(thread_siblings(cpu)?.contains(&poh_pinned_cpu_core)) +} + pub fn execute( matches: &ArgMatches, solana_version: &str, @@ -163,30 +341,18 @@ pub fn execute( Err(format!("invalid entrypoint address: {addr}"))?; } } + + let poh_pinned_cpu_core = parse_poh_pinned_cpu_core(matches); #[cfg(target_os = "linux")] - let xdp_transmit_config = if let Some(xdp_cpu_cores) = matches - .value_of("xdp_cpu_cores") - .or_else(|| matches.value_of("experimental_retransmit_xdp_cpu_cores")) { - let xdp_interface = matches - .value_of("xdp_interface") - .or_else(|| matches.value_of("experimental_retransmit_xdp_interface")); - let xdp_zero_copy = matches.is_present("xdp_zero_copy") - || matches.is_present("experimental_retransmit_xdp_zero_copy"); - let config = XdpConfig::new( - xdp_interface, - parse_cpu_ranges(xdp_cpu_cores).unwrap(), - xdp_zero_copy, - ); - if bind_addresses.len() > 1 { - Err(String::from( - "--xdp-cpu-cores cannot be used in a multihoming context", - ))?; + if let Some(poh_pinned_cpu_core) = poh_pinned_cpu_core { + info!("PoH pinned CPU core: {poh_pinned_cpu_core}"); + } else { + info!("PoH is not pinned to a CPU core"); } - Some(config) - } else { - None - }; + } + let xdp_transmit_config = + parse_xdp_transmit_config(matches, &bind_addresses, operation, poh_pinned_cpu_core)?; let dynamic_port_range = solana_net_utils::parse_port_range(matches.value_of("dynamic_port_range").unwrap()) @@ -426,11 +592,27 @@ pub fn execute( let (xdp_transmit_setup, xdp_network_config_report) = (None, None); #[cfg(target_os = "linux")] - let poh_pinned_cpu_core = - value_of(matches, "poh_pinned_cpu_core").or(poh_service::DEFAULT_PINNED_CPU_CORE); - - #[cfg(not(target_os = "linux"))] - let poh_pinned_cpu_core = None; + { + let reserved = xdp_transmit_config + .as_ref() + .map(|xdp| xdp.cpus.clone()) + .unwrap_or_default() + .into_iter() + .map(CpuId::new) + .collect::>>()?; + if !reserved.is_empty() { + let available = cpu_affinity(None)? + .into_iter() + .filter(|cpu| !reserved.contains(cpu)) + .collect::>(); + if available.is_empty() { + Err(String::from( + "XDP reserved all available CPU cores; no CPU available for the validator main thread", + ))?; + } + set_cpu_affinity(None, available.iter().copied())?; + } + } solana_core::validator::report_target_features(); @@ -1378,3 +1560,257 @@ fn new_snapshot_config( Ok(snapshot_config) } + +#[cfg(all(test, target_os = "linux"))] +mod tests { + use { + super::*, + std::net::{IpAddr, Ipv4Addr}, + }; + + fn xdp_config_for_args( + args: &[&str], + bind_addresses: &BindIpAddrs, + ) -> Result, String> { + xdp_config_for_args_and_operation(args, bind_addresses, Operation::Run) + } + + fn xdp_config_for_args_and_operation( + args: &[&str], + bind_addresses: &BindIpAddrs, + operation: Operation, + ) -> Result, String> { + let default_args = cli::DefaultArgs::default(); + let matches = + cli::app("test", &default_args).get_matches_from([&["agave-validator"], args].concat()); + let poh_pinned_cpu_core = parse_poh_pinned_cpu_core(&matches); + parse_xdp_transmit_config(&matches, bind_addresses, operation, poh_pinned_cpu_core) + } + + #[test] + fn poh_pinned_cpu_core_defaults_to_configured_default() { + let default_args = cli::DefaultArgs::default(); + let matches = cli::app("test", &default_args).get_matches_from(vec!["agave-validator"]); + + assert_eq!( + parse_poh_pinned_cpu_core(&matches), + poh_service::DEFAULT_PINNED_CPU_CORE + ); + } + + #[test] + fn poh_pinned_cpu_core_uses_stable_arg() { + let default_args = cli::DefaultArgs::default(); + let matches = cli::app("test", &default_args).get_matches_from(vec![ + "agave-validator", + "--poh-pinned-cpu-core", + "0", + ]); + + assert_eq!(parse_poh_pinned_cpu_core(&matches), Some(0)); + } + + #[test] + fn poh_pinned_cpu_core_accepts_deprecated_experimental_arg() { + let default_args = cli::DefaultArgs::default(); + let matches = cli::app("test", &default_args).get_matches_from(vec![ + "agave-validator", + "--experimental-poh-pinned-cpu-core", + "0", + ]); + + assert_eq!(parse_poh_pinned_cpu_core(&matches), Some(0)); + } + + #[test] + fn poh_pinned_cpu_core_args_conflict() { + let default_args = cli::DefaultArgs::default(); + let matches = cli::app("test", &default_args).get_matches_from_safe(vec![ + "agave-validator", + "--poh-pinned-cpu-core", + "0", + "--experimental-poh-pinned-cpu-core", + "0", + ]); + + assert!(matches.is_err()); + } + + #[test] + fn default_xdp_config_uses_copy_mode_and_auto_selected_cpu() { + let bind_addresses = BindIpAddrs::default(); + let config = xdp_config_for_args(&[], &bind_addresses).unwrap().unwrap(); + + assert_eq!(config.interface, None); + assert_eq!(config.cpus.len(), 1); + assert_ne!(Some(config.cpus[0]), poh_service::DEFAULT_PINNED_CPU_CORE); + assert!(!config.zero_copy); + } + + #[test] + fn xdp_zero_copy_requires_interface() { + let bind_addresses = BindIpAddrs::default(); + + let err = xdp_config_for_args(&["--xdp-zero-copy"], &bind_addresses).unwrap_err(); + assert!(err.contains("--xdp-interface")); + assert!(!err.contains("--no-xdp")); + } + + #[test] + fn xdp_zero_copy_uses_default_cpu_and_configured_interface() { + let bind_addresses = BindIpAddrs::default(); + let config = xdp_config_for_args( + &["--xdp-zero-copy", "--xdp-interface", "eth0"], + &bind_addresses, + ) + .unwrap() + .unwrap(); + + assert_eq!(config.interface.as_deref(), Some("eth0")); + assert_eq!(config.cpus.len(), 1); + assert_ne!(Some(config.cpus[0]), poh_service::DEFAULT_PINNED_CPU_CORE); + assert!(config.zero_copy); + } + + #[test] + fn xdp_zero_copy_accepts_deprecated_args() { + let bind_addresses = BindIpAddrs::default(); + let config = xdp_config_for_args( + &[ + "--experimental-retransmit-xdp-zero-copy", + "--experimental-retransmit-xdp-interface", + "eth0", + ], + &bind_addresses, + ) + .unwrap() + .unwrap(); + + assert_eq!(config.interface.as_deref(), Some("eth0")); + assert!(config.zero_copy); + } + + #[test] + fn no_xdp_returns_no_config() { + let bind_addresses = BindIpAddrs::default(); + assert!( + xdp_config_for_args(&["--no-xdp"], &bind_addresses) + .unwrap() + .is_none() + ); + } + + #[test] + fn init_returns_no_xdp_config() { + let bind_addresses = BindIpAddrs::default(); + assert!( + xdp_config_for_args_and_operation(&[], &bind_addresses, Operation::Initialize) + .unwrap() + .is_none() + ); + assert!( + xdp_config_for_args_and_operation( + &["--xdp-zero-copy"], + &bind_addresses, + Operation::Initialize, + ) + .unwrap() + .is_none() + ); + } + + #[test] + fn xdp_cpu_and_interface_are_configurable_in_copy_mode() { + let bind_addresses = BindIpAddrs::default(); + let config = xdp_config_for_args( + &[ + "--poh-pinned-cpu-core", + "1023", + "--xdp-interface", + "eth0", + "--xdp-cpu-cores", + "2-3", + ], + &bind_addresses, + ) + .unwrap() + .unwrap(); + + assert_eq!(config.interface.as_deref(), Some("eth0")); + assert_eq!(config.cpus, vec![2, 3]); + assert!(!config.zero_copy); + } + + #[test] + fn xdp_requires_single_bind_address() { + let bind_addresses = BindIpAddrs::new(vec![ + IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8)), + IpAddr::V4(Ipv4Addr::new(1, 1, 1, 1)), + ]) + .unwrap(); + + let err = xdp_config_for_args(&[], &bind_addresses).unwrap_err(); + assert!(err.contains("multihoming")); + assert!(!err.contains("--no-xdp")); + assert!( + xdp_config_for_args(&["--no-xdp"], &bind_addresses) + .unwrap() + .is_none() + ); + } + + #[test] + fn no_xdp_conflicts_with_xdp_overrides() { + let default_args = cli::DefaultArgs::default(); + let matches = cli::app("test", &default_args).get_matches_from_safe(vec![ + "agave-validator", + "--no-xdp", + "--xdp-cpu-cores", + "2", + ]); + + assert!(matches.is_err()); + } + + fn test_thread_siblings(cpu: usize) -> Result, String> { + Ok(match cpu { + 2 | 10 => vec![2, 10], + 3 | 11 => vec![3, 11], + _ => vec![cpu], + }) + } + + #[test] + fn default_xdp_cpu_skips_poh_physical_core() { + assert_eq!( + select_default_xdp_cpu(&[3, 2], 10, test_thread_siblings), + Ok(3) + ); + } + + #[test] + fn default_xdp_cpu_errors_without_separate_physical_core() { + let err = select_default_xdp_cpu(&[2, 10], 10, test_thread_siblings).unwrap_err(); + assert!(err.contains("physical core separate from PoH")); + assert!(err.contains("--xdp-cpu-cores")); + assert!(!err.contains("--no-xdp")); + } + + #[test] + fn explicit_xdp_cpu_rejects_poh_physical_core() { + let err = + validate_xdp_cpus_are_separate_from_poh_physical_core(&[2], 10, test_thread_siblings) + .unwrap_err(); + assert!(err.contains("shares a physical core")); + assert!(err.contains("--xdp-cpu-cores")); + assert!(!err.contains("--no-xdp")); + } + + #[test] + fn explicit_xdp_cpu_accepts_separate_physical_core() { + assert!( + validate_xdp_cpus_are_separate_from_poh_physical_core(&[3], 10, test_thread_siblings,) + .is_ok() + ); + } +} diff --git a/xdp/src/tx_loop.rs b/xdp/src/tx_loop.rs index 80f383628c6..7f9b62f142c 100644 --- a/xdp/src/tx_loop.rs +++ b/xdp/src/tx_loop.rs @@ -17,7 +17,7 @@ use { socket::{Socket, Tx, TxRing}, umem::{Frame, OwnedUmem, PageAlignedMemory, Umem}, }, - agave_cpu_utils::set_cpu_affinity, + agave_cpu_utils::{CpuId, set_cpu_affinity}, crossbeam_channel::{Receiver, Sender, TryRecvError}, libc::{_SC_PAGESIZE, sysconf}, std::{ @@ -240,7 +240,7 @@ impl TxLoop { } = self; // each queue is bound to its own CPU core - set_cpu_affinity(None, [agave_cpu_utils::CpuId::new(cpu_id).unwrap()]).unwrap(); + set_cpu_affinity(None, [CpuId::new(cpu_id).unwrap()]).unwrap(); let umem = socket.umem(); let umem_tx_capacity = umem.available();