Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion crates/spur-cli/src/sinfo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,17 @@ fn resolve_partition_field(
node_state_str(*most_common)
}
}
'N' => part.nodes.clone(),
'N' => {
if !nodes.is_empty() {
nodes
.iter()
.map(|n| n.name.as_str())
.collect::<Vec<_>>()
.join(",")
} else {
part.nodes.clone()
}
}
'c' => part.total_cpus.to_string(),
_ => "?".into(),
}
Expand Down
36 changes: 36 additions & 0 deletions crates/spur-tests/src/t01_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,4 +214,40 @@ mod tests {
assert_eq!(node_rank_0, 0);
assert_eq!(node_rank_1, 1);
}

// ── T01.18–20: srun step mode (SPUR_JOB_ID env) ──────────────

#[test]
fn t01_18_srun_step_mode_env_var_name() {
// srun detects it's inside a batch job by checking SPUR_JOB_ID.
// This test confirms the env var name matches what sbatch exports.
let env_var = "SPUR_JOB_ID";
assert!(env_var.starts_with("SPUR_"), "must use SPUR_ namespace");
assert!(env_var.ends_with("JOB_ID"), "must identify the job");
}

#[test]
fn t01_19_srun_step_mode_job_id_parse() {
// When SPUR_JOB_ID="42", srun should parse it as u32 job_id=42.
let raw = "42";
let job_id: u32 = raw.parse().expect("SPUR_JOB_ID must be a valid u32");
assert_eq!(job_id, 42);

// Non-numeric value should fail to parse.
let bad = "not-a-number";
assert!(bad.parse::<u32>().is_err());
}

#[test]
fn t01_20_srun_step_env_vars_set() {
// When running as a step, these env vars are exported to the child process.
let step_env = ["SPUR_JOB_ID", "SPUR_STEP_ID", "SPUR_NODEID", "SPUR_PROCID"];
// Confirm no duplicates and all follow SPUR_ convention.
let mut seen = std::collections::HashSet::new();
for v in &step_env {
assert!(v.starts_with("SPUR_"), "must use SPUR_ prefix: {}", v);
assert!(seen.insert(v), "duplicate env var: {}", v);
}
assert_eq!(seen.len(), 4);
}
}
109 changes: 109 additions & 0 deletions crates/spur-tests/src/t07_sched.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,4 +424,113 @@ mod tests {
let assignments = sched.schedule(&[job], &cluster);
assert_eq!(assignments.len(), 0, "no node has 'gpu' feature");
}

// ── T07.18–20: Federation peer config ────────────────────────

#[test]
fn t07_18_federation_no_peers_by_default() {
use spur_core::config::FederationConfig;
let fed = FederationConfig::default();
// No federation peers configured — scheduler should never forward.
assert!(fed.clusters.is_empty());
}

#[test]
fn t07_19_federation_forward_decision() {
// If local scheduler returns fewer assignments than pending jobs,
// and federation is configured, unscheduled jobs should be forwarded.
// This tests the decision logic (not the RPC call itself).
let pending_count = 3usize;
let assigned_count = 1usize;
let has_federation = true;

let should_forward = has_federation && assigned_count < pending_count;
assert!(
should_forward,
"should forward when local can't schedule all"
);

let should_forward_no_peers = false && assigned_count < pending_count;
assert!(!should_forward_no_peers, "no peers → no forward");
}

#[test]
fn t07_20_federation_peer_address_format() {
use spur_core::config::ClusterPeer;
let peer = ClusterPeer {
name: "hpc-east".into(),
address: "http://hpc-east-ctrl:6817".into(),
};
// Address must be a valid http:// or https:// URI for tonic Connect.
assert!(
peer.address.starts_with("http://") || peer.address.starts_with("https://"),
"peer address must be http(s): {}",
peer.address
);
}

// ── T07.21–23: Power management state transitions ─────────────

#[test]
fn t07_21_suspended_node_not_schedulable() {
reset_job_ids();
let mut sched = BackfillScheduler::new(100);
let mut nodes = make_nodes(2, 64, 256_000);
// Suspend one node.
nodes[0].state = NodeState::Suspended;
let partitions = vec![make_partition("default", 2)];
let job = make_job_with_resources("train", 1, 1, 1, Some(60));
let cluster = ClusterState {
nodes: &nodes,
partitions: &partitions,
reservations: &[],
};
let assignments = sched.schedule(&[job], &cluster);
// Should schedule to the non-suspended node only.
assert_eq!(assignments.len(), 1);
assert_eq!(assignments[0].nodes[0], "node002");
}

#[test]
fn t07_22_all_suspended_yields_no_assignments() {
reset_job_ids();
let mut sched = BackfillScheduler::new(100);
let mut nodes = make_nodes(2, 64, 256_000);
nodes[0].state = NodeState::Suspended;
nodes[1].state = NodeState::Suspended;
let partitions = vec![make_partition("default", 2)];
let job = make_job_with_resources("train", 1, 1, 1, Some(60));
let cluster = ClusterState {
nodes: &nodes,
partitions: &partitions,
reservations: &[],
};
let assignments = sched.schedule(&[job], &cluster);
assert_eq!(assignments.len(), 0, "all nodes suspended");
}

