From 941d56d7a23966ef276d72de4de1820ce5a9baa4 Mon Sep 17 00:00:00 2001 From: raviguptaamd Date: Sat, 9 May 2026 08:10:32 +0000 Subject: [PATCH 1/8] feat(deployment): add slurm_multi launcher (minimal additive) Introduce the `slurm_multi` self-managed multi-node SLURM launcher, fully additive vs develop. Verbatim minimal-delta port from PR #86 limited to the slurm_multi surface; no develop functionality removed. - common.py: register `slurm_multi` in VALID_LAUNCHERS, add hyphen alias normalization (`slurm-multi` -> `slurm_multi`). All other develop content (functools.lru_cache, _ROCPROF_FAMILY_TOOL_NAMES, tools_include_rocprof_family, full configure_multi_node_profiling rocprof-family handling, sglang-disagg in VALID_LAUNCHERS) preserved. - slurm.py: SLURM_MULTI_ALIASES; reservation field + allocation detection in __init__; helper methods (_get_allocation_node_count, _validate_allocation_nodes); slurm_multi early dispatch in prepare(); _prepare_slurm_multi_script (sets `_is_slurm_multi = True` for deploy() gate); _generate_slurm_multi_command; new elif in _generate_launcher_command (existing primus/sglang-disagg dispatch preserved); gated bash-in-salloc branch in deploy() (slurm_multi only -- non-slurm_multi launchers continue to use sbatch); _run_inside_existing_allocation; _collect_slurm_multi_results + early dispatch in collect_results(); reservation= kwarg threaded to existing SlurmNodeSelector(...) call. - slurm_node_selector.py: optional `reservation` parameter so the reservation propagates to srun health/cleanup commands. Source: ROCm/madengine PR #86 (slurm_multi surface only). 0 deletions from develop. Co-authored-by: Cursor --- src/madengine/deployment/common.py | 8 +- src/madengine/deployment/slurm.py | 620 ++++++++++++++++++ .../deployment/slurm_node_selector.py | 7 + 3 files changed, 634 insertions(+), 1 deletion(-) diff --git a/src/madengine/deployment/common.py b/src/madengine/deployment/common.py index 5b898960..7fb735eb 100644 --- a/src/madengine/deployment/common.py +++ b/src/madengine/deployment/common.py @@ -21,7 +21,8 @@ "primus", "vllm", "sglang", - "sglang-disagg" + "sglang-disagg", + "slurm_multi", ] # Tool names that use rocprof / rocprofv3 wrapping and need MPI-aware rocprofv3 on multi-node. @@ -62,6 +63,8 @@ def normalize_launcher(launcher_type: Optional[str], deployment_type: str) -> st Logic: - If launcher is in VALID_LAUNCHERS: keep as-is + - If launcher's hyphen/underscore variant is in VALID_LAUNCHERS: normalize + (e.g. "slurm-multi" -> "slurm_multi") - If launcher is None/empty/invalid: * local → "docker" (runs in Docker container) * slurm → "docker" (typically uses containers on compute nodes) @@ -76,6 +79,9 @@ def normalize_launcher(launcher_type: Optional[str], deployment_type: str) -> st """ if launcher_type and launcher_type in VALID_LAUNCHERS: return launcher_type + # Normalize hyphen variant: slurm-multi -> slurm_multi + if launcher_type and launcher_type.replace("-", "_") in VALID_LAUNCHERS: + return launcher_type.replace("-", "_") if deployment_type == "local": return "docker" if deployment_type == "slurm": diff --git a/src/madengine/deployment/slurm.py b/src/madengine/deployment/slurm.py index b5eefbd4..19fb1665 100644 --- a/src/madengine/deployment/slurm.py +++ b/src/madengine/deployment/slurm.py @@ -27,6 +27,11 @@ from madengine.utils.path_utils import scripts_base_dir_from import json +SLURM_MULTI_ALIASES = [ + "slurm_multi", + "slurm-multi", +] + class SlurmDeployment(BaseDeployment): """ @@ -70,6 +75,7 @@ def __init__(self, config: DeploymentConfig): self.gpus_per_node = self.slurm_config.get("gpus_per_node", 8) self.time_limit = self.slurm_config.get("time", "24:00:00") self.output_dir = Path(self.slurm_config.get("output_dir", "./slurm_results")) + self.reservation = self.slurm_config.get("reservation", None) # Setup Jinja2 template engine template_dir = Path(__file__).parent / "templates" / "slurm" @@ -78,6 +84,115 @@ def __init__(self, config: DeploymentConfig): # Generated script path self.script_path = None + # ========== OPTION 2: Detect existing SLURM allocation ========== + # If SLURM_JOB_ID exists, we're inside an salloc allocation + self.inside_allocation = os.environ.get("SLURM_JOB_ID") is not None + self.existing_job_id = os.environ.get("SLURM_JOB_ID", "") + self.allocation_nodes = self._get_allocation_node_count() + + if self.inside_allocation: + self.console.print( + f"[cyan]✓ Detected existing SLURM allocation: Job {self.existing_job_id}[/cyan]" + ) + self.console.print( + f" Allocation has {self.allocation_nodes} nodes available" + ) + + def _get_allocation_node_count(self) -> int: + """ + Get number of nodes in current SLURM allocation. + + Note: SLURM_NNODES reflects the current job step, not the full allocation. + We query the job directly using scontrol to get the actual node count. + """ + if not self.inside_allocation: + return 0 + + job_id = self.existing_job_id + + # Query the actual job's node count using scontrol (most accurate) + try: + result = subprocess.run( + ["scontrol", "show", "job", job_id], + capture_output=True, + text=True, + timeout=10, + ) + if result.returncode == 0: + # Parse NumNodes=X from output + for line in result.stdout.split("\n"): + if "NumNodes=" in line: + # Format: "NumNodes=3 NumCPUs=..." + for part in line.split(): + if part.startswith("NumNodes="): + try: + return int(part.split("=")[1]) + except (ValueError, IndexError): + pass + except Exception: + pass + + # Fallback: Try SLURM_JOB_NUM_NODES (full job node count, if set) + job_num_nodes = os.environ.get("SLURM_JOB_NUM_NODES") + if job_num_nodes: + try: + return int(job_num_nodes) + except ValueError: + pass + + # Fallback: SLURM_NNODES (may be step-specific, not full allocation) + nnodes = os.environ.get("SLURM_NNODES") + if nnodes: + try: + return int(nnodes) + except ValueError: + pass + + # Last resort: count nodes in SLURM_NODELIST + nodelist = os.environ.get("SLURM_NODELIST") + if nodelist: + try: + result = subprocess.run( + ["scontrol", "show", "hostname", nodelist], + capture_output=True, + text=True, + timeout=10, + ) + if result.returncode == 0: + return len(result.stdout.strip().split("\n")) + except Exception: + pass + + return 0 + + def _validate_allocation_nodes(self) -> tuple: + """ + Validate that existing allocation has enough nodes for the job. + + Returns: + Tuple of (is_valid, error_message) + """ + if not self.inside_allocation: + return True, "" + + requested_nodes = self.nodes + available_nodes = self.allocation_nodes + + if available_nodes < requested_nodes: + return False, ( + f"Insufficient nodes in current allocation. " + f"Requested: {requested_nodes}, Available: {available_nodes}. " + f"Either reduce nodes in config or use a larger allocation." + ) + + if available_nodes > requested_nodes: + self.console.print( + f"[yellow]⚠ Note: Using {requested_nodes} of {available_nodes} " + f"available nodes in allocation[/yellow]" + ) + + return True, "" + def validate(self) -> bool: """Validate SLURM commands are available locally.""" # Check required SLURM CLI tools @@ -177,6 +292,34 @@ def _validate_cli_availability(self) -> bool: def prepare(self) -> bool: """Generate sbatch script from template.""" + # slurm_multi early dispatch: peek at the model's launcher type so slurm_multi + # can take a self-managed path without requiring madengine on compute nodes. + # Other launchers fall through to the standard template flow unchanged. + try: + model_keys_peek = list((self.manifest or {}).get("built_models", {}).keys()) + if model_keys_peek: + model_info_peek = self.manifest["built_models"][model_keys_peek[0]] + model_distributed_peek = model_info_peek.get("distributed", {}) + launcher_type_peek = ( + model_distributed_peek.get("launcher") + or self.distributed_config.get("launcher", "torchrun") + ) + launcher_normalized_peek = launcher_type_peek.lower().replace("_", "-") + slurm_multi_aliases_normalized = [ + a.lower().replace("_", "-") for a in SLURM_MULTI_ALIASES + ] + if launcher_normalized_peek in slurm_multi_aliases_normalized: + self.output_dir.mkdir(parents=True, exist_ok=True) + self.console.print( + f"[cyan]Detected slurm_multi launcher: {launcher_type_peek}[/cyan]" + ) + return self._prepare_slurm_multi_script( + model_info_peek, docker_image_name=model_keys_peek[0] + ) + except Exception: + # Fall through to develop's standard flow on any peek error + pass + # Validate environment BEFORE generating job scripts self.console.print("\n[bold]Validating submission environment...[/bold]") if not self._validate_cli_availability(): @@ -224,6 +367,217 @@ def _normalize_nodelist(nodelist: Optional[str]) -> Optional[str]: return None return ",".join(n.strip() for n in nodelist.split(",") if n.strip()) + def _prepare_slurm_multi_script(self, model_info: Dict, docker_image_name: str = None) -> bool: + """ + Generate a simple wrapper script for slurm_multi (self-managed) launchers. + + These launchers (slurm_multi, sglang-disagg, vllm-disagg) run the model's + .slurm script directly on the host, which then manages Docker containers + via srun. No madengine wrapper needed. + + Args: + model_info: Model configuration from manifest + docker_image_name: The built Docker image name from manifest key + """ + self._is_slurm_multi = True + # Get the model's script path + model_script = model_info.get("scripts", "") + if not model_script: + self.console.print("[red]✗ No scripts defined in model_info[/red]") + return False + + # Get manifest directory (where the model script is relative to) + manifest_dir = Path(self.config.manifest_file).parent.absolute() + model_script_path = manifest_dir / model_script + + if not model_script_path.exists(): + self.console.print(f"[red]✗ Model script not found: {model_script_path}[/red]") + return False + + # Get environment variables + env_vars = {} + + # From model_info.env_vars + if "env_vars" in model_info: + env_vars.update(model_info["env_vars"]) + + # From additional_context.env_vars + if "env_vars" in self.config.additional_context: + env_vars.update(self.config.additional_context["env_vars"]) + + # From distributed config (model's distributed section) + model_distributed = model_info.get("distributed", {}) + sglang_disagg_config = model_distributed.get("sglang_disagg", {}) or self.distributed_config.get("sglang_disagg", {}) + if sglang_disagg_config: + env_vars["xP"] = str(sglang_disagg_config.get("prefill_nodes", 1)) + env_vars["yD"] = str(sglang_disagg_config.get("decode_nodes", 1)) + + # Override DOCKER_IMAGE_NAME with the built image from manifest + # This ensures the run uses the freshly built image, not the base image + # Priority: docker_image_name param > model_info.docker_image > env_vars.DOCKER_IMAGE_NAME + if docker_image_name and docker_image_name.startswith("ci-"): + # The manifest key IS the built image name for madengine-built images + self.console.print(f"[cyan]Using built Docker image: {docker_image_name}[/cyan]") + env_vars["DOCKER_IMAGE_NAME"] = docker_image_name + elif "docker_image" in model_info: + built_image = model_info["docker_image"] + self.console.print(f"[cyan]Using Docker image: {built_image}[/cyan]") + env_vars["DOCKER_IMAGE_NAME"] = built_image + elif "image" in model_info: + # Fallback to 'image' field + built_image = model_info["image"] + self.console.print(f"[cyan]Using Docker image: {built_image}[/cyan]") + env_vars["DOCKER_IMAGE_NAME"] = built_image + + # Get model args + model_args = model_info.get("args", "") + + # Generate simple wrapper script + # IMPORTANT: SBATCH directives MUST be at the top, right after #!/bin/bash + script_lines = [ + "#!/bin/bash", + f"#SBATCH --job-name=madengine-{model_info['name']}", + f"#SBATCH --output={self.output_dir}/madengine-{model_info['name']}_%j_%t.out", + f"#SBATCH --error={self.output_dir}/madengine-{model_info['name']}_%j_%t.err", + f"#SBATCH --partition={self.partition}", + f"#SBATCH --nodes={self.nodes}", + f"#SBATCH --ntasks={self.nodes}", + f"#SBATCH --gpus-per-node={self.gpus_per_node}", + f"#SBATCH --time={self.time_limit}", + ] + # Honour user-configured exclusivity (defaults to True to match the standard SLURM template). + if self.slurm_config.get("exclusive", True): + script_lines.append("#SBATCH --exclusive") + + # Add reservation if specified + if self.reservation: + script_lines.append(f"#SBATCH --reservation={self.reservation}") + + # Add nodelist if specified (from model card or --additional-context) + nodelist = self._normalize_nodelist(self.slurm_config.get("nodelist")) + if nodelist: + script_lines.append(f"#SBATCH --nodelist={nodelist}") + + script_lines.extend([ + "", + f"# slurm_multi launcher script for {model_info['name']}", + f"# Generated by madengine for slurm_multi", + "", + "set -e", + "", + "# Environment variables", + ]) + + for key, value in env_vars.items(): + script_lines.append(f"export {key}=\"{value}\"") + + script_lines.append("") + script_lines.extend([ + "echo '=========================================='", + "echo 'slurm_multi Launcher'", + "echo '=========================================='", + f"echo 'Model: {model_info['name']}'", + f"echo 'Script: {model_script_path}'", + "echo 'SLURM_JOB_ID:' $SLURM_JOB_ID", + "echo 'SLURM_NNODES:' $SLURM_NNODES", + "echo 'SLURM_NODELIST:' $SLURM_NODELIST", + "echo ''", + ]) + + # Check if image needs parallel pull on all nodes + # Pull if: image is from registry (contains / or .) and not a local ci-* build + docker_image = env_vars.get("DOCKER_IMAGE_NAME", "") + is_registry_image = docker_image and not docker_image.startswith("ci-") and ("/" in docker_image or "." in docker_image) + + if is_registry_image: + # Add parallel docker pull on all nodes + # This ensures all nodes have the image before running + script_lines.extend([ + "", + "# Pull Docker image in parallel on all nodes", + "echo '=========================================='", + "echo 'Pulling Docker image on all nodes in parallel'", + "echo '=========================================='", + f"echo 'Image: {docker_image}'", + "echo ''", + "", + f"srun --nodes=$SLURM_NNODES --ntasks=$SLURM_NNODES bash -c \"", + f" echo \\\"[\\$(hostname)] Pulling {docker_image}...\\\"", + f" docker pull {docker_image}", + " PULL_RC=\\$?", + " if [ \\$PULL_RC -eq 0 ]; then", + " echo \\\"[\\$(hostname)] Pull SUCCESS\\\"", + " else", + " echo \\\"[\\$(hostname)] Pull FAILED with exit code \\$PULL_RC\\\"", + " fi", + " exit \\$PULL_RC", + "\"", + "PULL_EXIT=$?", + "", + "if [ $PULL_EXIT -ne 0 ]; then", + " echo 'Docker pull failed on one or more nodes'", + " exit $PULL_EXIT", + "fi", + "", + "echo ''", + "echo 'Docker image pulled on all nodes'", + "echo ''", + ]) + + # Create completion marker path for robust completion detection. + # Namespace by SLURM_JOB_ID so concurrent / repeat runs of the same model + # tag don't collide on each other's marker files. monitor() reconstructs + # the same path using the deployment_id returned by sbatch. + completion_marker_dir = self.output_dir.resolve() + completion_marker_template = ( + completion_marker_dir + / f"madengine_{model_info['name']}_${{SLURM_JOB_ID:-local}}.complete" + ) + + script_lines.extend([ + "", + "# Change to script directory", + f"cd {model_script_path.parent}", + "", + "# Run the model script directly on the host", + f"echo 'Executing: bash {model_script_path.name} {model_args}'", + f"bash {model_script_path.name} {model_args}", + "SCRIPT_EXIT_CODE=$?", + "", + "echo ''", + "echo 'Script completed.'", + "", + "# Write completion marker for madengine to detect (job-id namespaced)", + f"echo \"exit_code=$SCRIPT_EXIT_CODE\" > {completion_marker_template}", + f"echo \"timestamp=$(date -Iseconds)\" >> {completion_marker_template}", + f"echo \"Completion marker written: {completion_marker_template}\"", + "", + "exit $SCRIPT_EXIT_CODE", + ]) + + # Store marker info for monitor() to reconstruct the path with deployment_id. + self._completion_marker_dir = completion_marker_dir + self._completion_marker_basename_template = ( + f"madengine_{model_info['name']}_{{job_id}}.complete" + ) + # Backward-compat: monitor() falls back to this single path when the + # job-id-aware path is unavailable (e.g. inside_allocation flow). + self._completion_marker = ( + completion_marker_dir / f"madengine_{model_info['name']}_local.complete" + ) + + script_content = "\n".join(script_lines) + + # Save script + self.script_path = self.output_dir / f"madengine_{model_info['name']}.sh" + self.script_path.write_text(script_content) + self.script_path.chmod(0o755) + + self.console.print(f"[green]✓ Generated slurm_multi script: {self.script_path}[/green]") + self.console.print(f" Model script: {model_script_path}") + self.console.print(f" Environment: {len(env_vars)} variables") + + return True def _prepare_template_context(self, model_info: Dict) -> Dict[str, Any]: """Prepare context for Jinja2 template rendering.""" # Use hierarchical GPU resolution: runtime > deployment > model > default @@ -349,6 +703,8 @@ def _generate_launcher_command( return self._generate_sglang_command(nnodes, nproc_per_node, master_port) elif launcher_type == "sglang-disagg" or launcher_type == "sglang_disagg": return self._generate_sglang_disagg_command(nnodes, nproc_per_node, master_port) + elif launcher_type.lower().replace("_", "-") in [a.lower().replace("_", "-") for a in SLURM_MULTI_ALIASES]: + return self._generate_slurm_multi_command(nnodes, nproc_per_node, master_port) elif launcher_type == "deepspeed": return self._generate_deepspeed_command(nnodes, nproc_per_node, master_port) elif launcher_type == "megatron": @@ -544,6 +900,102 @@ def _generate_sglang_disagg_command( echo "TP Size: {nproc_per_node}" echo "==========================================" +# No MAD_MULTI_NODE_RUNNER - SGLang disagg handles process management +# Model script should detect SGLANG_DISAGG_MODE and launch appropriately''' + + def _generate_slurm_multi_command( + self, nnodes: int, nproc_per_node: int, master_port: int + ) -> str: + """ + Generate slurm_multi launcher environment for SLURM. + + slurm_multi Architecture (self-managed multi-node): + - Node 0: Proxy (load balancer) + - Nodes 1 to xP: Prefill nodes + - Nodes xP+1 to xP+yD: Decode nodes + + Minimum cluster: 3 nodes (1 proxy + 1 prefill + 1 decode) + + Args: + nnodes: Total number of nodes (must be >= 3) + nproc_per_node: GPUs per node (tensor parallel size) + master_port: Master port for coordination + + Returns: + Environment setup with node role assignment + + Raises: + ValueError: If nnodes < 3 + """ + if nnodes < 3: + raise ValueError( + f"slurm_multi requires minimum 3 nodes " + f"(1 proxy + 1 prefill + 1 decode), got {nnodes}" + ) + + # Check if custom split is specified in additional_context + sglang_disagg_config = self.config.additional_context.get("distributed", {}).get("sglang_disagg", {}) + prefill_nodes = sglang_disagg_config.get("prefill_nodes") + decode_nodes = sglang_disagg_config.get("decode_nodes") + + if prefill_nodes is not None and decode_nodes is not None: + # User specified custom split - validate + if prefill_nodes < 1 or decode_nodes < 1: + raise ValueError( + f"SGLang Disaggregated requires at least 1 prefill and 1 decode node, " + f"got prefill={prefill_nodes}, decode={decode_nodes}" + ) + if prefill_nodes + decode_nodes + 1 != nnodes: + raise ValueError( + f"Custom split validation failed: " + f"prefill_nodes ({prefill_nodes}) + decode_nodes ({decode_nodes}) + 1 proxy " + f"must equal nnodes ({nnodes}), but got {prefill_nodes + decode_nodes + 1}" + ) + xP = prefill_nodes + yD = decode_nodes + else: + # Default split: use golden ratio for prefill/decode + # For N total nodes: 1 proxy + ~40% prefill + ~60% decode + xP = max(1, (nnodes - 1) * 2 // 5) # ~40% of worker nodes + yD = nnodes - 1 - xP # remaining nodes + + return f'''# SGLang Disaggregated multi-node setup +# ============================================ +# Cluster Configuration: +# Total Nodes: {nnodes} +# Proxy: 1 node (NODE_RANK=0) +# Prefill: {xP} nodes (NODE_RANK=1 to {xP}) +# Decode: {yD} nodes (NODE_RANK={xP+1} to {nnodes-1}) +# ============================================ + +# Export cluster topology +export SGLANG_DISAGG_MODE="enabled" +export SGLANG_DISAGG_PREFILL_NODES={xP} +export SGLANG_DISAGG_DECODE_NODES={yD} +export SGLANG_DISAGG_TOTAL_NODES={nnodes} +export SGLANG_TP_SIZE={nproc_per_node} + +# Master coordination +export MASTER_PORT={master_port} + +# Build node IP list from SLURM +SLURM_NODE_IPS=$(scontrol show hostname ${{SLURM_JOB_NODELIST}} | while read node; do + getent hosts "$node" | awk '{{print $1}}' +done | tr '\\n' ',' | sed 's/,$//') + +export SGLANG_NODE_IPS="$SLURM_NODE_IPS" +export SGLANG_NODE_RANK=${{SLURM_PROCID}} + +echo "==========================================" +echo "SGLang Disaggregated Cluster Info" +echo "==========================================" +echo "Node Rank: $SGLANG_NODE_RANK" +echo "Node IPs: $SGLANG_NODE_IPS" +echo "Prefill Nodes: {xP}" +echo "Decode Nodes: {yD}" +echo "TP Size: {nproc_per_node}" +echo "==========================================" + # No MAD_MULTI_NODE_RUNNER - SGLang disagg handles process management # Model script should detect SGLANG_DISAGG_MODE and launch appropriately''' @@ -717,6 +1169,12 @@ def deploy(self) -> DeploymentResult: message="Script not generated. Run prepare() first.", ) + # slurm_multi inside an existing salloc allocation: run the generated script + # directly with bash instead of nesting another sbatch. Non-slurm_multi launchers + # always fall through to the standard sbatch flow (preserves develop behavior). + if self.inside_allocation and getattr(self, "_is_slurm_multi", False): + return self._run_inside_existing_allocation() + # ==================== PREFLIGHT NODE SELECTION ==================== # For single- and multi-node jobs, check for clean nodes and exclude bad ones. # Single-node: we still run the check so bad nodes (e.g. Docker broken) get excluded; @@ -734,6 +1192,7 @@ def deploy(self) -> DeploymentResult: console=self.console, auto_cleanup=auto_cleanup, verbose=self.slurm_config.get("verbose_node_check", False), + reservation=self.reservation, ) clean_nodes, updated_exclude = selector.select_nodes( partition=self.partition, @@ -826,6 +1285,79 @@ def deploy(self) -> DeploymentResult: message=f"Deployment error: {str(e)}", ) + def _run_inside_existing_allocation(self) -> DeploymentResult: + """ + Run script directly inside existing salloc allocation using bash. + + The script will use the nodes already allocated to the current job. + SLURM environment variables (SLURM_NODELIST, etc.) are inherited. + """ + # Validate node count before running + is_valid, error_msg = self._validate_allocation_nodes() + if not is_valid: + return DeploymentResult( + status=DeploymentStatus.FAILED, + deployment_id=self.existing_job_id, + message=error_msg, + ) + + self.console.print( + f"\n[bold cyan]Running inside existing SLURM allocation[/bold cyan]" + ) + self.console.print(f" Job ID: {self.existing_job_id}") + self.console.print(f" Using {self.nodes} of {self.allocation_nodes} allocated nodes") + self.console.print(f" GPUs per node: {self.gpus_per_node}") + self.console.print(f" Script: {self.script_path}") + self.console.print(f"\n[dim]Executing: bash {self.script_path}[/dim]\n") + + try: + # Run script directly with bash (synchronous, blocks until done) + # Don't capture output - let it stream directly to console + result = subprocess.run( + ["bash", str(self.script_path)], + timeout=self.config.timeout if self.config.timeout > 0 else None, + ) + + if result.returncode == 0: + self.console.print( + f"\n[green]✓ Script completed successfully in allocation {self.existing_job_id}[/green]" + ) + return DeploymentResult( + status=DeploymentStatus.SUCCESS, + deployment_id=self.existing_job_id, + message=f"Completed inside existing allocation {self.existing_job_id}", + logs_path=str(self.output_dir), + skip_monitoring=True, # Already ran synchronously, no need to poll + ) + else: + self.console.print( + f"\n[red]✗ Script failed with exit code {result.returncode}[/red]" + ) + return DeploymentResult( + status=DeploymentStatus.FAILED, + deployment_id=self.existing_job_id, + message=f"Script failed with exit code {result.returncode}", + logs_path=str(self.output_dir), + skip_monitoring=True, # Already ran synchronously + ) + + except subprocess.TimeoutExpired: + self.console.print( + f"\n[red]✗ Script timed out after {self.config.timeout}s[/red]" + ) + return DeploymentResult( + status=DeploymentStatus.FAILED, + deployment_id=self.existing_job_id, + message=f"Script timed out after {self.config.timeout}s", + ) + except Exception as e: + self.console.print(f"\n[red]✗ Execution error: {e}[/red]") + return DeploymentResult( + status=DeploymentStatus.FAILED, + deployment_id=self.existing_job_id, + message=f"Execution error: {str(e)}", + ) + def monitor(self, deployment_id: str) -> DeploymentResult: """Check SLURM job status (locally).""" try: @@ -1204,6 +1736,21 @@ def collect_results(self, deployment_id: str) -> Dict[str, Any]: model_name_for_path = model_info_for_path.get("name", model_key or "unknown") model_name = model_key or "unknown" # image key for build_info / model_info_for_entry lookups + # slurm_multi early dispatch: model script emits its own perf.csv directly, + # so collect via _collect_slurm_multi_results instead of the template-based path. + if model_key: + _mi = built_models_dict.get(model_key, {}) or {} + _launcher_type = (_mi.get("distributed") or {}).get("launcher", "") + _launcher_normalized = _launcher_type.lower().replace("_", "-") + _slurm_multi_aliases_normalized = [ + a.lower().replace("_", "-") for a in SLURM_MULTI_ALIASES + ] + if _launcher_normalized in _slurm_multi_aliases_normalized: + return self._collect_slurm_multi_results( + deployment_id, results, session_start_row + ) + + build_info = {} built_images = self.manifest.get("built_images") or {} if built_images: @@ -1519,6 +2066,79 @@ def collect_results(self, deployment_id: str) -> Dict[str, Any]: ) return results + def _collect_slurm_multi_results( + self, deployment_id: str, results: Dict[str, Any], session_start_row: Optional[int] + ) -> Dict[str, Any]: + """ + Collect results for slurm_multi launchers. + + slurm_multi model scripts generate their own perf.csv via their + benchmark scripts (e.g. generate_perf_csv.py). We collect SLURM + logs for diagnostics and read the model-generated perf.csv for metrics. + """ + # Collect SLURM output logs for diagnostics + flat_out_files = sorted(self.output_dir.glob(f"madengine-*_{deployment_id}_*.out")) + results["logs"] = [str(f) for f in flat_out_files] + + # Look for model-generated perf.csv. Inner scripts in MAD-private write + # to one of these locations depending on the workload: + # * SGLang / vLLM disagg: /shared_inference///perf.csv + # * Large EP / KV cache: /slurm_output/perf_csv/**.csv + # Plus the legacy /perf.csv path some flows still use. + # Priority: results_dir config > shared_inference NFS > slurm_output/perf_csv + # > /perf.csv (with NFS-propagation retry). + perf_csv_path = None + if self.slurm_config.get("results_dir"): + results_dir = Path(self.slurm_config["results_dir"]) + candidates = list(results_dir.glob("perf*.csv")) + if candidates: + perf_csv_path = candidates[0] + + if not perf_csv_path: + user = os.environ.get("USER", "") + shared_candidates = [] + if user: + shared_candidates.extend([ + Path(f"/shared_inference/{user}/{deployment_id}/perf.csv"), + Path(f"/shared_inference/{user}/model_blog_logs/{deployment_id}/perf.csv"), + ]) + workspace_perf_dir = Path("slurm_output/perf_csv") + workspace_candidates = list(workspace_perf_dir.glob(f"*{deployment_id}*.csv")) + workspace_perf = Path("perf.csv") + + # Retry briefly for NFS propagation after SLURM job completion + import time + for _attempt in range(6): + for cand in shared_candidates: + if cand.exists() and cand.stat().st_size > 0: + perf_csv_path = cand + break + if perf_csv_path: + break + if workspace_candidates: + perf_csv_path = workspace_candidates[0] + break + if workspace_perf.exists() and workspace_perf.stat().st_size > 0: + perf_csv_path = workspace_perf + break + time.sleep(5) + # Re-glob in case the file appeared during the wait. + workspace_candidates = list(workspace_perf_dir.glob(f"*{deployment_id}*.csv")) + + if perf_csv_path: + results["perf_files"] = [str(perf_csv_path)] + self._collect_results_parse_perf_csv(results, session_start_row) + else: + self.console.print("[yellow]No perf.csv found from slurm_multi model script[/yellow]") + + self.console.print( + f"[green]Collected slurm_multi results: {len(results['perf_files'])} perf files, " + f"{len(results['logs'])} log files, " + f"{len(results['successful_runs'])} successful, " + f"{len(results['failed_runs'])} failed[/green]" + ) + return results + def _collect_results_parse_perf_csv( self, results: Dict[str, Any], session_start_row: Optional[int] ) -> None: diff --git a/src/madengine/deployment/slurm_node_selector.py b/src/madengine/deployment/slurm_node_selector.py index 408e8d3c..f63593ac 100644 --- a/src/madengine/deployment/slurm_node_selector.py +++ b/src/madengine/deployment/slurm_node_selector.py @@ -72,6 +72,7 @@ def __init__( auto_cleanup: bool = False, verbose: bool = False, timeout: int = 30, + reservation: Optional[str] = None, ): """ Initialize node selector. @@ -81,11 +82,13 @@ def __init__( auto_cleanup: Automatically clean dirty nodes verbose: Enable verbose logging timeout: Timeout for srun commands (seconds) + reservation: SLURM reservation name (passed through to srun health/cleanup) """ self.console = console or Console() self.auto_cleanup = auto_cleanup self.verbose = verbose self.timeout = timeout + self.reservation = reservation # Max candidates to check (avoids excessive checks on large clusters) MAX_CANDIDATES_CAP = 100 @@ -209,6 +212,8 @@ def check_node_health(self, node: str, job_name: Optional[str] = None) -> NodeSt ] if job_name: srun_cmd.append(f"--job-name={job_name}") + if self.reservation: + srun_cmd.append(f"--reservation={self.reservation}") srun_cmd.extend(["bash", "-c", check_script]) try: @@ -323,6 +328,8 @@ def cleanup_node(self, node: str, job_name: Optional[str] = None) -> bool: ] if job_name: srun_cmd.append(f"--job-name={job_name}") + if self.reservation: + srun_cmd.append(f"--reservation={self.reservation}") srun_cmd.extend(["bash", "-c", cleanup_script]) try: From bd371feb0a88d48bb522d45c20504c89cd345a90 Mon Sep 17 00:00:00 2001 From: raviguptaamd Date: Sat, 9 May 2026 08:10:54 +0000 Subject: [PATCH 2/8] feat(orchestration): build_on_compute, registry gate, parallel pull for slurm_multi Wire the build/run orchestration paths needed by the slurm_multi launcher. All existing flows for non-slurm_multi launchers continue to work unchanged; new behavior is gated on `distributed.launcher == "slurm_multi"` (or `slurm-multi`). - execution/container_runner.py: SLURM_MULTI_ALIASES, SELF_MANAGED_LAUNCHERS constants; `_run_self_managed` method (runs the model script directly on baremetal so srun/scontrol work inside it, propagates env from model_info.env_vars and additional_context.env_vars); self-managed launcher early dispatch in run_container() that fires only when launcher matches SELF_MANAGED_LAUNCHERS; `.slurm` extension recognized alongside `.sh` in script-extension detection (T-C2). Develop helpers preserved verbatim (_print_run_env_table, _docker_image_exists_locally, _bash_quote_path, _cp_model_dir_file_to_cwd_cmd, _resolve_docker_image, _ENV_KEY_RE env-var validation, MAD_OUTPUT_CSV passthrough, PRIMUS_CONFIG_PATH/PRIMUS_CLI_EXTRA in env-var allowlist, PERFORMANCE_LOG_PATTERN regex, CSV fieldname stripping). - execution/docker_builder.py: after build, write resolved registry image into built_models[*].env_vars.DOCKER_IMAGE_NAME so the run-time `srun docker pull "\$DOCKER_IMAGE_NAME"` on each compute node finds the registry image. Pure addition (one block before manifest assembly). - orchestration/build_orchestrator.py: import shutil; capture user-supplied slurm keys (_original_user_slurm_keys); add `use_image: Optional[str]` and `build_on_compute: bool` to execute(); mutex validation between use_image and build_on_compute; use_image dispatch (incl. "auto" sentinel that resolves from model card via _resolve_image_from_model_card); build_on_compute dispatch (registry required, validated in _execute_build_on_compute); slurm_multi registry gate that runs DiscoverModels early to detect slurm_multi tags and either raises a structured ConfigurationError with helpful suggestions or auto-uses DOCKER_IMAGE_NAME from the model card env_vars (implicit --use-image fallback); _execute_with_prebuilt_image; _resolve_image_from_model_card; _execute_build_on_compute (sbatch wrapper that builds + pushes from one compute node, polls completion, marks `built_on_compute: true` in the manifest). Develop's load_credentials top-level usage, detect_local_gpu_arch parameter, and resolved_arch print all preserved unchanged. Note: PR #86's universal merge of model_info.env_vars into docker_env_vars in run_container() was DELIBERATELY DROPPED -- it is not slurm_multi-essential (slurm_multi gets env_vars via _prepare_slurm_multi_script's wrapper export block and via _run_self_managed's own merge). Applying it universally would silently change behavior for non-slurm_multi launchers and has an incorrect precedence vs additional_context.env_vars; it can be revisited as a separate PR if needed. Source: ROCm/madengine PR #86 (slurm_multi surface only). 0 deletions from develop except the agreed `.sh` -> `.sh|.slurm` line modification (T-C2: harmless extension to script-extension recognition). Co-authored-by: Cursor --- src/madengine/execution/container_runner.py | 204 +++- src/madengine/execution/docker_builder.py | 11 + .../orchestration/build_orchestrator.py | 892 ++++++++++++++++++ 3 files changed, 1105 insertions(+), 2 deletions(-) diff --git a/src/madengine/execution/container_runner.py b/src/madengine/execution/container_runner.py index 2ffc8a31..2f98ec85 100644 --- a/src/madengine/execution/container_runner.py +++ b/src/madengine/execution/container_runner.py @@ -44,6 +44,13 @@ ) +SLURM_MULTI_ALIASES = [ + "slurm_multi", + "slurm-multi", +] +SELF_MANAGED_LAUNCHERS = SLURM_MULTI_ALIASES + + def _print_run_env_table( gpu_vendor: str, context, @@ -771,6 +778,167 @@ def apply_tools( else: print(f" Note: Command '{cmd}' already added by another tool, skipping duplicate.") + def _run_self_managed( + self, + model_info: typing.Dict, + build_info: typing.Dict, + log_file_path: str, + timeout: int, + run_results: typing.Dict, + pre_encapsulate_post_scripts: typing.Dict, + run_env: typing.Dict, + ) -> typing.Dict: + """ + Run script directly on the host (self-managed launcher, not inside madengine Docker). + + Used for slurm_multi launchers that manage their own Docker containers + via SLURM srun commands. The script is executed directly on the node. + + Args: + model_info: Model configuration from manifest + build_info: Build information from manifest + log_file_path: Path to log file + timeout: Execution timeout in seconds + run_results: Dictionary to store run results + pre_encapsulate_post_scripts: Pre/post script configuration + run_env: Environment variables for the script + + Returns: + Dictionary with run results + """ + import shutil + + self.rich_console.print(f"[dim]{'='*80}[/dim]") + + # Prepare script path + scripts_arg = model_info["scripts"] + + # Get the current working directory (might be temp workspace) + cwd = os.getcwd() + print(f"📂 Current directory: {cwd}") + + if scripts_arg.endswith(".sh") or scripts_arg.endswith(".slurm"): + script_path = scripts_arg + script_name = os.path.basename(scripts_arg) + elif scripts_arg.endswith(".py"): + script_path = scripts_arg + script_name = os.path.basename(scripts_arg) + else: + # Directory specified - look for run.sh + script_path = os.path.join(scripts_arg, "run.sh") + script_name = "run.sh" + + # If script path is relative, make it absolute from cwd + if not os.path.isabs(script_path): + script_path = os.path.join(cwd, script_path) + + # Check script exists + if not os.path.exists(script_path): + print(f"⚠️ Script not found at: {script_path}") + # Try alternative locations + alt_path = os.path.join(cwd, os.path.basename(scripts_arg)) + if os.path.exists(alt_path): + script_path = alt_path + print(f"✓ Found at alternative location: {script_path}") + else: + raise FileNotFoundError(f"Script not found: {script_path}") + + script_dir = os.path.dirname(script_path) or cwd + print(f"📜 Script: {script_path}") + print(f"📁 Working directory: {script_dir}") + + # Prepare model arguments + model_args = self.context.ctx.get("model_args", model_info.get("args", "")) + print(f"📝 Arguments: {model_args}") + + # Build command. The eventual `subprocess.run(..., shell=True)` below + # interprets shell metacharacters in `script_path` and `model_args`, + # so quote each piece explicitly. `model_args` is a CLI/manifest- + # supplied free-form string -- shlex.split + per-arg shlex.quote + # passes literal arguments to the script even when the input contains + # `$()`, backticks, `;`, etc. + _script_q = shlex.quote(script_path) + _args_q = ( + " ".join(shlex.quote(a) for a in shlex.split(model_args)) + if model_args + else "" + ) + if script_path.endswith(".py"): + cmd = f"python3 {_script_q} {_args_q}".rstrip() + else: + cmd = f"bash {_script_q} {_args_q}".rstrip() + + print(f"🔧 Command: {cmd}") + + # Prepare environment + env = os.environ.copy() + env.update(run_env) + + # Add model-specific env vars from model_info + if "env_vars" in model_info and model_info["env_vars"]: + for key, value in model_info["env_vars"].items(): + env[key] = str(value) + print(f" ENV: {key}={value}") + + # Add env vars from additional_context + if self.additional_context and "env_vars" in self.additional_context: + for key, value in self.additional_context["env_vars"].items(): + env[key] = str(value) + + # Run script with logging + test_start_time = time.time() + self.rich_console.print("\n[bold blue]Running script (self-managed launcher)...[/bold blue]") + + try: + with open(log_file_path, mode="w", buffering=1) as outlog: + with redirect_stdout( + PythonicTee(outlog, self.live_output) + ), redirect_stderr(PythonicTee(outlog, self.live_output)): + print(f"⏰ Setting timeout to {timeout} seconds.") + print(f"🚀 Executing: {cmd}") + print(f"📂 Working directory: {script_dir}") + print(f"{'='*80}") + + # NOTE: shell=True is required because cmd embeds shell features + # (pipes, redirects, env-var substitution) constructed earlier in this + # method. cmd is built from validated model card / manifest fields and + # any user-provided model_args are routed through shlex-quoted assembly + # in the caller — do NOT concatenate raw user input directly into cmd. + result = subprocess.run( # noqa: S602 (shell=True intentional, see comment above) + cmd, + shell=True, + cwd=script_dir, + env=env, + timeout=timeout if timeout > 0 else None, + ) + + run_results["test_duration"] = time.time() - test_start_time + print(f"\n{'='*80}") + print(f"⏱️ Test Duration: {run_results['test_duration']:.2f} seconds") + + if result.returncode == 0: + run_results["status"] = "SUCCESS" + self.rich_console.print("[bold green]✓ Script completed successfully[/bold green]") + else: + run_results["status"] = "FAILURE" + run_results["status_detail"] = f"Exit code {result.returncode}" + self.rich_console.print(f"[bold red]✗ Script failed with exit code {result.returncode}[/bold red]") + raise subprocess.CalledProcessError(result.returncode, cmd) + + except subprocess.TimeoutExpired: + run_results["status"] = "FAILURE" + run_results["status_detail"] = f"Timeout after {timeout}s" + run_results["test_duration"] = time.time() - test_start_time + self.rich_console.print(f"[bold red]✗ Script timed out after {timeout}s[/bold red]") + raise + except Exception as e: + run_results["status"] = "FAILURE" + run_results["status_detail"] = str(e) + run_results["test_duration"] = time.time() - test_start_time + raise + + return run_results + def run_pre_post_script( self, model_docker: Docker, model_dir: str, pre_post: typing.List ) -> None: @@ -1098,6 +1266,38 @@ def run_container( print(f"Docker options: {docker_options}") + # ========== CHECK FOR SELF-MANAGED LAUNCHERS ========== + # slurm_multi launchers run scripts directly on the host, + # not inside a madengine-managed Docker. The script manages its own containers via srun. + launcher = "" + if self.additional_context: + distributed_config = self.additional_context.get("distributed", {}) + launcher = distributed_config.get("launcher", "") + if not launcher and model_info.get("distributed"): + launcher = model_info["distributed"].get("launcher", "") + if not launcher: + launcher = os.environ.get("MAD_LAUNCHER_TYPE", "") + launcher_normalized = launcher.lower().replace("_", "-") if launcher else "" + if launcher_normalized and launcher_normalized in [ + l.lower().replace("_", "-") for l in SELF_MANAGED_LAUNCHERS + ]: + self.rich_console.print( + f"\n[bold cyan]🖥️ Self-managed launcher (launcher: {launcher})[/bold cyan]" + ) + self.rich_console.print( + "[dim]Script will manage its own Docker containers via SLURM[/dim]" + ) + return self._run_self_managed( + model_info=model_info, + build_info=build_info, + log_file_path=log_file_path, + timeout=timeout, + run_results=run_results, + pre_encapsulate_post_scripts=pre_encapsulate_post_scripts, + run_env=run_env, + ) + # ========== END SELF-MANAGED CHECK ========== + self.rich_console.print(f"\n[bold blue]🏃 Starting Docker container execution...[/bold blue]") print(f"🏷️ Image: {docker_image}") print(f"📦 Container: {container_name}") @@ -1214,8 +1414,8 @@ def run_container( # Prepare script execution scripts_arg = model_info["scripts"] - if scripts_arg.endswith(".sh"): - # Shell script specified directly + if scripts_arg.endswith(".sh") or scripts_arg.endswith(".slurm"): + # Shell script specified directly (.sh or .slurm for SLURM batch scripts) dir_path = os.path.dirname(scripts_arg) script_name = "bash " + os.path.basename(scripts_arg) elif scripts_arg.endswith(".py"): diff --git a/src/madengine/execution/docker_builder.py b/src/madengine/execution/docker_builder.py index 56f33d6d..d1d6720e 100644 --- a/src/madengine/execution/docker_builder.py +++ b/src/madengine/execution/docker_builder.py @@ -366,6 +366,17 @@ def export_build_manifest( "registry" ) + # Update built_models with registry image name for parallel pull in slurm_multi + # Map local image to registry image for env_vars + for image_name, build_info in self.built_images.items(): + registry_image = build_info.get("registry_image") + if registry_image and image_name in self.built_models: + model_data = self.built_models[image_name] + if "env_vars" not in model_data: + model_data["env_vars"] = {} + # Set DOCKER_IMAGE_NAME to registry image for parallel pull + model_data["env_vars"]["DOCKER_IMAGE_NAME"] = registry_image + manifest = { "built_images": self.built_images, "built_models": self.built_models, diff --git a/src/madengine/orchestration/build_orchestrator.py b/src/madengine/orchestration/build_orchestrator.py index d905f3b4..84f09c05 100644 --- a/src/madengine/orchestration/build_orchestrator.py +++ b/src/madengine/orchestration/build_orchestrator.py @@ -10,6 +10,7 @@ import json import os +import shutil from pathlib import Path from typing import Dict, List, Optional @@ -95,6 +96,7 @@ def __init__(self, args, additional_context: Optional[Dict] = None, detect_local apply_build_context_defaults(merged_context) self.additional_context = merged_context + self._original_user_slurm_keys = set(merged_context.get("slurm", {}).keys()) # Apply ConfigLoader to infer deploy type, validate, and apply defaults if self.additional_context: @@ -195,6 +197,8 @@ def execute( clean_cache: bool = False, manifest_output: str = "build_manifest.json", batch_build_metadata: Optional[Dict] = None, + use_image: Optional[str] = None, + build_on_compute: bool = False, ) -> str: """ Execute build workflow. @@ -204,6 +208,8 @@ def execute( clean_cache: Whether to use --no-cache for Docker builds manifest_output: Output file for build manifest batch_build_metadata: Optional batch build metadata + use_image: Pre-built Docker image to use (skip Docker build) + build_on_compute: Build on SLURM compute node instead of login node Returns: Path to generated build_manifest.json @@ -212,6 +218,93 @@ def execute( DiscoveryError: If model discovery fails BuildError: If Docker build fails """ + # --use-image and --build-on-compute are mutually exclusive + if use_image and build_on_compute: + raise ConfigurationError( + "--use-image and --build-on-compute cannot be used together", + context=create_error_context( + operation="build", + component="BuildOrchestrator", + ), + suggestions=[ + "Use --use-image to skip build and use an existing image", + "Use --build-on-compute to build on a compute node and push to registry", + ], + ) + + # Handle pre-built image mode + if use_image: + # If use_image is "auto", resolve from model card + if use_image == "auto": + use_image = self._resolve_image_from_model_card() + + return self._execute_with_prebuilt_image( + use_image=use_image, + manifest_output=manifest_output, + ) + + # Handle build-on-compute mode + if build_on_compute: + return self._execute_build_on_compute( + registry=registry, + clean_cache=clean_cache, + manifest_output=manifest_output, + batch_build_metadata=batch_build_metadata, + ) + + # For normal build: check if slurm_multi launcher requires registry + # Discover models first to check launcher + try: + _disc = DiscoverModels(args=self.args) + _discovered_models = _disc.run() + except Exception: + _discovered_models = [] + + if _discovered_models: + # When the discovered model card already declares an image via + # env_vars.DOCKER_IMAGE_NAME, treat it as an implicit --use-image so + # users do not have to repeat the image on the CLI for slurm_multi. + slurm_multi_models = [ + m for m in _discovered_models + if (m.get("distributed") or {}).get("launcher", "") in ["slurm_multi", "slurm-multi"] + ] + if slurm_multi_models and not registry: + card_images = { + (m.get("env_vars") or {}).get("DOCKER_IMAGE_NAME", "") + for m in slurm_multi_models + if (m.get("env_vars") or {}).get("DOCKER_IMAGE_NAME") + } + if len(card_images) == 1: + implicit_image = next(iter(card_images)) + self.rich_console.print( + f"[dim]slurm_multi: no --registry/--use-image given; " + f"using DOCKER_IMAGE_NAME from model card -> {implicit_image}[/dim]" + ) + return self._execute_with_prebuilt_image( + use_image=implicit_image, + manifest_output=manifest_output, + ) + # No card image (or divergent images across models): fall through to error. + for model in _discovered_models: + launcher = (model.get("distributed") or {}).get("launcher", "") + if launcher in ["slurm_multi", "slurm-multi"] and not registry: + model_name = model.get("name", "unknown") + raise ConfigurationError( + "slurm_multi launcher requires --registry or --use-image", + context=create_error_context( + operation="build", + component="BuildOrchestrator", + model_name=model_name, + additional_info={"launcher": launcher}, + ), + suggestions=[ + "Use --registry docker.io/myorg to push image (nodes will pull in parallel)", + "Use --use-image to use a pre-built image from registry", + "Use --build-on-compute --registry to build on compute and push", + "Or set DOCKER_IMAGE_NAME in the model card env_vars (auto-detected for slurm_multi)", + ], + ) + self.rich_console.print(f"\n[dim]{'=' * 60}[/dim]") self.rich_console.print("[bold blue]🔨 BUILD PHASE[/bold blue]") self.rich_console.print("[yellow](Build-only mode - no GPU detection)[/yellow]") @@ -356,6 +449,805 @@ def execute( ], ) from e + def _execute_with_prebuilt_image( + self, + use_image: str, + manifest_output: str = "build_manifest.json", + ) -> str: + """ + Generate manifest for a pre-built Docker image (skip Docker build). + + This is useful when using external images like: + - lmsysorg/sglang:v0.5.2rc1-rocm700-mi30x + - nvcr.io/nvidia/pytorch:24.01-py3 + + Args: + use_image: Pre-built Docker image name + manifest_output: Output file for build manifest + + Returns: + Path to generated build_manifest.json + """ + self.rich_console.print(f"\n[dim]{'=' * 60}[/dim]") + self.rich_console.print("[bold blue]🔨 BUILD PHASE (Pre-built Image Mode)[/bold blue]") + self.rich_console.print(f"[cyan]Using pre-built image: {use_image}[/cyan]") + self.rich_console.print(f"[dim]{'=' * 60}[/dim]\n") + + try: + # Step 1: Discover models + self.rich_console.print("[bold cyan]🔍 Discovering models...[/bold cyan]") + discover_models = DiscoverModels(args=self.args) + models = discover_models.run() + + if not models: + raise DiscoveryError( + "No models discovered", + context=create_error_context( + operation="discover_models", + component="BuildOrchestrator", + ), + suggestions=[ + "Check if models.json exists", + "Verify --tags parameter is correct", + ], + ) + + self.rich_console.print(f"[green]✓ Found {len(models)} models[/green]\n") + + # Step 2: Generate manifest with pre-built image + self.rich_console.print("[bold cyan]📄 Generating manifest for pre-built image...[/bold cyan]") + + manifest = { + "built_images": { + use_image: { + "image_name": use_image, + "docker_image": use_image, + "dockerfile": "", + "build_time": 0, + "prebuilt": True, + } + }, + "built_models": {}, + "context": self.context.ctx if hasattr(self.context, 'ctx') else {}, + "credentials_required": [], + "summary": { + "successful_builds": [], + "failed_builds": [], + "total_build_time": 0, + "successful_pushes": [], + "failed_pushes": [], + }, + } + + # Add each discovered model with the pre-built image. + # Key by model_name (not use_image) so multiple models sharing + # the same pre-built image are all preserved in the manifest. + for model in models: + model_name = model.get("name", "unknown") + model_distributed = model.get("distributed", {}) + + # Merge DOCKER_IMAGE_NAME into env_vars for parallel pull in run phase + model_env_vars = model.get("env_vars", {}).copy() + model_env_vars["DOCKER_IMAGE_NAME"] = use_image + + manifest["built_models"][model_name] = { + "name": model_name, + "image": use_image, + "docker_image": use_image, + "dockerfile": model.get("dockerfile", ""), + "scripts": model.get("scripts", ""), + "data": model.get("data", ""), + "n_gpus": model.get("n_gpus", "8"), + "owner": model.get("owner", ""), + "training_precision": model.get("training_precision", ""), + "multiple_results": model.get("multiple_results", ""), + "tags": model.get("tags", []), + "timeout": model.get("timeout", -1), + "args": model.get("args", ""), + "slurm": model.get("slurm", {}), + "distributed": model_distributed, + "env_vars": model_env_vars, + "prebuilt": True, + } + manifest["summary"]["successful_builds"].append(model_name) + + # Save manifest + with open(manifest_output, "w") as f: + json.dump(manifest, f, indent=2) + + # Save deployment config + self._save_deployment_config(manifest_output) + + # Merge model's distributed and slurm config into deployment_config + # This ensures launcher and slurm settings are in deployment_config even if not in additional-context + if models: + with open(manifest_output, "r") as f: + saved_manifest = json.load(f) + + if "deployment_config" not in saved_manifest: + saved_manifest["deployment_config"] = {} + + # Merge model's distributed config from the first model. + # If multiple models have differing distributed configs, warn — only the first wins here. + if len(models) > 1: + distinct_distributed = {tuple(sorted((m.get("distributed") or {}).items())) for m in models} + if len(distinct_distributed) > 1: + self.rich_console.print( + "[yellow]Warning: discovered models have differing distributed configs; " + f"using {models[0].get('name', '')}'s config.[/yellow]" + ) + model_distributed = models[0].get("distributed", {}) + if model_distributed: + if "distributed" not in saved_manifest["deployment_config"]: + saved_manifest["deployment_config"]["distributed"] = {} + + # Copy launcher and other critical fields from model config + for key in ["launcher", "nnodes", "nproc_per_node", "backend", "port", "sglang_disagg", "vllm_disagg"]: + if key in model_distributed and key not in saved_manifest["deployment_config"]["distributed"]: + saved_manifest["deployment_config"]["distributed"][key] = model_distributed[key] + + # Merge model's slurm config into deployment_config.slurm from the first model. + # This enables run phase to auto-detect SLURM deployment without --additional-context. + # Warn when multiple models have differing slurm configs (only the first wins here). + if len(models) > 1: + distinct_slurm = {tuple(sorted((m.get("slurm") or {}).items())) for m in models} + if len(distinct_slurm) > 1: + self.rich_console.print( + "[yellow]Warning: discovered models have differing slurm configs; " + f"using {models[0].get('name', '')}'s config.[/yellow]" + ) + model_slurm = models[0].get("slurm", {}) + if model_slurm: + if "slurm" not in saved_manifest["deployment_config"]: + saved_manifest["deployment_config"]["slurm"] = {} + + # Copy slurm settings from model config (model card fills in + # values not explicitly set by --additional-context). + # Use _original_user_slurm_keys (captured before ConfigLoader + # applies defaults) so model card values override defaults + # but user's explicit CLI values still win. + for key in ["partition", "nodes", "gpus_per_node", "time", "exclusive", "reservation", "output_dir", "nodelist"]: + if key in model_slurm and key not in self._original_user_slurm_keys: + saved_manifest["deployment_config"]["slurm"][key] = model_slurm[key] + + with open(manifest_output, "w") as f: + json.dump(saved_manifest, f, indent=2) + + self.rich_console.print(f"[green]✓ Generated manifest: {manifest_output}[/green]") + self.rich_console.print(f" Pre-built image: {use_image}") + self.rich_console.print(f" Models: {len(models)}") + self.rich_console.print(f"[dim]{'=' * 60}[/dim]\n") + + return manifest_output + + except (DiscoveryError, BuildError): + raise + except Exception as e: + raise BuildError( + f"Failed to generate manifest for pre-built image: {e}", + context=create_error_context( + operation="prebuilt_manifest", + component="BuildOrchestrator", + ), + ) from e + + def _resolve_image_from_model_card(self) -> str: + """ + Resolve Docker image name from model card's DOCKER_IMAGE_NAME env var. + + This method discovers models and extracts the DOCKER_IMAGE_NAME from + env_vars. If multiple models have different images, uses the first + and prints a warning. + + Returns: + Docker image name from model card + + Raises: + ConfigurationError: If no DOCKER_IMAGE_NAME found in any model + """ + self.rich_console.print("[bold cyan]🔍 Auto-detecting image from model card...[/bold cyan]") + + # Discover models to get their env_vars + discover_models = DiscoverModels(args=self.args) + models = discover_models.run() + + if not models: + raise ConfigurationError( + "No models discovered for image auto-detection", + context=create_error_context( + operation="resolve_image", + component="BuildOrchestrator", + ), + suggestions=[ + "Specify image name explicitly with --use-image ", + "Check if models.json exists", + "Verify --tags parameter is correct", + ], + ) + + # Collect DOCKER_IMAGE_NAME from all models + images_found = {} + for model in models: + model_name = model.get("name", "unknown") + env_vars = model.get("env_vars", {}) + docker_image = env_vars.get("DOCKER_IMAGE_NAME") + + if docker_image: + images_found[model_name] = docker_image + + if not images_found: + model_names = [m.get("name", "unknown") for m in models] + raise ConfigurationError( + "No DOCKER_IMAGE_NAME found in model card env_vars", + context=create_error_context( + operation="resolve_image", + component="BuildOrchestrator", + additional_info={"model_names": model_names}, + ), + suggestions=[ + "Add DOCKER_IMAGE_NAME to model's env_vars in models.json", + "Specify image name explicitly with --use-image ", + 'Example: "env_vars": {"DOCKER_IMAGE_NAME": "myimage:tag"}', + ], + ) + + # Use first model's image + first_model = list(images_found.keys())[0] + resolved_image = images_found[first_model] + + # Warn if multiple models have different images + unique_images = set(images_found.values()) + if len(unique_images) > 1: + self.rich_console.print( + f"[yellow]⚠️ Warning: Multiple models have different DOCKER_IMAGE_NAME values:[/yellow]" + ) + for model_name, image in images_found.items(): + self.rich_console.print(f" - {model_name}: {image}") + self.rich_console.print( + f"[yellow] Using image from '{first_model}': {resolved_image}[/yellow]\n" + ) + else: + self.rich_console.print(f"[green]✓ Auto-detected image: {resolved_image}[/green]\n") + + return resolved_image + + def _execute_build_on_compute( + self, + registry: Optional[str] = None, + clean_cache: bool = False, + manifest_output: str = "build_manifest.json", + batch_build_metadata: Optional[Dict] = None, + ) -> str: + """ + Execute Docker build on a SLURM compute node and push to registry. + + Build workflow: + 1. Build on 1 compute node only + 2. Push image to registry + 3. Store registry image name in manifest + 4. Run phase will pull image in parallel on all nodes + + Args: + registry: Registry to push images to (REQUIRED) + clean_cache: Whether to use --no-cache for Docker builds + manifest_output: Output file for build manifest + batch_build_metadata: Optional batch build metadata + + Returns: + Path to generated build_manifest.json + """ + import subprocess + import os + import glob + + self.rich_console.print(f"\n[dim]{'=' * 60}[/dim]") + self.rich_console.print("[bold blue]🔨 BUILD PHASE (Compute Node Mode)[/bold blue]") + self.rich_console.print("[cyan]Building on 1 compute node, pushing to registry...[/cyan]") + self.rich_console.print(f"[dim]{'=' * 60}[/dim]\n") + + # registry is required for the build-on-compute flow (it must be pushed somewhere + # so the run phase on other nodes can pull it). The signature accepts Optional[str] + # to keep the call-site signature flexible, but we must reject None up front. + if not registry: + raise ConfigurationError( + "Registry is required for --build-on-compute (image must be pushed for run-phase pull).", + context=create_error_context( + operation="build_on_compute", + component="BuildOrchestrator", + ), + suggestions=[ + "Pass --registry on the build CLI", + 'Or set "registry" in the model card / additional-context', + ], + ) + + # Normalize and validate --registry input shape. + # + # Downstream code derives the registry HOST from `registry.split("/")[0]` + # (used as the `docker login ` argument). Users routinely pass + # Dockerhub-shorthand like `rocm/pytorch-private` without the + # `docker.io/` prefix, and that would `docker login rocm` -> DNS NXDOMAIN. + # + # Auto-prepend `docker.io/` when the first path segment doesn't look + # like an FQDN/host:port. Then validate the resulting first segment is + # a plausible host (contains '.' or ':' or is 'localhost'), and reject + # otherwise with an actionable error. + _registry_invalid_msg = lambda r, fs: ConfigurationError( # noqa: E731 + f"Invalid --registry value: {r!r}. " + + ( + f"First segment '{fs}' is not a valid registry host." + if fs + else "Registry value is empty after whitespace/trailing-slash trim." + ), + context=create_error_context( + operation="build_on_compute", + component="BuildOrchestrator", + additional_info={"registry": r}, + ), + suggestions=[ + 'Dockerhub: --registry docker.io/(/)', + 'GHCR: --registry ghcr.io/(/)', + 'Quay: --registry quay.io/(/)', + 'NGC: --registry nvcr.io/(/)', + 'Self-hosted: --registry (:)(/)', + 'Local: --registry localhost:5000(/)', + ], + ) + normalized = registry.strip().rstrip("/") + if not normalized: + raise _registry_invalid_msg(registry, "") + first_seg = normalized.split("/", 1)[0] + # Reject blatantly invalid characters BEFORE the auto-prepend step, + # because auto-prepend would overwrite first_seg to "docker.io" and + # mask path-side garbage. `@` and whitespace are never valid in a + # Dockerhub-style namespace/repo; subtler cases (uppercase letters, + # other illegal chars deeper in the path) get caught later by + # `docker push` itself. + if " " in normalized or "@" in normalized: + raise _registry_invalid_msg(registry, first_seg) + looks_like_host = ( + "." in first_seg or ":" in first_seg or first_seg == "localhost" + ) + if not looks_like_host: + # Treat as Dockerhub shorthand: prepend docker.io/. + self.rich_console.print( + f" [dim]Registry: {registry!r} has no host segment; " + f"auto-prefixing 'docker.io/' (treat as Dockerhub).[/dim]" + ) + normalized = f"docker.io/{normalized}" + first_seg = "docker.io" + if not first_seg: + raise _registry_invalid_msg(registry, first_seg) + if normalized != registry: + registry = normalized + + # Discover models first to get SLURM config from model card + self.rich_console.print("[bold cyan]🔍 Discovering models...[/bold cyan]") + discover_models = DiscoverModels(args=self.args) + models = discover_models.run() + + if not models: + raise DiscoveryError( + "No models discovered for build-on-compute", + context=create_error_context( + operation="build_on_compute", + component="BuildOrchestrator", + ), + suggestions=[ + "Check if models.json exists", + "Verify --tags parameter is correct", + ], + ) + + model = models[0] + model_name = model.get("name", "unknown") + self.rich_console.print(f"[green]✓ Found model: {model_name}[/green]\n") + + # Merge SLURM config: model card (base) + additional-context (override) + model_slurm_config = model.get("slurm", {}) + context_slurm_config = self.additional_context.get("slurm", {}) + + # Start with model card config, then override with command-line context + slurm_config = {**model_slurm_config, **context_slurm_config} + + self.rich_console.print("[bold cyan]📋 SLURM Configuration (merged):[/bold cyan]") + if model_slurm_config: + self.rich_console.print(f" [dim]From model card:[/dim] {list(model_slurm_config.keys())}") + if context_slurm_config: + self.rich_console.print(f" [dim]From --additional-context (overrides):[/dim] {list(context_slurm_config.keys())}") + + # Validate required fields + partition = slurm_config.get("partition") + if not partition: + raise ConfigurationError( + "Missing required SLURM field: partition", + context=create_error_context( + operation="build_on_compute", + component="BuildOrchestrator", + ), + suggestions=[ + 'Add "partition" to model card\'s slurm section', + 'Or specify via --additional-context \'{"slurm": {"partition": "gpu"}}\'', + ], + ) + + reservation = slurm_config.get("reservation", "") + time_limit = slurm_config.get("time", "02:00:00") + + self.rich_console.print(f" Partition: {partition}") + self.rich_console.print(f" Time limit: {time_limit}") + if reservation: + self.rich_console.print(f" Reservation: {reservation}") + self.rich_console.print("") + + # Validate registry credentials + self.rich_console.print("[bold cyan]🔐 Registry Configuration:[/bold cyan]") + self.rich_console.print(f" Registry: {registry}") + + # Check for credentials - either from environment or credential.json + dockerhub_user = os.environ.get("MAD_DOCKERHUB_USER", "") + dockerhub_password = os.environ.get("MAD_DOCKERHUB_PASSWORD", "") + + # Try to load from credential.json if env vars not set + credential_file = Path("credential.json") + if not dockerhub_user and credential_file.exists(): + try: + with open(credential_file) as f: + creds = json.load(f) + dockerhub_creds = creds.get("dockerhub", {}) + dockerhub_user = dockerhub_creds.get("username", "") + dockerhub_password = dockerhub_creds.get("password", "") + if dockerhub_user: + self.rich_console.print(f" Credentials: Found in credential.json") + except (json.JSONDecodeError, IOError) as e: + self.rich_console.print(f" [yellow]Warning: Could not read credential.json: {e}[/yellow]") + elif dockerhub_user: + self.rich_console.print(f" Credentials: Found in environment (MAD_DOCKERHUB_USER)") + + # Determine if registry requires authentication + requires_auth = True + public_registries = ["docker.io", "ghcr.io", "gcr.io", "quay.io", "nvcr.io"] + registry_lower = registry.lower() if registry else "" + + # For docker.io pushes, authentication is always required + # Per-registry guidance for the missing-credentials error message. + # Today only Docker Hub credentials (MAD_DOCKERHUB_USER/PASSWORD or credential.json) + # are wired into this code path, but the error suggestion should at least name the + # right token type for ghcr.io / quay.io / nvcr.io / gcr.io users. + _registry_hints = { + "docker.io": [ + "Set environment variables: MAD_DOCKERHUB_USER and MAD_DOCKERHUB_PASSWORD", + 'Or create credential.json: {"dockerhub": {"username": "...", "password": "..."}}', + "For Docker Hub, use a Personal Access Token (PAT) as password", + "Example: export MAD_DOCKERHUB_USER=myuser", + "Example: export MAD_DOCKERHUB_PASSWORD=dckr_pat_xxxxx", + ], + "ghcr.io": [ + "GitHub Container Registry: use a GitHub PAT with read:packages (and write:packages to push)", + "Set MAD_DOCKERHUB_USER=, MAD_DOCKERHUB_PASSWORD=", + ], + "gcr.io": [ + "Google Container Registry: use a service-account JSON key as password", + "Set MAD_DOCKERHUB_USER=_json_key, MAD_DOCKERHUB_PASSWORD=\"$(cat key.json)\"", + ], + "quay.io": [ + "Quay.io: use a robot account or encrypted password", + "Set MAD_DOCKERHUB_USER=, MAD_DOCKERHUB_PASSWORD=", + ], + "nvcr.io": [ + "NVIDIA NGC: use $oauthtoken as username and an NGC API key as password", + "Set MAD_DOCKERHUB_USER=\\$oauthtoken, MAD_DOCKERHUB_PASSWORD=", + ], + } + _matched_hints = next( + (hints for reg_key, hints in _registry_hints.items() if reg_key in registry_lower), + _registry_hints["docker.io"], + ) + if any(pub_reg in registry_lower for pub_reg in public_registries): + if not dockerhub_user or not dockerhub_password: + raise ConfigurationError( + f"Registry credentials required for pushing to {registry}", + context=create_error_context( + operation="build_on_compute", + component="BuildOrchestrator", + additional_info={"registry": registry}, + ), + suggestions=_matched_hints, + ) + self.rich_console.print(f" Auth: Will login to registry before push") + else: + # Private/internal registry - may not need auth + self.rich_console.print(f" Auth: Private registry (auth may not be required)") + requires_auth = dockerhub_user and dockerhub_password + + self.rich_console.print("") + + # Check if we're inside an existing allocation + inside_allocation = os.environ.get("SLURM_JOB_ID") is not None + existing_job_id = os.environ.get("SLURM_JOB_ID", "") + + # Find Dockerfile + dockerfile = model.get("dockerfile", "") + dockerfile_path = "" + dockerfile_patterns = [ + f"{dockerfile}.ubuntu.amd.Dockerfile", + f"{dockerfile}.Dockerfile", + f"{dockerfile}", + ] + for pattern in dockerfile_patterns: + matches = glob.glob(pattern) + if matches: + dockerfile_path = matches[0] + break + + if not dockerfile_path: + raise ConfigurationError( + f"Dockerfile not found for model {model_name}", + context=create_error_context( + operation="build_on_compute", + component="BuildOrchestrator", + additional_info={"dockerfile": dockerfile}, + ), + suggestions=[ + f"Check if {dockerfile}.ubuntu.amd.Dockerfile exists", + "Verify the dockerfile path in models.json", + ], + ) + + # Generate image name for registry. + # Docker repository names must be lowercase, so .lower() the final + # local image name. This also covers the edge case where the + # dockerfile filename is just `Dockerfile` (no `.ubuntu.amd.Dockerfile` + # suffix to strip), which would otherwise leave an uppercase `D` in + # `ci-_Dockerfile` and make `docker build -t` reject the tag. + dockerfile_basename = Path(dockerfile_path).name.replace(".Dockerfile", "").replace(".ubuntu.amd", "") + local_image_name = f"ci-{model_name}_{dockerfile_basename}".lower() + + # Determine registry image name based on registry format + # docker.io/namespace/repo -> use model name as tag: docker.io/namespace/repo:model_name + # docker.io/namespace -> use model name as repo: docker.io/namespace/model_name:latest + registry_parts = registry.replace("docker.io/", "").split("/") + if len(registry_parts) >= 2: + # Registry already includes repo name (e.g., rocm/pytorch-private) + # Use model name as tag + registry_image_name = f"{registry}:{model_name}" + self.rich_console.print(f" [dim]Registry format: namespace/repo -> using model name as tag[/dim]") + else: + # Registry is just namespace (e.g., myuser) + # Use model name as repo + registry_image_name = f"{registry}/{model_name}:latest" + self.rich_console.print(f" [dim]Registry format: namespace -> using model name as repo[/dim]") + + self.rich_console.print("[bold cyan]🐳 Docker Configuration:[/bold cyan]") + self.rich_console.print(f" Dockerfile: {dockerfile_path}") + self.rich_console.print(f" Local image: {local_image_name}") + self.rich_console.print(f" Registry image: {registry_image_name}") + self.rich_console.print("") + + # Determine registry host for docker login + registry_host = registry.split("/")[0] if "/" in registry else registry + + # Build script content - builds on 1 node, pushes to registry + build_script_content = f"""#!/bin/bash +#SBATCH --job-name=madengine-build +#SBATCH --partition={partition} +#SBATCH --nodes=1 +#SBATCH --ntasks=1 +#SBATCH --time={time_limit} +{f'#SBATCH --reservation={reservation}' if reservation else ''} +#SBATCH --output=madengine_build_%j.out +#SBATCH --error=madengine_build_%j.err + +echo "============================================================" +echo "=== MADENGINE BUILD ON COMPUTE NODE ===" +echo "============================================================" +echo "" +echo "Job ID: $SLURM_JOB_ID" +echo "Build Node: $(hostname)" +echo "Working directory: $(pwd)" +echo "Registry: {registry}" +echo "" + +# Change to submission directory +cd {Path.cwd().absolute()} + +# Step 0: Docker login for registry push +echo "=== Step 0: Docker Registry Authentication ===" +DOCKER_USER="${{MAD_DOCKERHUB_USER:-}}" +DOCKER_PASS="${{MAD_DOCKERHUB_PASSWORD:-}}" + +# Try credential.json if env vars not set +if [ -z "$DOCKER_USER" ] && [ -f "credential.json" ]; then + echo "Reading credentials from credential.json..." + DOCKER_USER=$(python3 -c "import json; print(json.load(open('credential.json')).get('dockerhub', {{}}).get('username', ''))" 2>/dev/null || echo "") + DOCKER_PASS=$(python3 -c "import json; print(json.load(open('credential.json')).get('dockerhub', {{}}).get('password', ''))" 2>/dev/null || echo "") +fi + +if [ -n "$DOCKER_USER" ] && [ -n "$DOCKER_PASS" ]; then + echo "Logging in to registry as $DOCKER_USER..." + echo "$DOCKER_PASS" | docker login {registry_host} -u "$DOCKER_USER" --password-stdin + LOGIN_RC=$? + if [ $LOGIN_RC -ne 0 ]; then + echo "" + echo "❌ Docker login FAILED with exit code $LOGIN_RC" + echo "" + echo "Troubleshooting:" + echo " - Verify MAD_DOCKERHUB_USER and MAD_DOCKERHUB_PASSWORD are correct" + echo " - For Docker Hub, use a Personal Access Token (PAT) not your password" + echo " - Check if the registry URL is correct: {registry_host}" + exit $LOGIN_RC + fi + echo "✅ Docker login SUCCESS" +else + echo "No credentials found - assuming public registry or pre-authenticated" +fi +echo "" + +# Step 1: Build Docker image +echo "" +echo "=== Step 1: Building Docker image ===" +echo "Dockerfile: {dockerfile_path}" +echo "Local image name: {local_image_name}" +echo "" + +docker build --network=host -t {local_image_name} {"--no-cache" if clean_cache else ""} --pull -f {dockerfile_path} ./docker +BUILD_RC=$? + +if [ $BUILD_RC -ne 0 ]; then + echo "" + echo "❌ Docker build FAILED on $(hostname) with exit code $BUILD_RC" + exit $BUILD_RC +fi + +echo "" +echo "✅ Docker build SUCCESS on $(hostname)" +echo "" + +# Step 2: Tag and push to registry +echo "=== Step 2: Pushing to registry ===" +echo "Tagging: {local_image_name} -> {registry_image_name}" +docker tag {local_image_name} {registry_image_name} + +echo "Pushing: {registry_image_name}" +docker push {registry_image_name} +PUSH_RC=$? + +if [ $PUSH_RC -ne 0 ]; then + echo "" + echo "❌ Docker push FAILED with exit code $PUSH_RC" + echo "" + echo "Troubleshooting:" + echo " - Check if you have push access to {registry}" + echo " - Verify credentials are correct (MAD_DOCKERHUB_USER, MAD_DOCKERHUB_PASSWORD)" + echo " - For Docker Hub, ensure the repository exists or you have create permissions" + exit $PUSH_RC +fi + +echo "" +echo "============================================================" +echo "✅ BUILD AND PUSH COMPLETE" +echo "============================================================" +echo "" +echo "Build Node: $(hostname)" +echo "Registry Image: {registry_image_name}" +echo "" +echo "Run phase will pull this image in parallel on all nodes." +echo "============================================================" + +exit 0 +""" + + build_script_path = Path("madengine_build_job.sh") + build_script_path.write_text(build_script_content) + build_script_path.chmod(0o755) + + if inside_allocation: + self.rich_console.print(f"[cyan]Running build via srun (inside allocation {existing_job_id})...[/cyan]") + cmd = ["srun", "-N1", "--ntasks=1", "bash", str(build_script_path)] + else: + self.rich_console.print("[cyan]Submitting build job via sbatch...[/cyan]") + cmd = ["sbatch", "--wait", str(build_script_path)] + + self.rich_console.print(f" Build script: {build_script_path}") + self.rich_console.print(f" Command: {' '.join(cmd)}") + self.rich_console.print("") + + try: + result = subprocess.run( + cmd, + capture_output=False, + text=True, + ) + + if result.returncode != 0: + raise BuildError( + f"Build on compute node failed with exit code {result.returncode}", + context=create_error_context( + operation="build_on_compute", + component="BuildOrchestrator", + ), + suggestions=[ + "Check the build log files (madengine_build_*.out/err)", + "Verify SLURM partition and reservation settings", + "Ensure Docker is available on compute nodes", + "Verify registry credentials are configured", + ], + ) + + # Generate manifest with registry image name + self.rich_console.print(f"\n[bold cyan]📄 Generating manifest...[/bold cyan]") + + manifest = { + "built_images": { + registry_image_name: { + "image_name": registry_image_name, + "docker_image": registry_image_name, + "local_image": local_image_name, + "dockerfile": dockerfile_path, + "build_time": 0, + "built_on_compute": True, + "registry": registry, + } + }, + "built_models": { + registry_image_name: { + "name": model_name, + "image": registry_image_name, + "docker_image": registry_image_name, + "dockerfile": dockerfile_path, + "scripts": model.get("scripts", ""), + "data": model.get("data", ""), + "n_gpus": model.get("n_gpus", "8"), + "tags": model.get("tags", []), + "slurm": slurm_config, + "distributed": model.get("distributed", {}), + "env_vars": {**model.get("env_vars", {}), "DOCKER_IMAGE_NAME": registry_image_name}, + "built_on_compute": True, + } + }, + "context": self.context.ctx if hasattr(self.context, 'ctx') else {}, + "deployment_config": { + "slurm": slurm_config, + "distributed": model.get("distributed", {}), + }, + "credentials_required": [], + "summary": { + "successful_builds": [model_name], + "failed_builds": [], + "total_build_time": 0, + "successful_pushes": [registry_image_name], + "failed_pushes": [], + }, + } + + with open(manifest_output, "w") as f: + json.dump(manifest, f, indent=2) + + self.rich_console.print(f"[green]✓ Build completed on compute node[/green]") + self.rich_console.print(f"[green]✓ Image pushed: {registry_image_name}[/green]") + self.rich_console.print(f"[green]✓ Manifest: {manifest_output}[/green]") + self.rich_console.print(f"[dim]{'=' * 60}[/dim]\n") + + return manifest_output + + except subprocess.TimeoutExpired: + raise BuildError( + "Build on compute node timed out", + context=create_error_context( + operation="build_on_compute", + component="BuildOrchestrator", + ), + ) + except (DiscoveryError, ConfigurationError, BuildError): + raise + except Exception as e: + raise BuildError( + f"Failed to build on compute node: {e}", + context=create_error_context( + operation="build_on_compute", + component="BuildOrchestrator", + ), + ) from e def _save_build_summary(self, manifest_file: str, build_summary: Dict): """Save build summary to manifest for display purposes.""" try: From 8a5e1742b2e85e4f8c8ea44197ac1c69c87e3747 Mon Sep 17 00:00:00 2001 From: raviguptaamd Date: Sat, 9 May 2026 08:11:03 +0000 Subject: [PATCH 3/8] feat(cli): expose --use-image and --build-on-compute on madengine build Surface the orchestrator paths from the previous commit through the `madengine build` CLI: - cli/commands/build.py: add `--use-image [IMAGE | auto]` (skip the local Docker build and use the named image; `auto` resolves from the model card's DOCKER_IMAGE_NAME); add `--build-on-compute` (build on a SLURM compute node and push to the configured registry; manifest records `built_on_compute: true`); 3 mutex validation blocks (--use-image vs --registry, --use-image vs --build-on-compute, --build-on-compute requires --registry); pass both new kwargs through to validate_additional_context, create_args_namespace, and BuildOrchestrator.execute(). - cli/validators.py: add `use_image: Optional[str] = None` parameter (signature + docstring only -- body unchanged) so build.py's call site does not TypeError. PR #86's other validators.py rewrites (drops of ROCM_PATH / MAD_ROCM_PATH validation, dockerfile_matched fallback removal) are NOT inherited; develop validation is preserved. Required for the slurm_multi launcher's documented build options in MAD-private #186. Co-authored-by: Cursor --- src/madengine/cli/commands/build.py | 47 ++++++++++++++++++++++++++++- src/madengine/cli/validators.py | 2 ++ 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/src/madengine/cli/commands/build.py b/src/madengine/cli/commands/build.py index 5b10a65c..a866b6d5 100644 --- a/src/madengine/cli/commands/build.py +++ b/src/madengine/cli/commands/build.py @@ -55,6 +55,22 @@ def build( "--batch-manifest", help="Input batch.json file for batch build mode" ), ] = None, + use_image: Annotated[ + Optional[str], + typer.Option( + "--use-image", + is_flag=False, + flag_value="auto", + help="Skip Docker build and use pre-built image. Optionally specify image name, or omit to auto-detect from model card's DOCKER_IMAGE_NAME" + ), + ] = None, + build_on_compute: Annotated[ + bool, + typer.Option( + "--build-on-compute", + help="Build Docker images on SLURM compute node instead of login node" + ), + ] = False, additional_context: Annotated[ str, typer.Option( @@ -116,6 +132,31 @@ def build( ) raise typer.Exit(ExitCode.INVALID_ARGS) + if use_image and registry: + console.print( + "❌ [bold red]Error: Cannot specify both --use-image and --registry options[/bold red]\n" + "[yellow]Use --use-image for pre-built external images.[/yellow]\n" + "[yellow]Use --registry to push locally built images.[/yellow]" + ) + raise typer.Exit(ExitCode.INVALID_ARGS) + + if use_image and build_on_compute: + console.print( + "❌ [bold red]Error: Cannot specify both --use-image and --build-on-compute options[/bold red]\n" + "[yellow]--use-image skips Docker build entirely.[/yellow]\n" + "[yellow]--build-on-compute builds on SLURM compute nodes.[/yellow]" + ) + raise typer.Exit(ExitCode.INVALID_ARGS) + + if build_on_compute and not registry: + console.print( + "❌ [bold red]Error: --build-on-compute requires --registry option[/bold red]\n" + "[yellow]Build on compute node pushes image to registry.[/yellow]\n" + "[yellow]Run phase will pull image in parallel on all nodes.[/yellow]\n" + "[dim]Example: --build-on-compute --registry docker.io/myorg[/dim]" + ) + raise typer.Exit(ExitCode.INVALID_ARGS) + # Process batch manifest if provided batch_data = None effective_tags = processed_tags @@ -175,7 +216,7 @@ def build( try: # Validate additional context and merge file + CLI; defaults wired into orchestrator validated_context = validate_additional_context( - additional_context, additional_context_file + additional_context, additional_context_file, use_image ) # Create arguments object @@ -191,6 +232,8 @@ def build( verbose=verbose, _separate_phases=True, batch_build_metadata=batch_build_metadata if batch_build_metadata else None, + use_image=use_image, + build_on_compute=build_on_compute, ) # Initialize orchestrator in build-only mode @@ -211,6 +254,8 @@ def build( clean_cache=clean_docker_cache, manifest_output=manifest_output, batch_build_metadata=batch_build_metadata, + use_image=use_image, + build_on_compute=build_on_compute, ) # Load build summary for display diff --git a/src/madengine/cli/validators.py b/src/madengine/cli/validators.py index 1f7ee001..867b4eed 100644 --- a/src/madengine/cli/validators.py +++ b/src/madengine/cli/validators.py @@ -298,6 +298,7 @@ def additional_context_needs_cli_validation( def validate_additional_context( additional_context: str, additional_context_file: Optional[str] = None, + use_image: Optional[str] = None, ) -> Dict[str, Any]: """ Validate and parse additional context. @@ -305,6 +306,7 @@ def validate_additional_context( Args: additional_context: JSON string containing additional context additional_context_file: Optional file containing additional context + use_image: Optional pre-built image to use (skips required field validation) Returns: Dict containing parsed additional context From f7af062f8263040a17ab46682e6beeba66a6e690 Mon Sep 17 00:00:00 2001 From: raviguptaamd Date: Sat, 9 May 2026 08:11:18 +0000 Subject: [PATCH 4/8] test(slurm_multi): contract tests + minimal example config - tests/unit/test_slurm_multi.py: nine tests across three classes - TestSlurmMultiRegistration: `slurm_multi` is in VALID_LAUNCHERS; SLURM_MULTI_ALIASES contains both canonical and hyphen forms. - TestNormalizeSlurmMultiAliases: canonical and hyphen alias both normalize to `slurm_multi`; unknown values still fall through to the existing `slurm -> docker` default. - TestSlurmMultiPrepareScript: end-to-end fixture that mirrors MAD-private PR #186's `pyt_sglang_disagg_qwen3-32b_short` entry verbatim. Asserts that prepare() takes the slurm_multi early- dispatch path, sets `_is_slurm_multi = True` for the deploy() gate, emits an SBATCH header that reflects the model card slurm block, and emits every model_info.env_vars key as `export KEY="value"` in the wrapper script. This is the explicit safety net for the Q2 decision to drop PR #86's universal docker_env_vars merge -- if slurm_multi ever stops propagating those env_vars to the workload, this test fails loudly. Existing tests in tests/unit/test_deployment.py and tests/unit/test_container_runner.py are NOT modified. - examples/slurm-configs/minimal/slurm-multi-minimal.json: small reference config alongside develop's existing *-minimal.json examples (`-f` add because *.json is gitignored, matching how existing files in this dir were originally added). Co-authored-by: Cursor --- .../minimal/slurm-multi-minimal.json | 23 ++ tests/unit/test_slurm_multi.py | 203 ++++++++++++++++++ 2 files changed, 226 insertions(+) create mode 100644 examples/slurm-configs/minimal/slurm-multi-minimal.json create mode 100644 tests/unit/test_slurm_multi.py diff --git a/examples/slurm-configs/minimal/slurm-multi-minimal.json b/examples/slurm-configs/minimal/slurm-multi-minimal.json new file mode 100644 index 00000000..03fb5fb9 --- /dev/null +++ b/examples/slurm-configs/minimal/slurm-multi-minimal.json @@ -0,0 +1,23 @@ +{ + "_comment": "Minimal slurm_multi launcher configuration - 3 nodes minimum", + "_description": "Self-managed multi-node SLURM launcher (script runs on baremetal, manages its own Docker via srun)", + "_architecture": "Wrapper SBATCH exports env_vars and runs the model's .slurm script directly on the head node; the script orchestrates per-node containers via srun", + + "gpu_vendor": "AMD", + "guest_os": "UBUNTU", + "deploy": "slurm", + + "slurm": { + "partition": "gpu", + "nodes": 3, + "gpus_per_node": 8, + "time": "04:00:00", + "exclusive": true + }, + + "distributed": { + "launcher": "slurm_multi", + "nnodes": 3, + "nproc_per_node": 8 + } +} diff --git a/tests/unit/test_slurm_multi.py b/tests/unit/test_slurm_multi.py new file mode 100644 index 00000000..89d5f1a6 --- /dev/null +++ b/tests/unit/test_slurm_multi.py @@ -0,0 +1,203 @@ +#!/usr/bin/env python3 +""" +Unit tests for the slurm_multi launcher. + +Locks in three contract points so future refactors fail loudly if they regress: +1. `slurm_multi` is registered in `VALID_LAUNCHERS`. +2. `slurm-multi` (hyphen alias) normalizes to `slurm_multi`. +3. The wrapper SBATCH script generated by `_prepare_slurm_multi_script` + exports every `model_info.env_vars` declared by the model card + (mirrors the `pyt_sglang_disagg_qwen3-32b_short` entry from MAD-private PR #186). + +Copyright (c) Advanced Micro Devices, Inc. All rights reserved. +""" + +import json +from pathlib import Path + +import pytest + +from madengine.deployment.common import VALID_LAUNCHERS, normalize_launcher +from madengine.deployment.base import DeploymentConfig +from madengine.deployment.slurm import SLURM_MULTI_ALIASES, SlurmDeployment + + +# --------------------------------------------------------------------------- +# 1. Registry membership + +class TestSlurmMultiRegistration: + """slurm_multi is registered in the launcher allowlist.""" + + def test_slurm_multi_in_valid_launchers(self): + assert "slurm_multi" in VALID_LAUNCHERS + + def test_aliases_constant(self): + assert SLURM_MULTI_ALIASES == ["slurm_multi", "slurm-multi"] + + +# --------------------------------------------------------------------------- +# 2. Hyphen alias normalization + +class TestNormalizeSlurmMultiAliases: + """slurm-multi (hyphen) normalizes to slurm_multi.""" + + def test_canonical(self): + assert normalize_launcher("slurm_multi", "slurm") == "slurm_multi" + + def test_hyphen_alias(self): + assert normalize_launcher("slurm-multi", "slurm") == "slurm_multi" + + def test_unknown_falls_through_to_default(self): + # Sanity: unrelated value still returns docker for slurm + assert normalize_launcher("totally-bogus", "slurm") == "docker" + + +# --------------------------------------------------------------------------- +# 3. MAD-private #186 env_vars contract + +# Verbatim from MAD-private PR #186, model name pyt_sglang_disagg_qwen3-32b_short. +PR186_MODEL_ENTRY = { + "name": "pyt_sglang_disagg_qwen3-32b_short", + "url": "", + "dockerfile": "docker/sglang_disagg_inference", + "scripts": "scripts/sglang_disagg/run_xPyD_models.slurm", + "data": "huggingface", + "n_gpus": "8", + "owner": "mad.support@amd.com", + "training_precision": "", + "multiple_results": "perf_Qwen3-32B.csv", + "tags": [ + "pyt", "sglang", "sglang_disagg", "inference", + "qwen3", "disaggregated", "short", + ], + "timeout": -1, + "args": "--model_name Qwen3-32B --model_path /shared_inference/models_blog/Qwen3-32B --tp_size 8", + "slurm": { + "partition": "amd-rccl", + "nodes": 3, + "gpus_per_node": 8, + "time": "01:00:00", + "output_dir": "./slurm_output", + "exclusive": True, + "reservation": "", + }, + "distributed": { + "launcher": "slurm_multi", + "nnodes": 3, + "nproc_per_node": 8, + "backend": "nccl", + "port": 29500, + "sglang_disagg": { + "prefill_nodes": 1, + "decode_nodes": 1, + }, + }, + "env_vars": { + "DOCKER_IMAGE_NAME": "rocm/pytorch-private:sglang_disagg_mori_20260502", + "MODEL_NAME": "Qwen3-32B", + "MODEL_PATH": "/shared_inference/models_blog/Qwen3-32B", + "SGLANG_ENABLE_FLASHINFER": "1", + "SGLANG_ENABLE_RADIX_CACHE": "1", + "SGLANG_RADIX_CACHE_SIZE": "0.9", + "xP": "1", + "yD": "1", + "BENCHMARK_CONCURRENCY": "8", + "BENCHMARK_COMBINATIONS": "32/32", + }, +} + + +class TestSlurmMultiPrepareScript: + """`_prepare_slurm_multi_script` emits every model_info.env_vars as `export`.""" + + @pytest.fixture + def slurm_deployment(self, tmp_path: Path) -> SlurmDeployment: + """Build a SlurmDeployment whose manifest contains the MAD-private #186 entry.""" + # _prepare_slurm_multi_script resolves model_info["scripts"] against the + # manifest's directory and bails (returns False) if the script doesn't exist. + # Create a placeholder so the existence check passes; contents are irrelevant + # because prepare() never executes it. + script_rel = PR186_MODEL_ENTRY["scripts"] + script_abs = tmp_path / script_rel + script_abs.parent.mkdir(parents=True, exist_ok=True) + script_abs.write_text("#!/bin/bash\n# placeholder slurm_multi model script for unit test\n") + + # Use the model image-name as the manifest key (matches manifest convention). + image_key = "rocm/pytorch-private:sglang_disagg_mori_20260502" + + manifest = { + "built_images": { + image_key: { + "image_name": image_key, + "docker_image": image_key, + "registry_image": image_key, + }, + }, + "built_models": { + image_key: PR186_MODEL_ENTRY, + }, + "context": { + "docker_env_vars": {}, + "docker_mounts": {}, + "docker_build_arg": {}, + "gpu_vendor": "AMD", + "guest_os": "UBUNTU", + "docker_gpus": "all", + }, + } + manifest_path = tmp_path / "build_manifest.json" + manifest_path.write_text(json.dumps(manifest)) + + # Additional context drives the slurm config: point output_dir at tmp_path + # so we can read the generated script back. + additional_context = { + "deploy": "slurm", + "gpu_vendor": "AMD", + "guest_os": "UBUNTU", + "slurm": dict(PR186_MODEL_ENTRY["slurm"], output_dir=str(tmp_path / "slurm_results")), + "distributed": PR186_MODEL_ENTRY["distributed"], + } + + cfg = DeploymentConfig( + target="slurm", + manifest_file=str(manifest_path), + additional_context=additional_context, + ) + return SlurmDeployment(cfg) + + def test_prepare_dispatches_to_slurm_multi(self, slurm_deployment, tmp_path): + """prepare() must take the slurm_multi early-dispatch path and return True.""" + ok = slurm_deployment.prepare() + assert ok is True + # Wrapper script should now exist + assert slurm_deployment.script_path is not None + assert Path(slurm_deployment.script_path).exists() + + def test_wrapper_exports_all_model_env_vars(self, slurm_deployment): + """Every key in model_info.env_vars must appear as `export KEY="value"` in the wrapper.""" + slurm_deployment.prepare() + script_text = Path(slurm_deployment.script_path).read_text() + + for key, value in PR186_MODEL_ENTRY["env_vars"].items(): + # xP/yD are overridden by distributed.sglang_disagg.{prefill,decode}_nodes + # in _prepare_slurm_multi_script; values match in this fixture so the + # assertion still holds, but the source of truth is the model card. + expected = f'export {key}="{value}"' + assert expected in script_text, f"missing export for {key!r}: expected {expected!r}" + + def test_wrapper_is_slurm_multi_flag_set(self, slurm_deployment): + """`_is_slurm_multi` flag must be set so deploy() can choose the bash branch.""" + slurm_deployment.prepare() + assert getattr(slurm_deployment, "_is_slurm_multi", False) is True + + def test_wrapper_sbatch_header_uses_slurm_config(self, slurm_deployment): + """SBATCH header reflects the model card's slurm block (partition, nodes, gpus, time, exclusive).""" + slurm_deployment.prepare() + script_text = Path(slurm_deployment.script_path).read_text() + assert "#SBATCH --partition=amd-rccl" in script_text + assert "#SBATCH --nodes=3" in script_text + assert "#SBATCH --gpus-per-node=8" in script_text + assert "#SBATCH --time=01:00:00" in script_text + assert "#SBATCH --exclusive" in script_text + # reservation is "" in the fixture, so no --reservation line + assert "#SBATCH --reservation=" not in script_text From e281e7eda2eebdccd6a4a5ce06bf96f6de6da96b Mon Sep 17 00:00:00 2001 From: raviguptaamd Date: Sat, 9 May 2026 22:56:24 +0000 Subject: [PATCH 5/8] fix(deployment): add skip_monitoring to DeploymentResult for slurm_multi bash branch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The earlier feat(deployment) commit ported `_run_inside_existing_allocation` verbatim from PR #86 (used by the slurm_multi bash-in-salloc branch). That method constructs `DeploymentResult(..., skip_monitoring=True)` to signal that the script ran synchronously and the monitor phase should be skipped. Develop's `DeploymentResult` dataclass did not have that field, so the construction raised `TypeError` at runtime, the failure was swallowed by the orchestrator, and `madengine run` exited 0 even when the wrapper script failed. Two minimal additive edits to deployment/base.py: - DeploymentResult: add `skip_monitoring: bool = False` field. - BaseDeployment.execute(): change the monitor guard from `if self.config.monitor:` to `if self.config.monitor and not result.skip_monitoring:` so the bash branch can correctly bypass the SLURM job poll (there is no SLURM job to poll — the script ran inline). Behavior for non-slurm_multi launchers is unchanged: they never construct DeploymentResult with skip_monitoring=True, the field defaults to False, and the monitor() call still fires as before. Discovered during R5.1b cluster smoke test of `pyt_sglang_disagg_qwen3-32b_short` from MAD-private PR #186. Co-authored-by: Cursor --- src/madengine/deployment/base.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/madengine/deployment/base.py b/src/madengine/deployment/base.py index a032c037..8c910439 100644 --- a/src/madengine/deployment/base.py +++ b/src/madengine/deployment/base.py @@ -82,6 +82,7 @@ class DeploymentResult: metrics: Optional[Dict[str, Any]] = None logs_path: Optional[str] = None artifacts: Optional[List[str]] = None + skip_monitoring: bool = False # Set True for synchronous runs (e.g., inside salloc) @property def is_success(self) -> bool: @@ -196,7 +197,8 @@ def execute(self) -> DeploymentResult: return result # Step 4: Monitor (optional) - if self.config.monitor: + # Skip monitoring if deploy() already ran synchronously (e.g., inside salloc) + if self.config.monitor and not result.skip_monitoring: result = self._monitor_until_complete(result.deployment_id) # Step 5: Collect Results (always collect, even on failure to record failed runs) From e84506afd729cef8962b36290cb2d2d46b22af2f Mon Sep 17 00:00:00 2001 From: raviguptaamd Date: Sun, 10 May 2026 03:00:33 +0000 Subject: [PATCH 6/8] fix(slurm_multi): aggregate per-job perf.csv into cwd for dashboard reporter The slurm_multi model script (e.g., MAD-private's run_xPyD_models.slurm) writes its perf CSV to /shared_inference/$USER/$JOBID/perf.csv. `_collect_slurm_multi_results` correctly finds and ingests that CSV via `_collect_results_parse_perf_csv` (results['successful_runs'] is populated, manifest exit codes are right). However the CLI's post-run reporter (`display_performance_table` in cli/utils.py) reads from cwd 'perf.csv' by default, so users saw a cosmetic "Performance CSV not found: perf.csv" warning even on a successful slurm_multi run. Local and classic-SLURM flows already leave a cumulative perf.csv in cwd via update_perf_csv(); slurm_multi did not. Fix: inside `_collect_slurm_multi_results`, after the per-job CSV is located and parsed, also write its rows into the conventional cwd perf.csv path: * if cwd 'perf.csv' is absent: shutil.copy() the per-job file * if it exists (from previous runs): append data rows (skip the per-job header so the cwd file stays single-headed) Original per-job CSV at /shared_inference/$USER/$JOBID/perf.csv is not modified or deleted. Wrapped in try/except so any I/O failure degrades to a yellow warning rather than failing the run. Affects only `_collect_slurm_multi_results`, which only runs for the slurm_multi launcher dispatch added earlier on this branch. Zero effect on non-slurm_multi launchers (they still take develop's classic collect_results path which writes cwd perf.csv via update_perf_csv). Validated end-to-end: Llama-3.1-8B smoke on 3 reservation nodes (Distributed_Inference_CI), bash branch in salloc, parallel docker pull on all nodes, proxy/prefill/decode came up, benchmark concurrency=8 / 32-32 ISL/OSL ran to completion, run exit 0, cwd perf.csv now contains the per-job row (THROUGHPUT/TTFT/ITL columns match per-job source). Co-authored-by: Cursor --- src/madengine/deployment/slurm.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/src/madengine/deployment/slurm.py b/src/madengine/deployment/slurm.py index 19fb1665..4e56c605 100644 --- a/src/madengine/deployment/slurm.py +++ b/src/madengine/deployment/slurm.py @@ -2128,6 +2128,29 @@ def _collect_slurm_multi_results( if perf_csv_path: results["perf_files"] = [str(perf_csv_path)] self._collect_results_parse_perf_csv(results, session_start_row) + # Aggregate per-job perf rows into cwd perf.csv so the dashboard + # reporter (display_performance_table, report to-html, etc.) + # finds them under the conventional path. Local + classic-SLURM + # flows already leave a cumulative perf.csv in cwd via + # update_perf_csv(); slurm_multi flows did not, so this mirrors + # that convention without modifying the original per-job file. + import shutil + cwd_perf = Path("perf.csv") + try: + if cwd_perf.exists(): + with open(perf_csv_path, "r") as src, open(cwd_perf, "a") as dst: + next(src, None) # skip per-job header so cwd CSV stays single-headed + for line in src: + dst.write(line) + else: + shutil.copy(str(perf_csv_path), str(cwd_perf)) + self.console.print( + f"[green]✓ Aggregated per-job perf into {cwd_perf}[/green]" + ) + except Exception as e: + self.console.print( + f"[yellow]⚠ Could not aggregate per-job perf into cwd perf.csv: {e}[/yellow]" + ) else: self.console.print("[yellow]No perf.csv found from slurm_multi model script[/yellow]") From dc3bc486355515016c33db7c6ec64a190bfc306e Mon Sep 17 00:00:00 2001 From: raviguptaamd Date: Sun, 10 May 2026 04:18:41 +0000 Subject: [PATCH 7/8] docs(slurm_multi): CHANGELOG entry + forward-compat TODO on --use-image - CHANGELOG.md: new [Unreleased] section documenting the slurm_multi launcher, --use-image / --build-on-compute CLI flags, the slurm_multi build registry gate, the bash-in-salloc execution path, the DeploymentResult.skip_monitoring field, the SlurmNodeSelector reservation parameter, the new contract tests + minimal example, and the cwd perf.csv aggregation fix. - cli/commands/build.py: comment near the --use-image Typer option flagging that `is_flag=False, flag_value="auto"` is being deprecated by Typer, with the planned migration path. Behavior unchanged (matches MAD-private PR #186's documented UX). Co-authored-by: Cursor --- CHANGELOG.md | 26 ++++++++++++++++++++++++++ src/madengine/cli/commands/build.py | 6 ++++++ 2 files changed, 32 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e8044098..a609013f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,32 @@ All notable changes to madengine will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Added + +- **`slurm_multi` SLURM launcher**: New self-managed multi-node launcher for workloads that orchestrate their own per-node Docker containers via `srun` (e.g. SGLang Disaggregated proxy + prefill + decode topologies). Selected via `distributed.launcher: "slurm_multi"` (or `"slurm-multi"` alias). Generates a wrapper SBATCH script that runs the model's `.slurm` script directly on baremetal so `srun`/`scontrol` work inside it; performs parallel `srun docker pull` of the registry image on all allocated nodes when the model card sets `env_vars.DOCKER_IMAGE_NAME`. Honors model-card and `--additional-context` `slurm` fields (`partition`, `nodes`, `gpus_per_node`, `time`, `exclusive`, `reservation`, `nodelist`). + +- **`madengine build --use-image [IMAGE | auto]`**: Skip the local Docker build and use a pre-built image instead. With no value, resolves to the model card's `env_vars.DOCKER_IMAGE_NAME` automatically. Mutually exclusive with `--registry` and `--build-on-compute`. + +- **`madengine build --build-on-compute`**: Build Docker images on a SLURM compute node and push to a registry, then have `madengine run` pull the image in parallel on all allocated nodes. Requires `--registry`. The resulting manifest carries `built_on_compute: true`. + +- **slurm_multi build registry gate**: When `madengine build` discovers a `slurm_multi` model and no `--registry`/`--use-image`/`--build-on-compute` is given, the orchestrator either auto-uses `env_vars.DOCKER_IMAGE_NAME` from the model card (implicit `--use-image` fallback) or raises a structured `ConfigurationError` with the four supported options listed. + +- **bash-in-salloc execution path** for slurm_multi: when `madengine run` detects `SLURM_JOB_ID` (i.e. running inside an existing `salloc`), the slurm_multi launcher runs the generated wrapper synchronously with `bash` instead of nesting another `sbatch` job. Other launchers continue to use `sbatch` even inside `salloc` (no behavior change for non-slurm_multi). + +- **`DeploymentResult.skip_monitoring`** (`deployment/base.py`): new dataclass field so synchronous deploy paths (e.g. slurm_multi's bash-in-salloc) can skip the monitor poll. + +- **`SlurmNodeSelector` `reservation` parameter**: optional reservation name forwarded to srun health/cleanup commands so node-prep srun calls run inside the reservation. + +- **`tests/unit/test_slurm_multi.py`**: contract tests for `slurm_multi` registry membership, hyphen alias normalization, and end-to-end env_vars-export contract against MAD-private PR #186's `pyt_sglang_disagg_qwen3-32b_short` model card. + +- **`examples/slurm-configs/minimal/slurm-multi-minimal.json`**: minimal reference config for the new launcher. + +### Fixed + +- **slurm_multi: cwd `perf.csv` aggregation**: After a successful slurm_multi run, `madengine run` previously printed a cosmetic `Performance CSV not found: perf.csv` warning even though `_collect_slurm_multi_results` had ingested the per-job CSV from `/shared_inference/$USER/$JOBID/perf.csv`. The reporter (`display_performance_table`) reads cwd `perf.csv` by default. Now `_collect_slurm_multi_results` also writes the per-job rows into cwd `perf.csv` (copy if absent, append-data-rows if present) so reporting and HTML generation work without extra args. Local + classic-SLURM flows are unchanged. + ## [2.0.3] - 2026-05-06 ### Changed diff --git a/src/madengine/cli/commands/build.py b/src/madengine/cli/commands/build.py index a866b6d5..8133a75c 100644 --- a/src/madengine/cli/commands/build.py +++ b/src/madengine/cli/commands/build.py @@ -55,6 +55,12 @@ def build( "--batch-manifest", help="Input batch.json file for batch build mode" ), ] = None, + # NOTE: `is_flag=False, flag_value="auto"` lets `--use-image` (no value) + # mean "auto-detect from the model card's DOCKER_IMAGE_NAME", matching + # MAD-private PR #186's documented UX. Typer is deprecating this pattern + # for a future release; when removed, switch to requiring an explicit + # value (e.g. `--use-image auto` as the documented sentinel) and update + # MAD-private's docs in lockstep. use_image: Annotated[ Optional[str], typer.Option( From 68d0bf38733a54ceb6232c4493a0ad445835616a Mon Sep 17 00:00:00 2001 From: raviguptaamd Date: Mon, 11 May 2026 06:37:59 +0000 Subject: [PATCH 8/8] fix(slurm_multi): address Copilot review on PR #124 Manifest-shape correctness (Copilot C2): - _execute_with_prebuilt_image now keys built_images by model_name (one entry per model, all pointing at the same use_image) so it matches built_models and the rest of the codebase's invariant that ContainerRunner.run_models_from_manifest() relies on at built_models.get(image_name, {}). Without this, --use-image (and the implicit slurm_multi DOCKER_IMAGE_NAME gate) silently skipped every model in the run phase. Multi-model dedup TypeError fix (Copilot C3): - Replace tuple(sorted((m.get('distributed') or {}).items())) and the same pattern for slurm with json.dumps(..., sort_keys=True, default=str). The old pattern raised TypeError: unhashable type: 'dict' as soon as a model's distributed block contained nested dicts (sglang_disagg / vllm_disagg). slurm_multi wrapper completion-marker on failure (Copilot C6): - The wrapper script enables `set -e` at the top, so a non-zero exit from the model script terminated the wrapper before SCRIPT_EXIT_CODE was captured and the completion marker was written -- failed runs looked like hangs to monitor(). Wrap the bash invocation in `set +e` / `set -e` so the marker is always written. Self-managed launcher hygiene (Copilot C4, C5, C7): - Drop unused `import shutil` inside _run_self_managed. - Drop unused script_name local in _run_self_managed (only script_path is consumed by the function). - Redact env_vars values in the run log (`ENV: KEY=`) so model-card credentials (HF_TOKEN, MAD_DOCKERHUB_PASSWORD, etc.) don't leak. Build orchestrator hygiene (Copilot C1, C8): - Drop top-level `import shutil` (unused). - Drop the two `requires_auth = ...` assignments in _execute_build_on_compute (assigned but never read). Validators docstring honesty (Copilot C10): - validate_additional_context's `use_image` parameter is currently informational only (parameter retained for build.py call-site compatibility); the docstring previously claimed it skipped required- field validation, which the body never implemented. Tightened the docstring to match reality. New contract tests (tests/unit/test_slurm_multi.py): - test_wrapper_disables_set_e_around_model_script: locks in C6's set +e/SCRIPT_EXIT_CODE/set -e ordering around the bash invocation. - test_built_images_and_models_share_keys: locks in C2's invariant using a fake 2-model fixture and a stubbed BuildOrchestrator. - test_multi_model_nested_dict_distributed_does_not_raise: regression for C3 using two sglang_disagg-style cards with differing nested dicts in `distributed`; would TypeError under the old code. Skipped: Copilot C9 (broader BuildOrchestrator branch tests for --use-image / --build-on-compute / registry-gating). The slurm_multi contract suite + multi-node cluster smoke already cover the slurm_multi surface end-to-end; broader orchestrator-branch coverage is a worthwhile follow-up but out of scope for this minimal-additive PR. Tests: - pytest tests/unit/test_slurm_multi.py -v -> 12 passed (was 9). - pytest tests/unit -q -> 436 passed / 2 failed; the 2 failures are the same PermissionError environmental baseline that exists on develop (test_upload_file_to_mongodb_file_not_found and test_validate_additional_context_file_not_found, both raise PermissionError on '/nonexistent/file.json' instead of FileNotFound on this filesystem). Zero new failures. Co-authored-by: Cursor --- src/madengine/cli/validators.py | 5 +- src/madengine/deployment/slurm.py | 10 +- src/madengine/execution/container_runner.py | 25 +-- .../orchestration/build_orchestrator.py | 46 +++-- tests/unit/test_slurm_multi.py | 175 +++++++++++++++++- 5 files changed, 224 insertions(+), 37 deletions(-) diff --git a/src/madengine/cli/validators.py b/src/madengine/cli/validators.py index 867b4eed..68f45856 100644 --- a/src/madengine/cli/validators.py +++ b/src/madengine/cli/validators.py @@ -306,7 +306,10 @@ def validate_additional_context( Args: additional_context: JSON string containing additional context additional_context_file: Optional file containing additional context - use_image: Optional pre-built image to use (skips required field validation) + use_image: Pre-built image override forwarded by build.py for CLI signature + compatibility. Currently informational only -- validation behavior is + unchanged when this is set; callers wanting to skip required-field + checks should adjust ``finalize_additional_context_dict`` directly. Returns: Dict containing parsed additional context diff --git a/src/madengine/deployment/slurm.py b/src/madengine/deployment/slurm.py index 4e56c605..e9bc9687 100644 --- a/src/madengine/deployment/slurm.py +++ b/src/madengine/deployment/slurm.py @@ -534,15 +534,23 @@ def _prepare_slurm_multi_script(self, model_info: Dict, docker_image_name: str = / f"madengine_{model_info['name']}_${{SLURM_JOB_ID:-local}}.complete" ) + # Disable `set -e` around the model script bash invocation below so a + # non-zero exit doesn't terminate the wrapper before SCRIPT_EXIT_CODE is + # captured and the completion marker is written. monitor() relies on the + # marker to distinguish 'failed' from 'still running'; without this, + # a failed model run would look like a hang. script_lines.extend([ "", "# Change to script directory", f"cd {model_script_path.parent}", "", - "# Run the model script directly on the host", + "# Run the model script directly on the host (with -e disabled so we", + "# can capture the exit code and write the completion marker even on failure).", f"echo 'Executing: bash {model_script_path.name} {model_args}'", + "set +e", f"bash {model_script_path.name} {model_args}", "SCRIPT_EXIT_CODE=$?", + "set -e", "", "echo ''", "echo 'Script completed.'", diff --git a/src/madengine/execution/container_runner.py b/src/madengine/execution/container_runner.py index 2f98ec85..cd5cacc0 100644 --- a/src/madengine/execution/container_runner.py +++ b/src/madengine/execution/container_runner.py @@ -802,31 +802,24 @@ def _run_self_managed( run_results: Dictionary to store run results pre_encapsulate_post_scripts: Pre/post script configuration run_env: Environment variables for the script - + Returns: Dictionary with run results """ - import shutil - self.rich_console.print(f"[dim]{'='*80}[/dim]") - + # Prepare script path scripts_arg = model_info["scripts"] - + # Get the current working directory (might be temp workspace) cwd = os.getcwd() print(f"📂 Current directory: {cwd}") - - if scripts_arg.endswith(".sh") or scripts_arg.endswith(".slurm"): - script_path = scripts_arg - script_name = os.path.basename(scripts_arg) - elif scripts_arg.endswith(".py"): + + if scripts_arg.endswith(".sh") or scripts_arg.endswith(".slurm") or scripts_arg.endswith(".py"): script_path = scripts_arg - script_name = os.path.basename(scripts_arg) else: # Directory specified - look for run.sh script_path = os.path.join(scripts_arg, "run.sh") - script_name = "run.sh" # If script path is relative, make it absolute from cwd if not os.path.isabs(script_path): @@ -874,12 +867,14 @@ def _run_self_managed( env = os.environ.copy() env.update(run_env) - # Add model-specific env vars from model_info + # Add model-specific env vars from model_info. + # Log keys only (not values) so credentials in env_vars (HF_TOKEN, MAD_DOCKERHUB_PASSWORD, + # CONNECT_*_TOKEN, etc.) carried via the model card don't leak into the run log. if "env_vars" in model_info and model_info["env_vars"]: for key, value in model_info["env_vars"].items(): env[key] = str(value) - print(f" ENV: {key}={value}") - + print(f" ENV: {key}=") + # Add env vars from additional_context if self.additional_context and "env_vars" in self.additional_context: for key, value in self.additional_context["env_vars"].items(): diff --git a/src/madengine/orchestration/build_orchestrator.py b/src/madengine/orchestration/build_orchestrator.py index 84f09c05..49299dcb 100644 --- a/src/madengine/orchestration/build_orchestrator.py +++ b/src/madengine/orchestration/build_orchestrator.py @@ -10,7 +10,6 @@ import json import os -import shutil from pathlib import Path from typing import Dict, List, Optional @@ -498,15 +497,12 @@ def _execute_with_prebuilt_image( self.rich_console.print("[bold cyan]📄 Generating manifest for pre-built image...[/bold cyan]") manifest = { - "built_images": { - use_image: { - "image_name": use_image, - "docker_image": use_image, - "dockerfile": "", - "build_time": 0, - "prebuilt": True, - } - }, + # built_images and built_models MUST share the same key set so + # ContainerRunner.run_models_from_manifest() can join them via + # `built_models.get(image_name, {})`. Key both by model_name and + # write one built_images entry per model (all pointing at the + # same pre-built use_image) so multi-model --use-image runs work. + "built_images": {}, "built_models": {}, "context": self.context.ctx if hasattr(self.context, 'ctx') else {}, "credentials_required": [], @@ -519,17 +515,22 @@ def _execute_with_prebuilt_image( }, } - # Add each discovered model with the pre-built image. - # Key by model_name (not use_image) so multiple models sharing - # the same pre-built image are all preserved in the manifest. for model in models: model_name = model.get("name", "unknown") model_distributed = model.get("distributed", {}) - + # Merge DOCKER_IMAGE_NAME into env_vars for parallel pull in run phase model_env_vars = model.get("env_vars", {}).copy() model_env_vars["DOCKER_IMAGE_NAME"] = use_image - + + manifest["built_images"][model_name] = { + "image_name": use_image, + "docker_image": use_image, + "dockerfile": "", + "build_time": 0, + "prebuilt": True, + } + manifest["built_models"][model_name] = { "name": model_name, "image": use_image, @@ -569,8 +570,13 @@ def _execute_with_prebuilt_image( # Merge model's distributed config from the first model. # If multiple models have differing distributed configs, warn — only the first wins here. + # Use json.dumps for the hash key so nested dicts (e.g. sglang_disagg / vllm_disagg) + # don't trigger TypeError: unhashable type: 'dict' from `tuple(sorted(items()))`. if len(models) > 1: - distinct_distributed = {tuple(sorted((m.get("distributed") or {}).items())) for m in models} + distinct_distributed = { + json.dumps(m.get("distributed") or {}, sort_keys=True, default=str) + for m in models + } if len(distinct_distributed) > 1: self.rich_console.print( "[yellow]Warning: discovered models have differing distributed configs; " @@ -589,8 +595,12 @@ def _execute_with_prebuilt_image( # Merge model's slurm config into deployment_config.slurm from the first model. # This enables run phase to auto-detect SLURM deployment without --additional-context. # Warn when multiple models have differing slurm configs (only the first wins here). + # json.dumps key for the same unhashable-nested-dict reason as above. if len(models) > 1: - distinct_slurm = {tuple(sorted((m.get("slurm") or {}).items())) for m in models} + distinct_slurm = { + json.dumps(m.get("slurm") or {}, sort_keys=True, default=str) + for m in models + } if len(distinct_slurm) > 1: self.rich_console.print( "[yellow]Warning: discovered models have differing slurm configs; " @@ -905,7 +915,6 @@ def _execute_build_on_compute( self.rich_console.print(f" Credentials: Found in environment (MAD_DOCKERHUB_USER)") # Determine if registry requires authentication - requires_auth = True public_registries = ["docker.io", "ghcr.io", "gcr.io", "quay.io", "nvcr.io"] registry_lower = registry.lower() if registry else "" @@ -958,7 +967,6 @@ def _execute_build_on_compute( else: # Private/internal registry - may not need auth self.rich_console.print(f" Auth: Private registry (auth may not be required)") - requires_auth = dockerhub_user and dockerhub_password self.rich_console.print("") diff --git a/tests/unit/test_slurm_multi.py b/tests/unit/test_slurm_multi.py index 89d5f1a6..80e558b0 100644 --- a/tests/unit/test_slurm_multi.py +++ b/tests/unit/test_slurm_multi.py @@ -2,18 +2,25 @@ """ Unit tests for the slurm_multi launcher. -Locks in three contract points so future refactors fail loudly if they regress: +Locks in contract points so future refactors fail loudly if they regress: 1. `slurm_multi` is registered in `VALID_LAUNCHERS`. 2. `slurm-multi` (hyphen alias) normalizes to `slurm_multi`. 3. The wrapper SBATCH script generated by `_prepare_slurm_multi_script` exports every `model_info.env_vars` declared by the model card (mirrors the `pyt_sglang_disagg_qwen3-32b_short` entry from MAD-private PR #186). +4. The wrapper bash invocation is wrapped in `set +e` / `set -e` so a non-zero + exit from the model script does not skip the completion-marker write. +5. `_execute_with_prebuilt_image` produces a manifest where `built_images.keys() + == built_models.keys()` so `ContainerRunner.run_models_from_manifest()` joins + them correctly (also exercises multi-model + nested-dict distributed configs + to guard against the `tuple(sorted(items()))` unhashable regression). Copyright (c) Advanced Micro Devices, Inc. All rights reserved. """ import json from pathlib import Path +from unittest.mock import MagicMock, patch import pytest @@ -201,3 +208,169 @@ def test_wrapper_sbatch_header_uses_slurm_config(self, slurm_deployment): assert "#SBATCH --exclusive" in script_text # reservation is "" in the fixture, so no --reservation line assert "#SBATCH --reservation=" not in script_text + + def test_wrapper_disables_set_e_around_model_script(self, slurm_deployment): + """Regression for Copilot C6: `set -e` at the top of the wrapper would + terminate the script before SCRIPT_EXIT_CODE is captured and the + completion marker is written if the model script exits non-zero. + The wrapper must `set +e` immediately before the bash invocation, + capture $?, then re-enable `set -e` before continuing.""" + slurm_deployment.prepare() + script_text = Path(slurm_deployment.script_path).read_text() + + # Find the index of the bash line and assert the + # surrounding set +e / SCRIPT_EXIT_CODE / set -e ordering. + lines = script_text.splitlines() + bash_line_idx = next( + (i for i, line in enumerate(lines) if line.strip().startswith("bash run_xPyD_models.slurm")), + None, + ) + assert bash_line_idx is not None, "wrapper must invoke the model script via bash" + + # The two preceding non-empty/non-comment lines must include `set +e` + prelude = [l.strip() for l in lines[max(0, bash_line_idx - 5):bash_line_idx] if l.strip() and not l.strip().startswith("#")] + assert "set +e" in prelude, ( + f"`set +e` must appear immediately before bash invocation, got prelude {prelude!r}" + ) + + # SCRIPT_EXIT_CODE must be captured immediately after, and -e re-enabled before exit + suffix = [l.strip() for l in lines[bash_line_idx + 1:bash_line_idx + 5] if l.strip() and not l.strip().startswith("#")] + assert suffix[0] == "SCRIPT_EXIT_CODE=$?", f"first non-comment line after bash must capture $?, got {suffix!r}" + assert "set -e" in suffix, f"`set -e` must be re-enabled after capture, got {suffix!r}" + + # And the final exit must use the captured exit code + assert "exit $SCRIPT_EXIT_CODE" in script_text + + +# --------------------------------------------------------------------------- +# 4. _execute_with_prebuilt_image manifest-shape contract (Copilot C2 + C3) + +class TestPrebuiltImageManifestShape: + """Regression tests for the manifest produced by `_execute_with_prebuilt_image`. + + Two invariants the rest of the codebase relies on: + - C2: `built_images.keys() == built_models.keys()` so + `ContainerRunner.run_models_from_manifest()` can join them via + `built_models.get(image_name, {})`. + - C3: discovering >1 model whose `distributed` block contains nested dicts + (e.g. `sglang_disagg`, `vllm_disagg`) must not raise + `TypeError: unhashable type: 'dict'` when comparing distinct configs. + """ + + @pytest.fixture + def fake_models_two_sglang_disagg(self): + """Two sglang_disagg model cards. `distributed` includes a nested dict + (sglang_disagg.{prefill_nodes, decode_nodes}) so `tuple(sorted(items()))` + on this would TypeError -- exactly what C3 protects against.""" + common_distributed = { + "launcher": "slurm_multi", + "nnodes": 3, + "nproc_per_node": 8, + "backend": "nccl", + "port": 29500, + "sglang_disagg": {"prefill_nodes": 1, "decode_nodes": 1}, + } + common_slurm = { + "partition": "amd-rccl", + "nodes": 3, + "gpus_per_node": 8, + "time": "01:00:00", + "exclusive": True, + "reservation": "", + } + return [ + { + "name": "pyt_fake_a", + "dockerfile": "docker/sglang_disagg_inference", + "scripts": "scripts/fake/run.sh", + "n_gpus": "8", + "owner": "test@amd.com", + "tags": ["pyt", "fake", "a"], + "args": "--model_a", + "slurm": common_slurm, + "distributed": common_distributed, + "env_vars": {"MODEL_NAME": "Fake-A"}, + }, + { + "name": "pyt_fake_b", + "dockerfile": "docker/sglang_disagg_inference", + "scripts": "scripts/fake/run.sh", + "n_gpus": "8", + "owner": "test@amd.com", + "tags": ["pyt", "fake", "b"], + "args": "--model_b", + "slurm": common_slurm, + # Differing nested-dict to trigger the warning + dedup path + "distributed": {**common_distributed, "sglang_disagg": {"prefill_nodes": 2, "decode_nodes": 1}}, + "env_vars": {"MODEL_NAME": "Fake-B"}, + }, + ] + + def _build_orchestrator_stub(self, tmp_path: Path): + """Construct a BuildOrchestrator instance with `__new__` so we skip the + real constructor (which initializes Context, ConfigLoader, credentials, + etc.). We populate only the attributes `_execute_with_prebuilt_image` + and `_save_deployment_config` actually read.""" + from madengine.orchestration.build_orchestrator import BuildOrchestrator + orch = BuildOrchestrator.__new__(BuildOrchestrator) + # Required attributes + orch.args = MagicMock() + orch.console = MagicMock() + orch.rich_console = MagicMock() + orch.context = MagicMock() + orch.context.ctx = {"docker_env_vars": {}, "docker_mounts": {}} + orch.additional_context = {} + orch._original_user_slurm_keys = set() + orch.credentials = {} + return orch + + def test_built_images_and_models_share_keys(self, tmp_path, fake_models_two_sglang_disagg): + """Copilot C2: built_images and built_models must use the same keys + (model_name) so ContainerRunner.run_models_from_manifest() finds + each model_info via `built_models.get(image_name, {})`.""" + orch = self._build_orchestrator_stub(tmp_path) + manifest_path = tmp_path / "build_manifest.json" + use_image = "rocm/pytorch-private:fake-tag" + + with patch("madengine.orchestration.build_orchestrator.DiscoverModels") as mock_dm: + mock_dm.return_value.run.return_value = fake_models_two_sglang_disagg + # _save_deployment_config writes a sidecar file we don't care about; + # patch it to a no-op so we don't depend on its filesystem behavior. + with patch.object(orch, "_save_deployment_config", return_value=None): + orch._execute_with_prebuilt_image(use_image=use_image, manifest_output=str(manifest_path)) + + manifest = json.loads(manifest_path.read_text()) + + assert set(manifest["built_images"].keys()) == set(manifest["built_models"].keys()), ( + "built_images and built_models must share the same key set" + ) + assert set(manifest["built_models"].keys()) == {"pyt_fake_a", "pyt_fake_b"} + + for model_name in ("pyt_fake_a", "pyt_fake_b"): + assert manifest["built_images"][model_name]["docker_image"] == use_image + assert manifest["built_images"][model_name]["prebuilt"] is True + assert manifest["built_models"][model_name]["env_vars"]["DOCKER_IMAGE_NAME"] == use_image + + def test_multi_model_nested_dict_distributed_does_not_raise( + self, tmp_path, fake_models_two_sglang_disagg + ): + """Copilot C3: `distinct_distributed` / `distinct_slurm` dedupe must not + TypeError when distributed/slurm blocks contain nested dicts. Before + the fix, `tuple(sorted((m.get('distributed') or {}).items()))` raised + `TypeError: unhashable type: 'dict'` because the dict values aren't + hashable when stuffed into a set.""" + orch = self._build_orchestrator_stub(tmp_path) + manifest_path = tmp_path / "build_manifest.json" + + with patch("madengine.orchestration.build_orchestrator.DiscoverModels") as mock_dm: + mock_dm.return_value.run.return_value = fake_models_two_sglang_disagg + with patch.object(orch, "_save_deployment_config", return_value=None): + # Must not raise TypeError + orch._execute_with_prebuilt_image( + use_image="rocm/pytorch-private:fake-tag", + manifest_output=str(manifest_path), + ) + + # Manifest should still be valid JSON with both models present + manifest = json.loads(manifest_path.read_text()) + assert len(manifest["built_models"]) == 2