Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/spur-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ chrono = { workspace = true }
anyhow = { workspace = true }
thiserror = { workspace = true }
prost-types = { workspace = true }
tokio-stream = "0.1"
whoami = "2.1.1"
atty = "0.2.14"
nix = { workspace = true, features = ["user"] }
Expand Down
41 changes: 41 additions & 0 deletions crates/spur-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,48 @@ mod strigger;

use std::path::Path;

/// If SPUR_CONTROLLER_ADDR is not already set, try to read the controller
/// address from the config file so that all subcommands pick it up
/// automatically via their `env = "SPUR_CONTROLLER_ADDR"` clap annotation.
///
/// Priority: --controller CLI arg > SPUR_CONTROLLER_ADDR env > config file > default
fn load_controller_addr_from_config() {
if std::env::var("SPUR_CONTROLLER_ADDR").is_ok() {
return; // User already set it explicitly
}

// Check SPUR_CONF for custom config path, then /etc/spur/spur.conf
let config_path = std::env::var("SPUR_CONF")
.map(std::path::PathBuf::from)
.unwrap_or_else(|_| std::path::PathBuf::from("/etc/spur/spur.conf"));

if !config_path.exists() {
return;
}

if let Ok(config) = spur_core::config::SlurmConfig::load(&config_path) {
// Extract host and port from the config
let host = config
.controller
.hosts
.first()
.map(|h| h.as_str())
.unwrap_or("localhost");
let port = config
.controller
.listen_addr
.rsplit(':')
.next()
.unwrap_or("6817");
let addr = format!("http://{}:{}", host, port);
std::env::set_var("SPUR_CONTROLLER_ADDR", &addr);
}
}

