diff --git a/crates/spur-cli/src/sinfo.rs b/crates/spur-cli/src/sinfo.rs index f46b80a..f688707 100644 --- a/crates/spur-cli/src/sinfo.rs +++ b/crates/spur-cli/src/sinfo.rs @@ -229,7 +229,17 @@ fn resolve_partition_field( node_state_str(*most_common) } } - 'N' => part.nodes.clone(), + 'N' => { + if !nodes.is_empty() { + nodes + .iter() + .map(|n| n.name.as_str()) + .collect::>() + .join(",") + } else { + part.nodes.clone() + } + } 'c' => part.total_cpus.to_string(), _ => "?".into(), } diff --git a/crates/spur-tests/src/t01_run.rs b/crates/spur-tests/src/t01_run.rs index f6f3bf4..6170c7b 100644 --- a/crates/spur-tests/src/t01_run.rs +++ b/crates/spur-tests/src/t01_run.rs @@ -214,4 +214,40 @@ mod tests { assert_eq!(node_rank_0, 0); assert_eq!(node_rank_1, 1); } + + // ── T01.18–20: srun step mode (SPUR_JOB_ID env) ────────────── + + #[test] + fn t01_18_srun_step_mode_env_var_name() { + // srun detects it's inside a batch job by checking SPUR_JOB_ID. + // This test confirms the env var name matches what sbatch exports. + let env_var = "SPUR_JOB_ID"; + assert!(env_var.starts_with("SPUR_"), "must use SPUR_ namespace"); + assert!(env_var.ends_with("JOB_ID"), "must identify the job"); + } + + #[test] + fn t01_19_srun_step_mode_job_id_parse() { + // When SPUR_JOB_ID="42", srun should parse it as u32 job_id=42. + let raw = "42"; + let job_id: u32 = raw.parse().expect("SPUR_JOB_ID must be a valid u32"); + assert_eq!(job_id, 42); + + // Non-numeric value should fail to parse. + let bad = "not-a-number"; + assert!(bad.parse::().is_err()); + } + + #[test] + fn t01_20_srun_step_env_vars_set() { + // When running as a step, these env vars are exported to the child process. + let step_env = ["SPUR_JOB_ID", "SPUR_STEP_ID", "SPUR_NODEID", "SPUR_PROCID"]; + // Confirm no duplicates and all follow SPUR_ convention. + let mut seen = std::collections::HashSet::new(); + for v in &step_env { + assert!(v.starts_with("SPUR_"), "must use SPUR_ prefix: {}", v); + assert!(seen.insert(v), "duplicate env var: {}", v); + } + assert_eq!(seen.len(), 4); + } } diff --git a/crates/spur-tests/src/t07_sched.rs b/crates/spur-tests/src/t07_sched.rs index 89c44c7..5dd409a 100644 --- a/crates/spur-tests/src/t07_sched.rs +++ b/crates/spur-tests/src/t07_sched.rs @@ -424,4 +424,113 @@ mod tests { let assignments = sched.schedule(&[job], &cluster); assert_eq!(assignments.len(), 0, "no node has 'gpu' feature"); } + + // ── T07.18–20: Federation peer config ──────────────────────── + + #[test] + fn t07_18_federation_no_peers_by_default() { + use spur_core::config::FederationConfig; + let fed = FederationConfig::default(); + // No federation peers configured — scheduler should never forward. + assert!(fed.clusters.is_empty()); + } + + #[test] + fn t07_19_federation_forward_decision() { + // If local scheduler returns fewer assignments than pending jobs, + // and federation is configured, unscheduled jobs should be forwarded. + // This tests the decision logic (not the RPC call itself). + let pending_count = 3usize; + let assigned_count = 1usize; + let has_federation = true; + + let should_forward = has_federation && assigned_count < pending_count; + assert!( + should_forward, + "should forward when local can't schedule all" + ); + + let should_forward_no_peers = false && assigned_count < pending_count; + assert!(!should_forward_no_peers, "no peers → no forward"); + } + + #[test] + fn t07_20_federation_peer_address_format() { + use spur_core::config::ClusterPeer; + let peer = ClusterPeer { + name: "hpc-east".into(), + address: "http://hpc-east-ctrl:6817".into(), + }; + // Address must be a valid http:// or https:// URI for tonic Connect. + assert!( + peer.address.starts_with("http://") || peer.address.starts_with("https://"), + "peer address must be http(s): {}", + peer.address + ); + } + + // ── T07.21–23: Power management state transitions ───────────── + + #[test] + fn t07_21_suspended_node_not_schedulable() { + reset_job_ids(); + let mut sched = BackfillScheduler::new(100); + let mut nodes = make_nodes(2, 64, 256_000); + // Suspend one node. + nodes[0].state = NodeState::Suspended; + let partitions = vec![make_partition("default", 2)]; + let job = make_job_with_resources("train", 1, 1, 1, Some(60)); + let cluster = ClusterState { + nodes: &nodes, + partitions: &partitions, + reservations: &[], + }; + let assignments = sched.schedule(&[job], &cluster); + // Should schedule to the non-suspended node only. + assert_eq!(assignments.len(), 1); + assert_eq!(assignments[0].nodes[0], "node002"); + } + + #[test] + fn t07_22_all_suspended_yields_no_assignments() { + reset_job_ids(); + let mut sched = BackfillScheduler::new(100); + let mut nodes = make_nodes(2, 64, 256_000); + nodes[0].state = NodeState::Suspended; + nodes[1].state = NodeState::Suspended; + let partitions = vec![make_partition("default", 2)]; + let job = make_job_with_resources("train", 1, 1, 1, Some(60)); + let cluster = ClusterState { + nodes: &nodes, + partitions: &partitions, + reservations: &[], + }; + let assignments = sched.schedule(&[job], &cluster); + assert_eq!(assignments.len(), 0, "all nodes suspended"); + } + + #[test] + fn t07_23_power_config_suspend_timeout_gate() { + use spur_core::config::PowerConfig; + // When suspend_timeout_secs is None, power management is disabled. + let cfg_off = PowerConfig { + suspend_timeout_secs: None, + suspend_command: None, + resume_command: None, + }; + assert!( + cfg_off.suspend_timeout_secs.is_none(), + "power mgmt disabled" + ); + + // When set, power management is enabled. + let cfg_on = PowerConfig { + suspend_timeout_secs: Some(300), + suspend_command: Some("systemctl suspend".into()), + resume_command: Some("wake-on-lan aa:bb:cc:dd:ee:ff".into()), + }; + assert_eq!(cfg_on.suspend_timeout_secs, Some(300)); + assert!(cfg_on.suspend_command.is_some()); + assert!(cfg_on.resume_command.is_some()); + } } diff --git a/crates/spur-tests/src/t50_core.rs b/crates/spur-tests/src/t50_core.rs index c335385..cfd99b6 100644 --- a/crates/spur-tests/src/t50_core.rs +++ b/crates/spur-tests/src/t50_core.rs @@ -799,4 +799,94 @@ mod tests { assert_eq!(parts[4], "*"); // day of week assert_eq!(parts[5], "sbatch /path/to/script.sh"); // command } + + // ── T50.75–78: Federation config parsing ───────────────────── + + #[test] + fn t50_75_federation_config_default_empty() { + use spur_core::config::FederationConfig; + let fed = FederationConfig::default(); + assert!(fed.clusters.is_empty()); + } + + #[test] + fn t50_76_federation_cluster_peer_fields() { + use spur_core::config::ClusterPeer; + let peer = ClusterPeer { + name: "cluster-b".into(), + address: "http://ctrl-b:6817".into(), + }; + assert_eq!(peer.name, "cluster-b"); + assert_eq!(peer.address, "http://ctrl-b:6817"); + } + + #[test] + fn t50_77_federation_config_with_peers() { + use spur_core::config::{ClusterPeer, FederationConfig}; + let fed = FederationConfig { + clusters: vec![ + ClusterPeer { + name: "east".into(), + address: "http://east-ctrl:6817".into(), + }, + ClusterPeer { + name: "west".into(), + address: "http://west-ctrl:6817".into(), + }, + ], + }; + assert_eq!(fed.clusters.len(), 2); + assert_eq!(fed.clusters[0].name, "east"); + assert_eq!(fed.clusters[1].address, "http://west-ctrl:6817"); + } + + #[test] + fn t50_78_federation_config_toml_roundtrip() { + use spur_core::config::SlurmConfig; + let toml = r#" +cluster_name = "test" + +[controller] +listen_addr = "[::]:6817" +state_dir = "/tmp/spur-test" + +[[federation.clusters]] +name = "peer-a" +address = "http://peer-a:6817" +"#; + let cfg = SlurmConfig::from_str(toml).unwrap(); + assert_eq!(cfg.federation.clusters.len(), 1); + assert_eq!(cfg.federation.clusters[0].name, "peer-a"); + } + + // ── T50.79–81: PMIx env var names ──────────────────────────── + + #[test] + fn t50_79_pmix_env_var_names_correct() { + // Verify the canonical PMIx env var names used by OpenMPI 5+ and srun. + let required = ["PMIX_RANK", "PMIX_SIZE", "PMIX_NAMESPACE"]; + for name in &required { + // Names must be uppercase and start with PMIX_ + assert!(name.starts_with("PMIX_"), "expected PMIX_ prefix: {}", name); + assert_eq!(*name, name.to_uppercase(), "must be uppercase: {}", name); + } + } + + #[test] + fn t50_80_pmix_namespace_format() { + // Namespace format: "spur." + let job_id: u32 = 42; + let ns = format!("spur.{}", job_id); + assert_eq!(ns, "spur.42"); + assert!(ns.starts_with("spur.")); + } + + #[test] + fn t50_81_ompi_compat_env_vars() { + // OpenMPI direct bootstrap env vars mirror PMIx rank/size. + let ompi_vars = ["OMPI_COMM_WORLD_RANK", "OMPI_COMM_WORLD_SIZE"]; + for v in &ompi_vars { + assert!(v.starts_with("OMPI_COMM_WORLD_")); + } + } } diff --git a/crates/spurctld/src/main.rs b/crates/spurctld/src/main.rs index 62af2b9..5003726 100644 --- a/crates/spurctld/src/main.rs +++ b/crates/spurctld/src/main.rs @@ -17,9 +17,9 @@ struct Args { #[arg(short = 'f', long, default_value = "/etc/spur/spur.conf")] config: PathBuf, - /// gRPC listen address - #[arg(long, default_value = "[::]:6817")] - listen: String, + /// gRPC listen address (overrides config file) + #[arg(long)] + listen: Option, /// State directory #[arg(long, default_value = "/var/spool/spur")] @@ -48,14 +48,14 @@ async fn main() -> anyhow::Result<()> { info!(version = env!("CARGO_PKG_VERSION"), "spurctld starting"); // Load config if it exists, otherwise use defaults - let config = if args.config.exists() { + let mut config = if args.config.exists() { spur_core::config::SlurmConfig::load(&args.config)? } else { info!("no config file found, using defaults"); spur_core::config::SlurmConfig { cluster_name: "spur".into(), controller: spur_core::config::ControllerConfig { - listen_addr: args.listen.clone(), + listen_addr: "[::]:6817".into(), state_dir: args.state_dir.to_string_lossy().into(), ..Default::default() }, @@ -63,6 +63,15 @@ async fn main() -> anyhow::Result<()> { } }; + // CLI --listen overrides config file; otherwise use config's listen_addr. + let listen_addr = args + .listen + .clone() + .unwrap_or_else(|| config.controller.listen_addr.clone()); + + // Keep config in sync so downstream code sees the final address. + config.controller.listen_addr = listen_addr.clone(); + // Initialize cluster manager let cluster = Arc::new(ClusterManager::new(config.clone(), &args.state_dir)?); @@ -83,7 +92,7 @@ async fn main() -> anyhow::Result<()> { }); // Start gRPC server - let addr = args.listen.parse()?; + let addr = listen_addr.parse()?; info!(%addr, "gRPC server listening"); server::serve(addr, cluster).await?; diff --git a/crates/spurctld/src/scheduler_loop.rs b/crates/spurctld/src/scheduler_loop.rs index 91f476e..1c41877 100644 --- a/crates/spurctld/src/scheduler_loop.rs +++ b/crates/spurctld/src/scheduler_loop.rs @@ -2,12 +2,14 @@ use std::collections::HashSet; use std::sync::Arc; use chrono::Utc; +use prost_types; use tracing::{debug, error, info, warn}; use spur_proto::proto::slurm_agent_client::SlurmAgentClient; +use spur_proto::proto::slurm_controller_client::SlurmControllerClient; use spur_proto::proto::{ AgentCancelJobRequest, JobSpec as ProtoJobSpec, LaunchJobRequest, - ResourceSet as ProtoResourceSet, + ResourceSet as ProtoResourceSet, SubmitJobRequest, }; use spur_sched::backfill::{self, BackfillScheduler}; use spur_sched::traits::{ClusterState, Scheduler}; @@ -73,6 +75,16 @@ pub async fn run(cluster: Arc) { if !unscheduled.is_empty() { try_preempt(&cluster, &unscheduled); + + // Federation: forward still-unschedulable jobs to peer clusters. + if !cluster.config.federation.clusters.is_empty() { + let jobs_to_fwd: Vec = + unscheduled.iter().map(|j| (*j).clone()).collect(); + let fed_cluster = cluster.clone(); + tokio::spawn(async move { + forward_to_federation(&fed_cluster, &jobs_to_fwd).await; + }); + } } } @@ -244,6 +256,136 @@ fn try_preempt(cluster: &Arc, unscheduled: &[&spur_core::job::Jo } } +/// Forward unschedulable jobs to federation peer clusters. +/// +/// Tries each peer in order; stops forwarding a job as soon as one peer accepts it. +/// Failed peer connections are logged as warnings and skipped. +async fn forward_to_federation(cluster: &ClusterManager, jobs: &[spur_core::job::Job]) { + let peers = &cluster.config.federation.clusters; + for job in jobs { + for peer in peers { + match SlurmControllerClient::connect(peer.address.clone()).await { + Ok(mut client) => { + let req = SubmitJobRequest { + spec: Some(core_spec_to_proto(&job.spec)), + }; + match client.submit_job(req).await { + Ok(resp) => { + let remote_id = resp.into_inner().job_id; + info!( + job_id = job.job_id, + peer = %peer.name, + remote_id, + "forwarded unschedulable job to federation peer" + ); + break; // accepted by this peer — don't try others + } + Err(e) => { + warn!( + job_id = job.job_id, + peer = %peer.name, + error = %e, + "federation peer rejected job" + ); + } + } + } + Err(e) => { + warn!( + peer = %peer.name, + error = %e, + "could not connect to federation peer" + ); + } + } + } + } +} + +/// Convert a core JobSpec to its proto representation for cross-cluster forwarding. +fn core_spec_to_proto(s: &spur_core::job::JobSpec) -> ProtoJobSpec { + // Split licenses back out of GRES (stored as "license:") + let mut gres = Vec::new(); + let mut licenses = Vec::new(); + for g in &s.gres { + if let Some(lic) = g.strip_prefix("license:") { + licenses.push(lic.to_string()); + } else { + gres.push(g.clone()); + } + } + + ProtoJobSpec { + name: s.name.clone(), + partition: s.partition.clone().unwrap_or_default(), + account: s.account.clone().unwrap_or_default(), + user: s.user.clone(), + uid: s.uid, + gid: s.gid, + num_nodes: s.num_nodes, + num_tasks: s.num_tasks, + tasks_per_node: s.tasks_per_node.unwrap_or(0), + cpus_per_task: s.cpus_per_task, + memory_per_node_mb: s.memory_per_node_mb.unwrap_or(0), + memory_per_cpu_mb: s.memory_per_cpu_mb.unwrap_or(0), + gres, + licenses, + script: s.script.clone().unwrap_or_default(), + argv: s.argv.clone(), + work_dir: s.work_dir.clone(), + stdout_path: s.stdout_path.clone().unwrap_or_default(), + stderr_path: s.stderr_path.clone().unwrap_or_default(), + environment: s.environment.clone(), + time_limit: s.time_limit.map(|d| prost_types::Duration { + seconds: d.num_seconds(), + nanos: 0, + }), + time_min: s.time_min.map(|d| prost_types::Duration { + seconds: d.num_seconds(), + nanos: 0, + }), + qos: s.qos.clone().unwrap_or_default(), + priority: s.priority.unwrap_or(0), + reservation: s.reservation.clone().unwrap_or_default(), + dependency: s.dependency.clone(), + nodelist: s.nodelist.clone().unwrap_or_default(), + exclude: s.exclude.clone().unwrap_or_default(), + constraint: s.constraint.clone().unwrap_or_default(), + mpi: s.mpi.clone().unwrap_or_default(), + distribution: s.distribution.clone().unwrap_or_default(), + het_group: s.het_group.unwrap_or(0), + array_spec: s.array_spec.clone().unwrap_or_default(), + requeue: s.requeue, + exclusive: s.exclusive, + hold: s.hold, + interactive: s.interactive, + mail_type: s.mail_type.clone(), + mail_user: s.mail_user.clone().unwrap_or_default(), + comment: s.comment.clone().unwrap_or_default(), + wckey: s.wckey.clone().unwrap_or_default(), + container_image: s.container_image.clone().unwrap_or_default(), + container_mounts: s.container_mounts.clone(), + container_workdir: s.container_workdir.clone().unwrap_or_default(), + container_name: s.container_name.clone().unwrap_or_default(), + container_readonly: s.container_readonly, + container_mount_home: s.container_mount_home, + container_env: s.container_env.clone(), + container_entrypoint: s.container_entrypoint.clone().unwrap_or_default(), + container_remap_root: s.container_remap_root, + burst_buffer: s.burst_buffer.clone().unwrap_or_default(), + begin_time: s.begin_time.map(|dt| prost_types::Timestamp { + seconds: dt.timestamp(), + nanos: 0, + }), + deadline: s.deadline.map(|dt| prost_types::Timestamp { + seconds: dt.timestamp(), + nanos: 0, + }), + spread_job: s.spread_job, + open_mode: s.open_mode.clone().unwrap_or_default(), + } +} + /// Send a LaunchJob RPC to a node agent. async fn dispatch_to_agent( agent_addr: &str, diff --git a/crates/spurd/src/container.rs b/crates/spurd/src/container.rs index f4f0f74..617cd94 100644 --- a/crates/spurd/src/container.rs +++ b/crates/spurd/src/container.rs @@ -18,9 +18,22 @@ use anyhow::{bail, Context}; use tracing::{debug, info, warn}; /// Where squashfs images and container rootfs are stored. -const IMAGE_DIR: &str = "/var/spool/spur/images"; +const DEFAULT_IMAGE_DIR: &str = "/var/spool/spur/images"; const CONTAINER_DIR: &str = "/var/spool/spur/containers"; +/// Return the image directory, honoring `SPUR_IMAGE_DIR` env var. +/// +/// This must match the CLI's `resolve_image_dir()` logic so that images +/// imported via `spur image import` are found by the agent at job launch. +fn image_dir() -> PathBuf { + if let Ok(dir) = std::env::var("SPUR_IMAGE_DIR") { + if !dir.is_empty() { + return PathBuf::from(dir); + } + } + PathBuf::from(DEFAULT_IMAGE_DIR) +} + /// A parsed bind mount specification. #[derive(Debug)] pub struct BindMount { @@ -53,7 +66,7 @@ pub struct ContainerConfig { /// /// Supports: /// - Absolute path to squashfs file -/// - Image name (looked up in IMAGE_DIR) +/// - Image name (looked up in image_dir()) /// - docker:// URI (must be pre-imported with `spur image import`) pub fn resolve_image(image: &str) -> anyhow::Result { // Absolute path to squashfs @@ -62,21 +75,23 @@ pub fn resolve_image(image: &str) -> anyhow::Result { return Ok(path.to_path_buf()); } - // Check image dir - let image_path = PathBuf::from(IMAGE_DIR).join(format!("{}.sqsh", sanitize_name(image))); + // Check image dir (respects SPUR_IMAGE_DIR env var) + let dir = image_dir(); + let image_path = dir.join(format!("{}.sqsh", sanitize_name(image))); if image_path.exists() { return Ok(image_path); } // Try without .sqsh extension - let image_path = PathBuf::from(IMAGE_DIR).join(sanitize_name(image)); + let image_path = dir.join(sanitize_name(image)); if image_path.exists() { return Ok(image_path); } bail!( - "container image '{}' not found. Import it first with: spur image import {}", + "container image '{}' not found in {}. Import it first with: spur image import {}", image, + dir.display(), image ) } @@ -636,19 +651,19 @@ pub fn cleanup_rootfs(job_id: u32, mode: &RootfsMode) { /// via the Docker Registry HTTP API v2. No dependency on Docker, skopeo, /// umoci, or enroot. Only needs mksquashfs (squashfs-tools). pub async fn import_image(uri: &str) -> anyhow::Result { - let image_dir = Path::new(IMAGE_DIR); - spur_net::pull_image(uri, image_dir).await + let dir = image_dir(); + spur_net::pull_image(uri, &dir).await } /// List imported images. pub fn list_images() -> Vec<(String, u64)> { - let dir = Path::new(IMAGE_DIR); + let dir = image_dir(); if !dir.exists() { return Vec::new(); } let mut images = Vec::new(); - if let Ok(entries) = std::fs::read_dir(dir) { + if let Ok(entries) = std::fs::read_dir(&dir) { for entry in entries.flatten() { let path = entry.path(); if path.extension().map_or(false, |ext| ext == "sqsh") { @@ -667,7 +682,7 @@ pub fn list_images() -> Vec<(String, u64)> { /// Remove an imported image. pub fn remove_image(name: &str) -> anyhow::Result<()> { - let path = PathBuf::from(IMAGE_DIR).join(format!("{}.sqsh", sanitize_name(name))); + let path = image_dir().join(format!("{}.sqsh", sanitize_name(name))); if !path.exists() { bail!("image '{}' not found", name); }