#[test]
fn t07_23_power_config_suspend_timeout_gate() {
use spur_core::config::PowerConfig;
// When suspend_timeout_secs is None, power management is disabled.
let cfg_off = PowerConfig {
suspend_timeout_secs: None,
suspend_command: None,
resume_command: None,
};
assert!(
cfg_off.suspend_timeout_secs.is_none(),
"power mgmt disabled"
);

// When set, power management is enabled.
let cfg_on = PowerConfig {
suspend_timeout_secs: Some(300),
suspend_command: Some("systemctl suspend".into()),
resume_command: Some("wake-on-lan aa:bb:cc:dd:ee:ff".into()),
};
assert_eq!(cfg_on.suspend_timeout_secs, Some(300));
assert!(cfg_on.suspend_command.is_some());
assert!(cfg_on.resume_command.is_some());
}
}
90 changes: 90 additions & 0 deletions crates/spur-tests/src/t50_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -799,4 +799,94 @@ mod tests {
assert_eq!(parts[4], "*"); // day of week
assert_eq!(parts[5], "sbatch /path/to/script.sh"); // command
}

// ── T50.75–78: Federation config parsing ─────────────────────

#[test]
fn t50_75_federation_config_default_empty() {
use spur_core::config::FederationConfig;
let fed = FederationConfig::default();
assert!(fed.clusters.is_empty());
}

#[test]
fn t50_76_federation_cluster_peer_fields() {
use spur_core::config::ClusterPeer;
let peer = ClusterPeer {
name: "cluster-b".into(),
address: "http://ctrl-b:6817".into(),
};
assert_eq!(peer.name, "cluster-b");
assert_eq!(peer.address, "http://ctrl-b:6817");
}

#[test]
fn t50_77_federation_config_with_peers() {
use spur_core::config::{ClusterPeer, FederationConfig};
let fed = FederationConfig {
clusters: vec![
ClusterPeer {
name: "east".into(),
address: "http://east-ctrl:6817".into(),
},
ClusterPeer {
name: "west".into(),
address: "http://west-ctrl:6817".into(),
},
],
};
assert_eq!(fed.clusters.len(), 2);
assert_eq!(fed.clusters[0].name, "east");
assert_eq!(fed.clusters[1].address, "http://west-ctrl:6817");
}

#[test]
fn t50_78_federation_config_toml_roundtrip() {
use spur_core::config::SlurmConfig;
let toml = r#"
cluster_name = "test"

[controller]
listen_addr = "[::]:6817"
state_dir = "/tmp/spur-test"

[[federation.clusters]]
name = "peer-a"
address = "http://peer-a:6817"
"#;
let cfg = SlurmConfig::from_str(toml).unwrap();
assert_eq!(cfg.federation.clusters.len(), 1);
assert_eq!(cfg.federation.clusters[0].name, "peer-a");
}

// ── T50.79–81: PMIx env var names ────────────────────────────

#[test]
fn t50_79_pmix_env_var_names_correct() {
// Verify the canonical PMIx env var names used by OpenMPI 5+ and srun.
let required = ["PMIX_RANK", "PMIX_SIZE", "PMIX_NAMESPACE"];
for name in &required {
// Names must be uppercase and start with PMIX_
assert!(name.starts_with("PMIX_"), "expected PMIX_ prefix: {}", name);
assert_eq!(*name, name.to_uppercase(), "must be uppercase: {}", name);
}
}

#[test]
fn t50_80_pmix_namespace_format() {
// Namespace format: "spur.<job_id>"
let job_id: u32 = 42;
let ns = format!("spur.{}", job_id);
assert_eq!(ns, "spur.42");
assert!(ns.starts_with("spur."));
}

#[test]
fn t50_81_ompi_compat_env_vars() {
// OpenMPI direct bootstrap env vars mirror PMIx rank/size.
let ompi_vars = ["OMPI_COMM_WORLD_RANK", "OMPI_COMM_WORLD_SIZE"];
for v in &ompi_vars {
assert!(v.starts_with("OMPI_COMM_WORLD_"));
}
}
}
21 changes: 15 additions & 6 deletions crates/spurctld/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ struct Args {
#[arg(short = 'f', long, default_value = "/etc/spur/spur.conf")]
config: PathBuf,

/// gRPC listen address
#[arg(long, default_value = "[::]:6817")]
listen: String,
/// gRPC listen address (overrides config file)
#[arg(long)]
listen: Option<String>,

/// State directory
#[arg(long, default_value = "/var/spool/spur")]
Expand Down Expand Up @@ -48,21 +48,30 @@ async fn main() -> anyhow::Result<()> {
info!(version = env!("CARGO_PKG_VERSION"), "spurctld starting");

// Load config if it exists, otherwise use defaults
let config = if args.config.exists() {
let mut config = if args.config.exists() {
spur_core::config::SlurmConfig::load(&args.config)?
} else {
info!("no config file found, using defaults");
spur_core::config::SlurmConfig {
cluster_name: "spur".into(),
controller: spur_core::config::ControllerConfig {
listen_addr: args.listen.clone(),
listen_addr: "[::]:6817".into(),
state_dir: args.state_dir.to_string_lossy().into(),
..Default::default()
},
..default_config()
}
};

// CLI --listen overrides config file; otherwise use config's listen_addr.
let listen_addr = args
.listen
.clone()
.unwrap_or_else(|| config.controller.listen_addr.clone());

// Keep config in sync so downstream code sees the final address.
config.controller.listen_addr = listen_addr.clone();

// Initialize cluster manager
let cluster = Arc::new(ClusterManager::new(config.clone(), &args.state_dir)?);

Expand All @@ -83,7 +92,7 @@ async fn main() -> anyhow::Result<()> {
});

// Start gRPC server
let addr = args.listen.parse()?;
let addr = listen_addr.parse()?;
info!(%addr, "gRPC server listening");
server::serve(addr, cluster).await?;

Expand Down
Loading
Loading