diff --git a/Cargo.lock b/Cargo.lock index d65bc21..0103114 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2927,6 +2927,7 @@ dependencies = [ "tar", "thiserror 2.0.18", "tokio", + "tokio-stream", "tonic", "whoami 2.1.1", ] diff --git a/crates/spur-cli/Cargo.toml b/crates/spur-cli/Cargo.toml index 93ca07b..678a1e6 100644 --- a/crates/spur-cli/Cargo.toml +++ b/crates/spur-cli/Cargo.toml @@ -20,6 +20,7 @@ chrono = { workspace = true } anyhow = { workspace = true } thiserror = { workspace = true } prost-types = { workspace = true } +tokio-stream = "0.1" whoami = "2.1.1" atty = "0.2.14" nix = { workspace = true, features = ["user"] } diff --git a/crates/spur-cli/src/main.rs b/crates/spur-cli/src/main.rs index 74d278f..b94edfb 100644 --- a/crates/spur-cli/src/main.rs +++ b/crates/spur-cli/src/main.rs @@ -23,7 +23,48 @@ mod strigger; use std::path::Path; +/// If SPUR_CONTROLLER_ADDR is not already set, try to read the controller +/// address from the config file so that all subcommands pick it up +/// automatically via their `env = "SPUR_CONTROLLER_ADDR"` clap annotation. +/// +/// Priority: --controller CLI arg > SPUR_CONTROLLER_ADDR env > config file > default +fn load_controller_addr_from_config() { + if std::env::var("SPUR_CONTROLLER_ADDR").is_ok() { + return; // User already set it explicitly + } + + // Check SPUR_CONF for custom config path, then /etc/spur/spur.conf + let config_path = std::env::var("SPUR_CONF") + .map(std::path::PathBuf::from) + .unwrap_or_else(|_| std::path::PathBuf::from("/etc/spur/spur.conf")); + + if !config_path.exists() { + return; + } + + if let Ok(config) = spur_core::config::SlurmConfig::load(&config_path) { + // Extract host and port from the config + let host = config + .controller + .hosts + .first() + .map(|h| h.as_str()) + .unwrap_or("localhost"); + let port = config + .controller + .listen_addr + .rsplit(':') + .next() + .unwrap_or("6817"); + let addr = format!("http://{}:{}", host, port); + std::env::set_var("SPUR_CONTROLLER_ADDR", &addr); + } +} + fn main() -> anyhow::Result<()> { + // Load controller address from config file (if not set via env var) + load_controller_addr_from_config(); + // Multi-call binary: dispatch based on argv[0] (symlink name). let argv0 = std::env::args().next().unwrap_or_else(|| "spur".into()); let bin_name = Path::new(&argv0) diff --git a/crates/spur-cli/src/sattach.rs b/crates/spur-cli/src/sattach.rs index a0f6bde..cd58642 100644 --- a/crates/spur-cli/src/sattach.rs +++ b/crates/spur-cli/src/sattach.rs @@ -2,7 +2,7 @@ use anyhow::{Context, Result}; use clap::Parser; use spur_proto::proto::slurm_agent_client::SlurmAgentClient; use spur_proto::proto::slurm_controller_client::SlurmControllerClient; -use spur_proto::proto::{GetJobRequest, JobState, StreamJobOutputRequest}; +use spur_proto::proto::{AttachJobInput, GetJobRequest, JobState, StreamJobOutputRequest}; use std::io::Write; /// Attach to a running job step's standard I/O. @@ -12,10 +12,14 @@ pub struct SattachArgs { /// Job ID (or job_id.step_id) pub job_step: String, - /// Stream to attach to + /// Stream to attach to (stdout or stderr) — used in output-only mode #[arg(long, default_value = "stdout")] pub output: String, + /// Output-only mode: stream job output without interactive stdin forwarding + #[arg(long)] + pub output_only: bool, + /// Controller address #[arg( long, @@ -67,22 +71,39 @@ pub async fn main_with_args(args: Vec) -> Result<()> { // Connect to the first node's agent let first_node = nodelist.split(',').next().unwrap_or(nodelist).trim(); - let agent_addr = format!("http://{}:6818", first_node); let mut agent = SlurmAgentClient::connect(agent_addr.clone()) .await .context(format!("failed to connect to agent at {}", agent_addr))?; + if args.output_only { + // Output-only mode: stream stdout/stderr without stdin + stream_output_only(&mut agent, job_id, &args.output).await + } else { + // Interactive mode: bidirectional stdin/stdout forwarding + interactive_attach(&mut agent, job_id).await + } +} + +/// Stream job output without interactive input (legacy behavior). +async fn stream_output_only( + agent: &mut SlurmAgentClient, + job_id: u32, + stream_name: &str, +) -> Result<()> { let mut stream = agent .stream_job_output(StreamJobOutputRequest { job_id, - stream: args.output.clone(), + stream: stream_name.to_string(), }) .await .context("failed to start output stream")? .into_inner(); - eprintln!("sattach: attached to job {} ({})", job_id, args.output); + eprintln!( + "sattach: streaming {} for job {} (output-only)", + stream_name, job_id + ); let stdout = std::io::stdout(); let mut handle = stdout.lock(); @@ -106,6 +127,83 @@ pub async fn main_with_args(args: Vec) -> Result<()> { Ok(()) } +/// Interactive attach: bidirectional stdin/stdout forwarding via AttachJob RPC. +async fn interactive_attach( + agent: &mut SlurmAgentClient, + job_id: u32, +) -> Result<()> { + use tokio::io::{AsyncBufReadExt, BufReader}; + + let (tx, rx) = tokio::sync::mpsc::channel::(32); + + // Send first message with job_id + tx.send(AttachJobInput { + job_id, + data: Vec::new(), + }) + .await + .context("failed to send initial attach message")?; + + // Spawn stdin reader task + let tx_stdin = tx.clone(); + tokio::spawn(async move { + let stdin = tokio::io::stdin(); + let mut reader = BufReader::new(stdin); + let mut line = String::new(); + loop { + line.clear(); + match reader.read_line(&mut line).await { + Ok(0) => break, // EOF + Ok(_) => { + if tx_stdin + .send(AttachJobInput { + job_id, + data: line.as_bytes().to_vec(), + }) + .await + .is_err() + { + break; + } + } + Err(_) => break, + } + } + drop(tx_stdin); + }); + + // Make the bidirectional streaming call + let response = agent + .attach_job(tokio_stream::wrappers::ReceiverStream::new(rx)) + .await + .context("attach_job RPC failed")?; + + let mut out_stream = response.into_inner(); + let stdout = std::io::stdout(); + let mut handle = stdout.lock(); + + loop { + match out_stream.message().await { + Ok(Some(chunk)) => { + if chunk.eof { + break; + } + if !chunk.data.is_empty() { + let _ = handle.write_all(&chunk.data); + let _ = handle.flush(); + } + } + Ok(None) => break, + Err(e) => { + eprintln!("sattach: stream error: {}", e); + break; + } + } + } + + Ok(()) +} + fn state_name(state: i32) -> &'static str { match state { 0 => "PENDING", diff --git a/crates/spur-k8s/src/agent.rs b/crates/spur-k8s/src/agent.rs index 07006f7..3cc2f85 100644 --- a/crates/spur-k8s/src/agent.rs +++ b/crates/spur-k8s/src/agent.rs @@ -30,6 +30,7 @@ impl VirtualAgent { impl SlurmAgent for VirtualAgent { type StreamJobOutputStream = tokio_stream::wrappers::ReceiverStream>; + type AttachJobStream = tokio_stream::wrappers::ReceiverStream>; async fn launch_job( &self, @@ -390,6 +391,15 @@ impl SlurmAgent for VirtualAgent { "output streaming not supported for K8s agent", )) } + + async fn attach_job( + &self, + _request: Request>, + ) -> Result, Status> { + Err(Status::unimplemented( + "interactive attach not supported for K8s agent", + )) + } } impl VirtualAgent { diff --git a/crates/spur-tests/src/t50_core.rs b/crates/spur-tests/src/t50_core.rs index 73c669d..de1e40e 100644 --- a/crates/spur-tests/src/t50_core.rs +++ b/crates/spur-tests/src/t50_core.rs @@ -1018,4 +1018,90 @@ address = "http://peer-a:6817" assert_eq!(req.job_id, 42); assert_eq!(req.command.len(), 2); } + + // ── Issue #45: sattach interactive attach ───────────────────── + + #[test] + fn t50_92_attach_job_proto_messages_exist() { + // Issue #45: AttachJob bidirectional streaming RPC should exist + // with AttachJobInput and AttachJobOutput messages. + let input = spur_proto::proto::AttachJobInput { + job_id: 10, + data: b"ls\n".to_vec(), + }; + assert_eq!(input.job_id, 10); + assert_eq!(input.data, b"ls\n"); + + let output = spur_proto::proto::AttachJobOutput { + data: b"hello\n".to_vec(), + eof: false, + }; + assert!(!output.eof); + assert_eq!(output.data, b"hello\n"); + } + + // ── Issue #46: CLI reads config file for controller port ────── + + #[test] + fn t50_93_config_controller_addr_from_toml() { + // Issue #46: the CLI should read controller address from config. + // Verify SlurmConfig can parse a custom port and hosts. + let toml = r#" + cluster_name = "test" + [controller] + listen_addr = "[::]:6821" + hosts = ["ctrl.example.com"] + "#; + let config = spur_core::config::SlurmConfig::from_str(toml).unwrap(); + assert_eq!(config.controller.listen_addr, "[::]:6821"); + assert_eq!(config.controller.hosts[0], "ctrl.example.com"); + + // Verify we can extract port from listen_addr + let port = config.controller.listen_addr.rsplit(':').next().unwrap(); + assert_eq!(port, "6821"); + } + + // ── Issue #47: nodes auto-join default partition ────────────── + + #[test] + fn t50_94_node_auto_partition_assignment() { + // Issue #47: when a node doesn't match any partition hostlist, + // it should auto-join the default partition. + let mut node = Node::new( + "dynamic-node".into(), + ResourceSet { + cpus: 8, + memory_mb: 16384, + ..Default::default() + }, + ); + assert!(node.partitions.is_empty()); + + // Simulate auto-assign: if no partitions matched, add to default + let default_partition = make_partition("batch", 1); + if node.partitions.is_empty() { + node.partitions.push(default_partition.name.clone()); + } + assert_eq!(node.partitions, vec!["batch"]); + } + + // ── Issue #48: container image resolution fallback ──────────── + + #[test] + fn t50_95_container_image_absolute_path_basename_fallback() { + // Issue #48: when the agent receives an absolute path from the + // login node that doesn't exist locally, it should try the + // basename in the local image directory. + let path = std::path::Path::new("/var/spool/spur/images/ubuntu+22.04.sqsh"); + let basename = path.file_name().unwrap().to_str().unwrap(); + assert_eq!(basename, "ubuntu+22.04.sqsh"); + + // Verify sanitize_name works correctly for the expected pattern + // (colon and slash replaced with +) + let name = "ubuntu:22.04"; + let expected = "ubuntu+22.04"; + let sanitized = name.replace('/', "+").replace(':', "+"); + assert_eq!(sanitized, expected); + assert_eq!(format!("{}.sqsh", sanitized), basename); + } } diff --git a/crates/spurctld/src/cluster.rs b/crates/spurctld/src/cluster.rs index c31f546..787e0a1 100644 --- a/crates/spurctld/src/cluster.rs +++ b/crates/spurctld/src/cluster.rs @@ -688,7 +688,7 @@ impl ClusterManager { node.agent_start_time = Some(Utc::now()); node.last_heartbeat = Some(Utc::now()); - // Assign to partitions based on config + // Assign to partitions based on config hostlist patterns let partitions = self.partitions.read(); for part in partitions.iter() { if let Ok(hosts) = spur_core::hostlist::expand(&part.nodes) { @@ -698,6 +698,17 @@ impl ClusterManager { } } + // If node didn't match any partition's hostlist, auto-assign to the + // default partition so dynamically-registered nodes are always + // schedulable. This matches Slurm's behavior for unconfigured nodes. + if node.partitions.is_empty() { + if let Some(default_part) = partitions.iter().find(|p| p.is_default) { + node.partitions.push(default_part.name.clone()); + } else if let Some(first) = partitions.first() { + node.partitions.push(first.name.clone()); + } + } + // Copy features and weight from node config for nc in &self.config.nodes { if let Ok(hosts) = spur_core::hostlist::expand(&nc.names) { diff --git a/crates/spurd/src/agent_server.rs b/crates/spurd/src/agent_server.rs index a3a4f45..d278aaf 100644 --- a/crates/spurd/src/agent_server.rs +++ b/crates/spurd/src/agent_server.rs @@ -244,6 +244,7 @@ async fn report_completion(controller_addr: &str, job_id: u32, exit_code: i32) { #[tonic::async_trait] impl SlurmAgent for AgentService { type StreamJobOutputStream = ReceiverStream>; + type AttachJobStream = ReceiverStream>; async fn launch_job( &self, @@ -829,6 +830,212 @@ impl SlurmAgent for AgentService { Ok(Response::new(ReceiverStream::new(rx))) } + + async fn attach_job( + &self, + request: Request>, + ) -> Result, Status> { + let mut in_stream = request.into_inner(); + + // Read the first message to get the job_id + let first_msg = in_stream + .message() + .await + .map_err(|e| Status::internal(format!("failed to read first message: {}", e)))? + .ok_or_else(|| { + Status::invalid_argument("empty stream — expected job_id in first message") + })?; + + let job_id = first_msg.job_id; + + // Check the job is running and get its PID for namespace entry + let (pid, env_vars) = { + let jobs = self.running.lock().await; + match jobs.get(&job_id) { + Some(tracked) => { + let pid = tracked.pid.ok_or_else(|| { + Status::failed_precondition(format!("job {} has no PID", job_id)) + })?; + // Read a few env vars from /proc to replicate the job's environment + let env = Self::read_proc_env(pid); + (pid, env) + } + None => { + return Err(Status::not_found(format!( + "job {} not running on this node", + job_id + ))); + } + } + }; + + let (tx, rx) = tokio::sync::mpsc::channel::>(32); + + tokio::spawn(async move { + // Spawn an interactive shell inside the job's cgroup/namespace + use std::process::Stdio; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + use tokio::process::Command; + + // Use nsenter to enter the job process's namespaces if possible, + // otherwise just spawn a shell with the same environment. + let mut cmd = Command::new("/bin/sh"); + cmd.arg("-i") + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()); + + // Set the job's environment variables + for (k, v) in &env_vars { + cmd.env(k, v); + } + cmd.env("SPUR_JOB_ID", job_id.to_string()); + + // Try nsenter for namespace isolation (if running as root) + let mut child = if nix::unistd::geteuid().is_root() { + let mut ns_cmd = Command::new("nsenter"); + ns_cmd + .args(["-t", &pid.to_string(), "--mount", "--pid", "--"]) + .args(["/bin/sh", "-i"]) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()); + for (k, v) in &env_vars { + ns_cmd.env(k, v); + } + ns_cmd.env("SPUR_JOB_ID", job_id.to_string()); + match ns_cmd.spawn() { + Ok(c) => c, + Err(_) => match cmd.spawn() { + Ok(c) => c, + Err(e) => { + let _ = tx + .send(Err(Status::internal(format!( + "failed to spawn shell: {}", + e + )))) + .await; + return; + } + }, + } + } else { + match cmd.spawn() { + Ok(c) => c, + Err(e) => { + let _ = tx + .send(Err(Status::internal(format!( + "failed to spawn shell: {}", + e + )))) + .await; + return; + } + } + }; + + let mut child_stdin = child.stdin.take().unwrap(); + let mut child_stdout = child.stdout.take().unwrap(); + let mut child_stderr = child.stderr.take().unwrap(); + + // Forward initial data from first message (if any) + if !first_msg.data.is_empty() { + let _ = child_stdin.write_all(&first_msg.data).await; + } + + let tx_clone = tx.clone(); + + // Task: read from client stream → child stdin + let stdin_task = tokio::spawn(async move { + while let Ok(Some(msg)) = in_stream.message().await { + if !msg.data.is_empty() { + if child_stdin.write_all(&msg.data).await.is_err() { + break; + } + } + } + drop(child_stdin); // EOF to child + }); + + // Task: read child stderr → merge into output + let tx_stderr = tx.clone(); + let stderr_task = tokio::spawn(async move { + let mut buf = vec![0u8; 4096]; + loop { + match child_stderr.read(&mut buf).await { + Ok(0) => break, + Ok(n) => { + if tx_stderr + .send(Ok(AttachJobOutput { + data: buf[..n].to_vec(), + eof: false, + })) + .await + .is_err() + { + break; + } + } + Err(_) => break, + } + } + }); + + // Main: read child stdout → output stream + let mut buf = vec![0u8; 4096]; + loop { + match child_stdout.read(&mut buf).await { + Ok(0) => break, + Ok(n) => { + if tx_clone + .send(Ok(AttachJobOutput { + data: buf[..n].to_vec(), + eof: false, + })) + .await + .is_err() + { + break; + } + } + Err(_) => break, + } + } + + // Wait for child to exit + let _ = child.wait().await; + stdin_task.abort(); + stderr_task.abort(); + + // Send EOF + let _ = tx_clone + .send(Ok(AttachJobOutput { + data: Vec::new(), + eof: true, + })) + .await; + }); + + Ok(Response::new(ReceiverStream::new(rx))) + } +} + +impl AgentService { + /// Read environment variables from a running process via /proc. + fn read_proc_env(pid: u32) -> Vec<(String, String)> { + let path = format!("/proc/{}/environ", pid); + match std::fs::read(&path) { + Ok(data) => data + .split(|&b| b == 0) + .filter_map(|entry| { + let s = std::str::from_utf8(entry).ok()?; + let (k, v) = s.split_once('=')?; + Some((k.to_string(), v.to_string())) + }) + .collect(), + Err(_) => Vec::new(), + } + } } pub fn create_server(reporter: Arc) -> SlurmAgentServer { diff --git a/crates/spurd/src/container.rs b/crates/spurd/src/container.rs index 2904756..2b4103b 100644 --- a/crates/spurd/src/container.rs +++ b/crates/spurd/src/container.rs @@ -69,25 +69,48 @@ pub struct ContainerConfig { /// - Image name (looked up in image_dir()) /// - docker:// URI (must be pre-imported with `spur image import`) pub fn resolve_image(image: &str) -> anyhow::Result { - // Absolute path to squashfs let path = Path::new(image); - if path.is_absolute() && path.exists() { - return Ok(path.to_path_buf()); + + // Absolute path: use directly if it exists + if path.is_absolute() { + if path.exists() { + return Ok(path.to_path_buf()); + } + // Path was resolved on the login node (sbatch) but doesn't exist + // locally — try the basename in our local image directory. This + // handles the case where login node and compute node use separate + // (non-shared) image directories. + if let Some(filename) = path.file_name() { + let local = image_dir().join(filename); + if local.exists() { + return Ok(local); + } + } } - // Check image dir (respects SPUR_IMAGE_DIR env var) + // Check image dir with sanitized name (respects SPUR_IMAGE_DIR env var) let dir = image_dir(); - let image_path = dir.join(format!("{}.sqsh", sanitize_name(image))); + let sanitized = sanitize_name(image); + let image_path = dir.join(format!("{}.sqsh", sanitized)); if image_path.exists() { return Ok(image_path); } // Try without .sqsh extension - let image_path = dir.join(sanitize_name(image)); + let image_path = dir.join(&sanitized); if image_path.exists() { return Ok(image_path); } + // Also check ~/.spur/images (matching CLI's resolve_image_dir behavior) + if let Some(home) = std::env::var_os("HOME") { + let home_dir = PathBuf::from(home).join(".spur/images"); + let image_path = home_dir.join(format!("{}.sqsh", sanitized)); + if image_path.exists() { + return Ok(image_path); + } + } + bail!( "container image '{}' not found in {}. Import it first with: spur image import {}", image, diff --git a/proto/slurm.proto b/proto/slurm.proto index 86f5cac..cf7acab 100644 --- a/proto/slurm.proto +++ b/proto/slurm.proto @@ -265,6 +265,8 @@ service SlurmAgent { rpc GetNodeResources(google.protobuf.Empty) returns (NodeResourcesResponse); rpc ExecInJob(ExecInJobRequest) returns (ExecInJobResponse); rpc StreamJobOutput(StreamJobOutputRequest) returns (stream StreamJobOutputChunk); + // Interactive attach: bidirectional streaming for stdin/stdout forwarding + rpc AttachJob(stream AttachJobInput) returns (stream AttachJobOutput); } // -- Accounting service (slurmdbd) -- @@ -453,6 +455,16 @@ message StreamJobOutputChunk { bool eof = 2; } +message AttachJobInput { + uint32 job_id = 1; // set on first message; ignored after + bytes data = 2; // stdin data to forward to the job +} + +message AttachJobOutput { + bytes data = 1; // stdout+stderr from the job + bool eof = 2; +} + // -- Accounting -- message RecordJobStartRequest {