fn main() -> anyhow::Result<()> {
// Load controller address from config file (if not set via env var)
load_controller_addr_from_config();

// Multi-call binary: dispatch based on argv[0] (symlink name).
let argv0 = std::env::args().next().unwrap_or_else(|| "spur".into());
let bin_name = Path::new(&argv0)
Expand Down
108 changes: 103 additions & 5 deletions crates/spur-cli/src/sattach.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::{Context, Result};
use clap::Parser;
use spur_proto::proto::slurm_agent_client::SlurmAgentClient;
use spur_proto::proto::slurm_controller_client::SlurmControllerClient;
use spur_proto::proto::{GetJobRequest, JobState, StreamJobOutputRequest};
use spur_proto::proto::{AttachJobInput, GetJobRequest, JobState, StreamJobOutputRequest};
use std::io::Write;

/// Attach to a running job step's standard I/O.
Expand All @@ -12,10 +12,14 @@ pub struct SattachArgs {
/// Job ID (or job_id.step_id)
pub job_step: String,

/// Stream to attach to
/// Stream to attach to (stdout or stderr) — used in output-only mode
#[arg(long, default_value = "stdout")]
pub output: String,

/// Output-only mode: stream job output without interactive stdin forwarding
#[arg(long)]
pub output_only: bool,

/// Controller address
#[arg(
long,
Expand Down Expand Up @@ -67,22 +71,39 @@ pub async fn main_with_args(args: Vec<String>) -> Result<()> {

// Connect to the first node's agent
let first_node = nodelist.split(',').next().unwrap_or(nodelist).trim();

let agent_addr = format!("http://{}:6818", first_node);
let mut agent = SlurmAgentClient::connect(agent_addr.clone())
.await
.context(format!("failed to connect to agent at {}", agent_addr))?;

if args.output_only {
// Output-only mode: stream stdout/stderr without stdin
stream_output_only(&mut agent, job_id, &args.output).await
} else {
// Interactive mode: bidirectional stdin/stdout forwarding
interactive_attach(&mut agent, job_id).await
}
}

/// Stream job output without interactive input (legacy behavior).
async fn stream_output_only(
agent: &mut SlurmAgentClient<tonic::transport::Channel>,
job_id: u32,
stream_name: &str,
) -> Result<()> {
let mut stream = agent
.stream_job_output(StreamJobOutputRequest {
job_id,
stream: args.output.clone(),
stream: stream_name.to_string(),
})
.await
.context("failed to start output stream")?
.into_inner();

eprintln!("sattach: attached to job {} ({})", job_id, args.output);
eprintln!(
"sattach: streaming {} for job {} (output-only)",
stream_name, job_id
);

let stdout = std::io::stdout();
let mut handle = stdout.lock();
Expand All @@ -106,6 +127,83 @@ pub async fn main_with_args(args: Vec<String>) -> Result<()> {
Ok(())
}

/// Interactive attach: bidirectional stdin/stdout forwarding via AttachJob RPC.
async fn interactive_attach(
agent: &mut SlurmAgentClient<tonic::transport::Channel>,
job_id: u32,
) -> Result<()> {
use tokio::io::{AsyncBufReadExt, BufReader};

let (tx, rx) = tokio::sync::mpsc::channel::<AttachJobInput>(32);

// Send first message with job_id
tx.send(AttachJobInput {
job_id,
data: Vec::new(),
})
.await
.context("failed to send initial attach message")?;

// Spawn stdin reader task
let tx_stdin = tx.clone();
tokio::spawn(async move {
let stdin = tokio::io::stdin();
let mut reader = BufReader::new(stdin);
let mut line = String::new();
loop {
line.clear();
match reader.read_line(&mut line).await {
Ok(0) => break, // EOF
Ok(_) => {
if tx_stdin
.send(AttachJobInput {
job_id,
data: line.as_bytes().to_vec(),
})
.await
.is_err()
{
break;
}
}
Err(_) => break,
}
}
drop(tx_stdin);
});

// Make the bidirectional streaming call
let response = agent
.attach_job(tokio_stream::wrappers::ReceiverStream::new(rx))
.await
.context("attach_job RPC failed")?;

let mut out_stream = response.into_inner();
let stdout = std::io::stdout();
let mut handle = stdout.lock();

loop {
match out_stream.message().await {
Ok(Some(chunk)) => {
if chunk.eof {
break;
}
if !chunk.data.is_empty() {
let _ = handle.write_all(&chunk.data);
let _ = handle.flush();
}
}
Ok(None) => break,
Err(e) => {
eprintln!("sattach: stream error: {}", e);
break;
}
}
}

Ok(())
}

fn state_name(state: i32) -> &'static str {
match state {
0 => "PENDING",
Expand Down
10 changes: 10 additions & 0 deletions crates/spur-k8s/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ impl VirtualAgent {
impl SlurmAgent for VirtualAgent {
type StreamJobOutputStream =
tokio_stream::wrappers::ReceiverStream<Result<StreamJobOutputChunk, Status>>;
type AttachJobStream = tokio_stream::wrappers::ReceiverStream<Result<AttachJobOutput, Status>>;

async fn launch_job(
&self,
Expand Down Expand Up @@ -390,6 +391,15 @@ impl SlurmAgent for VirtualAgent {
"output streaming not supported for K8s agent",
))
}

async fn attach_job(
&self,
_request: Request<tonic::Streaming<AttachJobInput>>,
) -> Result<Response<Self::AttachJobStream>, Status> {
Err(Status::unimplemented(
"interactive attach not supported for K8s agent",
))
}
}

impl VirtualAgent {
Expand Down
86 changes: 86 additions & 0 deletions crates/spur-tests/src/t50_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1018,4 +1018,90 @@ address = "http://peer-a:6817"
assert_eq!(req.job_id, 42);
assert_eq!(req.command.len(), 2);
}

// ── Issue #45: sattach interactive attach ─────────────────────

#[test]
fn t50_92_attach_job_proto_messages_exist() {
// Issue #45: AttachJob bidirectional streaming RPC should exist
// with AttachJobInput and AttachJobOutput messages.
let input = spur_proto::proto::AttachJobInput {
job_id: 10,
data: b"ls\n".to_vec(),
};
assert_eq!(input.job_id, 10);
assert_eq!(input.data, b"ls\n");

let output = spur_proto::proto::AttachJobOutput {
data: b"hello\n".to_vec(),
eof: false,
};
assert!(!output.eof);
assert_eq!(output.data, b"hello\n");
}

// ── Issue #46: CLI reads config file for controller port ──────

#[test]
fn t50_93_config_controller_addr_from_toml() {
// Issue #46: the CLI should read controller address from config.
// Verify SlurmConfig can parse a custom port and hosts.
let toml = r#"
cluster_name = "test"
[controller]
listen_addr = "[::]:6821"
hosts = ["ctrl.example.com"]
"#;
let config = spur_core::config::SlurmConfig::from_str(toml).unwrap();
assert_eq!(config.controller.listen_addr, "[::]:6821");
assert_eq!(config.controller.hosts[0], "ctrl.example.com");

// Verify we can extract port from listen_addr
let port = config.controller.listen_addr.rsplit(':').next().unwrap();
assert_eq!(port, "6821");
}

// ── Issue #47: nodes auto-join default partition ──────────────

#[test]
fn t50_94_node_auto_partition_assignment() {
// Issue #47: when a node doesn't match any partition hostlist,
// it should auto-join the default partition.
let mut node = Node::new(
"dynamic-node".into(),
ResourceSet {
cpus: 8,
memory_mb: 16384,
..Default::default()
},
);
assert!(node.partitions.is_empty());

// Simulate auto-assign: if no partitions matched, add to default
let default_partition = make_partition("batch", 1);
if node.partitions.is_empty() {
node.partitions.push(default_partition.name.clone());
}
assert_eq!(node.partitions, vec!["batch"]);
}

// ── Issue #48: container image resolution fallback ────────────

#[test]
fn t50_95_container_image_absolute_path_basename_fallback() {
// Issue #48: when the agent receives an absolute path from the
// login node that doesn't exist locally, it should try the
// basename in the local image directory.
let path = std::path::Path::new("/var/spool/spur/images/ubuntu+22.04.sqsh");
let basename = path.file_name().unwrap().to_str().unwrap();
assert_eq!(basename, "ubuntu+22.04.sqsh");

// Verify sanitize_name works correctly for the expected pattern
// (colon and slash replaced with +)
let name = "ubuntu:22.04";
let expected = "ubuntu+22.04";
let sanitized = name.replace('/', "+").replace(':', "+");
assert_eq!(sanitized, expected);
assert_eq!(format!("{}.sqsh", sanitized), basename);
}
}
13 changes: 12 additions & 1 deletion crates/spurctld/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ impl ClusterManager {
node.agent_start_time = Some(Utc::now());
node.last_heartbeat = Some(Utc::now());

// Assign to partitions based on config
// Assign to partitions based on config hostlist patterns
let partitions = self.partitions.read();
for part in partitions.iter() {
if let Ok(hosts) = spur_core::hostlist::expand(&part.nodes) {
Expand All @@ -698,6 +698,17 @@ impl ClusterManager {
}
}

// If node didn't match any partition's hostlist, auto-assign to the
// default partition so dynamically-registered nodes are always
// schedulable. This matches Slurm's behavior for unconfigured nodes.
if node.partitions.is_empty() {
if let Some(default_part) = partitions.iter().find(|p| p.is_default) {
node.partitions.push(default_part.name.clone());
} else if let Some(first) = partitions.first() {
node.partitions.push(first.name.clone());
}
}

// Copy features and weight from node config
for nc in &self.config.nodes {
if let Ok(hosts) = spur_core::hostlist::expand(&nc.names) {
Expand Down
Loading
Loading