diff --git a/AGENTS.md b/AGENTS.md index 4d75f36..dd488d7 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -91,7 +91,8 @@ your response. read-only dependency unless coordinating with maintainers. - `examples/` — Runnable training recipes grouped by task type (`math/`, `code/`, `math-multi-agent/`, `code-multi-agent/`, - `alfworld/`, `webshop/`, `search/`, `math-efficient-data/`), plus + `terminal-bench/`, `alfworld/`, `webshop/`, `search/`, + `math-efficient-data/`), plus shared helpers in `_common/` and `launch_trainer.py`. Each recipe ships a `yaml/` directory of configs and a `scripts/` directory of numbered launch scripts. diff --git a/README.md b/README.md index 58b7c6f..a758a0a 100644 --- a/README.md +++ b/README.md @@ -64,6 +64,7 @@ AstraFlow currently supports the following recipes. Check the [documentation](ht | [`math-efficient-data/`](examples/math-efficient-data/) | Composable data algorithms — GRESO, dynamic sampling, buffer replay | | [`code/`](examples/code/) | Code-generation RL — Qwen3-8B, M2PO | | [`code-multi-agent/`](examples/code-multi-agent/) | Codegen + verifier competitive coding | +| [`terminal-bench/`](examples/terminal-bench/) | Harbor-backed Terminal-Bench eval and RL recipes | | [`search/`](examples/search/) | Search-augmented agent training with local retrieval | | [`alfworld/`](examples/alfworld/) | ALFWorld embodied household agent | | [`webshop/`](examples/webshop/) | WebShop web-navigation shopping agent | diff --git a/astraflow/core/workflow/__init__.py b/astraflow/core/workflow/__init__.py index 3c95304..73c14b3 100644 --- a/astraflow/core/workflow/__init__.py +++ b/astraflow/core/workflow/__init__.py @@ -21,6 +21,7 @@ import astraflow.core.workflow.impl.solve_and_check import astraflow.core.workflow.impl.sep_solve_and_check import astraflow.core.workflow.impl.solve_and_verify +import astraflow.core.workflow.impl.terminal_bench_harbor import astraflow.core.workflow.impl.actor_and_verify import astraflow.core.workflow.impl.rlvr import astraflow.core.workflow.impl.sm_lg_router diff --git a/astraflow/core/workflow/impl/terminal_bench_harbor.py b/astraflow/core/workflow/impl/terminal_bench_harbor.py new file mode 100644 index 0000000..aed7225 --- /dev/null +++ b/astraflow/core/workflow/impl/terminal_bench_harbor.py @@ -0,0 +1,622 @@ +"""Harbor-backed Terminal-Bench workflows.""" + +from __future__ import annotations + +import asyncio +import json +import os +import shlex +import uuid +from pathlib import Path +from typing import Any + +import torch + +from astraflow.core.workflow.api.cli_args import GenerationHyperparameters +from astraflow.core.workflow.api.engine_api import EngineGroup, InferenceEngine +from astraflow.core.workflow.api.workflow_api import RolloutWorkflow +from astraflow.core.workflow.registry import register_workflow +from astraflow.core.workflow.utils import logging, stats_tracker +from astraflow.core.workflow.utils.data import resolve_prompt_id, results_to_structured + +logger = logging.getLogger("TerminalBenchHarborWorkflow") + + +def _coerce_float(value: Any) -> float | None: + if isinstance(value, bool): + return 1.0 if value else 0.0 + if isinstance(value, int | float): + return float(value) + if isinstance(value, str): + try: + return float(value) + except ValueError: + return None + return None + + +def _extract_reward_from_result(result: Any) -> float | None: + if not isinstance(result, dict): + return None + + verifier_result = result.get("verifier_result") + if isinstance(verifier_result, dict): + rewards = verifier_result.get("rewards") + if isinstance(rewards, dict): + return _coerce_float(rewards.get("reward")) + return None + + +def _collect_harbor_rewards(job_root: Path) -> list[float]: + result_path, result = _load_harbor_trial_result(job_root) + reward = _extract_reward_from_result(result) + if reward is None: + raise RuntimeError( + f"Harbor trial result has no verifier reward: {result_path}" + ) + return [reward] + + +def _load_json_file(path: Path) -> Any | None: + try: + return json.loads(path.read_text()) + except (OSError, json.JSONDecodeError): + return None + + +def _load_harbor_trial_result(job_root: Path) -> tuple[Path, dict[str, Any]]: + timestamp_dirs = [path for path in job_root.iterdir() if path.is_dir()] + if len(timestamp_dirs) != 1: + raise RuntimeError( + f"Expected exactly one Harbor run directory under {job_root}, " + f"found {len(timestamp_dirs)}." + ) + + trial_result_paths = sorted(timestamp_dirs[0].glob("*/result.json")) + if len(trial_result_paths) != 1: + raise RuntimeError( + f"Expected exactly one Harbor trial result under {timestamp_dirs[0]}, " + f"found {len(trial_result_paths)}." + ) + + result_path = trial_result_paths[0] + result = _load_json_file(result_path) + if not isinstance(result, dict): + raise RuntimeError(f"Could not read Harbor trial result: {result_path}") + if not isinstance(result.get("agent_result"), dict) or not isinstance( + result.get("verifier_result"), dict + ): + raise RuntimeError( + f"Harbor trial result missing agent_result/verifier_result: {result_path}" + ) + return result_path, result + + +def _extract_rollout_details(result: dict[str, Any]) -> list[dict[str, Any]]: + agent_result = result.get("agent_result") + if isinstance(agent_result, dict): + rollout_details = agent_result.get("rollout_details") + if isinstance(rollout_details, list): + return [ + detail + for detail in rollout_details + if isinstance(detail, dict) + ] + return [] + + +def _as_turn_token_lists(value: Any, field_name: str) -> list[list[int]]: + if not isinstance(value, list): + raise ValueError(f"Harbor rollout detail missing list field {field_name!r}.") + turns: list[list[int]] = [] + for turn_idx, turn in enumerate(value): + if not isinstance(turn, list): + raise ValueError( + f"Harbor rollout detail field {field_name!r} turn {turn_idx} " + f"is {type(turn).__name__}, expected list." + ) + turns.append([int(token) for token in turn]) + return turns + + +def _as_turn_float_lists(value: Any, field_name: str) -> list[list[float]]: + if not isinstance(value, list): + raise ValueError(f"Harbor rollout detail missing list field {field_name!r}.") + turns: list[list[float]] = [] + for turn_idx, turn in enumerate(value): + if not isinstance(turn, list): + raise ValueError( + f"Harbor rollout detail field {field_name!r} turn {turn_idx} " + f"is {type(turn).__name__}, expected list." + ) + turns.append([float(logprob) for logprob in turn]) + return turns + + +def _prompt_suffix_for_accumulated_sequence( + prompt_token_ids: list[int], + accumulated_sequence: list[int], +) -> list[int]: + if not accumulated_sequence: + return prompt_token_ids + if prompt_token_ids[: len(accumulated_sequence)] == accumulated_sequence: + return prompt_token_ids[len(accumulated_sequence) :] + return prompt_token_ids + + +def _harbor_result_to_training_sequence( + result: dict[str, Any], + reward: float, + version: int, + rollout_detail_index: int = 0, +) -> dict[str, torch.Tensor]: + rollout_details = _extract_rollout_details(result) + if not rollout_details: + raise ValueError( + "Harbor result has no rollout_details. Set Terminus-2 " + "agent_kwarg collect_rollout_details=true for RL training." + ) + if rollout_detail_index >= len(rollout_details): + raise ValueError( + f"Harbor result has {len(rollout_details)} rollout detail(s), " + f"but rollout_detail_index={rollout_detail_index}." + ) + + detail = rollout_details[rollout_detail_index] + prompt_turns = _as_turn_token_lists( + detail.get("prompt_token_ids"), "prompt_token_ids" + ) + completion_turns = _as_turn_token_lists( + detail.get("completion_token_ids"), "completion_token_ids" + ) + logprob_turns = _as_turn_float_lists(detail.get("logprobs"), "logprobs") + + n_turns = len(completion_turns) + if len(prompt_turns) != n_turns or len(logprob_turns) != n_turns: + raise ValueError( + "Harbor rollout detail has inconsistent turn counts: " + f"prompt={len(prompt_turns)}, completion={len(completion_turns)}, " + f"logprobs={len(logprob_turns)}." + ) + + seq: list[int] = [] + logprobs: list[float] = [] + loss_mask: list[int] = [] + versions: list[int] = [] + n_completion_tokens = 0 + + for turn_idx, (prompt_tokens, completion_tokens, turn_logprobs) in enumerate( + zip(prompt_turns, completion_turns, logprob_turns) + ): + if len(completion_tokens) != len(turn_logprobs): + raise ValueError( + f"Harbor rollout detail turn {turn_idx} has " + f"{len(completion_tokens)} completion token(s) but " + f"{len(turn_logprobs)} logprob(s)." + ) + prompt_delta = _prompt_suffix_for_accumulated_sequence(prompt_tokens, seq) + seq += prompt_delta + completion_tokens + logprobs += [0.0] * len(prompt_delta) + turn_logprobs + loss_mask += [0] * len(prompt_delta) + [1] * len(completion_tokens) + versions += [-1] * len(prompt_delta) + [version] * len(completion_tokens) + n_completion_tokens += len(completion_tokens) + + if not seq: + raise ValueError("Harbor rollout detail produced an empty token sequence.") + if n_completion_tokens == 0: + raise ValueError("Harbor rollout detail has no trainable completion tokens.") + + res = { + "input_ids": torch.tensor(seq, dtype=torch.int32), + "loss_mask": torch.tensor(loss_mask, dtype=torch.int32), + "logprobs": torch.tensor(logprobs, dtype=torch.float32), + "versions": torch.tensor(versions, dtype=torch.int32), + "attention_mask": torch.ones(len(seq), dtype=torch.bool), + "rewards": torch.tensor(float(reward), dtype=torch.float32), + } + return {key: value.unsqueeze(0) for key, value in res.items()} + + +def _format_agent_kwarg_value(value: Any) -> str: + if isinstance(value, bool): + return "true" if value else "false" + return str(value) + + +def _tail(text: str, limit: int = 4000) -> str: + return text[-limit:] if len(text) > limit else text + + +def _engine_addresses(engine: InferenceEngine) -> list[str]: + engines: list[Any] + if isinstance(engine, EngineGroup): + engines = [engine[key] for key in engine.keys()] + else: + engines = [engine] + + addresses: list[str] = [] + seen: set[str] = set() + for one_engine in engines: + candidates = [ + one_engine, + getattr(one_engine, "_engine", None), + getattr(getattr(one_engine, "default", None), "_engine", None), + ] + for candidate in candidates: + for address in getattr(candidate, "addresses", None) or []: + address = str(address) + if address not in seen: + seen.add(address) + addresses.append(address) + return addresses + + +def _append_api_base_suffix(api_base: str, suffix: str) -> str: + base = ( + api_base + if api_base.startswith(("http://", "https://")) + else f"http://{api_base}" + ) + return f"{base.rstrip('/')}{suffix}" if suffix else base.rstrip("/") + + +def _as_api_base_list(api_base: str | list[str] | tuple[str, ...] | None) -> list[str]: + if api_base is None: + return [] + if isinstance(api_base, str): + values = [api_base] + else: + values = list(api_base) + return [str(value).strip() for value in values if str(value).strip()] + + +@register_workflow("terminal_bench_harbor") +class TerminalBenchHarborWorkflow(RolloutWorkflow): + """Run Terminal-Bench through Harbor and return AstraFlow eval rewards.""" + + def __init__( + self, + gconfig: GenerationHyperparameters, + tokenizer, + dataset: str = "terminal-bench@2.0", + dataset_path: str | None = None, + harbor_binary: str = "harbor", + harbor_command: list[str] | None = None, + agent_name: str = "terminus-2", + model_name: str = "openai/local-model", + api_base: str | list[str] | tuple[str, ...] | None = None, + api_base_suffix: str = "/v1", + api_key: str = "EMPTY", + api_key_env: str = "OPENAI_API_KEY", + environment: str | None = None, + jobs_dir: str | None = None, + timeout: float = 7200.0, + rollout_stat_scope: str = "eval-rollout", + n_concurrent_trials: int = 1, + max_parallel_jobs: int = 1, + task_name_arg: str = "--include-task-name", + auto_confirm: bool = True, + agent_kwargs: dict[str, Any] | None = None, + agent_env: dict[str, str] | None = None, + extra_args: list[str] | None = None, + dump_dir: str | None = None, + ): + del tokenizer + del dump_dir + self.gconfig = gconfig + self.dataset = dataset + self.dataset_path = dataset_path + self.harbor_binary = harbor_binary + self.harbor_command = list(harbor_command) if harbor_command else None + self.agent_name = agent_name + self.model_name = model_name + self.api_base = api_base + self.api_base_suffix = api_base_suffix + self.api_key = api_key + self.api_key_env = api_key_env + self.environment = environment + self.jobs_dir = jobs_dir + self.timeout = timeout + self.rollout_stat_scope = rollout_stat_scope + self.n_concurrent_trials = n_concurrent_trials + self.max_parallel_jobs = max(1, int(max_parallel_jobs)) + self._semaphore = asyncio.Semaphore(self.max_parallel_jobs) + self.task_name_arg = task_name_arg + self.auto_confirm = auto_confirm + self.agent_kwargs = dict(agent_kwargs or {}) + self.agent_env = dict(agent_env or {}) + self.extra_args = list(extra_args or []) + self._api_base_next_idx = 0 + + async def arun_episode( + self, + engine: InferenceEngine, + data: dict[str, Any], + ) -> dict[str, Any]: + # EvalManager already implements repeated sampling with the dataset + # `repeat`/`k` setting. Keep one Harbor subprocess per eval item so + # expensive Docker-backed jobs do not multiply unexpectedly. + configured_samples = int(getattr(self.gconfig, "n_samples", 1)) + if configured_samples != 1: + logger.warning( + "Ignoring gconfig.n_samples=%s for Harbor eval; use eval " + "dataset repeat/k for pass@k.", + configured_samples, + ) + results = [await self._run_one_harbor_trial(engine, data)] + rewards = torch.tensor([r["reward"] for r in results], dtype=torch.float32) + eval_correct = torch.tensor( + [1.0 if r["reward"] > 0.0 else 0.0 for r in results], + dtype=torch.float32, + ) + stats_tracker.get(self.rollout_stat_scope).scalar( + reward=float(rewards.mean().item()), + success=float(eval_correct.mean().item()), + ) + output: dict[str, Any] = { + "rewards": rewards, + "eval_correct": eval_correct, + "n_trajs": 1, + "harbor": results, + } + prompt_id = resolve_prompt_id(data) + if prompt_id is not None: + output["prompt_id"] = prompt_id + return output + + async def _run_one_harbor_trial( + self, + engine: InferenceEngine, + data: dict[str, Any], + ) -> dict[str, Any]: + async with self._semaphore: + return await self._run_one_harbor_trial_unlocked(engine, data) + + async def _run_one_harbor_trial_unlocked( + self, + engine: InferenceEngine, + data: dict[str, Any], + ) -> dict[str, Any]: + task_path = data.get("task_path") + if task_path is None and isinstance(data.get("prompt"), str): + prompt_path = Path(data["prompt"]).expanduser() + if (prompt_path / "instruction.md").is_file(): + task_path = str(prompt_path) + task_name = data.get("task_name") or data.get("task_id") + run_id = uuid.uuid4().hex + root = Path(self.jobs_dir or "./data-harbor-jobs") + index_value = data.get("index") or task_name + if index_value is None and task_path is not None: + index_value = Path(str(task_path)).expanduser().name + index = str(index_value or "task").replace("/", "_") + run_jobs_dir = root / f"{index}-{run_id}" + run_jobs_dir.mkdir(parents=True, exist_ok=True) + + cmd = self._build_command(engine, task_name, run_jobs_dir, task_path=task_path) + env = os.environ.copy() + if self.api_key_env and self.api_key_env not in env: + env[self.api_key_env] = self.api_key + env.update({str(k): str(v) for k, v in self.agent_env.items()}) + + command_text = shlex.join(cmd) + (run_jobs_dir / "harbor.command.txt").write_text(command_text + "\n") + logger.info("Running Harbor command: %s", command_text) + proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + env=env, + ) + try: + stdout, stderr = await asyncio.wait_for(proc.communicate(), self.timeout) + except asyncio.CancelledError: + proc.kill() + await proc.communicate() + raise + except asyncio.TimeoutError: + proc.kill() + await proc.communicate() + raise TimeoutError( + f"Harbor timed out after {self.timeout}s for task {task_name!r}" + ) + + stdout_text = stdout.decode(errors="replace") + stderr_text = stderr.decode(errors="replace") + (run_jobs_dir / "harbor.stdout.log").write_text(stdout_text) + (run_jobs_dir / "harbor.stderr.log").write_text(stderr_text) + if proc.returncode != 0: + logger.error( + "Harbor command failed for task %r with returncode=%s. " + "stdout/stderr logs are under %s. stderr tail:\n%s", + task_name, + proc.returncode, + run_jobs_dir, + _tail(stderr_text, 2000), + ) + raise RuntimeError( + "Harbor command failed " + f"(returncode={proc.returncode}, task={task_name!r}).\n" + f"logs: {run_jobs_dir}\n" + f"stdout:\n{_tail(stdout_text)}\n" + f"stderr:\n{_tail(stderr_text)}" + ) + + rewards = _collect_harbor_rewards(run_jobs_dir) + if not rewards: + logger.error( + "Could not find Harbor reward for task %r under %s. " + "stdout tail:\n%s\nstderr tail:\n%s", + task_name, + run_jobs_dir, + _tail(stdout_text, 1000), + _tail(stderr_text, 1000), + ) + raise RuntimeError( + f"Could not find Harbor reward under {run_jobs_dir}. " + f"stdout tail:\n{_tail(stdout_text, 2000)}\n" + f"stderr tail:\n{_tail(stderr_text, 2000)}" + ) + reward = float(sum(rewards) / len(rewards)) + return { + "task_name": task_name, + "task_path": str(task_path) if task_path is not None else None, + "reward": reward, + "num_harbor_rewards": len(rewards), + "jobs_dir": str(run_jobs_dir), + } + + def _build_command( + self, + engine: InferenceEngine, + task_name: Any, + run_jobs_dir: Path, + task_path: Any | None = None, + ) -> list[str]: + cmd = list(self.harbor_command or [self.harbor_binary]) + ["run"] + if task_path: + cmd.extend(["--path", str(Path(str(task_path)).expanduser())]) + elif self.dataset_path: + cmd.extend(["--path", self.dataset_path]) + else: + cmd.extend(["--dataset", self.dataset]) + cmd.extend(["--agent", self.agent_name, "--model", self.model_name]) + cmd.extend(["--n-concurrent", str(self.n_concurrent_trials)]) + cmd.extend(["--jobs-dir", str(run_jobs_dir)]) + if self.auto_confirm: + cmd.append("--yes") + + if task_name and not task_path: + cmd.extend([self.task_name_arg, str(task_name)]) + if self.environment: + cmd.extend(["--env", self.environment]) + + agent_kwargs = dict(self.agent_kwargs) + api_base = self._allocate_api_base(engine) + if not api_base: + raise RuntimeError( + "TerminalBenchHarborWorkflow could not find a model API base " + "URL from configured `api_base` or the RaaS eval engine." + ) + agent_kwargs.setdefault("api_base", api_base) + for key, value in agent_kwargs.items(): + formatted_value = _format_agent_kwarg_value(value) + cmd.extend(["--agent-kwarg", f"{key}={formatted_value}"]) + + cmd.extend(self.extra_args) + return cmd + + def _configured_api_bases(self) -> list[str]: + return _as_api_base_list(self.api_base) + + def _infer_api_base(self, engine: InferenceEngine) -> list[str]: + suffix = self.api_base_suffix or "" + return [ + _append_api_base_suffix(address, suffix) + for address in _engine_addresses(engine) + ] + + def _available_api_bases(self, engine: InferenceEngine) -> list[str]: + return self._configured_api_bases() or self._infer_api_base(engine) + + def _allocate_api_base(self, engine: InferenceEngine) -> str | None: + api_bases = self._available_api_bases(engine) + if not api_bases: + return None + api_base = api_bases[self._api_base_next_idx % len(api_bases)] + self._api_base_next_idx += 1 + return api_base + + +@register_workflow("terminal_bench_harbor_rl") +class TerminalBenchHarborRLWorkflow(TerminalBenchHarborWorkflow): + """Run Terminal-Bench through Harbor and return AstraFlow RL tensors.""" + + def __init__( + self, + *args, + rollout_detail_index: int = 0, + **kwargs, + ): + agent_kwargs = dict(kwargs.pop("agent_kwargs", {}) or {}) + agent_kwargs.setdefault("collect_rollout_details", True) + agent_kwargs.setdefault("enable_summarize", False) + kwargs["agent_kwargs"] = agent_kwargs + kwargs.setdefault("rollout_stat_scope", "rollout") + + # AstraFlow samples should be represented as separate trajectories so + # group-level reward normalization/filtering sees each attempt. + n_concurrent_trials = int(kwargs.get("n_concurrent_trials", 1)) + if n_concurrent_trials != 1: + logger.warning( + "terminal_bench_harbor_rl uses gconfig.n_samples for repeated " + "training trajectories; overriding n_concurrent_trials=%s to 1.", + n_concurrent_trials, + ) + kwargs["n_concurrent_trials"] = 1 + + super().__init__(*args, **kwargs) + self.rollout_detail_index = int(rollout_detail_index) + + async def arun_episode( + self, + engine: InferenceEngine, + data: dict[str, Any], + ) -> dict[str, Any]: + n_samples = max(1, int(getattr(self.gconfig, "n_samples", 1))) + sample_results = await asyncio.gather( + *[ + self._run_one_harbor_training_trial(engine, data) + for _ in range(n_samples) + ] + ) + rewards = [ + float(res["rewards"].flatten()[0].item()) + for res in sample_results + ] + if rewards: + successes = [1.0 if reward > 0.0 else 0.0 for reward in rewards] + stats_tracker.get(self.rollout_stat_scope).scalar( + reward=float(sum(rewards) / len(rewards)), + success=float(sum(successes) / len(successes)), + ) + return results_to_structured( + sample_results, + prompt_id=resolve_prompt_id(data), + ) + + async def _run_one_harbor_training_trial( + self, + engine: InferenceEngine, + data: dict[str, Any], + ) -> dict[str, torch.Tensor]: + harbor_summary = await self._run_one_harbor_trial(engine, data) + job_root = Path(harbor_summary["jobs_dir"]) + result_path, result = _load_harbor_trial_result(job_root) + if not _extract_rollout_details(result): + raise RuntimeError( + "Could not find Harbor rollout_details in trial result. " + "Check that agent_kwargs.collect_rollout_details=true and the " + "OpenAI-compatible backend returns token IDs/logprobs. " + f"Result: {result_path}" + ) + + reward = _extract_reward_from_result(result) + if reward is None: + reward = float(harbor_summary["reward"]) + try: + version = int(engine.get_version()) + except Exception: + logger.exception("Could not read engine version; using version=0.") + version = 0 + + try: + return _harbor_result_to_training_sequence( + result, + reward=float(reward), + version=version, + rollout_detail_index=self.rollout_detail_index, + ) + except ValueError as exc: + raise RuntimeError( + f"Could not convert Harbor result to RL tensors: {result_path}" + ) from exc diff --git a/astraflow/core/workflow/impl/tests/test_terminal_bench_harbor.py b/astraflow/core/workflow/impl/tests/test_terminal_bench_harbor.py new file mode 100644 index 0000000..03aff65 --- /dev/null +++ b/astraflow/core/workflow/impl/tests/test_terminal_bench_harbor.py @@ -0,0 +1,236 @@ +from __future__ import annotations + +import json + +import pytest + +from astraflow.dataflow.dataset.terminal_bench import get_harbor_task_path_dataset +from astraflow.core.workflow.impl.terminal_bench_harbor import ( + TerminalBenchHarborRLWorkflow, + TerminalBenchHarborWorkflow, + _collect_harbor_rewards, + _extract_reward_from_result, + _harbor_result_to_training_sequence, + _load_harbor_trial_result, +) + + +def test_extract_reward_from_harbor_trial_result(): + result = { + "verifier_result": { + "rewards": { + "reward": 1.0, + } + } + } + + assert _extract_reward_from_result(result) == pytest.approx(1.0) + + +def test_collect_harbor_rewards_reads_trial_result_json(tmp_path): + run_dir = tmp_path / "job" / "2026-05-15__17-17-54" + result_file = run_dir / "trial-a" / "result.json" + result_file.parent.mkdir(parents=True) + result_file.write_text( + json.dumps({"verifier_result": {"rewards": {"reward": 0.0}}}) + ) + + with pytest.raises(RuntimeError, match="missing agent_result/verifier_result"): + _collect_harbor_rewards(tmp_path / "job") + + result_file.write_text( + json.dumps( + { + "agent_result": {}, + "verifier_result": {"rewards": {"reward": 1.0}}, + } + ) + ) + + assert _collect_harbor_rewards(tmp_path / "job") == [pytest.approx(1.0)] + + +def test_load_harbor_trial_result_uses_single_trial_result(tmp_path): + aggregate = tmp_path / "job" / "2026-05-15__17-17-54" / "result.json" + aggregate.parent.mkdir(parents=True) + aggregate.write_text(json.dumps({"n_total_trials": 1, "stats": {}})) + + trial = aggregate.parent / "trial-a" / "result.json" + trial.parent.mkdir(parents=True) + trial.write_text( + json.dumps( + { + "agent_result": {"rollout_details": []}, + "verifier_result": {"rewards": {"reward": 1.0}}, + } + ) + ) + + assert _load_harbor_trial_result(tmp_path / "job") == ( + trial, + json.loads(trial.read_text()), + ) + + +def test_harbor_task_path_dataset_loads_skyrl_layout(tmp_path, monkeypatch): + root = tmp_path / "CodeContests" + task = root / "task-a" + task.mkdir(parents=True) + (task / "instruction.md").write_text("do task\n") + monkeypatch.setenv("HARBOR_DATA", str(root)) + + dataset = get_harbor_task_path_dataset( + path="$HARBOR_DATA", + dataset_name="test_harbor_tasks", + ) + + assert len(dataset) == 1 + assert dataset[0]["task_path"] == str(task) + assert dataset[0]["prompt"] == str(task) + assert dataset[0]["task_name"] == "task-a" + + +def test_harbor_result_to_training_sequence_uses_rollout_details(): + result = { + "agent_result": { + "rollout_details": [ + { + "prompt_token_ids": [[10, 11], [10, 11, 12, 20, 30]], + "completion_token_ids": [[12, 20], [31]], + "logprobs": [[-0.1, -0.2], [-0.3]], + } + ] + }, + "verifier_result": {"rewards": {"reward": 0.5}}, + } + + seq = _harbor_result_to_training_sequence(result, reward=0.5, version=7) + + assert seq["input_ids"].tolist() == [[10, 11, 12, 20, 30, 31]] + assert seq["loss_mask"].tolist() == [[0, 0, 1, 1, 0, 1]] + assert seq["logprobs"].tolist()[0] == pytest.approx( + [0.0, 0.0, -0.1, -0.2, 0.0, -0.3] + ) + assert seq["versions"].tolist() == [[-1, -1, 7, 7, -1, 7]] + assert seq["attention_mask"].tolist() == [[True, True, True, True, True, True]] + assert seq["rewards"].tolist() == [pytest.approx(0.5)] + + +def test_harbor_result_to_training_sequence_requires_rollout_details(): + with pytest.raises(ValueError, match="collect_rollout_details=true"): + _harbor_result_to_training_sequence( + {"agent_result": {"rollout_details": []}}, + reward=0.0, + version=1, + ) + + +def test_build_command_supports_conda_wrapped_harbor(tmp_path): + class DummyEngine: + addresses = ["127.0.0.1:12345"] + + workflow = TerminalBenchHarborWorkflow( + gconfig=object(), + tokenizer=None, + harbor_command=[ + "conda", + "run", + "--no-capture-output", + "-n", + "harbor-tb2", + "harbor", + ], + ) + + cmd = workflow._build_command(DummyEngine(), "build-pmars", tmp_path) + + assert cmd[:7] == [ + "conda", + "run", + "--no-capture-output", + "-n", + "harbor-tb2", + "harbor", + "run", + ] + assert "--include-task-name" in cmd + assert "--yes" in cmd + assert "build-pmars" in cmd + assert "api_base=http://127.0.0.1:12345/v1" in cmd + + +def test_build_command_supports_harbor_task_path(tmp_path): + class DummyEngine: + addresses = ["127.0.0.1:12345"] + + task_dir = tmp_path / "task-a" + task_dir.mkdir() + workflow = TerminalBenchHarborWorkflow( + gconfig=object(), + tokenizer=None, + ) + + cmd = workflow._build_command( + DummyEngine(), + "task-a", + tmp_path / "job", + task_path=task_dir, + ) + + assert "--path" in cmd + assert str(task_dir) in cmd + assert "--include-task-name" not in cmd + assert "task-a" not in cmd + + +def test_build_command_round_robins_inferred_api_bases(tmp_path): + class DummyEngine: + addresses = ["127.0.0.1:12345", "127.0.0.1:12346"] + + workflow = TerminalBenchHarborWorkflow( + gconfig=object(), + tokenizer=None, + ) + + cmd0 = workflow._build_command(DummyEngine(), "task-a", tmp_path / "a") + cmd1 = workflow._build_command(DummyEngine(), "task-b", tmp_path / "b") + cmd2 = workflow._build_command(DummyEngine(), "task-c", tmp_path / "c") + + assert "api_base=http://127.0.0.1:12345/v1" in cmd0 + assert "api_base=http://127.0.0.1:12346/v1" in cmd1 + assert "api_base=http://127.0.0.1:12345/v1" in cmd2 + + +def test_build_command_round_robins_configured_api_bases(tmp_path): + class DummyEngine: + addresses = ["127.0.0.1:12345"] + + workflow = TerminalBenchHarborWorkflow( + gconfig=object(), + tokenizer=None, + api_base=[ + "http://127.0.0.1:20001/v1", + "http://127.0.0.1:20002/v1", + ], + ) + + cmd0 = workflow._build_command(DummyEngine(), "task-a", tmp_path / "a") + cmd1 = workflow._build_command(DummyEngine(), "task-b", tmp_path / "b") + + assert "api_base=http://127.0.0.1:20001/v1" in cmd0 + assert "api_base=http://127.0.0.1:20002/v1" in cmd1 + + +def test_rl_workflow_enables_rollout_details_and_disables_summarize(tmp_path): + class DummyEngine: + addresses = ["127.0.0.1:12345"] + + workflow = TerminalBenchHarborRLWorkflow( + gconfig=object(), + tokenizer=None, + ) + + cmd = workflow._build_command(DummyEngine(), "task-a", tmp_path) + + assert "collect_rollout_details=true" in cmd + assert "enable_summarize=false" in cmd diff --git a/astraflow/dataflow/dataset/__init__.py b/astraflow/dataflow/dataset/__init__.py index c50b2c5..5be7221 100644 --- a/astraflow/dataflow/dataset/__init__.py +++ b/astraflow/dataflow/dataset/__init__.py @@ -22,6 +22,10 @@ from .math500 import get_math500_test_dataset from .minervamath import get_minerva_math_test_dataset from .olympiadbench import get_olympiad_bench_test_dataset +from .terminal_bench import ( + get_harbor_task_path_dataset, + get_terminal_bench_2_test_dataset, +) __all__ = [ "get_aime_2024x4_test_dataset", @@ -38,4 +42,6 @@ "get_math500_test_dataset", "get_minerva_math_test_dataset", "get_olympiad_bench_test_dataset", + "get_harbor_task_path_dataset", + "get_terminal_bench_2_test_dataset", ] diff --git a/astraflow/dataflow/dataset/scripts/prepare_harbor_dataset.py b/astraflow/dataflow/dataset/scripts/prepare_harbor_dataset.py new file mode 100644 index 0000000..1f0c75a --- /dev/null +++ b/astraflow/dataflow/dataset/scripts/prepare_harbor_dataset.py @@ -0,0 +1,239 @@ +#!/usr/bin/env python3 +"""Prepare local Harbor task datasets from Hugging Face Hub. + +Some Harbor training datasets, such as ``open-thoughts/CodeContests``, are +published as parquet rows with: + + path: relative task directory + task_binary: tar archive bytes for that task directory + +This script downloads the dataset snapshot and extracts those task archives into +a local directory that AstraFlow can load with +``get_harbor_task_path_dataset``. By default the output is repo-local: + + ./data-data/harbor/ + +Example: + + python astraflow/dataflow/dataset/scripts/prepare_harbor_dataset.py \ + --dataset open-thoughts/CodeContests +""" + +from __future__ import annotations + +import argparse +import io +import os +import shutil +import tarfile +from concurrent.futures import ProcessPoolExecutor +from pathlib import Path, PurePosixPath +from typing import Any + +import pyarrow.parquet as pq +from huggingface_hub import snapshot_download + + +def _is_within(base: Path, target: Path) -> bool: + try: + return ( + os.path.commonpath([str(base.resolve()), str(target.resolve())]) + == str(base.resolve()) + ) + except Exception: + return False + + +def _sanitize_tar_member_name(name: str) -> str: + path = PurePosixPath(name) + parts = [part for part in path.parts if part not in ("..", ".", "", "/")] + return str(PurePosixPath(*parts)) if parts else "" + + +def _safe_extract_tar(archive_bytes: bytes, dest_dir: Path) -> None: + dest_dir.mkdir(parents=True, exist_ok=True) + with tarfile.open(fileobj=io.BytesIO(archive_bytes), mode="r:*") as tar: + for member in tar.getmembers(): + member_name = _sanitize_tar_member_name(member.name) + if not member_name: + continue + if ".snapshot" in PurePosixPath(member_name).parts: + continue + + target = (dest_dir / member_name).resolve() + if not _is_within(dest_dir, target): + raise RuntimeError(f"Unsafe path in archive: {member.name}") + + if member.isdir(): + target.mkdir(parents=True, exist_ok=True) + continue + + if not member.isfile(): + continue + + target.parent.mkdir(parents=True, exist_ok=True) + src = tar.extractfile(member) + if src is None: + continue + with src, open(target, "wb") as dst: + shutil.copyfileobj(src, dst) + + +def _safe_relative_path(path: str) -> Path: + posix_path = PurePosixPath(path) + parts = [part for part in posix_path.parts if part not in ("..", ".", "", "/")] + return Path(*parts) if parts else Path("task_unknown") + + +def _extract_one(args: tuple[str, bytes | bytearray | memoryview, str]) -> bool: + rel_path, archive_data, output_dir_str = args + if not isinstance(rel_path, str): + return False + if not isinstance(archive_data, bytes | bytearray | memoryview): + return False + + output_dir = Path(output_dir_str).resolve() + target_dir = (output_dir / _safe_relative_path(rel_path)).resolve() + if not _is_within(output_dir, target_dir): + return False + + if target_dir.exists() and (target_dir / "instruction.md").is_file(): + return True + + try: + _safe_extract_tar(bytes(archive_data), target_dir) + except Exception as exc: + print(f"Warning: failed to extract {rel_path}: {exc}", flush=True) + return False + + return (target_dir / "instruction.md").is_file() + + +def _find_task_parquet_files(snapshot_dir: Path) -> list[Path]: + parquet_files: list[Path] = [] + for parquet_path in snapshot_dir.glob("**/*.parquet"): + try: + schema = pq.read_schema(parquet_path) + except Exception as exc: + print(f"Warning: could not read schema from {parquet_path}: {exc}") + continue + if "path" in schema.names and "task_binary" in schema.names: + parquet_files.append(parquet_path) + return parquet_files + + +def _extract_parquet( + parquet_path: Path, + output_dir: Path, + workers: int, +) -> int: + table = pq.read_table(parquet_path, columns=["path", "task_binary"]) + paths = table.column("path").to_pylist() + archives = table.column("task_binary").to_pylist() + output_dir.mkdir(parents=True, exist_ok=True) + + tasks = [(path, archive, str(output_dir)) for path, archive in zip(paths, archives)] + if workers <= 1: + return sum(_extract_one(task) for task in tasks) + + with ProcessPoolExecutor(max_workers=workers) as pool: + return sum(pool.map(_extract_one, tasks, chunksize=64)) + + +def _repo_name(dataset: str) -> str: + return dataset.rstrip("/").split("/")[-1] + + +def _default_output_dir(dataset: str) -> Path: + return Path("./data-data/harbor") / _repo_name(dataset) + + +def _replace_path_with_symlink(target: Path, source: Path) -> None: + target.parent.mkdir(parents=True, exist_ok=True) + if target.is_symlink() or target.is_file(): + target.unlink() + elif target.exists(): + shutil.rmtree(target) + target.symlink_to(source, target_is_directory=True) + + +def prepare( + dataset: str, + output_dir: str | None = None, + workers: int = 8, + direct_mode: str = "symlink", +) -> str: + output_path = Path( + os.path.expandvars(output_dir) if output_dir else _default_output_dir(dataset) + ).expanduser().resolve() + + print(f"Downloading {dataset}...") + snapshot_dir = Path(snapshot_download(repo_id=dataset, repo_type="dataset")) + print(f"Downloaded snapshot to {snapshot_dir}") + + parquet_files = _find_task_parquet_files(snapshot_dir) + if not parquet_files: + print("No Harbor task parquet files found.") + if direct_mode == "copy": + print(f"Copying snapshot to {output_path}...") + if output_path.exists() or output_path.is_symlink(): + if output_path.is_symlink() or output_path.is_file(): + output_path.unlink() + else: + shutil.rmtree(output_path) + shutil.copytree(snapshot_dir, output_path) + else: + print(f"Symlinking {output_path} -> {snapshot_dir}") + _replace_path_with_symlink(output_path, snapshot_dir) + return str(output_path) + + total = 0 + for parquet_path in parquet_files: + print(f"Extracting {parquet_path.name}...") + total += _extract_parquet(parquet_path, output_path, workers=workers) + + print(f"Done. Extracted {total} Harbor task(s) to {output_path}") + return str(output_path) + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Prepare a Harbor task dataset from Hugging Face Hub." + ) + parser.add_argument( + "--dataset", + required=True, + help="Hugging Face dataset repo, e.g. open-thoughts/CodeContests.", + ) + parser.add_argument( + "--output-dir", + "--output_dir", + default=None, + help="Output directory. Defaults to ./data-data/harbor/.", + ) + parser.add_argument( + "--workers", + type=int, + default=8, + help="Parallel extraction workers for parquet task archives.", + ) + parser.add_argument( + "--direct-mode", + choices=("symlink", "copy"), + default="symlink", + help=( + "When no path/task_binary parquet files are found, either symlink " + "or copy the downloaded snapshot." + ), + ) + args = parser.parse_args() + prepare( + dataset=args.dataset, + output_dir=args.output_dir, + workers=max(1, args.workers), + direct_mode=args.direct_mode, + ) + + +if __name__ == "__main__": + main() diff --git a/astraflow/dataflow/dataset/terminal_bench.py b/astraflow/dataflow/dataset/terminal_bench.py new file mode 100644 index 0000000..740268f --- /dev/null +++ b/astraflow/dataflow/dataset/terminal_bench.py @@ -0,0 +1,229 @@ +"""Dataset loaders for Terminal-Bench tasks run through Harbor.""" + +from __future__ import annotations + +import os +from pathlib import Path +from typing import Any + +from datasets import Dataset, load_dataset + +from astraflow.dataflow.dataset.utils import attach_query_ids + + +TERMINAL_BENCH_2_TASKS: tuple[str, ...] = ( + "adaptive-rejection-sampler", + "bn-fit-modify", + "break-filter-js-from-html", + "build-cython-ext", + "build-pmars", + "build-pov-ray", + "caffe-cifar-10", + "cancel-async-tasks", + "chess-best-move", + "circuit-fibsqrt", + "cobol-modernization", + "code-from-image", + "compile-compcert", + "configure-git-webserver", + "constraints-scheduling", + "count-dataset-tokens", + "crack-7z-hash", + "custom-memory-heap-crash", + "db-wal-recovery", + "distribution-search", + "dna-assembly", + "dna-insert", + "extract-elf", + "extract-moves-from-video", + "feal-differential-cryptanalysis", + "feal-linear-cryptanalysis", + "filter-js-from-html", + "financial-document-processor", + "fix-code-vulnerability", + "fix-git", + "fix-ocaml-gc", + "gcode-to-text", + "git-leak-recovery", + "git-multibranch", + "gpt2-codegolf", + "headless-terminal", + "hf-model-inference", + "install-windows-3.11", + "kv-store-grpc", + "large-scale-text-editing", + "largest-eigenval", + "llm-inference-batching-scheduler", + "log-summary-date-ranges", + "mailman", + "make-doom-for-mips", + "make-mips-interpreter", + "mcmc-sampling-stan", + "merge-diff-arc-agi-task", + "model-extraction-relu-logits", + "modernize-scientific-stack", + "mteb-leaderboard", + "mteb-retrieve", + "multi-source-data-merger", + "nginx-request-logging", + "openssl-selfsigned-cert", + "overfull-hbox", + "password-recovery", + "path-tracing", + "path-tracing-reverse", + "polyglot-c-py", + "polyglot-rust-c", + "portfolio-optimization", + "protein-assembly", + "prove-plus-comm", + "pypi-server", + "pytorch-model-cli", + "pytorch-model-recovery", + "qemu-alpine-ssh", + "qemu-startup", + "query-optimize", + "raman-fitting", + "regex-chess", + "regex-log", + "reshard-c4-data", + "rstan-to-pystan", + "sam-cell-seg", + "sanitize-git-repo", + "schemelike-metacircular-eval", + "sparql-university", + "sqlite-db-truncate", + "sqlite-with-gcov", + "torch-pipeline-parallelism", + "torch-tensor-parallelism", + "train-fasttext", + "tune-mjcf", + "video-processing", + "vulnerable-secret", + "winning-avg-corewars", + "write-compressor", +) + + +def _normalise_task_row(sample: dict[str, Any], idx: int) -> dict[str, Any]: + task_name = ( + sample.get("task_name") + or sample.get("name") + or sample.get("task_id") + or sample.get("index") + ) + row = { + **sample, + "index": sample.get("index", task_name if task_name is not None else idx), + "task_name": task_name, + "messages": sample.get("messages", []), + "source": sample.get("source", "terminal_bench"), + } + if row["task_name"] is not None: + row["task_name"] = str(row["task_name"]) + return row + + +def _is_harbor_task_dir(path: Path) -> bool: + return path.is_dir() and (path / "instruction.md").is_file() + + +def _find_harbor_task_dirs(path: str) -> list[Path]: + root = Path(os.path.expandvars(path)).expanduser() + if not root.exists(): + raise FileNotFoundError(f"Harbor task dataset path does not exist: {path}") + + if _is_harbor_task_dir(root): + return [root] + + direct_children = [ + child for child in sorted(root.iterdir()) if _is_harbor_task_dir(child) + ] + if direct_children: + return direct_children + + return sorted( + child + for child in root.rglob("*") + if _is_harbor_task_dir(child) + ) + + +def get_harbor_task_path_dataset( + path: str, + split: str = "train", + tokenizer=None, + max_length: int | None = None, + max_samples: int | None = None, + dataset_name: str = "harbor_tasks", +): + """Create a dataset of local Harbor task directories. + + This matches SkyRL's Harbor dataset shape: a prepared directory such as + ``~/data/harbor/CodeContests`` contains task subdirectories, each with an + ``instruction.md`` file. The Harbor workflow runs each sample via + ``harbor run --path ``. + """ + del split, tokenizer, max_length + + task_dirs = _find_harbor_task_dirs(path) + if max_samples is not None: + task_dirs = task_dirs[: int(max_samples)] + + dataset = Dataset.from_list( + [ + { + "task_path": str(task_dir), + "prompt": str(task_dir), + "task_name": task_dir.name, + "index": task_dir.name, + "messages": [], + "source": "harbor_task_path", + } + for task_dir in task_dirs + ] + ) + return attach_query_ids(dataset, dataset_name) + + +def get_terminal_bench_2_test_dataset( + path: str | None = None, + split: str = "test", + tokenizer=None, + max_length: int | None = None, + max_samples: int | None = None, + dataset_name: str = "terminal_bench_2", + tasks: list[str] | None = None, +): + """Create an eval dataset for Terminal-Bench 2.0 via Harbor. + + By default this returns the 89 Terminal-Bench 2.0 task names currently + published in Harbor's registry, so AstraFlow submits one Harbor trial per + benchmark task. ``tasks`` or ``path`` can be supplied to evaluate a subset + or a locally maintained task-name manifest. + """ + del tokenizer, max_length + + if tasks is not None: + dataset = Dataset.from_list( + [ + {"task_name": str(task), "index": str(task), "messages": []} + for task in tasks + ] + ) + elif path is not None: + dataset = load_dataset("json", data_files=path, split="train") + else: + dataset = Dataset.from_list( + [ + {"task_name": task, "index": task, "messages": []} + for task in TERMINAL_BENCH_2_TASKS + ] + ) + + dataset = attach_query_ids(dataset, dataset_name) + dataset = dataset.map(_normalise_task_row, with_indices=True) + + if max_samples is not None: + dataset = dataset.select(range(min(int(max_samples), len(dataset)))) + + return dataset diff --git a/astraflow/raas/patch/__init__.py b/astraflow/raas/patch/__init__.py index e291e04..cb3aacf 100644 --- a/astraflow/raas/patch/__init__.py +++ b/astraflow/raas/patch/__init__.py @@ -83,12 +83,14 @@ def _validate_patch_results(results: Dict[str, bool], strict: bool) -> None: def _run_sglang_patches(strict: bool) -> bool: from astraflow.raas.patch.sglang import ( HttpServerPatch, + OpenAIReturnTokenIdsPatch, ServerArgsPatch, ) manager = PatchManager() manager.register(ServerArgsPatch()) manager.register(HttpServerPatch()) + manager.register(OpenAIReturnTokenIdsPatch()) results = manager.apply_all() _log_patch_results(results) diff --git a/astraflow/raas/patch/sglang.py b/astraflow/raas/patch/sglang.py index 03481ec..ca7449c 100644 --- a/astraflow/raas/patch/sglang.py +++ b/astraflow/raas/patch/sglang.py @@ -7,6 +7,8 @@ can register with RaaS at startup. 2. HttpServerPatch — register SGLang instance with the rollout manager during ``launch_server``. +3. OpenAIReturnTokenIdsPatch — preserve token IDs in the OpenAI chat response + when clients request ``return_token_ids``. """ import logging @@ -16,6 +18,34 @@ logger = logging.getLogger(__name__) +def _requested_return_token_ids(body) -> bool: + if not isinstance(body, dict): + return False + if body.get("return_token_ids") is True: + return True + extra_body = body.get("extra_body") + return isinstance(extra_body, dict) and extra_body.get("return_token_ids") is True + + +def _as_token_id_list(value): + if not isinstance(value, list): + return None + if not value: + return [] + if all(isinstance(token_id, int) for token_id in value): + return value + return None + + +def _first_token_id_list(value): + token_ids = _as_token_id_list(value) + if token_ids is not None: + return token_ids + if isinstance(value, list) and value: + return _as_token_id_list(value[0]) + return None + + class ServerArgsPatch(BasePatch): """Add ``--rollout-manager-address`` to SGLang's ServerArgs.""" @@ -94,3 +124,126 @@ def patched_launch_server(server_args, *args, **kwargs): traceback.print_exc() return False + + +class OpenAIReturnTokenIdsPatch(BasePatch): + """Return token IDs through SGLang's OpenAI chat endpoint when requested. + + Harbor enables rollout-detail collection by sending ``logprobs=true`` and + ``extra_body.return_token_ids=true`` through LiteLLM. SGLang already has the + prompt IDs before dispatch and the generated IDs in ``ret[*]["output_ids"]``; + the OpenAI response builder simply drops both. This patch only changes + non-streaming chat responses for requests that explicitly ask for token IDs. + """ + + def apply(self) -> bool: + try: + from fastapi.responses import ORJSONResponse + from sglang.srt.entrypoints.openai.serving_chat import ( + OpenAIServingChat, + ) + + original_handle_request = OpenAIServingChat.handle_request + original_convert = OpenAIServingChat._convert_to_internal_request + original_build_response = OpenAIServingChat._build_chat_response + + if self._is_patched( + original_build_response, "openai_return_token_ids" + ): + return True + + async def patched_handle_request( + self_chat, request, raw_request, *args, **kwargs + ): + try: + body = await raw_request.json() + except Exception: + body = {} + + if _requested_return_token_ids(body): + object.__setattr__( + request, "_astraflow_return_token_ids", True + ) + + return await original_handle_request( + self_chat, request, raw_request, *args, **kwargs + ) + + def patched_convert_to_internal_request( + self_chat, request, *args, **kwargs + ): + adapted_request, processed_request = original_convert( + self_chat, request, *args, **kwargs + ) + + if getattr(request, "_astraflow_return_token_ids", False): + prompt_token_ids = _first_token_id_list( + getattr(adapted_request, "input_ids", None) + ) + object.__setattr__( + processed_request, "_astraflow_return_token_ids", True + ) + if prompt_token_ids is not None: + object.__setattr__( + processed_request, + "_astraflow_prompt_token_ids", + prompt_token_ids, + ) + + return adapted_request, processed_request + + def patched_build_chat_response( + self_chat, request, ret, created, *args, **kwargs + ): + response = original_build_response( + self_chat, request, ret, created, *args, **kwargs + ) + + if not getattr(request, "_astraflow_return_token_ids", False): + return response + if not hasattr(response, "model_dump"): + return response + + data = response.model_dump() + + prompt_token_ids = getattr( + request, "_astraflow_prompt_token_ids", None + ) + if prompt_token_ids is not None: + data["prompt_token_ids"] = prompt_token_ids + + choices = data.get("choices") + if isinstance(choices, list): + for idx, choice in enumerate(choices): + if idx >= len(ret) or not isinstance(choice, dict): + continue + token_ids = _as_token_id_list( + ret[idx].get("output_ids") + ) + if token_ids is not None: + choice["token_ids"] = token_ids + + return ORJSONResponse(content=data) + + self._mark_as_patched( + patched_handle_request, "openai_return_token_ids" + ) + self._mark_as_patched( + patched_convert_to_internal_request, + "openai_return_token_ids", + ) + self._mark_as_patched( + patched_build_chat_response, "openai_return_token_ids" + ) + + OpenAIServingChat.handle_request = patched_handle_request + OpenAIServingChat._convert_to_internal_request = ( + patched_convert_to_internal_request + ) + OpenAIServingChat._build_chat_response = patched_build_chat_response + + return True + except Exception as e: + logger.error(f"OpenAIReturnTokenIdsPatch failed: {e}") + return False + diff --git a/astraflow/raas/tests/test_sglang_patch.py b/astraflow/raas/tests/test_sglang_patch.py new file mode 100644 index 0000000..02b97a6 --- /dev/null +++ b/astraflow/raas/tests/test_sglang_patch.py @@ -0,0 +1,80 @@ +import asyncio +import json +import sys +import types +from types import SimpleNamespace + +from astraflow.raas.patch.sglang import OpenAIReturnTokenIdsPatch + + +class _RawRequest: + async def json(self): + return {"extra_body": {"return_token_ids": True}} + + +def _install_fake_sglang(monkeypatch): + module_names = [ + "sglang", + "sglang.srt", + "sglang.srt.entrypoints", + "sglang.srt.entrypoints.openai", + "sglang.srt.entrypoints.openai.serving_chat", + ] + for name in module_names: + monkeypatch.setitem(sys.modules, name, types.ModuleType(name)) + + serving_chat = sys.modules["sglang.srt.entrypoints.openai.serving_chat"] + + class OpenAIServingChat: + async def handle_request(self, request, raw_request): + adapted_request, processed_request = ( + self._convert_to_internal_request(request, raw_request) + ) + assert adapted_request.input_ids == [1, 2, 3] + return self._build_chat_response( + processed_request, + [{"output_ids": [4, 5]}], + 123, + ) + + def _convert_to_internal_request(self, request, raw_request): + assert isinstance(raw_request, _RawRequest) + return SimpleNamespace(input_ids=[1, 2, 3]), request + + def _build_chat_response(self, request, ret, created): + return SimpleNamespace( + model_dump=lambda: { + "id": "chatcmpl-test", + "object": "chat.completion", + "created": created, + "model": "test-model", + "choices": [ + { + "index": 0, + "message": { + "role": "assistant", + "content": "ok", + }, + "finish_reason": "stop", + } + ], + "usage": {}, + } + ) + + serving_chat.OpenAIServingChat = OpenAIServingChat + return OpenAIServingChat + + +def test_openai_return_token_ids_patch_adds_requested_token_ids(monkeypatch): + OpenAIServingChat = _install_fake_sglang(monkeypatch) + + assert OpenAIReturnTokenIdsPatch().apply() + + response = asyncio.run( + OpenAIServingChat().handle_request(SimpleNamespace(), _RawRequest()) + ) + body = json.loads(response.body) + + assert body["prompt_token_ids"] == [1, 2, 3] + assert body["choices"][0]["token_ids"] == [4, 5] diff --git a/docs/en/recipes/code.md b/docs/en/recipes/code.md index fa194ad..b304bc4 100644 --- a/docs/en/recipes/code.md +++ b/docs/en/recipes/code.md @@ -2,17 +2,17 @@ Reinforcement learning for code-generation reasoning with RLVR, rewarded by test-case execution. -**Code recipes**: [`examples/code/`](https://github.com/Infini-AI-Lab/astraflow/tree/main/examples/code) and [`examples/code-multi-agent/`](https://github.com/Infini-AI-Lab/astraflow/tree/main/examples/code-multi-agent) +**Code recipes**: [`examples/code/`](https://github.com/Infini-AI-Lab/astraflow/tree/main/examples/code), [`examples/code-multi-agent/`](https://github.com/Infini-AI-Lab/astraflow/tree/main/examples/code-multi-agent), and [`examples/terminal-bench/`](https://github.com/Infini-AI-Lab/astraflow/tree/main/examples/terminal-bench) Each recipe ships an all-in-one launch script under `scripts/` and its config under `yaml/`. -LiveCodeBench v5 eval data needs a one-time manual download — see [Eval Dataset Setup](#eval-dataset-setup) below. +LiveCodeBench v5 eval data needs a one-time manual download — see [Eval Dataset Setup](#eval-dataset-setup) below. Terminal-Bench 2 recipes also need Harbor setup and, for Harbor RL, LiveCodeBench v6 eval data. ## Eval Dataset Setup -The recipes evaluate on HumanEval, LiveCodeBench v5, and DeepCoder Codeforces. Only -LiveCodeBench v5 needs a one-time manual download before launching — otherwise the -periodic eval steps fail. +The standard code recipes evaluate on HumanEval, LiveCodeBench v5, and DeepCoder Codeforces. Only +LiveCodeBench v5 needs a one-time manual download for those recipes — otherwise the +periodic eval steps fail. Terminal-Bench Harbor RL also needs LiveCodeBench v6, covered below. Download the AReaL-boba-2-RL-Code dataset from the repo root: @@ -32,6 +32,201 @@ The training dataset (DeepCoder-Preview, `primeintellect` subset) is fetched fro Hugging Face automatically on first run, so LiveCodeBench v5 above is the only manual download needed to run a recipe end to end. +## LiveCodeBench v6 Setup + +The Terminal-Bench Harbor RL recipe evaluates on LiveCodeBench v6. Prepare it from the repo root: + +```bash +mkdir -p ./data-data/lcb_v6_raw ./data-data/lcb_v6 + +huggingface-cli download livecodebench/code_generation_lite test6.jsonl \ + --repo-type dataset \ + --local-dir ./data-data/lcb_v6_raw + +python astraflow/dataflow/dataset/scripts/convert_livecodebench_lite.py \ + ./data-data/lcb_v6_raw/test6.jsonl \ + ./data-data/lcb_v6/test.jsonl +``` + +This produces `./data-data/lcb_v6/test.jsonl`. + +## Terminal-Bench 2 + Harbor Setup + +AstraFlow has two Harbor-backed Terminal-Bench workflows: + +- `terminal_bench_harbor` runs `terminal-bench@2.0` eval tasks through Harbor and reports eval rewards. +- `terminal_bench_harbor_rl` runs local Harbor task directories and converts Harbor rollout details into RL tensors. + +The Terminal-Bench recipes live under `examples/terminal-bench/`. They invoke Harbor from a separate conda env so the main AstraFlow training env does not need to import Harbor directly. + +### Create the Harbor Env + +Harbor uses containerized task sandboxes. Make sure Docker or Podman is usable on the host first: + +```bash +docker info +# or, for Podman-backed recipes +podman info +``` + +Create the conda env expected by the recipes: + +```bash +conda create -n harbor-tb2 python=3.12 -y +conda activate harbor-tb2 +pip install harbor +``` + +Sanity-check Harbor against Terminal-Bench 2: + +```bash +harbor run -d terminal-bench@2.0 -a oracle --yes +``` + +The recipe configs call Harbor with: + +```yaml +harbor_command: + - conda + - run + - "--no-capture-output" + - "-n" + - harbor-tb2 + - harbor +``` + +### Terminal-Bench 2 Eval Recipe + +The eval recipe trains on DeepCoder Preview and periodically evaluates with Harbor + Terminus-2 on Terminal-Bench 2: + +```bash +bash examples/terminal-bench/terminal-bench-2-qwen3-8b/scripts/run_terminal-bench-2-qwen3-8b.sh +``` + +Important config pieces: + +```yaml +dataflow: + eval_workflows: + terminal_bench_2: + workflow_cls: "terminal_bench_harbor" + dataset: "terminal-bench@2.0" + agent_name: "terminus-2" + model_name: "openai/Qwen/Qwen3-8B" + max_parallel_jobs: 4 + agent_kwargs: + temperature: 0.6 + max_turns: 10 + enable_summarize: true + + eval_datasets: + terminal_bench_2: + dataset_fn: "astraflow.dataflow.dataset.terminal_bench:get_terminal_bench_2_test_dataset" + split: "test" + repeat: 1 + eval_workflow: terminal_bench_2 +``` + +`max_parallel_jobs` controls how many Harbor subprocesses AstraFlow launches at once. Lower it if Docker or Podman reports resource pressure or stale container conflicts. + +The corresponding RaaS config must keep tokenizer initialization enabled: + +```yaml +sglang: + skip_tokenizer_init: false +``` + +Terminus-2 talks to SGLang through the OpenAI-compatible chat endpoint, and SGLang needs its tokenizer for chat-template application. + +### Harbor RL Dataset Prep + +The Harbor RL recipe expects local task directories, each containing an `instruction.md` file and the task assets/tests: + +```text +./data-data/harbor/CodeContests/ + task-a/ + instruction.md + environment/ + tests/ + task-b/ + instruction.md + environment/ + tests/ +``` + +Prepare the default dataset with the helper script: + +```bash +python astraflow/dataflow/dataset/scripts/prepare_harbor_dataset.py \ + --dataset open-thoughts/CodeContests \ + --output-dir ./data-data/harbor/CodeContests +``` + +The helper downloads the Hugging Face dataset snapshot, finds parquet files with `path` and `task_binary` columns, and safely extracts each archived Harbor task into the output directory. + +The launcher defaults to this path: + +```bash +export HARBOR_TRAIN_DATA="${HARBOR_TRAIN_DATA:-./data-data/harbor/CodeContests}" +``` + +Override it when launching if your tasks live elsewhere: + +```bash +HARBOR_TRAIN_DATA=/path/to/harbor/tasks \ + bash examples/terminal-bench/terminal-bench-rl-qwen3-14b-podman-test/scripts/run_terminal-bench-rl-qwen3-14b-podman-test.sh +``` + +### Harbor RL Recipe + +The Harbor RL recipe trains Qwen3-14B on local Harbor tasks, uses the Podman custom environment, and evaluates on LiveCodeBench v6: + +```bash +bash examples/terminal-bench/terminal-bench-rl-qwen3-14b-podman-test/scripts/run_terminal-bench-rl-qwen3-14b-podman-test.sh +``` + +Key workflow block: + +```yaml +dataflow: + rollout_dataset: + dataset_fn: "astraflow.dataflow.dataset.terminal_bench:get_harbor_task_path_dataset" + path: "$HARBOR_TRAIN_DATA" + split: "train" + dataset_name: "skyrl_codecontests" + + workflow_spec: + workflow_cls: "terminal_bench_harbor_rl" + extra_args: + - "--environment-import-path" + - "examples.terminal-bench.harbor_podman_env:PodmanEnvironment" + agent_name: "terminus-2" + model_name: "openai/Qwen/Qwen3-14B" + max_parallel_jobs: 16 + agent_kwargs: + temperature: 1.0 + max_turns: 1 + suppress_max_turns_warning: true + enable_summarize: false + collect_rollout_details: true +``` + +`collect_rollout_details: true` is required for RL. Harbor/Terminus-2 asks the OpenAI-compatible backend for token IDs and logprobs, and AstraFlow converts those rollout details into trainable trajectories. `enable_summarize: false` keeps conversation history linear for conservative token/logprob alignment. + +The RL recipe evaluates LiveCodeBench v6 through the standard code workflow: + +```yaml +dataflow: + eval_datasets: + lcb_v6: + dataset_fn: "astraflow.dataflow.dataset.livecodebench:get_livecodebench_single_turn_test_dataset" + path: "./data-data/lcb_v6/test.jsonl" + split: "test" + max_length: 6000 + repeat: 1 + eval_workflow: code_eval +``` + ## Qwen3-8B — 8 GPUs (single-agent) Single-agent code-generation RL on one 8-GPU node — 4 GPUs for inference, 4 for training. It comes in two variants that differ **only** in weight transfer mode: diff --git a/examples/README.md b/examples/README.md index 3a3a4af..45297a6 100644 --- a/examples/README.md +++ b/examples/README.md @@ -19,6 +19,7 @@ Browse task-specific recipes in their own subfolders: - `examples/math-efficient-data/`: composable data-algorithm math recipes (GRESO, dynamic sampling, buffer replay). - `examples/code/`: code generation training recipes. - `examples/code-multi-agent/`: codegen/verifier and multi-agent code workflows. +- `examples/terminal-bench/`: Harbor-backed Terminal-Bench eval and RL recipes. - `examples/search/`: search-augmented agent training with local retrieval. - `examples/alfworld/`: AgentBench ALFWorld interactive task recipes. - `examples/webshop/`: AgentBench WebShop interactive task recipes. diff --git a/examples/terminal-bench/README.md b/examples/terminal-bench/README.md new file mode 100644 index 0000000..9bf779f --- /dev/null +++ b/examples/terminal-bench/README.md @@ -0,0 +1,25 @@ +# Terminal-Bench Recipes + +Harbor-backed Terminal-Bench eval and RL recipes. + +Run the Terminal-Bench 2 eval recipe from the repo root: + +```bash +bash examples/terminal-bench/terminal-bench-2-qwen3-8b/scripts/run_terminal-bench-2-qwen3-8b.sh +``` + +Run the Harbor RL Podman recipe from the repo root: + +```bash +bash examples/terminal-bench/terminal-bench-rl-qwen3-14b-podman-test/scripts/run_terminal-bench-rl-qwen3-14b-podman-test.sh +``` + +The Podman Harbor environment helper is `examples/terminal-bench/harbor_podman_env.py`. + +Complete guidance: [`docs/en/recipes/code.md`](../../docs/en/recipes/code.md#terminal-bench-2--harbor-setup). + +--- +**GPU Resources** + +The Terminal-Bench 2 eval recipe defaults to one 8xH100 node. The Harbor RL +Podman test recipe defaults to 2 GPUs for inference and 2 GPUs for training. diff --git a/examples/terminal-bench/harbor_podman_env.py b/examples/terminal-bench/harbor_podman_env.py new file mode 100644 index 0000000..781fc4e --- /dev/null +++ b/examples/terminal-bench/harbor_podman_env.py @@ -0,0 +1,344 @@ +"""Harbor custom environment backed by rootless Podman. + +This backend is intentionally small and targets the Harbor CodeContests layout +used by the AstraFlow t-bench RL recipe: one Linux Dockerfile per task and no +Docker Compose overlays. It is loaded by Harbor via +``--environment-import-path examples.terminal-bench.harbor_podman_env:PodmanEnvironment``. +""" + +from __future__ import annotations + +import asyncio +import os +import pwd +import re +import shlex +import shutil +from pathlib import Path + +from harbor.environments.base import BaseEnvironment, ExecResult +from harbor.environments.capabilities import ( + EnvironmentCapabilities, + EnvironmentResourceCapabilities, +) +from harbor.models.task.config import TaskOS + + +def _podman_bin() -> str: + return os.environ.get("HARBOR_PODMAN_BIN", "podman") + + +def _user_has_subid_mapping(path: str, username: str) -> bool: + try: + with open(path, "r", encoding="utf-8") as handle: + for line in handle: + parts = line.strip().split(":") + if len(parts) >= 3 and parts[0] == username and int(parts[2]) > 1: + return True + except (OSError, ValueError): + return False + return False + + +def _ignore_chown_errors_enabled() -> bool: + override = os.environ.get("HARBOR_PODMAN_IGNORE_CHOWN_ERRORS") + if override is not None: + return override.lower() not in {"", "0", "false", "no", "off"} + try: + username = pwd.getpwuid(os.getuid()).pw_name + except KeyError: + username = os.environ.get("USER", "") + return not ( + username + and _user_has_subid_mapping("/etc/subuid", username) + and _user_has_subid_mapping("/etc/subgid", username) + ) + + +def _podman_cmd() -> list[str]: + cmd = [_podman_bin()] + if _ignore_chown_errors_enabled(): + cmd.extend(["--storage-opt", "ignore_chown_errors=true"]) + return cmd + + +def _podman_network_mode() -> str | None: + override = os.environ.get("HARBOR_PODMAN_NETWORK") + if override is not None: + return override or None + if shutil.which("pasta") is None and shutil.which("slirp4netns"): + return "slirp4netns" + return None + +def _sanitize_image_name(name: str) -> str: + name = name.lower() + if not re.match(r"^[a-z0-9]", name): + name = "0" + name + return re.sub(r"[^a-z0-9._-]", "-", name) + + +def _sanitize_container_name(name: str) -> str: + name = name.lower() + if not re.match(r"^[a-z0-9]", name): + name = "0" + name + return re.sub(r"[^a-z0-9_.-]", "-", name)[:250] + + +class PodmanEnvironment(BaseEnvironment): + """Dockerfile-only Harbor environment implemented with ``podman``.""" + + _image_build_locks: dict[str, asyncio.Lock] = {} + + def __init__(self, *args, **kwargs): + self._container_name: str | None = None + self._image_name: str | None = None + super().__init__(*args, **kwargs) + + @staticmethod + def type() -> str: + return "podman" + + @classmethod + def preflight(cls) -> None: + if not shutil.which(_podman_bin()): + raise SystemExit( + "Podman is not installed or not on PATH. Install podman in the " + "Harbor environment before using PodmanEnvironment." + ) + try: + subprocess = asyncio.run(cls._preflight_info()) + except Exception as exc: # pragma: no cover - defensive CLI error path + raise SystemExit(f"Podman preflight failed: {exc}") from exc + if subprocess.return_code != 0: + output = subprocess.stderr or subprocess.stdout or "no output" + raise SystemExit(f"Podman is not usable: {output}") + + @staticmethod + async def _preflight_info() -> ExecResult: + return await PodmanEnvironment._run_host([*_podman_cmd(), "info"], check=False, timeout_sec=20) + + @classmethod + def resource_capabilities(cls) -> EnvironmentResourceCapabilities: + return EnvironmentResourceCapabilities(cpu_limit=True, memory_limit=True) + + @property + def capabilities(self) -> EnvironmentCapabilities: + return EnvironmentCapabilities( + disable_internet=True, + mounted=True, + docker_compose=False, + windows=False, + gpus=False, + ) + + @property + def _dockerfile_path(self) -> Path: + return self.environment_dir / "Dockerfile" + + @property + def _compose_path(self) -> Path: + return self.environment_dir / "docker-compose.yaml" + + def _validate_definition(self) -> None: + if self.task_env_config.os == TaskOS.WINDOWS: + raise RuntimeError("PodmanEnvironment only supports Linux tasks.") + if self._compose_path.exists() or self.extra_docker_compose_paths: + raise RuntimeError( + "PodmanEnvironment only supports Dockerfile-only Harbor tasks; " + "docker-compose.yaml and --extra-docker-compose are unsupported." + ) + if not self.task_env_config.docker_image and not self._dockerfile_path.is_file(): + raise FileNotFoundError(f"Dockerfile not found: {self._dockerfile_path}") + + @staticmethod + async def _run_host( + args: list[str], + *, + check: bool = True, + timeout_sec: int | None = None, + ) -> ExecResult: + process = await asyncio.create_subprocess_exec( + *args, + stdin=asyncio.subprocess.DEVNULL, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + try: + if timeout_sec: + stdout_bytes, stderr_bytes = await asyncio.wait_for( + process.communicate(), timeout=timeout_sec + ) + else: + stdout_bytes, stderr_bytes = await process.communicate() + except asyncio.TimeoutError: + process.terminate() + try: + stdout_bytes, stderr_bytes = await asyncio.wait_for( + process.communicate(), timeout=5 + ) + except asyncio.TimeoutError: + process.kill() + stdout_bytes, stderr_bytes = await process.communicate() + raise RuntimeError(f"Command timed out after {timeout_sec} seconds: {shlex.join(args)}") + + result = ExecResult( + stdout=stdout_bytes.decode(errors="replace") if stdout_bytes else None, + stderr=stderr_bytes.decode(errors="replace") if stderr_bytes else None, + return_code=process.returncode or 0, + ) + if check and result.return_code != 0: + output = result.stderr or result.stdout or "no output" + raise RuntimeError( + f"Podman command failed (rc={result.return_code}): " + f"{shlex.join(args)}\n{output}" + ) + return result + + async def _image_exists(self, image_name: str) -> bool: + result = await self._run_host([*_podman_cmd(), "image", "exists", image_name], check=False) + return result.return_code == 0 + + async def _ensure_image(self, force_build: bool) -> str: + if self.task_env_config.docker_image: + image_name = self.task_env_config.docker_image + if force_build or not await self._image_exists(image_name): + await self._run_host([*_podman_cmd(), "pull", image_name]) + return image_name + + image_name = f"hb__{_sanitize_image_name(self.environment_name)}" + lock = self._image_build_locks.setdefault(image_name, asyncio.Lock()) + async with lock: + await self._run_host( + [ + *_podman_cmd(), + "build", + *(["--network", _podman_network_mode()] if _podman_network_mode() else []), + "--tag", + image_name, + "--file", + str(self._dockerfile_path.resolve()), + str(self.environment_dir.resolve()), + ] + ) + return image_name + + def _volume_args(self) -> list[str]: + args: list[str] = [] + for mount in self._mounts: + if mount.get("type") != "bind": + raise RuntimeError( + "PodmanEnvironment currently supports bind mounts only; " + f"got mount type {mount.get('type')!r}." + ) + source = Path(str(mount["source"])).expanduser() + if mount.get("bind", {}).get("create_host_path") is not False: + source.mkdir(parents=True, exist_ok=True) + mode = "ro" if mount.get("read_only") else "rw" + args.extend(["--volume", f"{source.resolve()}:{mount['target']}:{mode}"]) + return args + + def _resource_args(self) -> list[str]: + args: list[str] = [] + if self._effective_cpus: + args.extend(["--cpus", str(self._effective_cpus)]) + if self._effective_memory_mb: + args.extend(["--memory", f"{self._effective_memory_mb}m"]) + return args + + async def start(self, force_build: bool) -> None: + image_name = await self._ensure_image(force_build=force_build) + container_name = _sanitize_container_name(f"harbor-{self.session_id}") + self._image_name = image_name + self._container_name = container_name + + await self._run_host([*_podman_cmd(), "rm", "--force", container_name], check=False) + + cmd = [ + *_podman_cmd(), + "run", + "--detach", + "--name", + container_name, + *self._volume_args(), + *self._resource_args(), + ] + if not self.task_env_config.allow_internet: + cmd.extend(["--network", "none"]) + elif _podman_network_mode(): + cmd.extend(["--network", _podman_network_mode()]) + for key, value in self._persistent_env.items(): + cmd.extend(["--env", f"{key}={value}"]) + cmd.extend([image_name, "sh", "-c", "sleep infinity"]) + await self._run_host(cmd) + await self.ensure_dirs(self._mount_targets(writable_only=True)) + + async def stop(self, delete: bool): + if not self._container_name: + return + try: + await self.prepare_logs_for_host() + if delete: + await self._run_host([*_podman_cmd(), "rm", "--force", self._container_name], check=False) + else: + await self._run_host([*_podman_cmd(), "stop", self._container_name], check=False) + finally: + self._container_name = None + + async def prepare_logs_for_host(self) -> None: + try: + for target in self._mount_targets(writable_only=True): + await self._chown_to_host_user(target, recursive=True) + except Exception as exc: + self.logger.warning(f"Failed to chown mounted Harbor paths: {exc}") + + async def _chown_to_host_user(self, path: str, recursive: bool = False) -> None: + flag = "-R " if recursive else "" + await self.exec(f"chown {flag}{os.getuid()}:{os.getgid()} {shlex.quote(path)}", user="root") + + async def upload_file(self, source_path: Path | str, target_path: str): + if not self._container_name: + raise RuntimeError("Podman container is not started") + await self._run_host([*_podman_cmd(), "cp", str(source_path), f"{self._container_name}:{target_path}"]) + + async def upload_dir(self, source_dir: Path | str, target_dir: str): + if not self._container_name: + raise RuntimeError("Podman container is not started") + await self.ensure_dirs([target_dir], chmod=False) + await self._run_host([*_podman_cmd(), "cp", f"{source_dir}/.", f"{self._container_name}:{target_dir}"]) + + async def download_file(self, source_path: str, target_path: Path | str): + if not self._container_name: + raise RuntimeError("Podman container is not started") + await self._chown_to_host_user(source_path) + await self._run_host([*_podman_cmd(), "cp", f"{self._container_name}:{source_path}", str(target_path)]) + + async def download_dir(self, source_dir: str, target_dir: Path | str): + if not self._container_name: + raise RuntimeError("Podman container is not started") + await self._chown_to_host_user(source_dir, recursive=True) + Path(target_dir).mkdir(parents=True, exist_ok=True) + await self._run_host([*_podman_cmd(), "cp", f"{self._container_name}:{source_dir}/.", str(target_dir)]) + + async def exec( + self, + command: str, + cwd: str | None = None, + env: dict[str, str] | None = None, + timeout_sec: int | None = None, + user: str | int | None = None, + ) -> ExecResult: + if not self._container_name: + raise RuntimeError("Podman container is not started") + + cmd = [*_podman_cmd(), "exec"] + resolved_user = self._resolve_user(user) + if resolved_user is not None: + cmd.extend(["--user", str(resolved_user)]) + if cwd is not None: + cmd.extend(["--workdir", cwd]) + merged_env = self._merge_env(env) + if merged_env: + for key, value in merged_env.items(): + cmd.extend(["--env", f"{key}={value}"]) + cmd.extend([self._container_name, "bash", "-c", command]) + return await self._run_host(cmd, check=False, timeout_sec=timeout_sec) diff --git a/examples/terminal-bench/terminal-bench-2-qwen3-8b/scripts/run_terminal-bench-2-qwen3-8b.sh b/examples/terminal-bench/terminal-bench-2-qwen3-8b/scripts/run_terminal-bench-2-qwen3-8b.sh new file mode 100755 index 0000000..1e83237 --- /dev/null +++ b/examples/terminal-bench/terminal-bench-2-qwen3-8b/scripts/run_terminal-bench-2-qwen3-8b.sh @@ -0,0 +1,124 @@ +#!/bin/bash +set -euo pipefail +# All-in-one launcher for Qwen3-8B training with Terminal-Bench 2.0 eval. +# Harbor is invoked from the "harbor-tb2" conda environment by the workflow. +# +# GPU layout: 4+4 by default on H100 80GB +# RaaS : 0,1,2,3 +# Trainer : 4,5,6,7 +# +# Usage: +# bash examples/terminal-bench/terminal-bench-2-qwen3-8b/scripts/run_terminal-bench-2-qwen3-8b.sh + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +REPO_ROOT="$(cd "${SCRIPT_DIR}/../../.." && pwd)" +cd "${REPO_ROOT}" +export PYTHONPATH="${REPO_ROOT}${PYTHONPATH:+:${PYTHONPATH}}" + +YAML_DIR="${SCRIPT_DIR}/yaml" +export EXPERIMENT_CONFIG="${EXPERIMENT_CONFIG:-${YAML_DIR}/experiment.yaml}" +export RAAS_CONFIG="${RAAS_CONFIG:-${YAML_DIR}/raas.yaml}" +source "${REPO_ROOT}/examples/_common/utils.sh" +astraflow_load_experiment_env + +export SERVICE_CUDA_VISIBLE_DEVICES="${SERVICE_CUDA_VISIBLE_DEVICES:-0,1,2,3}" +export TRAINER_GPUS="${TRAINER_GPUS:-4,5,6,7}" +TRAINER_NPROC="$(echo "${TRAINER_GPUS}" | awk -F',' '{print NF}')" + +RAAS_HOST="${RAAS_HOST:-0.0.0.0}" +RAAS_PORT="${RAAS_PORT:-19190}" +ASTRAFLOW_HOST="${ASTRAFLOW_HOST:-0.0.0.0}" +ASTRAFLOW_PORT="${ASTRAFLOW_PORT:-8000}" +ASTRAFLOW_URL="http://127.0.0.1:${ASTRAFLOW_PORT}" + +export WEIGHT_TRANSFER_HTTP_PORT="${WEIGHT_TRANSFER_HTTP_PORT:-19862}" +export XDG_CACHE_HOME="${XDG_CACHE_HOME:-/tmp/${USER:-$(whoami)}/xdg-cache}" +export FLASHINFER_WORKSPACE_BASE="${FLASHINFER_WORKSPACE_BASE:-/tmp/${USER:-$(whoami)}/flashinfer-workspace}" +mkdir -p "${XDG_CACHE_HOME}" "${FLASHINFER_WORKSPACE_BASE}" + +export NCCL_CUMEM_ENABLE=0 + +if [[ -z "${WANDB_API_KEY:-}" && -z "${WANDB_MODE:-}" && -z "${WANDB_DISABLED:-}" ]]; then + export WANDB_MODE="offline" + export WANDB_DISABLED="true" +fi + +LOG_DIR="data-log/${EXP_NAME}/${TRIAL_NAME}" +mkdir -p "${LOG_DIR}" +export ASTRAFLOW_VERIFY_WORK_ROOT="${ASTRAFLOW_VERIFY_WORK_ROOT:-${LOG_DIR}/verify-tmp}" +mkdir -p "${ASTRAFLOW_VERIFY_WORK_ROOT}" + +echo "=== AstraFlow v2 + Terminal-Bench 2.0 eval (Qwen3-8B, H100 80GB 4+4) ===" +echo "Experiment config : ${EXPERIMENT_CONFIG}" +echo "RaaS config : ${RAAS_CONFIG}" +echo "RaaS GPUs : ${SERVICE_CUDA_VISIBLE_DEVICES} (model0 dp=4)" +echo "Trainer GPUs : ${TRAINER_GPUS} (FSDP dp${TRAINER_NPROC})" +echo "RaaS port : ${RAAS_PORT}" +echo "AstraFlow port : ${ASTRAFLOW_PORT}" +echo "Harbor env : harbor-tb2" +echo "Sender HTTP : ${WEIGHT_TRANSFER_HTTP_PORT}" +echo "WANDB mode : ${WANDB_MODE:-online}" +echo "=========================================================================" + +cleanup() { + trap - EXIT INT TERM + echo "Shutting down..." + kill -- -$$ 2>/dev/null || true + pkill -9 -f astraflow.raas.server 2>/dev/null || true + pkill -9 -f astraflow.raas.entrypoint 2>/dev/null || true + for port in ${WEIGHT_TRANSFER_HTTP_PORT} 21000; do + lsof -i :"$port" 2>/dev/null | awk 'NR>1{print $2}' | sort -u | xargs kill -9 2>/dev/null || true + done + pkill -9 -f "multiprocessing-fork" 2>/dev/null || true + rm -f /dev/shm/areal_buffer_* /dev/shm/astraflow_* 2>/dev/null || true + wait 2>/dev/null + exit 0 +} +trap cleanup EXIT INT TERM + +echo "Cleaning up stale processes..." +pkill -9 -f astraflow.raas.server 2>/dev/null || true +pkill -9 -f astraflow.raas.entrypoint 2>/dev/null || true +pkill -9 -f "sglang::scheduler" 2>/dev/null || true +for port in ${WEIGHT_TRANSFER_HTTP_PORT} 21000; do + lsof -i :"$port" 2>/dev/null | awk 'NR>1{print $2}' | sort -u | xargs kill -9 2>/dev/null || true +done +pkill -9 -f "multiprocessing-fork" 2>/dev/null || true +pkill -9 -f "compile_worker" 2>/dev/null || true +rm -f /dev/shm/areal_buffer_* /dev/shm/astraflow_* 2>/dev/null || true +sleep 2 + +echo "[1/3] Starting RaaS inference server..." +CUDA_VISIBLE_DEVICES="${SERVICE_CUDA_VISIBLE_DEVICES}" \ + python3 -u -m astraflow.raas.server \ + --host "${RAAS_HOST}" \ + --port "${RAAS_PORT}" \ + --config "${EXPERIMENT_CONFIG}" \ + --config "${RAAS_CONFIG}" \ + --engine-id "${ENGINE_ID:-default}" \ + --astraflow-url "${ASTRAFLOW_URL}" \ + 2>&1 | tee "${LOG_DIR}/raas.log" & +sleep 20 + +echo "[2/3] Starting AstraFlow HTTP service..." +CUDA_VISIBLE_DEVICES="" \ + python3 -u -m astraflow \ + --config "${EXPERIMENT_CONFIG}" \ + --port "${ASTRAFLOW_PORT}" \ + --host "${ASTRAFLOW_HOST}" \ + 2>&1 | tee "${LOG_DIR}/astraflow.log" & +sleep 5 + +export ASTRAFLOW_URL="http://127.0.0.1:${ASTRAFLOW_PORT}" +export ASTRAFLOW_RAAS_URL="http://127.0.0.1:${RAAS_PORT}" + +echo "[3/3] Starting trainer..." +CUDA_VISIBLE_DEVICES="${TRAINER_GPUS}" \ +ASTRAFLOW_CUDA_VISIBLE_DEVICES="${TRAINER_GPUS}" \ +WEIGHT_TRANSFER_HTTP_PORT="${WEIGHT_TRANSFER_HTTP_PORT}" \ + torchrun --nnodes 1 --nproc-per-node "${TRAINER_NPROC}" \ + --master-addr "${MASTER_ADDR:-127.0.0.1}" --master-port "${MASTER_PORT:-29544}" \ + examples/launch_trainer.py \ + --config "${EXPERIMENT_CONFIG}" \ + --trainer trainer_model0 \ + "$@" 2>&1 | tee "${LOG_DIR}/trainer.log" diff --git a/examples/terminal-bench/terminal-bench-2-qwen3-8b/yaml/experiment.yaml b/examples/terminal-bench/terminal-bench-2-qwen3-8b/yaml/experiment.yaml new file mode 100644 index 0000000..bd12726 --- /dev/null +++ b/examples/terminal-bench/terminal-bench-2-qwen3-8b/yaml/experiment.yaml @@ -0,0 +1,143 @@ +# ============================================================================ +# Experiment config — AstraFlow service + Trainer + Terminal-Bench 2.0 eval +# Experiment: astraflow-code / terminal-bench-2-qwen3-8b +# +# Training dataset: HF agentica-org/DeepCoder-Preview-Dataset (primeintellect) +# Eval dataset: Terminal-Bench 2.0 through Harbor, using Terminus-2 by default. +# Harbor is invoked through the conda environment "harbor-tb2". +# Hardware target: H100 80GB, 4 GPUs for RaaS + 4 GPUs for trainer. +# ============================================================================ + +experiment: + experiment_name: astraflow-code + trial_name: terminal-bench-2-qwen3-8b + fileroot: ./data-experiments/${experiment.experiment_name}/${experiment.trial_name} + + model_path: "Qwen/Qwen3-8B" + tokenizer_path: "Qwen/Qwen3-8B" + seed: 1 + dtype: bfloat16 + weight_transfer_mode: tcp + weight_transfer_strategies: delta + +cluster: + name_resolve: + type: nfs + nfs_record_root: ./data-experiments/astraflow-code/terminal-bench-2-qwen3-8b/name_resolve + +raas: + models: + model0: + backend: sglang + gconfig: + n_samples: 8 + temperature: 1.0 + max_new_tokens: 4096 + min_new_tokens: 0 + delta_full_sync_interval: 10 + +dataflow: + host: "0.0.0.0" + port: 8000 + dump_dir: ./data-experiments/${experiment.experiment_name}/${experiment.trial_name}/rollout_dumps + + buffer: + size: 10000 + replay_size: 10000 + replay_ratio: 0 + max_staleness: 8 + filter_function: filter_zero_adv + + rollout_dataset: + dataset_fn: "astraflow.dataflow.dataset.deepcoder_preview:get_deepcoder_preview_primeintellect_rl_dataset" + hf_path: "agentica-org/DeepCoder-Preview-Dataset" + subset: "primeintellect" + split: "train" + max_length: 6000 + + workflow_spec: + workflow_cls: "livecodebench_single_turn" + reward_fn: "livecodebench_reward" + + eval_workflows: + terminal_bench_2: + workflow_cls: "terminal_bench_harbor" + dataset: "terminal-bench@2.0" + harbor_command: + - conda + - run + - "--no-capture-output" + - "-n" + - harbor-tb2 + - harbor + agent_name: "terminus-2" + model_name: "openai/Qwen/Qwen3-8B" + jobs_dir: ./data-experiments/astraflow-code/terminal-bench-2-qwen3-8b/harbor_jobs + timeout: 7200.0 + n_concurrent_trials: 1 + max_parallel_jobs: 4 + agent_kwargs: + temperature: 0.6 + max_turns: 10 + enable_summarize: true + gconfig_overrides: + n_samples: 1 + + eval_datasets: + terminal_bench_2: + dataset_fn: "astraflow.dataflow.dataset.terminal_bench:get_terminal_bench_2_test_dataset" + split: "test" + repeat: 1 + eval_workflow: terminal_bench_2 + +trainer_base: + total_train_steps: 1200 + train_batch_size: 128 + n_samples: 8 + engine: + backend: fsdp + data_parallel_size: 4 + + actor: + gradient_checkpointing: true + mb_spec: + max_tokens_per_mb: 22000 + optimizer: + type: adam + lr: 3e-6 + weight_decay: 0.01 + beta1: 0.9 + beta2: 0.999 + eps: 1e-8 + lr_scheduler_type: constant + gradient_clipping: 1.0 + m2_threshold: 0.01 + eps_clip: 100.0 + eps_clip_higher: 100.0 + reward_scaling: 1 + reward_bias: 0 + kl_ctl: 0.00 + kl_penalty_coef: 0.001 + ppo_n_minibatches: 2 + reward_norm: { mean_level: group, std_level: group } + adv_norm: { mean_level: batch, std_level: batch } + + ref: + mb_spec: + max_tokens_per_mb: 22000 + + recover: + mode: auto + freq_steps: 25 + + evaluator: + freq_steps: 25 + + stats_logger: + wandb: + mode: online + id_suffix: "uid" + tags: ["m2po", "code", "terminal-bench-2", "harbor", "terminus-2", "qwen3-8b", "tcp", "delta", "h100-80gb"] + +trainer_model0: + model_id: model0 diff --git a/examples/terminal-bench/terminal-bench-2-qwen3-8b/yaml/raas.yaml b/examples/terminal-bench/terminal-bench-2-qwen3-8b/yaml/raas.yaml new file mode 100644 index 0000000..305b6e0 --- /dev/null +++ b/examples/terminal-bench/terminal-bench-2-qwen3-8b/yaml/raas.yaml @@ -0,0 +1,28 @@ +# ============================================================================ +# RaaS config — Inference serving instance for Terminal-Bench 2.0 eval recipe +# Experiment: astraflow-code / terminal-bench-2-qwen3-8b +# +# Hardware: H100 80GB, 4x GPU for RaaS, TP=1, DP=4 by default. +# ============================================================================ + +rollout: + max_concurrent_rollouts: 128 + max_concurrent_evals: 4 + pause_grace_period: 3 + enable_adaptive_availability: true + target_waiting_queue_per_dp: 4 + adaptive_step_size: 4 + load_cache_ttl_ms: 100 + +engine: + model0: + backend: sglang + data_parallel_size: 4 + +sglang: + context_length: 20480 + mem_fraction_static: 0.8 + max_running_requests: 32 + # Harbor/Terminus-2 uses SGLang's OpenAI chat endpoint, which needs the + # server-side tokenizer to apply the chat template. + skip_tokenizer_init: false diff --git a/examples/terminal-bench/terminal-bench-rl-qwen3-14b-podman-test/scripts/run_terminal-bench-rl-qwen3-14b-podman-test.sh b/examples/terminal-bench/terminal-bench-rl-qwen3-14b-podman-test/scripts/run_terminal-bench-rl-qwen3-14b-podman-test.sh new file mode 100755 index 0000000..ea9cb69 --- /dev/null +++ b/examples/terminal-bench/terminal-bench-rl-qwen3-14b-podman-test/scripts/run_terminal-bench-rl-qwen3-14b-podman-test.sh @@ -0,0 +1,130 @@ +#!/bin/bash +set -euo pipefail +# All-in-one launcher for Qwen3-14B Harbor RL training (Podman test). +# Training uses local Harbor task directories prepared in SkyRL's layout: +# ./data-data/harbor/CodeContests +# +# Usage: +# bash examples/terminal-bench/terminal-bench-rl-qwen3-14b-podman-test/scripts/run_terminal-bench-rl-qwen3-14b-podman-test.sh + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +REPO_ROOT="$(cd "${SCRIPT_DIR}/../../.." && pwd)" +cd "${REPO_ROOT}" +export PYTHONPATH="${REPO_ROOT}${PYTHONPATH:+:${PYTHONPATH}}" + +YAML_DIR="${SCRIPT_DIR}/yaml" +export EXPERIMENT_CONFIG="${EXPERIMENT_CONFIG:-${YAML_DIR}/experiment.yaml}" +export RAAS_CONFIG="${RAAS_CONFIG:-${YAML_DIR}/raas.yaml}" +source "${REPO_ROOT}/examples/_common/utils.sh" +astraflow_load_experiment_env + +export HARBOR_TRAIN_DATA="${HARBOR_TRAIN_DATA:-./data-data/harbor/CodeContests}" +if [[ ! -d "${HARBOR_TRAIN_DATA}" ]]; then + echo "WARNING: Harbor train data not found: ${HARBOR_TRAIN_DATA}" + echo "Prepare the Harbor dataset first, for example:" + echo " python astraflow/dataflow/dataset/scripts/prepare_harbor_dataset.py --dataset open-thoughts/CodeContests --output-dir ./data-data/harbor/CodeContests" +fi + +# GPU assignments (default: 2 B200 GPUs for inference, 2 B200 GPUs for training) +export SERVICE_CUDA_VISIBLE_DEVICES="${SERVICE_CUDA_VISIBLE_DEVICES:-5,7}" +export TRAINER_GPUS="${TRAINER_GPUS:-0,4}" +TRAINER_NPROC="$(echo "${TRAINER_GPUS}" | awk -F',' '{print NF}')" + +RAAS_HOST="${RAAS_HOST:-0.0.0.0}" +RAAS_PORT="${RAAS_PORT:-19190}" +ASTRAFLOW_HOST="${ASTRAFLOW_HOST:-0.0.0.0}" +ASTRAFLOW_PORT="${ASTRAFLOW_PORT:-8005}" +ASTRAFLOW_URL="http://127.0.0.1:${ASTRAFLOW_PORT}" + +export WEIGHT_TRANSFER_HTTP_PORT="${WEIGHT_TRANSFER_HTTP_PORT:-19863}" +export XDG_CACHE_HOME="${XDG_CACHE_HOME:-/tmp/${USER:-$(whoami)}/xdg-cache}" +export FLASHINFER_WORKSPACE_BASE="${FLASHINFER_WORKSPACE_BASE:-/tmp/${USER:-$(whoami)}/flashinfer-workspace}" +mkdir -p "${XDG_CACHE_HOME}" "${FLASHINFER_WORKSPACE_BASE}" + +export NCCL_CUMEM_ENABLE=0 + +if [[ -z "${WANDB_API_KEY:-}" && -z "${WANDB_MODE:-}" && -z "${WANDB_DISABLED:-}" ]]; then + export WANDB_MODE="offline" + export WANDB_DISABLED="true" +fi + +LOG_DIR="data-log/${EXP_NAME}/${TRIAL_NAME}" +mkdir -p "${LOG_DIR}" +export ASTRAFLOW_VERIFY_WORK_ROOT="${ASTRAFLOW_VERIFY_WORK_ROOT:-${LOG_DIR}/verify-tmp}" +mkdir -p "${ASTRAFLOW_VERIFY_WORK_ROOT}" + +echo "=== AstraFlow v2 + Harbor RL (Qwen3-14B, B200 2+2, Podman test) ===" +echo "Experiment config : ${EXPERIMENT_CONFIG}" +echo "RaaS config : ${RAAS_CONFIG}" +echo "Harbor train data : ${HARBOR_TRAIN_DATA}" +echo "RaaS GPUs : ${SERVICE_CUDA_VISIBLE_DEVICES} (model0 dp=2)" +echo "Trainer GPUs : ${TRAINER_GPUS} (FSDP dp${TRAINER_NPROC})" +echo "RaaS port : ${RAAS_PORT}" +echo "AstraFlow port : ${ASTRAFLOW_PORT}" +echo "Harbor env : harbor-tb2 (PodmanEnvironment)" +echo "Sender HTTP : ${WEIGHT_TRANSFER_HTTP_PORT}" +echo "WANDB mode : ${WANDB_MODE:-online}" +echo "==============================================" + +cleanup() { + trap - EXIT INT TERM + echo "Shutting down..." + kill -- -$$ 2>/dev/null || true + pkill -9 -f astraflow.raas.server 2>/dev/null || true + pkill -9 -f astraflow.raas.entrypoint 2>/dev/null || true + for port in ${WEIGHT_TRANSFER_HTTP_PORT} 21000; do + lsof -i :"$port" 2>/dev/null | awk 'NR>1{print $2}' | sort -u | xargs kill -9 2>/dev/null || true + done + pkill -9 -f "multiprocessing-fork" 2>/dev/null || true + rm -f /dev/shm/areal_buffer_* /dev/shm/astraflow_* 2>/dev/null || true + wait 2>/dev/null + exit 0 +} +trap cleanup EXIT INT TERM + +echo "Cleaning up stale processes..." +pkill -9 -f astraflow.raas.server 2>/dev/null || true +pkill -9 -f astraflow.raas.entrypoint 2>/dev/null || true +pkill -9 -f "sglang::scheduler" 2>/dev/null || true +for port in ${WEIGHT_TRANSFER_HTTP_PORT} 21000; do + lsof -i :"$port" 2>/dev/null | awk 'NR>1{print $2}' | sort -u | xargs kill -9 2>/dev/null || true +done +pkill -9 -f "multiprocessing-fork" 2>/dev/null || true +pkill -9 -f "compile_worker" 2>/dev/null || true +rm -f /dev/shm/areal_buffer_* /dev/shm/astraflow_* 2>/dev/null || true +sleep 2 + +echo "[1/3] Starting RaaS inference server..." +CUDA_VISIBLE_DEVICES="${SERVICE_CUDA_VISIBLE_DEVICES}" \ + python3 -u -m astraflow.raas.server \ + --host "${RAAS_HOST}" \ + --port "${RAAS_PORT}" \ + --config "${EXPERIMENT_CONFIG}" \ + --config "${RAAS_CONFIG}" \ + --engine-id "${ENGINE_ID:-default}" \ + --astraflow-url "${ASTRAFLOW_URL}" \ + 2>&1 | tee "${LOG_DIR}/raas.log" & +sleep 15 + +echo "[2/3] Starting AstraFlow HTTP service..." +CUDA_VISIBLE_DEVICES="" \ + python3 -u -m astraflow \ + --config "${EXPERIMENT_CONFIG}" \ + --port "${ASTRAFLOW_PORT}" \ + --host "${ASTRAFLOW_HOST}" \ + 2>&1 | tee "${LOG_DIR}/astraflow.log" & +sleep 5 + +export ASTRAFLOW_URL="http://127.0.0.1:${ASTRAFLOW_PORT}" +export ASTRAFLOW_RAAS_URL="http://127.0.0.1:${RAAS_PORT}" + +echo "[3/3] Starting trainer..." +CUDA_VISIBLE_DEVICES="${TRAINER_GPUS}" \ +ASTRAFLOW_CUDA_VISIBLE_DEVICES="${TRAINER_GPUS}" \ +WEIGHT_TRANSFER_HTTP_PORT="${WEIGHT_TRANSFER_HTTP_PORT}" \ + torchrun --nnodes 1 --nproc-per-node "${TRAINER_NPROC}" \ + --master-addr "${MASTER_ADDR:-127.0.0.1}" --master-port "${MASTER_PORT:-29545}" \ + examples/launch_trainer.py \ + --config "${EXPERIMENT_CONFIG}" \ + --trainer trainer_model0 \ + "$@" 2>&1 | tee "${LOG_DIR}/trainer.log" diff --git a/examples/terminal-bench/terminal-bench-rl-qwen3-14b-podman-test/yaml/experiment.yaml b/examples/terminal-bench/terminal-bench-rl-qwen3-14b-podman-test/yaml/experiment.yaml new file mode 100644 index 0000000..5425b50 --- /dev/null +++ b/examples/terminal-bench/terminal-bench-rl-qwen3-14b-podman-test/yaml/experiment.yaml @@ -0,0 +1,149 @@ +# ============================================================================ +# Experiment config — AstraFlow service + Trainer + Harbor RL +# Experiment: astraflow-code / terminal-bench-rl-qwen3-14b-podman-test +# +# Training dataset: SkyRL Harbor dataset layout, defaulting to the prepared +# open-thoughts/CodeContests tasks under ./data-data/harbor/CodeContests. +# Training workflow: terminal_bench_harbor_rl through Harbor + Terminus-2. +# Eval dataset: LiveCodeBench v6 only. +# ============================================================================ + +experiment: + experiment_name: astraflow-code + trial_name: terminal-bench-rl-qwen3-14b-podman-test + fileroot: ./data-experiments/${experiment.experiment_name}/${experiment.trial_name} + + model_path: "Qwen/Qwen3-14B" + tokenizer_path: "Qwen/Qwen3-14B" + seed: 1 + dtype: bfloat16 + weight_transfer_mode: tcp + weight_transfer_strategies: delta + +cluster: + name_resolve: + type: nfs + nfs_record_root: ./data-experiments/astraflow-code/terminal-bench-rl-qwen3-14b-podman-test/name_resolve + +raas: + models: + model0: + backend: sglang + gconfig: + n_samples: 8 + temperature: 1.0 + max_new_tokens: 8192 + min_new_tokens: 0 + delta_full_sync_interval: 10 + +dataflow: + host: "0.0.0.0" + port: 8005 + dump_dir: ./data-experiments/${experiment.experiment_name}/${experiment.trial_name}/rollout_dumps + + buffer: + size: 10000 + replay_size: 10000 + replay_ratio: 0 + max_staleness: 8 + filter_function: filter_zero_adv + + rollout_dataset: + dataset_fn: "astraflow.dataflow.dataset.terminal_bench:get_harbor_task_path_dataset" + path: "$HARBOR_TRAIN_DATA" + split: "train" + dataset_name: "skyrl_codecontests" + + workflow_spec: + workflow_cls: "terminal_bench_harbor_rl" + harbor_command: + - conda + - run + - "--no-capture-output" + - "-n" + - harbor-tb2 + - harbor + extra_args: + - "--environment-import-path" + - "examples.terminal-bench.harbor_podman_env:PodmanEnvironment" + agent_name: "terminus-2" + model_name: "openai/Qwen/Qwen3-14B" + jobs_dir: ./data-experiments/astraflow-code/terminal-bench-rl-qwen3-14b-podman-test/harbor_jobs + timeout: 7200.0 + max_parallel_jobs: 16 + rollout_detail_index: 0 + agent_kwargs: + temperature: 1.0 + max_turns: 1 + suppress_max_turns_warning: true + enable_summarize: false + collect_rollout_details: true + + eval_workflows: + code_eval: + workflow_cls: "livecodebench_single_turn" + reward_fn: "livecodebench_reward" + gconfig_overrides: + temperature: 0.6 + n_samples: 1 + + eval_datasets: + lcb_v6: + dataset_fn: "astraflow.dataflow.dataset.livecodebench:get_livecodebench_single_turn_test_dataset" + path: "./data-data/lcb_v6/test.jsonl" + split: "test" + max_length: 6000 + repeat: 1 + eval_workflow: code_eval + +trainer_base: + total_train_steps: 1200 + train_batch_size: 32 + n_samples: 8 + engine: + backend: fsdp + data_parallel_size: 2 + + actor: + gradient_checkpointing: true + mb_spec: + max_tokens_per_mb: 22000 + optimizer: + type: adam + lr: 3e-6 + weight_decay: 0.01 + beta1: 0.9 + beta2: 0.999 + eps: 1e-8 + lr_scheduler_type: constant + gradient_clipping: 1.0 + m2_threshold: 0.01 + eps_clip: 100.0 + eps_clip_higher: 100.0 + reward_scaling: 1 + reward_bias: 0 + kl_ctl: 0.00 + kl_penalty_coef: 0.001 + ppo_n_minibatches: 2 + reward_norm: { mean_level: group, std_level: group } + adv_norm: { mean_level: batch, std_level: batch } + + ref: + mb_spec: + max_tokens_per_mb: 22000 + + recover: + mode: auto + freq_steps: 25 + + evaluator: + freq_steps: 25 + + stats_logger: + wandb: + mode: online + id_suffix: "uid" + tags: ["m2po", "code", "harbor-rl", "skyrl-codecontests", "terminus-2", "lcb-v6-eval", "qwen3-14b", "tcp", "delta", "b200", "podman"] + +trainer_model0: + model_id: model0 diff --git a/examples/terminal-bench/terminal-bench-rl-qwen3-14b-podman-test/yaml/raas.yaml b/examples/terminal-bench/terminal-bench-rl-qwen3-14b-podman-test/yaml/raas.yaml new file mode 100644 index 0000000..2d916ba --- /dev/null +++ b/examples/terminal-bench/terminal-bench-rl-qwen3-14b-podman-test/yaml/raas.yaml @@ -0,0 +1,28 @@ +# ============================================================================ +# RaaS config — Inference serving instance for Harbor RL recipe +# +# Hardware layout: 2 GPUs for RaaS + 2 GPUs for trainer by default. +# Harbor/Terminus-2 uses SGLang's OpenAI chat endpoint, so tokenizer init must +# stay enabled. +# ============================================================================ + +rollout: + max_concurrent_rollouts: 64 + max_concurrent_evals: 64 + pause_grace_period: 3 + enable_adaptive_availability: true + target_waiting_queue_per_dp: 4 + adaptive_step_size: 4 + load_cache_ttl_ms: 100 + +engine: + model0: + backend: sglang + data_parallel_size: 2 + +sglang: + context_length: 12288 + mem_fraction_static: 0.8 + max_running_requests: 48 + skip_tokenizer_init: false + attention_backend: flashinfer