Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions crates/spur-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,17 @@ fn main() -> anyhow::Result<()> {

if let Some(cmd) = canonical {
// Rewrite argv: replace ["spur", "cmd", ...rest] with ["cmd", ...rest]
//
// Special case (issue #53): `spur show node X` should dispatch as
// `scontrol show node X`, not `scontrol node X`. When the user's
// command is "show", insert the implicit "show" subcommand for scontrol.
let implicit_show = args[1].as_str() == "show" && cmd == "scontrol";
let rewritten: Vec<String> = std::iter::once(cmd.to_string())
.chain(if implicit_show {
vec!["show".to_string()]
} else {
vec![]
})
.chain(args[2..].iter().cloned())
.collect();
// Temporarily override process args for the subcommand parser
Expand Down
23 changes: 13 additions & 10 deletions crates/spur-cli/src/sattach.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,18 @@ async fn stream_output_only(
}

/// Interactive attach: bidirectional stdin/stdout forwarding via AttachJob RPC.
///
/// Issue #54 fixes:
/// - Use per-byte reads instead of line-buffered reads so interactive programs work
/// - Increase channel buffer to 256 to prevent deadlock under high output
/// - Add connect timeout
async fn interactive_attach(
agent: &mut SlurmAgentClient<tonic::transport::Channel>,
job_id: u32,
) -> Result<()> {
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::io::AsyncReadExt;

let (tx, rx) = tokio::sync::mpsc::channel::<AttachJobInput>(32);
let (tx, rx) = tokio::sync::mpsc::channel::<AttachJobInput>(256);

// Send first message with job_id
tx.send(AttachJobInput {
Expand All @@ -144,21 +149,19 @@ async fn interactive_attach(
.await
.context("failed to send initial attach message")?;

// Spawn stdin reader task
// Spawn stdin reader task — reads raw bytes for interactive use
let tx_stdin = tx.clone();
tokio::spawn(async move {
let stdin = tokio::io::stdin();
let mut reader = BufReader::new(stdin);
let mut line = String::new();
let mut stdin = tokio::io::stdin();
let mut buf = vec![0u8; 4096];
loop {
line.clear();
match reader.read_line(&mut line).await {
match stdin.read(&mut buf).await {
Ok(0) => break, // EOF
Ok(_) => {
Ok(n) => {
if tx_stdin
.send(AttachJobInput {
job_id,
data: line.as_bytes().to_vec(),
data: buf[..n].to_vec(),
})
.await
.is_err()
Expand Down
94 changes: 70 additions & 24 deletions crates/spur-k8s/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ struct Args {
#[arg(long, default_value = "[::]:6818")]
listen: String,

/// Advertised address for spurctld to reach this operator.
/// If unset, falls back to POD_IP env var, then hostname.
/// In K8s, set this to the Service DNS name or use the Downward API
/// to inject the Pod IP (issue #51).
#[arg(long, env = "SPUR_OPERATOR_ADDRESS")]
address: Option<String>,

/// K8s namespace for SpurJobs and Pods
#[arg(long, default_value = "spur")]
namespace: String,
Expand Down Expand Up @@ -82,54 +89,66 @@ async fn main() -> anyhow::Result<()> {
let client = Client::try_default().await?;

let listen_addr: SocketAddr = args.listen.parse()?;
let operator_ip = if listen_addr.ip().is_unspecified() {
// Issue #51: Use explicit --address flag, then POD_IP env var (K8s Downward
// API), then listen IP, then hostname. Pod hostnames are unroutable —
// spurctld can't reach the operator at "spur-k8s-operator-abc123".
let operator_ip = if let Some(ref addr) = args.address {
addr.clone()
} else if let Ok(pod_ip) = std::env::var("POD_IP") {
pod_ip
} else if !listen_addr.ip().is_unspecified() {
listen_addr.ip().to_string()
} else {
hostname::get()
.map(|h| h.to_string_lossy().to_string())
.unwrap_or_else(|_| "127.0.0.1".into())
} else {
listen_addr.ip().to_string()
};
let operator_port = listen_addr.port() as u32;
info!(address = %operator_ip, port = operator_port, "operator will advertise this address to spurctld");

// Spawn health/readiness server
// Spawn health/readiness server (issue #52: retry on failure)
let health_addr: SocketAddr = args.health_addr.parse()?;
let health_ctrl_addr = args.controller_addr.clone();
let health_client = client.clone();
tokio::spawn(async move {
if let Err(e) = health::serve(health_addr, health_client, health_ctrl_addr).await {
tracing::error!(error = %e, "health server exited");
}
run_with_retry("health server", || {
let c = health_client.clone();
let addr = health_ctrl_addr.clone();
Box::pin(health::serve(health_addr, c, addr))
})
.await;
});

// Spawn node watcher
// Spawn node watcher (issue #52: retry on failure)
let nw_client = client.clone();
let nw_ctrl_addr = args.controller_addr.clone();
let nw_op_addr = operator_ip.clone();
let nw_ns = args.namespace.clone();
let nw_selector = args.node_selector.clone();
tokio::spawn(async move {
if let Err(e) = node_watcher::run(
nw_client,
nw_ctrl_addr,
nw_op_addr,
operator_port,
nw_ns,
nw_selector,
)
.await
{
tracing::error!(error = %e, "node watcher exited");
}
run_with_retry("node watcher", || {
let c = nw_client.clone();
let ctrl = nw_ctrl_addr.clone();
let op = nw_op_addr.clone();
let ns = nw_ns.clone();
let sel = nw_selector.clone();
Box::pin(node_watcher::run(c, ctrl, op, operator_port, ns, sel))
})
.await;
});

// Spawn job controller
// Spawn job controller (issue #52: retry on failure)
let jc_client = client.clone();
let jc_ctrl_addr = args.controller_addr.clone();
let jc_ns = args.namespace.clone();
tokio::spawn(async move {
if let Err(e) = job_controller::run(jc_client, jc_ctrl_addr, jc_ns).await {
tracing::error!(error = %e, "job controller exited");
}
run_with_retry("job controller", || {
let c = jc_client.clone();
let ctrl = jc_ctrl_addr.clone();
let ns = jc_ns.clone();
Box::pin(job_controller::run(c, ctrl, ns))
})
.await;
});

// Start virtual agent gRPC server
Expand All @@ -144,6 +163,33 @@ async fn main() -> anyhow::Result<()> {
Ok(())
}

/// Run an async task with exponential backoff retry on failure (issue #52).
///
/// If the task exits with an error, it is restarted after a delay that doubles
/// each time (1s → 2s → 4s → ... → 60s max). On success the backoff resets.
async fn run_with_retry<F, Fut>(name: &str, mut factory: F) -> !
where
F: FnMut() -> std::pin::Pin<Box<Fut>>,
Fut: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
{
let mut backoff = std::time::Duration::from_secs(1);
let max_backoff = std::time::Duration::from_secs(60);

loop {
match factory().await {
Ok(()) => {
tracing::warn!(%name, "task exited cleanly, restarting");
backoff = std::time::Duration::from_secs(1);
}
Err(e) => {
tracing::error!(%name, error = %e, backoff_secs = backoff.as_secs(), "task failed, retrying");
tokio::time::sleep(backoff).await;
backoff = std::cmp::min(backoff * 2, max_backoff);
}
}
}
}

fn generate_crd() {
use kube::CustomResourceExt;
let crd = crd::SpurJob::crd();
Expand Down
2 changes: 1 addition & 1 deletion crates/spur-sched/src/backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ impl Scheduler for BackfillScheduler {

let required = job_resource_request(job);
let duration = job.spec.time_limit.unwrap_or(Duration::hours(1));
let needed_nodes = job.spec.num_nodes as usize;
let needed_nodes = (job.spec.num_nodes as usize).max(1);

// Find earliest start across needed_nodes
let mut node_starts: Vec<(usize, chrono::DateTime<Utc>)> = suitable
Expand Down
96 changes: 96 additions & 0 deletions crates/spur-tests/src/t07_sched.rs
Original file line number Diff line number Diff line change
Expand Up @@ -621,4 +621,100 @@ mod tests {
let assignments = sched.schedule(&[job], &cluster);
assert_eq!(assignments.len(), 1);
}

// ── Issue #56: edge cases that could crash the scheduler ─────

#[test]
fn t07_27_num_nodes_zero_does_not_panic() {
// Issue #56: A job with num_nodes=0 should be handled safely
// instead of panicking on .max().unwrap() with empty iterator.
reset_job_ids();
let mut sched = BackfillScheduler::new(100);
let nodes = make_nodes(2, 64, 256_000);
let partitions = vec![make_partition("default", 2)];
let mut job = make_job("zero-nodes");
job.spec.num_nodes = 0;

let cluster = ClusterState {
nodes: &nodes,
partitions: &partitions,
reservations: &[],
};
// Must not panic — should schedule with 1 node (the minimum)
let assignments = sched.schedule(&[job], &cluster);
assert_eq!(assignments.len(), 1);
assert_eq!(assignments[0].nodes.len(), 1);
}

#[test]
fn t07_28_single_idle_node_schedules_immediately() {
// Issue #56 regression: A single idle node with a single pending
// job should result in immediate scheduling (no Reason=Priority).
reset_job_ids();
let mut sched = BackfillScheduler::new(100);
let nodes = make_nodes(1, 64, 256_000);
let partitions = vec![make_partition("default", 1)];
let job = make_job("simple");

let cluster = ClusterState {
nodes: &nodes,
partitions: &partitions,
reservations: &[],
};
let assignments = sched.schedule(&[job], &cluster);
assert_eq!(
assignments.len(),
1,
"single idle node should schedule job immediately"
);
assert_eq!(assignments[0].nodes[0], "node001");
}

#[test]
fn t07_29_constraint_mismatch_not_scheduled() {
// Issue #56: A job with --constraint=gpu should NOT be scheduled
// on a node without the "gpu" feature.
reset_job_ids();
let mut sched = BackfillScheduler::new(100);
let nodes = make_nodes(2, 64, 256_000); // nodes have NO features
let partitions = vec![make_partition("default", 2)];
let mut job = make_job("gpu-job");
job.spec.constraint = Some("gpu".into());

let cluster = ClusterState {
nodes: &nodes,
partitions: &partitions,
reservations: &[],
};
let assignments = sched.schedule(&[job], &cluster);
assert_eq!(
assignments.len(),
0,
"job requiring gpu feature should not schedule on featureless nodes"
);
}

#[test]
fn t07_30_exclusive_job_needs_idle_node() {
// Issue #56: An exclusive job should only schedule on a node
// with zero current allocations.
reset_job_ids();
let mut sched = BackfillScheduler::new(100);
let mut nodes = make_nodes(2, 64, 256_000);
// Node 1 has partial allocations
nodes[0].alloc_resources.cpus = 32;
let partitions = vec![make_partition("default", 2)];
let mut job = make_job("exclusive");
job.spec.exclusive = true;

let cluster = ClusterState {
nodes: &nodes,
partitions: &partitions,
reservations: &[],
};
let assignments = sched.schedule(&[job], &cluster);
assert_eq!(assignments.len(), 1);
// Should land on node002 (the idle one), not node001 (partially allocated)
assert_eq!(assignments[0].nodes[0], "node002");
}
}
Loading
Loading