diff --git a/CHANGELOG.md b/CHANGELOG.md index e8044098..944479a3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed - **Profiling**: `rocm_trace_lite` now sets `RTL_MODE=lite` explicitly; added tool `rocm_trace_lite_default` with `RTL_MODE=default` for A/B overhead comparison. `rtl_trace_wrapper.sh` passes `rtl trace --mode …` when `RTL_MODE` is set. +- **Multi-node barrier**: The TCP image-ready barrier in `ContainerRunner` now binds to `MASTER_ADDR`'s resolved IP when possible (falling back to `0.0.0.0`) so the listener is not gratuitously exposed on other interfaces of the master host. + +### Added + +- **`MAD_BARRIER_TOKEN`** env var: opt-in secret for the multi-node TCP image-ready barrier in `ContainerRunner._tcp_image_ready_barrier`. When set, replaces the predictable `JOB` token so other processes on the master node cannot spoof `READY`/`GO` messages. Defaults to the previous `JOB` value for backward compatibility. - **Kubernetes deployment refactor**: Decomposed the monolithic `kubernetes.py` (~2800 lines) into focused mixin modules — `k8s_pvc.py` (PVC lifecycle), `k8s_results.py` (log/artifact collection and performance aggregation), `k8s_scripts.py` (script extraction and ConfigMap building), and `k8s_template_context.py` (Jinja2 template context assembly). `KubernetesDeployment` now composes these mixins; no functional changes. diff --git a/src/madengine/core/constants.py b/src/madengine/core/constants.py index d1afa4c9..1a8b4fc3 100644 --- a/src/madengine/core/constants.py +++ b/src/madengine/core/constants.py @@ -26,17 +26,28 @@ import os import json import logging +import sys # Utility function for optional verbose logging of configuration def _log_config_info(message: str, force_print: bool = False): """Log configuration information either to logger or print if specified.""" + # Keep --version/--help output clean even if MAD_VERBOSE_CONFIG=true. + if any(arg in {"--version", "-V", "--help", "-h"} for arg in sys.argv[1:]): + logging.debug(message) + return if force_print or os.environ.get("MAD_VERBOSE_CONFIG", "").lower() == "true": print(message) else: logging.debug(message) +def _is_lightweight_cli_invocation() -> bool: + """Return True for metadata/help invocations that should avoid side effects.""" + lightweight_flags = {"--version", "-V", "--help", "-h"} + return any(arg in lightweight_flags for arg in sys.argv[1:]) + + # third-party modules from madengine.core.console import Console @@ -65,9 +76,12 @@ def _setup_model_dir(): _log_config_info(f"Model dir: {MODEL_DIR} copied to current dir: {cwd_abs}") -# Only setup model directory if explicitly requested (when not just importing for constants) +# Only setup model directory if explicitly requested and invocation is not metadata-only. if os.environ.get("MAD_SETUP_MODEL_DIR", "").lower() == "true": - _setup_model_dir() + if _is_lightweight_cli_invocation(): + _log_config_info("Skipping MODEL_DIR setup for lightweight CLI invocation (--version/--help).") + else: + _setup_model_dir() # madengine credentials configuration CRED_FILE = "credential.json" diff --git a/src/madengine/deployment/slurm.py b/src/madengine/deployment/slurm.py index b5eefbd4..2eb4f2e6 100644 --- a/src/madengine/deployment/slurm.py +++ b/src/madengine/deployment/slurm.py @@ -1297,18 +1297,24 @@ def collect_results(self, deployment_id: str) -> Dict[str, Any]: model_key, {} ) if model_key else {} - # Multiple results path: resolve CSV from job_dir/node_*, then cwd/run_directory + # Multiple results path: resolve CSV from job_dir/node_*, then cwd/run_directory. + # In multi-node runs, different nodes may produce the CSV with different levels + # of completeness (e.g. only one node observes the final throughput numbers and + # populates the "performance" column). Prefer the candidate with the most + # non-empty "performance" rows so aggregation does not silently pick an empty one. mult_res = model_info_for_entry.get("multiple_results") if mult_res: resolved_csv: Optional[Path] = None + candidates: List[Path] = [] if (job_dir / mult_res).is_file(): - resolved_csv = job_dir / mult_res - else: - for i in range(self.nodes): - candidate = job_dir / f"node_{i}" / mult_res - if candidate.is_file(): - resolved_csv = candidate - break + candidates.append(job_dir / mult_res) + for i in range(self.nodes): + per_node_candidate = job_dir / f"node_{i}" / mult_res + if per_node_candidate.is_file(): + candidates.append(per_node_candidate) + + if candidates: + resolved_csv = self._select_best_multiple_results_csv(candidates) if not resolved_csv and Path(mult_res).is_file(): resolved_csv = Path(mult_res) if not resolved_csv and Path("run_directory", mult_res).is_file(): @@ -1519,6 +1525,85 @@ def collect_results(self, deployment_id: str) -> Dict[str, Any]: ) return results + def _select_best_multiple_results_csv( + self, candidates: List[Path] + ) -> Optional[Path]: + """Pick the CSV with the most non-empty ``performance`` entries. + + In multi-node SLURM runs, every node copies its local copy of the + workload's multi-results CSV into ``job_dir/node_/``. Only + some nodes will observe the final throughput numbers and therefore + populate the ``performance`` column; others may have the file but + with empty values. Ranking candidates by the number of non-empty + ``performance`` rows lets downstream aggregation use the richest + available data without depending on node-0 winning every race. + + Selection order: + + 1. Highest count of non-empty ``performance`` rows wins. Candidates + without a ``performance`` column score ``0`` (effectively tied + at the bottom). + 2. Ties are broken by total row count — when no candidate has any + performance data, this still surfaces the most-populated CSV + rather than always falling back to ``candidates[0]``, so + downstream diagnostics see real rows. + 3. ``candidates[0]`` is only used as the ultimate fallback when + every read raises (``best_candidate`` remained ``None``). + """ + if not candidates: + return None + if len(candidates) == 1: + return candidates[0] + + import csv as _csv + + best_candidate: Optional[Path] = None + best_score = -1 + best_rows = -1 + for candidate in candidates: + non_empty_perf = 0 + total_rows = 0 + has_perf_column = False + try: + with open(candidate, "r", encoding="utf-8", errors="ignore") as f: + reader = _csv.DictReader(f) + fieldnames = reader.fieldnames or [] + stripped_fields = [fn.strip() for fn in fieldnames] + has_perf_column = "performance" in stripped_fields + for row in reader: + total_rows += 1 + if has_perf_column: + # DictReader keys preserve any whitespace from the + # CSV header (e.g. " performance"). Normalize keys + # so the stripped-field detection above matches. + normalized_row = { + (k.strip() if isinstance(k, str) else k): v + for k, v in row.items() + } + value = (normalized_row.get("performance") or "").strip() + if value: + non_empty_perf += 1 + except Exception: + continue + + score = non_empty_perf if has_perf_column else 0 + if ( + score > best_score + or (score == best_score and total_rows > best_rows) + ): + best_score = score + best_rows = total_rows + best_candidate = candidate + + if best_candidate is None: + return candidates[0] + if best_score > 0: + self.console.print( + f"[dim] Selected multiple_results CSV with {best_score} " + f"non-empty performance rows: {best_candidate}[/dim]" + ) + return best_candidate + def _collect_results_parse_perf_csv( self, results: Dict[str, Any], session_start_row: Optional[int] ) -> None: diff --git a/src/madengine/deployment/templates/slurm/job.sh.j2 b/src/madengine/deployment/templates/slurm/job.sh.j2 index 5f8e8266..56115251 100644 --- a/src/madengine/deployment/templates/slurm/job.sh.j2 +++ b/src/madengine/deployment/templates/slurm/job.sh.j2 @@ -213,7 +213,8 @@ fi echo "" echo "Verifying madengine availability..." if command -v madengine >/dev/null 2>&1; then - MAD_CLI_VERSION=$(madengine --version 2>&1 | head -n1 || echo "unknown") + # MODEL_DIR can trigger side effects in madengine startup; unset it for preflight probes only. + MAD_CLI_VERSION=$(env -u MODEL_DIR madengine --version 2>&1 | head -n1 || echo "unknown") MAD_CLI_PATH=$(which madengine 2>/dev/null || echo "unknown") echo " ✓ madengine available" @@ -221,7 +222,7 @@ if command -v madengine >/dev/null 2>&1; then echo " Path: $MAD_CLI_PATH" # Verify it's executable - if madengine --help >/dev/null 2>&1; then + if env -u MODEL_DIR madengine --help >/dev/null 2>&1; then export MAD_CLI_COMMAND="madengine" else echo " ❌ ERROR: madengine found but not functional!" @@ -488,7 +489,11 @@ trap 'ec=$?; echo "[DEBUG] $(date -Iseconds) Node ${SLURM_PROCID} ($(hostname)): echo "Verifying madengine availability..." if command -v madengine >/dev/null 2>&1; then - MAD_CLI_VERSION=$(madengine --version 2>&1 | head -n1 || echo "unknown") + # MODEL_DIR can trigger side effects in madengine startup; isolate it for preflight probes. + set +e + MAD_VERSION_RAW_SANITIZED=$(env -u MODEL_DIR madengine --version 2>&1) + set -e + MAD_CLI_VERSION=$(printf "%s" "$MAD_VERSION_RAW_SANITIZED" | head -n1 || echo "unknown") MAD_CLI_PATH=$(which madengine 2>/dev/null || echo "unknown") echo "✓ madengine available" @@ -496,7 +501,12 @@ if command -v madengine >/dev/null 2>&1; then echo " Path: $MAD_CLI_PATH" # Verify it's executable - if madengine --help >/dev/null 2>&1; then + set +e + MAD_HELP_RAW_SANITIZED=$(env -u MODEL_DIR madengine --help 2>&1) + MAD_HELP_EXIT_SANITIZED=$? + set -e + + if [ "${MAD_HELP_EXIT_SANITIZED}" -eq 0 ]; then echo " ✓ Verified: madengine is functional" MAD_CLI_COMMAND="madengine" else diff --git a/src/madengine/execution/container_runner.py b/src/madengine/execution/container_runner.py index 2ffc8a31..b1e33c36 100644 --- a/src/madengine/execution/container_runner.py +++ b/src/madengine/execution/container_runner.py @@ -10,6 +10,7 @@ import os import re import shlex +import socket import subprocess import time import json @@ -679,9 +680,579 @@ def get_env_arg(self, run_env: typing.Dict) -> str: return env_args - def get_mount_arg(self, mount_datapaths: typing.List) -> str: + def _get_build_args(self) -> str: + """Build ``docker build --build-arg`` string from ``docker_build_arg`` context. + + Values are passed to ``Console.sh`` (``shell=True``); the key and the + value of each ``--build-arg`` are wrapped with :func:`shlex.quote` + individually (matching :meth:`DockerBuilder.get_build_arg` style) so + quotes / whitespace / shell metacharacters in either component cannot + break the build command or be injected when ``docker_build_arg`` comes + from manifests or user context. + """ + docker_build_arg = self.context.ctx.get("docker_build_arg", {}) if self.context else {} + if not docker_build_arg: + return "" + build_args = "" + for key, value in docker_build_arg.items(): + build_args += ( + "--build-arg " + + shlex.quote(str(key)) + + "=" + + shlex.quote(str(value)) + + " " + ) + return build_args + + def _get_node_rank(self) -> int: + """Return the current node rank for distributed runs. + + Raises: + RuntimeError: when ``NODE_RANK`` / ``RANK`` is set but cannot be + parsed as an integer. Treating a malformed rank as ``0`` + would let a worker incorrectly take the primary code path + (image build, tar save) and deadlock the multi-node barrier + which already validates rank strictly. + """ + node_rank_raw = os.environ.get("NODE_RANK") or os.environ.get("RANK") or "0" + try: + return int(node_rank_raw) + except (TypeError, ValueError) as e: + raise RuntimeError( + f"Invalid NODE_RANK/RANK env value {node_rank_raw!r}: {e}" + ) + + def _local_image_exists(self, run_image: str) -> bool: + """Check whether a Docker image already exists locally.""" + try: + self.console.sh( + f"docker image inspect {shlex.quote(run_image)} > /dev/null 2>&1" + ) + return True + except (subprocess.CalledProcessError, RuntimeError): + return False + + def _get_local_image_tar_path(self, run_image: str) -> typing.Optional[str]: + """Resolve the shared tar path for a local image, if configured. + + When ``MAD_DOCKER_BUILDS`` points at a shared directory (e.g. a network + filesystem visible to all nodes), this path is used to stage a + ``docker save`` tar of the pre-built local image so that worker nodes + can ``docker load`` it instead of rebuilding or pulling. + """ + builds_dir = (os.environ.get("MAD_DOCKER_BUILDS") or "").strip() + if not builds_dir: + return None + + safe_image_name = re.sub(r"[^A-Za-z0-9_.-]+", "_", run_image).strip("._") + if not safe_image_name: + safe_image_name = "docker_image" + return os.path.join(builds_dir, f"{safe_image_name}.tar") + + def _load_local_image_from_tar(self, run_image: str, tar_path: str) -> None: + """Load a Docker image from a previously saved tar archive.""" + if not os.path.exists(tar_path): + raise RuntimeError(f"Image tar not found for {run_image}: {tar_path}") + + self.rich_console.print( + f"[yellow]📦 Loading local image tar:[/yellow] {tar_path}" + ) + self.console.sh(f"docker load -i {shlex.quote(tar_path)}", timeout=None) + self.console.sh( + f"docker image inspect {shlex.quote(run_image)} > /dev/null 2>&1" + ) + self.rich_console.print( + f"[green]✅ Loaded local image from tar:[/green] {run_image}" + ) + + def _save_local_image_to_tar(self, run_image: str, tar_path: str) -> None: + """Persist a local Docker image into the shared tar cache. + + Written atomically: ``docker save`` streams into a sibling ``.tmp`` + file which is ``rename``\\ d into place only on success. This avoids + peers loading a half-written tar when a worker crashes the primary + mid-save (the rename is atomic on the same filesystem). + """ + tar_dir = os.path.dirname(tar_path) + if tar_dir: + os.makedirs(tar_dir, exist_ok=True) + + # Use a sibling tmp path so the rename stays on the same filesystem + # (POSIX rename(2) is atomic only within a single filesystem). + tmp_path = f"{tar_path}.tmp.{os.getpid()}" + + self.rich_console.print( + f"[yellow]💾 Saving local image tar:[/yellow] {tar_path}" + ) + try: + self.console.sh( + f"docker save -o {shlex.quote(tmp_path)} {shlex.quote(run_image)}", + timeout=None, + ) + os.replace(tmp_path, tar_path) + except Exception: + # Best-effort cleanup of the partial tmp file so retries do not + # accumulate stale half-saves in MAD_DOCKER_BUILDS. + try: + if os.path.exists(tmp_path): + os.remove(tmp_path) + except Exception: + pass + raise + self.rich_console.print( + f"[green]✅ Saved local image tar:[/green] {tar_path}" + ) + + def _build_or_pull_local_image( + self, run_image: str, build_info: typing.Dict, model_info: typing.Dict + ) -> None: + """Ensure the local image exists by building it first and pulling as fallback.""" + self.rich_console.print( + f"[yellow]⚠️ Image {run_image} not found on this node.[/yellow]" + ) + try: + self._build_local_image_from_manifest( + run_image=run_image, + build_info=build_info, + model_info=model_info, + ) + except Exception as build_error: + self.rich_console.print( + "[yellow]⚠️ Local build failed, attempting pull as fallback...[/yellow]" + ) + try: + self.pull_image(run_image) + except Exception as pull_error: + raise RuntimeError( + f"Failed to build or pull local image {run_image}: " + f"build_error={build_error}; pull_error={pull_error}" + ) + + def _ensure_local_image_available( + self, run_image: str, build_info: typing.Dict, model_info: typing.Dict + ) -> None: + """Prepare a local image, optionally using a shared tar cache. + + Multi-node invariant: + Every rank reaches the same barrier (``_sync_after_local_image_ready``) + exactly once, regardless of whether the tar cache existed at start. + This avoids deadlocks when nodes disagree on tar visibility (e.g. + NFS lag making ``os.path.exists(tar_path)`` flap across ranks). + + Behavior by node and cache state: + + * Primary node (rank 0): ensures the image (and, if ``MAD_DOCKER_BUILDS`` + is configured, the tar) is present before crossing the barrier. + * Worker nodes (rank > 0): wait at the barrier; once it releases, + load the tar (when configured) or build/pull independently. + + Callers must therefore NOT call :meth:`_sync_after_local_image_ready` + again after this method returns — the synchronization is already done. + """ + tar_path = self._get_local_image_tar_path(run_image) + node_rank = self._get_node_rank() + is_primary_node = node_rank == 0 + image_exists = self._local_image_exists(run_image) + tar_exists = bool(tar_path) and os.path.exists(tar_path) + + # Phase 1: primary makes the image (and tar, if cache configured) + # available before any worker tries to consume it. Workers skip this + # phase and wait at the barrier below. + if is_primary_node: + if not image_exists: + if tar_path and tar_exists: + self._load_local_image_from_tar(run_image, tar_path) + image_exists = True + else: + self._build_or_pull_local_image( + run_image=run_image, + build_info=build_info, + model_info=model_info, + ) + image_exists = True + if tar_path and not tar_exists: + self._save_local_image_to_tar(run_image, tar_path) + tar_exists = True + + # Phase 2: barrier. _sync_after_local_image_ready is a no-op for + # single-node runs, so this is unconditional. + self._sync_after_local_image_ready(run_image=run_image) + + # Phase 3: workers consume what the primary produced. + if not is_primary_node and not image_exists: + if tar_path: + if not os.path.exists(tar_path): + raise RuntimeError( + f"Node 0 did not produce image tar for {run_image}: {tar_path}" + ) + self._load_local_image_from_tar(run_image, tar_path) + image_exists = True + else: + # No shared tar cache configured: each node builds/pulls on its own. + self._build_or_pull_local_image( + run_image=run_image, + build_info=build_info, + model_info=model_info, + ) + image_exists = True + + def _build_local_image_from_manifest( + self, run_image: str, build_info: typing.Dict, model_info: typing.Dict + ) -> None: + """Build ``run_image`` on the current compute node using its manifest dockerfile. + + Used by ``run --manifest-file`` in distributed mode when the local image + is not present on a compute node and pulling is not desired or possible. + """ + dockerfile = build_info.get("dockerfile", "") + if not dockerfile or dockerfile == "N/A (local image mode)": + raise RuntimeError( + f"Cannot build image {run_image}: dockerfile is missing in manifest" + ) + + if not os.path.exists(dockerfile): + raise RuntimeError( + f"Cannot build image {run_image}: dockerfile not found at '{dockerfile}'" + ) + + docker_context = model_info.get("dockercontext", "") or "./docker" + if not os.path.exists(docker_context): + # Fallback to dockerfile directory if the provided context path is unavailable + # on the compute node (e.g. workspace layout differs from submission node). + docker_context = os.path.dirname(dockerfile) or "." + + build_args = self._get_build_args() + # Console.sh runs with shell=True; quote all path / image components + # so manifest values containing spaces or shell metacharacters cannot + # break the build command or inject additional tokens. + build_command = ( + f"docker build --network=host -t {shlex.quote(run_image)} --pull " + f"-f {shlex.quote(dockerfile)} {build_args}{shlex.quote(docker_context)}" + ) + + self.rich_console.print( + f"[yellow]🔨 Building missing local image on this node:[/yellow] {run_image}" + ) + self.rich_console.print(f"[dim] Dockerfile: {dockerfile}[/dim]") + self.rich_console.print(f"[dim] Context: {docker_context}[/dim]") + self.console.sh(build_command, timeout=None) + self.console.sh( + f"docker image inspect {shlex.quote(run_image)} > /dev/null 2>&1" + ) + self.rich_console.print( + f"[green]✅ Built local image on this node:[/green] {run_image}" + ) + + def _sync_after_local_image_ready( + self, run_image: str, timeout_s: int = 1800 + ) -> None: + """Barrier for multi-node local-image runs so all nodes continue together. + + Relies on a TCP rendezvous between ``NODE_RANK=0`` and worker nodes so + that no shared filesystem visibility is required. No-op for single-node + runs (``NNODES<=1``). + """ + nnodes_raw = os.environ.get("NNODES") or os.environ.get("WORLD_SIZE") or "1" + node_rank = os.environ.get("NODE_RANK") or os.environ.get("RANK") or "0" + # Mirror the strict policy applied to NODE_RANK / RANK: a malformed + # NNODES / WORLD_SIZE silently falling back to 1 would skip the + # multi-node barrier entirely and let nodes diverge (worker + # nodes proceeding before primary finished saving the tar). + try: + nnodes = int(nnodes_raw) + except (TypeError, ValueError) as e: + raise RuntimeError( + f"Invalid NNODES/WORLD_SIZE env value {nnodes_raw!r}: {e}" + ) + if nnodes <= 1: + return + + self._tcp_image_ready_barrier( + nnodes=nnodes, + node_rank=node_rank, + timeout_s=timeout_s, + ) + + @staticmethod + def _recv_line(sock: socket.socket, max_len: int = 128) -> str: + """Read one newline-terminated line from ``sock``. + + ``socket.recv`` is allowed to return a partial read (TCP is a byte + stream, not a datagram channel). Using it directly to parse a + protocol line like ``READY \\n`` can therefore reject + valid peers when the line is split across two reads, manifesting + as flaky multi-node barrier timeouts. + + This helper loops on ``recv`` until it sees ``\\n`` or until + ``max_len`` bytes have been buffered, honoring the socket's + existing timeout. The trailing ``\\n`` and surrounding whitespace + are stripped from the returned string. An empty string is + returned on EOF before any data. + """ + buf = bytearray() + while len(buf) < max_len: + chunk = sock.recv(max_len - len(buf)) + if not chunk: + break + buf.extend(chunk) + nl = buf.find(b"\n") + if nl != -1: + buf = buf[:nl] + break + return buf.decode("utf-8", errors="ignore").strip() + + def _tcp_image_ready_barrier( + self, nnodes: int, node_rank: str, timeout_s: int + ) -> None: + """TCP rendezvous barrier that does not require shared filesystem visibility. + + Node 0 listens on one of ``candidate_ports`` derived from ``MASTER_PORT`` + and ``SLURM_JOB_ID``; workers send ``READY `` and wait for + ``GO ``. The port range and token defend against multiple + concurrent jobs reusing the same master host. + + Security hardening: + + * Token defaults to ``JOB`` (predictable). Callers running + on shared clusters can set ``MAD_BARRIER_TOKEN`` to an opaque secret + (e.g. generated once in the SLURM submit script and exported to all + ranks) so other processes on the master node cannot spoof + ``READY``/``GO`` messages. + * The listener tries to bind to ``MASTER_ADDR``'s resolved IP first and + only falls back to ``0.0.0.0`` if that binding fails. This limits + accidental exposure on multi-homed master hosts while keeping the + single-node / test path functional. + """ + master_addr = os.environ.get("MASTER_ADDR", "127.0.0.1") + job_id_raw = os.environ.get("SLURM_JOB_ID", "0") + try: + job_id = int(job_id_raw) + except Exception: + job_id = 0 + # Allow an explicit secret token to defeat spoofing on shared clusters. + # Fall back to JOB for backward compatibility with existing SLURM + # submit scripts that do not export MAD_BARRIER_TOKEN. + token = (os.environ.get("MAD_BARRIER_TOKEN") or "").strip() or f"JOB{job_id}" + master_port_raw = os.environ.get("MASTER_PORT", "29500") + try: + master_port = int(master_port_raw) + except Exception: + master_port = 29500 + base_port = 43000 + ((master_port + job_id) % 1000) + candidate_ports = [base_port + i for i in range(0, 16)] + deadline = time.time() + timeout_s + # NODE_RANK / RANK come from the SLURM environment. A malformed value + # would otherwise raise an opaque ValueError mid-barrier; fall back to + # a clear RuntimeError instead of silently treating the worker as + # rank 0, which would make it bind a listener and deadlock peers. + try: + rank_int = int(node_rank) + except (TypeError, ValueError) as e: + raise RuntimeError( + f"TCP barrier: invalid NODE_RANK/RANK value {node_rank!r}: {e}" + ) + + if rank_int == 0: + accepted = 0 + # ``peers`` is keyed by ``worker_rank`` so a reconnecting worker + # (e.g. one that hit its connect timeout before the master called + # ``accept()`` and looped back) replaces its previous entry + # instead of duplicating it. This keeps the diagnostic log + # honest about how many distinct peers actually joined. + peers: typing.Dict[int, str] = {} + waiting: typing.Dict[int, socket.socket] = {} + server = None + port = None + # Prefer binding to MASTER_ADDR (workers connect there anyway) so + # the listener is not gratuitously exposed on other interfaces of + # the master host. Fall back to 0.0.0.0 if MASTER_ADDR cannot be + # bound on this node (e.g. it is a hostname resolved elsewhere). + try: + master_ip = socket.gethostbyname(master_addr) + except Exception: + master_ip = "" + bind_hosts: typing.List[str] = [] + if master_ip and master_ip not in ("0.0.0.0", "::"): + bind_hosts.append(master_ip) + if "0.0.0.0" not in bind_hosts: + bind_hosts.append("0.0.0.0") + try: + bind_errors = [] + for bind_host in bind_hosts: + for candidate in candidate_ports: + trial = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + trial.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + trial.bind((bind_host, candidate)) + server = trial + port = candidate + break + except Exception as e: + bind_errors.append( + {"host": bind_host, "port": candidate, "error": str(e)} + ) + try: + trial.close() + except Exception: + pass + if server is not None: + break + if server is None or port is None: + raise RuntimeError( + f"TCP barrier bind failed on all candidate ports: {bind_errors}" + ) + server.listen(max(1, nnodes - 1)) + server.settimeout(2.0) + while accepted < max(0, nnodes - 1) and time.time() < deadline: + try: + conn, addr = server.accept() + conn.settimeout(2.0) + # Read until newline; ``recv`` alone can return a + # partial line and reject otherwise valid peers. + payload = self._recv_line(conn) + parts = payload.split() + if len(parts) != 3 or parts[0] != "READY" or parts[1] != token: + conn.close() + continue + try: + worker_rank = int(parts[2]) + except Exception: + conn.close() + continue + if worker_rank <= 0 or worker_rank >= nnodes: + conn.close() + continue + if worker_rank in waiting: + try: + waiting[worker_rank].close() + except Exception: + pass + waiting[worker_rank] = conn + peers[worker_rank] = f"{addr[0]}:r{worker_rank}" + accepted = len(waiting) + except socket.timeout: + continue + if accepted < max(0, nnodes - 1): + # Close any worker sockets that did manage to connect so + # they get an EOF instead of a hanging half-open + # connection on top of the timeout. + for conn in waiting.values(): + try: + conn.close() + except Exception: + pass + raise RuntimeError( + f"TCP barrier timeout on master: accepted={accepted}/" + f"{max(0, nnodes - 1)} port={port}" + ) + for worker_rank, conn in waiting.items(): + try: + conn.sendall(f"GO {token} {worker_rank}\n".encode("utf-8")) + finally: + try: + conn.close() + except Exception: + pass + # Diagnostic: log which peers joined so multi-node deadlocks / + # missing-rank issues can be debugged from the master log alone. + # Iterate by sorted rank so the log line is deterministic and + # easy to diff between runs. + if peers: + pretty = ", ".join(peers[r] for r in sorted(peers)) + self.rich_console.print( + f"[dim]TCP barrier master: released {accepted} peer(s): " + f"{pretty} (port={port})[/dim]" + ) + return + finally: + try: + if server is not None: + server.close() + except Exception: + pass + + # Use the normalized integer rank (``rank_int``) when talking to the + # master so leading-zero values like ``NODE_RANK="01"`` still match + # the master's ACK, which is built from ``int(parts[2])``. + expected_ack = f"GO {token} {rank_int}" + last_error = "" + connect_attempts = 0 + while time.time() < deadline: + for candidate in candidate_ports: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + connect_attempts += 1 + try: + sock.settimeout(1.5) + sock.connect((master_addr, candidate)) + sock.sendall(f"READY {token} {rank_int}\n".encode("utf-8")) + remaining_s = max(1.0, deadline - time.time()) + sock.settimeout(remaining_s) + # Same partial-read concern as the master path: read the + # ``GO \n`` line by newline, not by length. + ack = self._recv_line(sock) + if ack == expected_ack: + return + last_error = f"unexpected_ack={ack!r} port={candidate}" + except Exception as e: + last_error = f"{e} port={candidate}" + finally: + try: + sock.close() + except Exception: + pass + time.sleep(1) + + raise RuntimeError( + f"TCP barrier timeout on worker rank={rank_int} master={master_addr} " + f"ports={candidate_ports} attempts={connect_attempts} last_error={last_error}" + ) + + def _extract_additional_mount_targets(self, additional_opts: str) -> typing.Set[str]: + """Extract container-side mount targets from free-form docker run options. + + Parses ``-v`` / ``--volume`` tokens from ``additional_docker_run_options`` + and returns the set of container paths already being mounted, so that + :meth:`get_mount_arg` can skip duplicates that would otherwise cause + docker to reject the run ("Duplicate mount point"). + """ + targets: typing.Set[str] = set() + if not additional_opts: + return targets + try: + tokens = shlex.split(additional_opts) + except Exception: + return targets + + i = 0 + while i < len(tokens): + token = tokens[i] + if token in ("-v", "--volume") and i + 1 < len(tokens): + spec = tokens[i + 1] + i += 2 + elif token.startswith("-v") and len(token) > 2: + spec = token[2:] + i += 1 + elif token.startswith("--volume="): + spec = token.split("=", 1)[1] + i += 1 + else: + i += 1 + continue + + # spec format: /host:/container[:mode] + parts = spec.split(":") + if len(parts) >= 2: + targets.add(parts[1]) + return targets + + def get_mount_arg( + self, + mount_datapaths: typing.List, + excluded_container_targets: typing.Optional[typing.Set[str]] = None, + ) -> str: """Get the mount arguments for docker run.""" mount_args = "" + excluded_container_targets = excluded_container_targets or set() # Mount data paths if mount_datapaths: @@ -701,6 +1272,10 @@ def get_mount_arg(self, mount_datapaths: typing.List) -> str: # Mount context paths if "docker_mounts" in self.context.ctx: for mount_arg in self.context.ctx["docker_mounts"].keys(): + # Avoid duplicate mount points when additional_docker_run_options + # already mounts the same container target. + if mount_arg in excluded_container_targets: + continue mount_args += ( f"-v {self.context.ctx['docker_mounts'][mount_arg]}:{mount_arg} " ) @@ -976,6 +1551,12 @@ def run_container( 'PRIMUS_CONFIG_PATH', 'PRIMUS_CLI_EXTRA', # Rendezvous timeout so all nodes can join after pull 'TORCH_ELASTIC_RDZV_TIMEOUT', + # Workload-level settings commonly provided via deployment_config.env_vars + # (required for disaggregated launchers like vLLM / SGLang disagg) + 'MODEL_NAME', 'MODEL_DIR', 'xP', 'yD', 'PD_SYNC_ROOT', 'PD_RUN_ID', + 'PROXY_TYPE', 'ROUTER_PORT', 'BENCHMARK_PORT', 'SLURM_JOB_ID', + 'OUTPUT_DIR', 'BARRIER_TIMEOUT_S', 'PROXY_CLOSE_TIMEOUT_S', + 'REQUIRE_RDMA', 'KV_UCX_TLS', 'KV_UCX_SOCKADDR_TLS_PRIORITY', # GPU visibility variables for Ray-based launchers (vLLM, SGLang) # CRITICAL: These must be passed to Docker for proper GPU device mapping 'HIP_VISIBLE_DEVICES', 'ROCR_VISIBLE_DEVICES', 'CUDA_VISIBLE_DEVICES' @@ -1080,9 +1661,14 @@ def run_container( self.context.ctx["docker_env_vars"]["MIOPEN_USER_DB_PATH"] = os.environ["MIOPEN_USER_DB_PATH"] print(f"ℹ️ Added MIOPEN_USER_DB_PATH to docker_env_vars: {os.environ['MIOPEN_USER_DB_PATH']}") + additional_docker_run_options = model_info.get("additional_docker_run_options", "") + additional_mount_targets = self._extract_additional_mount_targets(additional_docker_run_options) docker_options += self.get_env_arg(run_env) - docker_options += self.get_mount_arg(mount_datapaths) - docker_options += f" {model_info.get('additional_docker_run_options', '')}" + docker_options += self.get_mount_arg( + mount_datapaths, + excluded_container_targets=additional_mount_targets, + ) + docker_options += f" {additional_docker_run_options}" # Generate container name base_container_name = "container_" + re.sub( @@ -1287,10 +1873,62 @@ def run_container( ) # Use the container timeout (default 7200s) for script execution # to prevent indefinite hangs - model_output = model_docker.sh( - f"cd {model_dir} && {script_name} {model_args}", - timeout=timeout, - ) + try: + model_output = model_docker.sh( + f"cd {model_dir} && {script_name} {model_args}", + timeout=timeout, + ) + except RuntimeError as run_err: + # On script failure, collect lightweight diagnostics from the + # running container (process table, listening ports, log tails). + # These are printed via Console.sh so they land in the run log + # alongside the failure. Failures here are non-fatal. + run_err_str = str(run_err) + container_id_match = re.search( + r"docker exec\s+([a-f0-9]+)\s+bash", + run_err_str, + ) + failed_container_id = ( + container_id_match.group(1) + if container_id_match + else None + ) + if failed_container_id: + try: + self.console.sh( + f"docker exec {failed_container_id} bash -lc " + f"\"ps -eo pid,ppid,stat,etime,cmd | sed -n '1,160p'\"", + timeout=20, + ) + except Exception: + pass + try: + self.console.sh( + f"docker exec {failed_container_id} bash -lc " + f"\"(ss -lntp 2>/dev/null || netstat -lntp 2>/dev/null " + f"|| lsof -nP -iTCP -sTCP:LISTEN 2>/dev/null || true) " + f"| sed -n '1,200p'\"", + timeout=20, + ) + except Exception: + pass + try: + _md_q = _bash_quote_path(model_dir) + self.console.sh( + f"docker exec {failed_container_id} bash -lc " + f"\"for d in /run_logs /run_logs/${{SLURM_JOB_ID:-}} " + f"/myworkspace/{_md_q}; do " + f"if [ -d \\\"$d\\\" ]; then echo ===DIR:$d===; " + f"ls -lah \\\"$d\\\" | sed -n '1,80p'; fi; done; " + f"for f in /run_logs/*.log /run_logs/${{SLURM_JOB_ID:-}}/*.log " + f"/myworkspace/{_md_q}/*.log; do " + f"if [ -f \\\"$f\\\" ]; then echo ===$f===; " + f"tail -n 80 \\\"$f\\\"; fi; done\"", + timeout=30, + ) + except Exception: + pass + raise # When live_output is True, Console.sh() already streamed the output; avoid duplicate print. if not self.live_output: print(model_output) @@ -1363,8 +2001,27 @@ def run_container( break if not has_valid_perf: - run_results["performance"] = None - print("Error: Performance metric is empty in all rows of multiple results file.") + nnodes_env = os.environ.get("NNODES", "1") + try: + nnodes = int(nnodes_env) + except (TypeError, ValueError): + nnodes = 1 + + if nnodes > 1: + # In multi-node runs the performance CSV on this + # node may legitimately lack values (metrics are + # only populated on the rank that parses the final + # output). Keep the path so downstream per-node + # aggregation on the login node can pick the best + # candidate across all nodes. + print( + "Warning: Performance metric is currently empty in " + "multiple results file during multi-node run; " + "deferring final decision to aggregation step." + ) + else: + run_results["performance"] = None + print("Error: Performance metric is empty in all rows of multiple results file.") except Exception as e: self.rich_console.print( f"[yellow]Warning: Could not validate multiple results file: {e}[/yellow]" @@ -1447,6 +2104,10 @@ def run_container( "RpcError: Running out of retries to initialize the metrics agent", "Metrics will not be exported", "FutureWarning", + # rocEnvTool pre-script can timeout rocm-smi without affecting run correctness. + "RuntimeError: Console script timeout", + "rocEnvTool/console.py", + "rocEnvTool/rocenv_tool.py", "Opened result file:", "SQLite3 generation ::", "rocpd_op:", @@ -1683,6 +2344,30 @@ def run_container( model_docker.sh( _cp_model_dir_file_to_cwd_cmd(model_dir, "library_trace.csv") ) + # Additionally stage workload-level perf/benchmark artifacts that + # some frameworks (e.g. vLLM/SGLang disagg) write under workdir or + # /run_logs, so SLURM per-node result collection can find them. + _md_q = _bash_quote_path(_md) + _cwd_q = _bash_quote_path(".") + model_docker.sh(f"cp -- {_md_q}/perf_*.csv {_cwd_q} 2>/dev/null || true") + model_docker.sh(f"cp -- {_md_q}/perf-*.csv {_cwd_q} 2>/dev/null || true") + model_docker.sh( + f"cp -- {_md_q}/benchmark_*_CONCURRENCY.log {_cwd_q} 2>/dev/null || true" + ) + model_docker.sh(f"cp -- {_md_q}/workdir/perf_*.csv {_cwd_q} 2>/dev/null || true") + model_docker.sh(f"cp -- {_md_q}/workdir/perf-*.csv {_cwd_q} 2>/dev/null || true") + model_docker.sh( + f"cp -- {_md_q}/workdir/benchmark_*_CONCURRENCY.log {_cwd_q} 2>/dev/null || true" + ) + slurm_job_id = os.environ.get("SLURM_JOB_ID") + if slurm_job_id: + job_dir = shlex.quote(slurm_job_id) + else: + job_dir = "*" + model_docker.sh( + f"cp -- /run_logs/{job_dir}/benchmark_*_CONCURRENCY.log " + f"{_cwd_q} 2>/dev/null || true" + ) except Exception as e: # Ignore errors if no profiler/trace output files exist pass @@ -1849,16 +2534,16 @@ def run_models_from_manifest( # Local image mode (MAD_CONTAINER_IMAGE): Use the provided image directly run_image = build_info.get("docker_image") self.rich_console.print(f"[yellow]🏠 Using local image: {run_image}[/yellow]") - - # Verify image exists - try: - self.console.sh(f"docker image inspect {run_image} > /dev/null 2>&1") - except (subprocess.CalledProcessError, RuntimeError) as e: - self.rich_console.print(f"[yellow]⚠️ Image {run_image} not found, attempting to pull...[/yellow]") - try: - self.pull_image(run_image) - except Exception as e: - raise RuntimeError(f"Failed to find or pull local image {run_image}: {e}") + + # _ensure_local_image_available() guarantees that every + # rank crosses the multi-node barrier exactly once, so no + # additional _sync_after_local_image_ready() call is needed + # here. + self._ensure_local_image_available( + run_image=run_image, + build_info=build_info, + model_info=model_info, + ) elif build_info.get("registry_image"): # Registry image: Pull from registry diff --git a/src/madengine/execution/docker_builder.py b/src/madengine/execution/docker_builder.py index 56f33d6d..f5d4bcff 100644 --- a/src/madengine/execution/docker_builder.py +++ b/src/madengine/execution/docker_builder.py @@ -70,6 +70,12 @@ def get_context_path(self, info: typing.Dict) -> str: def get_build_arg(self, run_build_arg: typing.Optional[typing.Dict] = None) -> str: """Get the build arguments. + Each ``KEY=VALUE`` pair is wrapped with :func:`shlex.quote` because + the resulting string is interpolated into a ``docker build`` command + executed via ``Console.sh`` (``shell=True``). Manual single-quote + wrapping is unsafe when values contain quotes, whitespace, ``$`` or + other shell metacharacters. + Args: run_build_arg: The run build arguments. diff --git a/src/madengine/orchestration/run_orchestrator.py b/src/madengine/orchestration/run_orchestrator.py index 6742b2a5..863bc05f 100644 --- a/src/madengine/orchestration/run_orchestrator.py +++ b/src/madengine/orchestration/run_orchestrator.py @@ -563,6 +563,27 @@ def _execute_local(self, manifest_file: str, timeout: int) -> Dict: # Restore context from manifest if present if "context" in manifest: manifest_context = manifest["context"] + # Restore host-level runtime context fields from manifest. + # Keep runtime-detected values as priority; bring missing keys from manifest + # (especially docker_mounts for host path visibility on compute nodes). + if "docker_mounts" in manifest_context: + if "docker_mounts" not in self.context.ctx: + self.context.ctx["docker_mounts"] = {} + for container_path, host_path in manifest_context["docker_mounts"].items(): + if container_path not in self.context.ctx["docker_mounts"]: + self.context.ctx["docker_mounts"][container_path] = host_path + if "docker_build_arg" in manifest_context: + if "docker_build_arg" not in self.context.ctx: + self.context.ctx["docker_build_arg"] = {} + for key, value in manifest_context["docker_build_arg"].items(): + if key not in self.context.ctx["docker_build_arg"]: + self.context.ctx["docker_build_arg"][key] = value + if "docker_gpus" in manifest_context and "docker_gpus" not in self.context.ctx: + self.context.ctx["docker_gpus"] = manifest_context["docker_gpus"] + if "gpu_vendor" in manifest_context and "gpu_vendor" not in self.context.ctx: + self.context.ctx["gpu_vendor"] = manifest_context["gpu_vendor"] + if "guest_os" in manifest_context and "guest_os" not in self.context.ctx: + self.context.ctx["guest_os"] = manifest_context["guest_os"] if "tools" in manifest_context: self.context.ctx["tools"] = manifest_context["tools"] if "pre_scripts" in manifest_context: @@ -571,12 +592,16 @@ def _execute_local(self, manifest_file: str, timeout: int) -> Dict: self.context.ctx["post_scripts"] = manifest_context["post_scripts"] if "encapsulate_script" in manifest_context: self.context.ctx["encapsulate_script"] = manifest_context["encapsulate_script"] - # Restore docker_env_vars from build context (e.g. MAD_SECRET_HFTOKEN for Primus HF-backed configs) + # Restore docker_env_vars from build context (e.g. MAD_SECRETS_HFTOKEN for Primus HF-backed configs). + # Keep runtime-detected values as priority (consistent with docker_mounts / docker_build_arg): + # values already populated by Context (e.g. MAD_SECRETS_* read from os.environ) must not be + # overwritten by manifest entries that may still contain unexpanded "${VAR}" placeholders. if "docker_env_vars" in manifest_context and manifest_context["docker_env_vars"]: if "docker_env_vars" not in self.context.ctx: self.context.ctx["docker_env_vars"] = {} for k, v in manifest_context["docker_env_vars"].items(): - self.context.ctx["docker_env_vars"][k] = v + if k not in self.context.ctx["docker_env_vars"]: + self.context.ctx["docker_env_vars"][k] = v # Merge runtime additional_context (takes precedence over manifest) # This allows users to override tools/scripts at runtime diff --git a/tests/integration/test_container_execution.py b/tests/integration/test_container_execution.py index b0d11b3b..44661869 100644 --- a/tests/integration/test_container_execution.py +++ b/tests/integration/test_container_execution.py @@ -73,6 +73,106 @@ def test_load_build_manifest(self): assert "images" in result assert "model1" in result["images"] + @patch.dict(os.environ, {"MAD_DOCKER_BUILDS": "/shared/builds", "NODE_RANK": "0"}, clear=False) + @patch.object(ContainerRunner, "_sync_after_local_image_ready") + @patch.object(ContainerRunner, "_save_local_image_to_tar") + @patch.object(ContainerRunner, "_build_or_pull_local_image") + @patch.object(ContainerRunner, "_local_image_exists", return_value=True) + @patch("os.path.exists", return_value=False) + def test_ensure_local_image_available_saves_tar_on_primary_node( + self, + mock_exists, + mock_local_image_exists, + mock_build_or_pull, + mock_save_to_tar, + mock_sync, + ): + """Primary node should save a tar when image exists but cache file is missing.""" + runner = ContainerRunner() + + runner._ensure_local_image_available( + run_image="rocm/pyt_mlperf_training:full-tefix", + build_info={}, + model_info={}, + ) + + mock_build_or_pull.assert_not_called() + mock_save_to_tar.assert_called_once_with( + "rocm/pyt_mlperf_training:full-tefix", + "/shared/builds/rocm_pyt_mlperf_training_full-tefix.tar", + ) + assert mock_sync.call_count == 1 + + @patch.dict(os.environ, {"MAD_DOCKER_BUILDS": "/shared/builds", "NODE_RANK": "0"}, clear=False) + @patch.object(ContainerRunner, "_save_local_image_to_tar") + @patch.object(ContainerRunner, "_build_or_pull_local_image") + @patch.object(ContainerRunner, "_load_local_image_from_tar") + @patch.object(ContainerRunner, "_local_image_exists", return_value=False) + @patch("os.path.exists", return_value=True) + def test_ensure_local_image_available_loads_existing_tar( + self, + mock_exists, + mock_local_image_exists, + mock_load_from_tar, + mock_build_or_pull, + mock_save_to_tar, + ): + """Existing tar cache should be loaded instead of rebuilding.""" + runner = ContainerRunner() + + runner._ensure_local_image_available( + run_image="rocm/pyt_mlperf_training:full-tefix", + build_info={}, + model_info={}, + ) + + mock_load_from_tar.assert_called_once_with( + "rocm/pyt_mlperf_training:full-tefix", + "/shared/builds/rocm_pyt_mlperf_training_full-tefix.tar", + ) + mock_build_or_pull.assert_not_called() + mock_save_to_tar.assert_not_called() + + @patch.dict(os.environ, {"MAD_DOCKER_BUILDS": "/shared/builds", "NODE_RANK": "1"}, clear=False) + @patch.object(ContainerRunner, "_save_local_image_to_tar") + @patch.object(ContainerRunner, "_build_or_pull_local_image") + @patch.object(ContainerRunner, "_load_local_image_from_tar") + @patch.object(ContainerRunner, "_sync_after_local_image_ready") + @patch.object(ContainerRunner, "_local_image_exists", return_value=False) + @patch("os.path.exists", return_value=False) + def test_ensure_local_image_available_waits_for_primary_tar_on_worker( + self, + mock_exists, + mock_local_image_exists, + mock_sync, + mock_load_from_tar, + mock_build_or_pull, + mock_save_to_tar, + ): + """Worker nodes should wait for node 0 and then load the shared tar.""" + runner = ContainerRunner() + + def exists_side_effect(path): + if path == "/shared/builds/rocm_pyt_mlperf_training_full-tefix.tar": + return mock_sync.call_count > 0 + return False + + mock_exists.side_effect = exists_side_effect + + runner._ensure_local_image_available( + run_image="rocm/pyt_mlperf_training:full-tefix", + build_info={}, + model_info={}, + ) + + mock_sync.assert_called_once_with(run_image="rocm/pyt_mlperf_training:full-tefix") + mock_load_from_tar.assert_called_once_with( + "rocm/pyt_mlperf_training:full-tefix", + "/shared/builds/rocm_pyt_mlperf_training_full-tefix.tar", + ) + mock_build_or_pull.assert_not_called() + mock_save_to_tar.assert_not_called() + @patch.object(Console, "sh") def test_pull_image(self, mock_sh): """Test pulling image from registry."""