Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,16 @@ Release channels have their own copy of this changelog:
`getLatestBlockhash` response together with its context (notably `context.slot`).
### Validator
#### Breaking
* XDP transmit is now enabled by default on Linux in copy mode on an auto-selected CPU
core separate from PoH. Use `--xdp-cpu-cores` to override the XDP CPU assignment.
Use `--xdp-zero-copy` with `--xdp-interface` to opt in to zero copy. Default validator
startup now requires the XDP copy-mode capabilities.
* The default PoH pinned CPU core is now CPU core 10. Use `--poh-pinned-cpu-core` to override it.
#### Deprecations
* `--accounts-db-access-storages-method` is now deprecated and a no-op (the `mmap` value was
deprecated in v4.0.0; mmap mode has now been removed entirely). The flag is still accepted for
backward compatibility, but account storages are always accessed via file I/O.
* `--experimental-poh-pinned-cpu-core` is now deprecated. Use `--poh-pinned-cpu-core` instead.
#### Changes
* Turbine shred ingestion now rejects shreds more than half an epoch in the future (previously up to 2 full epochs ahead was accepted).
### CLI
Expand Down
24 changes: 10 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ members = [
"connection-cache",
"core",
"cost-model",
"cpu-utils",
"download-utils",
"entry",
"faucet",
Expand Down Expand Up @@ -170,6 +171,7 @@ collapsible_if = "allow"
Inflector = "0.11.4"
agave-banking-stage-ingress-types = { path = "banking-stage-ingress-types", version = "=4.2.0-alpha.0", features = ["agave-unstable-api"] }
agave-bls-cert-verify = { path = "bls-cert-verify", version = "=4.2.0-alpha.0", features = ["agave-unstable-api"] }
agave-cpu-utils = { path = "cpu-utils", version = "=4.2.0-alpha.0", features = ["agave-unstable-api"] }
agave-feature-set = { path = "feature-set", version = "=4.2.0-alpha.0", features = ["agave-unstable-api"] }
agave-fs = { path = "fs", version = "=4.2.0-alpha.0", features = ["agave-unstable-api"] }
agave-geyser-plugin-interface = { path = "geyser-plugin-interface", version = "=4.2.0-alpha.0" }
Expand Down Expand Up @@ -223,7 +225,6 @@ chrono-humanize = "0.2.3"
clap = { version = "2.33.1", default-features = false, features = ["suggestions"] }
console = "0.16.3"
const_format = "0.2.36"
core_affinity = "0.8.3"
criterion = "0.8.2"
crossbeam-channel = "0.5.15"
csv = "1.4.0"
Expand Down
15 changes: 15 additions & 0 deletions cpu-utils/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "agave-cpu-utils"
description = "Agave CPU Utils"
version = { workspace = true }
repository = { workspace = true }
homepage = { workspace = true }
license = { workspace = true }
edition = { workspace = true }
publish = true

[features]
agave-unstable-api = []

[dependencies]
libc = { workspace = true }
145 changes: 145 additions & 0 deletions cpu-utils/src/affinity.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
//! Core CPU affinity operations.

use std::{io, mem, ops::Deref};

const CPU_SETSIZE: usize = libc::CPU_SETSIZE as usize;

/// Identifies a logical CPU (hardware thread) by its kernel-assigned ID.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct CpuId(usize);

impl CpuId {
pub fn new(cpu: usize) -> io::Result<Self> {
if cpu < CPU_SETSIZE {
Ok(Self(cpu))
} else {
Err(io::Error::from_raw_os_error(libc::EINVAL))
}
}
}

impl Deref for CpuId {
type Target = usize;

fn deref(&self) -> &Self::Target {
&self.0
}
}

/// Set CPU affinity for a thread.
///
/// Restricts the thread to run only on the specified CPUs.
///
/// # Arguments
/// * `thread_id` - Thread ID to set affinity for. `None` means the calling thread.
/// * `cpus` - CPU IDs to bind the thread to. Can be any iterable collection.
///
/// # Examples
///
/// ```no_run
/// # use agave_cpu_utils::*;
/// # fn main() -> std::io::Result<()> {
/// // Pin current thread to CPU 0
/// set_cpu_affinity(None, [CpuId::new(0)?])?;
///
/// // Pin current thread to multiple CPUs
/// set_cpu_affinity(None, [CpuId::new(0)?, CpuId::new(1)?, CpuId::new(2)?])?;
/// # Ok(())
/// # }
/// ```
///
/// # Errors
///
/// Returns [`io::ErrorKind::InvalidInput`] if any CPU ID is too large for `cpu_set_t`.
/// Returns [`io::Error`] if the system call fails, including offline CPUs or CPUs
/// disallowed by the task's hard cpuset/cgroup.
pub fn set_cpu_affinity(
thread_id: Option<libc::pid_t>,
cpus: impl IntoIterator<Item = CpuId>,
) -> io::Result<()> {
// safety: cpu_set_t is a POD type, zero-initialization is standard
let mut cpu_set: libc::cpu_set_t = unsafe { mem::zeroed() };

for cpu in cpus {
// safety: CpuId values are constructed only after validating cpu < CPU_SETSIZE.
unsafe { libc::CPU_SET(*cpu, &mut cpu_set) };
}

let tid = thread_id.unwrap_or(0);

// safety: sched_setaffinity is safe with valid parameters
let result =
unsafe { libc::sched_setaffinity(tid, mem::size_of::<libc::cpu_set_t>(), &cpu_set) };

if result != 0 {
return Err(io::Error::last_os_error());
}

Ok(())
}

/// Get the CPU affinity mask for a thread.
///
/// Returns a sorted vector of CPU IDs that the thread is allowed to run on.
///
/// # Arguments
/// * `thread_id` - Thread ID to query. `None` means the calling thread.
///
/// # Examples
///
/// ```no_run
/// # use agave_cpu_utils::*;
/// # fn main() -> std::io::Result<()> {
/// let cpus = cpu_affinity(None)?;
/// println!("Thread can run on CPUs: {:?}", cpus);
/// # Ok(())
/// # }
/// ```
///
/// # Errors
///
/// Returns [`io::Error`] if the system call fails.
pub fn cpu_affinity(thread_id: Option<libc::pid_t>) -> io::Result<Vec<CpuId>> {
// safety: cpu_set_t is a POD type, zero-initialization is standard
let mut cpu_set: libc::cpu_set_t = unsafe { mem::zeroed() };

let tid = thread_id.unwrap_or(0);

// safety: sched_getaffinity is safe with valid parameters
let result =
unsafe { libc::sched_getaffinity(tid, mem::size_of::<libc::cpu_set_t>(), &mut cpu_set) };

if result != 0 {
return Err(io::Error::last_os_error());
}

let mut cpus = Vec::new();
for cpu in 0..CPU_SETSIZE {
// safety: cpu < CPU_SETSIZE by construction
if unsafe { libc::CPU_ISSET(cpu, &cpu_set) } {
cpus.push(CpuId::new(cpu)?);
}
}

Ok(cpus)
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_cpu_id_validation() {
let result = CpuId::new(CPU_SETSIZE);
assert_eq!(result.unwrap_err().raw_os_error(), Some(libc::EINVAL));
}

#[test]
fn test_cpu_affinity_returns_sorted() {
let cpus = cpu_affinity(None).expect("failed to query current CPU affinity");
assert!(
cpus.windows(2).all(|window| *window[0] <= *window[1]),
"cpu_affinity should return sorted CPU list"
);
}
}
26 changes: 26 additions & 0 deletions cpu-utils/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#![cfg(all(feature = "agave-unstable-api", target_os = "linux"))]

//! CPU affinity utilities for Linux systems.
//!
//! This crate provides safe Rust bindings for setting CPU affinity and querying
//! the current task affinity mask. Useful for performance-critical applications
//! that need precise control over thread placement.
//!
//! # Examples
//!
//! ```no_run
//! use agave_cpu_utils::*;
//!
//! # fn main() -> std::io::Result<()> {
//! let allowed = cpu_affinity(None)?;
//! if let Some(&cpu) = allowed.first() {
//! set_cpu_affinity(None, [cpu])?;
//! }
//! # Ok(())
//! # }
//! ```
//!

mod affinity;

pub use affinity::{CpuId, cpu_affinity, set_cpu_affinity};
77 changes: 77 additions & 0 deletions cpu-utils/tests/integration_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#![cfg(all(feature = "agave-unstable-api", target_os = "linux"))]

//! Integration tests for agave-cpu-utils
//!
//! These tests require a Linux system.
//! Tests that modify CPU affinity spawn dedicated threads to avoid affecting the
//! test harness.

use {
agave_cpu_utils::{CpuId, cpu_affinity, set_cpu_affinity},
std::{io, thread},
};

fn current_affinity() -> Vec<CpuId> {
cpu_affinity(None).expect("failed to query current CPU affinity")
}

fn handle_affinity_result(result: io::Result<()>, test_name: &str) {
match result {
Ok(()) => {}
Err(e) => panic!("{test_name}: unexpected error: {e:?}"),
}
}

#[test]
fn test_set_and_get_affinity() {
let affinity = current_affinity();
let cpu = affinity
.first()
.copied()
.expect("current CPU affinity mask should not be empty");

let result = thread::spawn(move || {
set_cpu_affinity(None, [cpu])?;
let affinity = cpu_affinity(None)?;
assert_eq!(affinity, vec![cpu]);
Ok::<(), io::Error>(())
})
.join()
.expect("Thread panicked");

handle_affinity_result(result, "test_set_and_get_affinity");
}

#[test]
fn test_affinity_with_multiple_cpus() {
let available = current_affinity();
if available.len() < 2 {
eprintln!(
"Skipping test_affinity_with_multiple_cpus: current CPU affinity mask has fewer than \
2 CPUs"
);
return;
}
let (cpu0, cpu1) = (available[0], available[1]);

let result = thread::spawn(move || {
set_cpu_affinity(None, [cpu0, cpu1])?;
let affinity = cpu_affinity(None)?;
assert_eq!(affinity, vec![cpu0, cpu1]);
Ok::<(), io::Error>(())
})
.join()
.expect("Thread panicked");

handle_affinity_result(result, "test_affinity_with_multiple_cpus");
}

#[test]
fn test_invalid_cpu_index_rejected() {
assert_eq!(
CpuId::new(libc::CPU_SETSIZE as usize)
.unwrap_err()
.raw_os_error(),
Some(libc::EINVAL)
);
}
Loading