From 867d0e7952e85fac338566c7dcfcd49751e47f05 Mon Sep 17 00:00:00 2001 From: hlin99 Date: Mon, 6 Apr 2026 18:48:23 +0800 Subject: [PATCH] feat: Deployment Readiness Report (M119) - ReadinessAssessor class in readiness.py - ReadinessConfig, ReadinessCheck, ReadinessVerdict, ReadinessReport Pydantic models - Combines quality gate, SLA risk score, SLA headroom, cost efficiency, rate limit headroom - Per-check pass/warn/fail with configurable thresholds - Overall verdict: READY (all pass), CAUTION (any warn), NOT_READY (any fail) - CLI readiness subcommand with table + JSON output - Programmatic assess_readiness() API - 26 tests Closes #261 --- ROADMAP.md | 14 ++ docs/iterations/current.md | 5 +- src/xpyd_plan/cli/_main.py | 6 + src/xpyd_plan/cli/_readiness.py | 146 +++++++++++ src/xpyd_plan/readiness.py | 430 ++++++++++++++++++++++++++++++++ tests/test_readiness.py | 353 ++++++++++++++++++++++++++ 6 files changed, 952 insertions(+), 2 deletions(-) create mode 100644 src/xpyd_plan/cli/_readiness.py create mode 100644 src/xpyd_plan/readiness.py create mode 100644 tests/test_readiness.py diff --git a/ROADMAP.md b/ROADMAP.md index 6196afe..cf2e90b 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -1596,3 +1596,17 @@ Help users find the **optimal Prefill:Decode instance ratio** based on **real be - CLI `sla-risk` subcommand with `--benchmark`, `--sla-ttft`, `--sla-tpot`, `--sla-total`, table + JSON output - Programmatic `assess_sla_risk()` API - ~22 new tests + +### M119 🔄 Deployment Readiness Report + +*In progress* + +- `ReadinessAssessor` class in `readiness.py` +- `ReadinessConfig`, `ReadinessCheck`, `ReadinessVerdict`, `ReadinessReport` Pydantic models +- Unified go/no-go deployment readiness assessment combining: quality gate result, SLA risk score, SLA headroom safety level, cost efficiency ratio, and rate limit headroom +- Per-check pass/warn/fail with configurable thresholds (risk score < 50, headroom > 10%, quality gate PASS, cost efficiency > 0.7, rate limit headroom > 15%) +- Overall verdict: READY (all pass), CAUTION (any warn, no fail), NOT_READY (any fail) +- Actionable deployment recommendations and blockers list +- CLI `readiness` subcommand with `--benchmark`, `--sla-ttft`, `--sla-tpot`, `--sla-total`, `--cost-model`, table + JSON output +- Programmatic `assess_readiness()` API +- ~24 new tests diff --git a/docs/iterations/current.md b/docs/iterations/current.md index 0ca3402..fbc8b4d 100644 --- a/docs/iterations/current.md +++ b/docs/iterations/current.md @@ -69,5 +69,6 @@ The project has completed **110 milestones**, covering the full feature chain fr | 8 | 2026-04-06 | M114 Multi-Backend Comparison Report | ✅ merged | PR #252, both bots approved | | 9 | 2026-04-06 | M115 Workload Mix Optimizer | ✅ merged | PR #254, both bots approved | | 10 | 2026-04-06 | M116 GPU Hour Calculator | ✅ merged | PR #256, both bots approved | -| 11 | 2026-04-06 | M117 Benchmark Quality Gate | ⏳ pending review | PR TBD | -| 12 | 2026-04-06 | M118 SLA Risk Score | ⏳ pending review | PR TBD | +| 11 | 2026-04-06 | M117 Benchmark Quality Gate | ✅ merged | PR #258 | +| 12 | 2026-04-06 | M118 SLA Risk Score | ✅ merged | PR #260, both bots approved | +| 13 | 2026-04-06 | M119 Deployment Readiness Report | ⏳ pending review | PR TBD | diff --git a/src/xpyd_plan/cli/_main.py b/src/xpyd_plan/cli/_main.py index bdb9186..4f72a75 100644 --- a/src/xpyd_plan/cli/_main.py +++ b/src/xpyd_plan/cli/_main.py @@ -69,6 +69,7 @@ from xpyd_plan.cli._ranking import _cmd_ranking, add_ranking_parser from xpyd_plan.cli._rate_limit import add_rate_limit_parser from xpyd_plan.cli._ratio_compare import add_ratio_compare_parser +from xpyd_plan.cli._readiness import register as register_readiness from xpyd_plan.cli._recommend import _cmd_recommend from xpyd_plan.cli._regression import _cmd_regression, add_regression_parser from xpyd_plan.cli._replay import add_replay_parser @@ -974,6 +975,7 @@ def main(argv: list[str] | None = None) -> None: register_gpu_hours(subparsers) register_quality_gate(subparsers) register_sla_risk(subparsers) + register_readiness(subparsers) register_workload_mix(subparsers) add_rate_limit_parser(subparsers) add_batch_analysis_parser(subparsers) @@ -1344,6 +1346,10 @@ def main(argv: list[str] | None = None) -> None: from xpyd_plan.cli._sla_risk import _cmd_sla_risk _cmd_sla_risk(args) + elif args.command == "readiness": + from xpyd_plan.cli._readiness import _cmd_readiness + + _cmd_readiness(args) else: parser.print_help() sys.exit(1) diff --git a/src/xpyd_plan/cli/_readiness.py b/src/xpyd_plan/cli/_readiness.py new file mode 100644 index 0000000..46495cf --- /dev/null +++ b/src/xpyd_plan/cli/_readiness.py @@ -0,0 +1,146 @@ +"""CLI readiness command.""" + +from __future__ import annotations + +import argparse +import json +import sys + +from rich.console import Console +from rich.table import Table + +from xpyd_plan.bench_adapter import load_benchmark_auto +from xpyd_plan.readiness import ReadinessAssessor, ReadinessConfig + + +def _cmd_readiness(args: argparse.Namespace) -> None: + """Handle the 'readiness' subcommand.""" + console = Console() + + data = load_benchmark_auto(args.benchmark) + + config = ReadinessConfig() + assessor = ReadinessAssessor(config=config) + report = assessor.assess( + data, + sla_ttft_ms=args.sla_ttft, + sla_tpot_ms=args.sla_tpot, + sla_total_ms=args.sla_total, + cost_per_request=args.cost_per_request, + optimal_cost_per_request=args.optimal_cost, + measured_qps=args.measured_qps, + max_safe_qps=args.max_safe_qps, + ) + + output_format = getattr(args, "output_format", "table") + if output_format == "json": + json.dump(report.model_dump(), sys.stdout, indent=2) + sys.stdout.write("\n") + return + + # Verdict banner + verdict_style = { + "ready": "[bold green]READY[/bold green]", + "caution": "[bold yellow]CAUTION[/bold yellow]", + "not_ready": "[bold red]NOT READY[/bold red]", + } + styled = verdict_style.get(report.verdict.value, report.verdict.value) + console.print(f"\nDeployment Readiness: {styled}") + console.print() + + # Checks table + table = Table(title="Readiness Checks") + table.add_column("Check", justify="left") + table.add_column("Status", justify="center") + table.add_column("Value", justify="right") + table.add_column("Threshold", justify="left") + table.add_column("Detail", justify="left") + + status_style = { + "pass": "[green]PASS[/green]", + "warn": "[yellow]WARN[/yellow]", + "fail": "[red]FAIL[/red]", + } + + for c in report.checks: + table.add_row( + c.name, + status_style.get(c.status.value, c.status.value), + c.value, + c.threshold, + c.detail, + ) + + console.print(table) + console.print() + + if report.blockers: + console.print(f"[bold red]Blockers:[/bold red] {', '.join(report.blockers)}") + if report.warnings: + console.print(f"[yellow]Warnings:[/yellow] {', '.join(report.warnings)}") + + console.print(f"\n[bold]{report.recommendation}[/bold]") + + if report.verdict.value == "not_ready": + sys.exit(1) + + +def register(subparsers: argparse._SubParsersAction) -> None: + """Register the readiness subcommand.""" + parser = subparsers.add_parser( + "readiness", + help="Unified deployment readiness assessment (go/no-go)", + ) + parser.add_argument( + "--benchmark", + required=True, + help="Path to benchmark JSON file", + ) + parser.add_argument( + "--sla-ttft", + type=float, + default=None, + help="TTFT SLA threshold in ms", + ) + parser.add_argument( + "--sla-tpot", + type=float, + default=None, + help="TPOT SLA threshold in ms", + ) + parser.add_argument( + "--sla-total", + type=float, + default=None, + help="Total latency SLA threshold in ms", + ) + parser.add_argument( + "--cost-per-request", + type=float, + default=None, + help="Actual cost per request", + ) + parser.add_argument( + "--optimal-cost", + type=float, + default=None, + help="Optimal cost per request baseline", + ) + parser.add_argument( + "--measured-qps", + type=float, + default=None, + help="Current measured QPS", + ) + parser.add_argument( + "--max-safe-qps", + type=float, + default=None, + help="Maximum safe QPS from rate limit analysis", + ) + parser.add_argument( + "--output-format", + choices=["table", "json"], + default="table", + help="Output format (default: table)", + ) diff --git a/src/xpyd_plan/readiness.py b/src/xpyd_plan/readiness.py new file mode 100644 index 0000000..de7a4d9 --- /dev/null +++ b/src/xpyd_plan/readiness.py @@ -0,0 +1,430 @@ +"""Deployment Readiness Report — unified go/no-go assessment for production deployment. + +Combines quality gate, SLA risk score, SLA headroom, cost efficiency, and rate +limit headroom into a single deployment readiness verdict with actionable +recommendations. +""" + +from __future__ import annotations + +from enum import Enum + +from pydantic import BaseModel, Field + +from .benchmark_models import BenchmarkData +from .quality_gate import GateVerdict, QualityGate +from .sla_headroom import SLAHeadroomCalculator +from .sla_risk import SLARiskScorer + + +class ReadinessVerdict(str, Enum): + """Overall deployment readiness verdict.""" + + READY = "ready" + CAUTION = "caution" + NOT_READY = "not_ready" + + +class CheckStatus(str, Enum): + """Status of a single readiness check.""" + + PASS = "pass" + WARN = "warn" + FAIL = "fail" + + +class ReadinessCheck(BaseModel): + """Result of a single readiness check.""" + + name: str = Field(..., description="Check name") + status: CheckStatus = Field(..., description="Check status") + detail: str = Field(..., description="Human-readable explanation") + value: str = Field(..., description="Measured value") + threshold: str = Field(..., description="Threshold applied") + + +class ReadinessConfig(BaseModel): + """Configuration for readiness assessment thresholds.""" + + max_risk_score: float = Field( + default=50.0, ge=0, le=100, description="Max acceptable SLA risk score" + ) + risk_warn_score: float = Field( + default=35.0, ge=0, le=100, description="Risk score warn threshold" + ) + min_headroom_pct: float = Field( + default=10.0, ge=0, description="Minimum SLA headroom percentage" + ) + headroom_warn_pct: float = Field( + default=20.0, ge=0, description="Headroom warn threshold percentage" + ) + require_quality_gate: bool = Field( + default=True, description="Require quality gate pass" + ) + min_cost_efficiency: float = Field( + default=0.7, ge=0.0, le=1.0, description="Minimum cost efficiency ratio" + ) + cost_efficiency_warn: float = Field( + default=0.8, ge=0.0, le=1.0, description="Cost efficiency warn threshold" + ) + min_rate_headroom_pct: float = Field( + default=15.0, ge=0, description="Minimum rate limit headroom percentage" + ) + rate_headroom_warn_pct: float = Field( + default=25.0, ge=0, description="Rate headroom warn threshold percentage" + ) + + +def _derive_verdict(checks: list[ReadinessCheck]) -> ReadinessVerdict: + """Derive overall verdict from individual check statuses.""" + statuses = {c.status for c in checks} + if CheckStatus.FAIL in statuses: + return ReadinessVerdict.NOT_READY + if CheckStatus.WARN in statuses: + return ReadinessVerdict.CAUTION + return ReadinessVerdict.READY + + +class ReadinessReport(BaseModel): + """Complete deployment readiness report.""" + + verdict: ReadinessVerdict = Field(..., description="Overall deployment readiness") + checks: list[ReadinessCheck] = Field( + ..., description="Individual check results" + ) + blockers: list[str] = Field( + default_factory=list, description="Failing checks that block deployment" + ) + warnings: list[str] = Field( + default_factory=list, description="Checks in warn status" + ) + recommendation: str = Field(..., description="Actionable deployment recommendation") + total_requests: int = Field(..., description="Requests in benchmark data") + + +class ReadinessAssessor: + """Unified deployment readiness assessor.""" + + def __init__(self, config: ReadinessConfig | None = None) -> None: + self._config = config or ReadinessConfig() + + @property + def config(self) -> ReadinessConfig: + return self._config + + def assess( + self, + data: BenchmarkData, + *, + sla_ttft_ms: float | None = None, + sla_tpot_ms: float | None = None, + sla_total_ms: float | None = None, + cost_per_request: float | None = None, + optimal_cost_per_request: float | None = None, + measured_qps: float | None = None, + max_safe_qps: float | None = None, + ) -> ReadinessReport: + """Assess deployment readiness from benchmark data and optional context. + + Args: + data: Benchmark data to analyze. + sla_ttft_ms: TTFT SLA threshold in ms. + sla_tpot_ms: TPOT SLA threshold in ms. + sla_total_ms: Total latency SLA threshold in ms. + cost_per_request: Actual cost per request (for cost efficiency check). + optimal_cost_per_request: Optimal cost per request baseline. + measured_qps: Current measured QPS. + max_safe_qps: Maximum safe QPS from rate limit analysis. + + Returns: + ReadinessReport with verdict and per-check details. + """ + checks: list[ReadinessCheck] = [] + + # 1. Quality gate check + checks.append(self._check_quality_gate(data)) + + # 2. SLA risk score check + has_sla = any(x is not None for x in (sla_ttft_ms, sla_tpot_ms, sla_total_ms)) + if has_sla: + checks.append( + self._check_sla_risk( + data, + sla_ttft_ms=sla_ttft_ms, + sla_tpot_ms=sla_tpot_ms, + sla_total_ms=sla_total_ms, + ) + ) + checks.append( + self._check_sla_headroom( + data, + sla_ttft_ms=sla_ttft_ms, + sla_tpot_ms=sla_tpot_ms, + sla_total_ms=sla_total_ms, + ) + ) + + # 4. Cost efficiency check + if cost_per_request is not None and optimal_cost_per_request is not None: + checks.append( + self._check_cost_efficiency( + cost_per_request, optimal_cost_per_request + ) + ) + + # 5. Rate limit headroom check + if measured_qps is not None and max_safe_qps is not None: + checks.append( + self._check_rate_headroom(measured_qps, max_safe_qps) + ) + + verdict = _derive_verdict(checks) + blockers = [c.name for c in checks if c.status == CheckStatus.FAIL] + warnings = [c.name for c in checks if c.status == CheckStatus.WARN] + recommendation = self._generate_recommendation(verdict, blockers, warnings) + + return ReadinessReport( + verdict=verdict, + checks=checks, + blockers=blockers, + warnings=warnings, + recommendation=recommendation, + total_requests=len(data.requests), + ) + + def _check_quality_gate(self, data: BenchmarkData) -> ReadinessCheck: + """Run quality gate and return readiness check.""" + gate = QualityGate() + result = gate.evaluate(data) + verdict_val = result.verdict.value # "pass", "warn", "fail" + passed_count = sum( + 1 for c in result.checks + if c.verdict == GateVerdict.PASS + ) + total_count = len(result.checks) + if verdict_val == GateVerdict.PASS.value: + status = CheckStatus.PASS + elif verdict_val == GateVerdict.WARN.value: + status = CheckStatus.WARN + else: + status = CheckStatus.FAIL + return ReadinessCheck( + name="quality_gate", + status=status, + detail=( + f"Quality gate: {result.verdict.value} " + f"({passed_count}/{total_count} checks passed)" + ), + value=result.verdict.value, + threshold="pass required" if self._config.require_quality_gate else "advisory", + ) + + def _check_sla_risk( + self, + data: BenchmarkData, + *, + sla_ttft_ms: float | None, + sla_tpot_ms: float | None, + sla_total_ms: float | None, + ) -> ReadinessCheck: + """Assess SLA risk score.""" + scorer = SLARiskScorer() + report = scorer.assess( + data, + sla_ttft_ms=sla_ttft_ms, + sla_tpot_ms=sla_tpot_ms, + sla_total_ms=sla_total_ms, + ) + score = report.risk_score.total_score + cfg = self._config + if score > cfg.max_risk_score: + status = CheckStatus.FAIL + elif score > cfg.risk_warn_score: + status = CheckStatus.WARN + else: + status = CheckStatus.PASS + return ReadinessCheck( + name="sla_risk_score", + status=status, + detail=f"SLA risk score: {score:.1f}/100 ({report.risk_score.risk_level.value})", + value=f"{score:.1f}", + threshold=f"fail>{cfg.max_risk_score}, warn>{cfg.risk_warn_score}", + ) + + def _check_sla_headroom( + self, + data: BenchmarkData, + *, + sla_ttft_ms: float | None, + sla_tpot_ms: float | None, + sla_total_ms: float | None, + ) -> ReadinessCheck: + """Check SLA headroom safety level.""" + report = SLAHeadroomCalculator().calculate( + data, + sla_ttft_ms=sla_ttft_ms, + sla_tpot_ms=sla_tpot_ms, + sla_total_ms=sla_total_ms, + ) + # Find tightest headroom + if not report.metrics: + return ReadinessCheck( + name="sla_headroom", + status=CheckStatus.PASS, + detail="No SLA metrics configured for headroom analysis", + value="N/A", + threshold=f"min {self._config.min_headroom_pct}%", + ) + + tightest = min(report.metrics, key=lambda m: m.headroom_pct) + pct = tightest.headroom_pct + cfg = self._config + + if not tightest.passes_sla or pct < cfg.min_headroom_pct: + status = CheckStatus.FAIL + elif pct < cfg.headroom_warn_pct: + status = CheckStatus.WARN + else: + status = CheckStatus.PASS + + detail = ( + f"Tightest headroom: {tightest.metric} at P{tightest.percentile:.0f} " + f"— {pct:.1f}% ({tightest.safety_level.value})" + ) + return ReadinessCheck( + name="sla_headroom", + status=status, + detail=detail, + value=f"{pct:.1f}%", + threshold=f"fail<{cfg.min_headroom_pct}%, warn<{cfg.headroom_warn_pct}%", + ) + + def _check_cost_efficiency( + self, + cost_per_request: float, + optimal_cost: float, + ) -> ReadinessCheck: + """Check cost efficiency ratio (optimal / actual).""" + if cost_per_request <= 0: + return ReadinessCheck( + name="cost_efficiency", + status=CheckStatus.PASS, + detail="Cost per request is zero or negative — skipped", + value="N/A", + threshold="N/A", + ) + ratio = optimal_cost / cost_per_request if cost_per_request > 0 else 1.0 + ratio = min(ratio, 1.0) # cap at 1.0 + cfg = self._config + if ratio < cfg.min_cost_efficiency: + status = CheckStatus.FAIL + elif ratio < cfg.cost_efficiency_warn: + status = CheckStatus.WARN + else: + status = CheckStatus.PASS + return ReadinessCheck( + name="cost_efficiency", + status=status, + detail=f"Cost efficiency: {ratio:.2f} (optimal/actual cost ratio)", + value=f"{ratio:.2f}", + threshold=f"fail<{cfg.min_cost_efficiency}, warn<{cfg.cost_efficiency_warn}", + ) + + def _check_rate_headroom( + self, + measured_qps: float, + max_safe_qps: float, + ) -> ReadinessCheck: + """Check rate limit headroom.""" + if max_safe_qps <= 0: + return ReadinessCheck( + name="rate_headroom", + status=CheckStatus.FAIL, + detail="Max safe QPS is zero — no headroom", + value="0.0%", + threshold=f"min {self._config.min_rate_headroom_pct}%", + ) + headroom_pct = ((max_safe_qps - measured_qps) / max_safe_qps) * 100 + cfg = self._config + if headroom_pct < cfg.min_rate_headroom_pct: + status = CheckStatus.FAIL + elif headroom_pct < cfg.rate_headroom_warn_pct: + status = CheckStatus.WARN + else: + status = CheckStatus.PASS + return ReadinessCheck( + name="rate_headroom", + status=status, + detail=( + f"Rate headroom: {headroom_pct:.1f}% " + f"(measured {measured_qps:.1f} vs max safe {max_safe_qps:.1f} QPS)" + ), + value=f"{headroom_pct:.1f}%", + threshold=f"fail<{cfg.min_rate_headroom_pct}%, warn<{cfg.rate_headroom_warn_pct}%", + ) + + @staticmethod + def _generate_recommendation( + verdict: ReadinessVerdict, + blockers: list[str], + warnings: list[str], + ) -> str: + """Generate actionable recommendation from verdict.""" + if verdict == ReadinessVerdict.READY: + return "All checks passed. System is ready for production deployment." + if verdict == ReadinessVerdict.NOT_READY: + blocker_str = ", ".join(blockers) + return ( + f"Deployment blocked by failing checks: {blocker_str}. " + "Address these issues before deploying to production." + ) + # CAUTION + warn_str = ", ".join(warnings) + return ( + f"Deployment possible with caution — warnings on: {warn_str}. " + "Consider addressing these before high-traffic deployment." + ) + + +def assess_readiness( + benchmark_path: str, + *, + sla_ttft_ms: float | None = None, + sla_tpot_ms: float | None = None, + sla_total_ms: float | None = None, + cost_per_request: float | None = None, + optimal_cost_per_request: float | None = None, + measured_qps: float | None = None, + max_safe_qps: float | None = None, + config: ReadinessConfig | None = None, +) -> ReadinessReport: + """Convenience function: load benchmark and assess readiness. + + Args: + benchmark_path: Path to benchmark JSON file. + sla_ttft_ms: TTFT SLA threshold in ms. + sla_tpot_ms: TPOT SLA threshold in ms. + sla_total_ms: Total latency SLA threshold in ms. + cost_per_request: Actual cost per request. + optimal_cost_per_request: Optimal cost per request baseline. + measured_qps: Current measured QPS. + max_safe_qps: Maximum safe QPS from rate limit analysis. + config: Optional readiness config overrides. + + Returns: + ReadinessReport with verdict, checks, and recommendation. + """ + from .bench_adapter import load_benchmark_auto + + data = load_benchmark_auto(benchmark_path) + assessor = ReadinessAssessor(config=config) + return assessor.assess( + data, + sla_ttft_ms=sla_ttft_ms, + sla_tpot_ms=sla_tpot_ms, + sla_total_ms=sla_total_ms, + cost_per_request=cost_per_request, + optimal_cost_per_request=optimal_cost_per_request, + measured_qps=measured_qps, + max_safe_qps=max_safe_qps, + ) diff --git a/tests/test_readiness.py b/tests/test_readiness.py new file mode 100644 index 0000000..7cbb886 --- /dev/null +++ b/tests/test_readiness.py @@ -0,0 +1,353 @@ +"""Tests for Deployment Readiness Report (M119).""" + +from __future__ import annotations + +import tempfile + +from xpyd_plan.benchmark_models import ( + BenchmarkData, + BenchmarkMetadata, + BenchmarkRequest, +) +from xpyd_plan.readiness import ( + CheckStatus, + ReadinessAssessor, + ReadinessCheck, + ReadinessConfig, + ReadinessReport, + ReadinessVerdict, + _derive_verdict, + assess_readiness, +) + + +def _make_data( + n: int = 200, + ttft_base: float = 50.0, + tpot_base: float = 10.0, + total_base: float = 200.0, + spread: float = 0.1, +) -> BenchmarkData: + """Generate benchmark data with low variance (healthy system).""" + requests = [] + for i in range(n): + frac = i / max(n - 1, 1) + requests.append( + BenchmarkRequest( + request_id=f"req-{i:04d}", + prompt_tokens=100, + output_tokens=50, + ttft_ms=ttft_base + frac * spread * ttft_base, + tpot_ms=tpot_base + frac * spread * tpot_base, + total_latency_ms=total_base + frac * spread * total_base, + timestamp=1000.0 + i, + ) + ) + return BenchmarkData( + metadata=BenchmarkMetadata( + num_prefill_instances=2, + num_decode_instances=2, + total_instances=4, + measured_qps=10.0, + ), + requests=requests, + ) + + +def _make_unhealthy_data(n: int = 200) -> BenchmarkData: + """Generate data with high jitter and spikes.""" + import random + + random.seed(99) + requests = [] + for i in range(n): + mult = random.uniform(0.5, 8.0) if i % 5 == 0 else random.uniform(0.8, 1.5) + requests.append( + BenchmarkRequest( + request_id=f"req-{i:04d}", + prompt_tokens=100, + output_tokens=50, + ttft_ms=50.0 * mult, + tpot_ms=10.0 * mult, + total_latency_ms=200.0 * mult, + timestamp=1000.0 + i, + ) + ) + return BenchmarkData( + metadata=BenchmarkMetadata( + num_prefill_instances=2, + num_decode_instances=2, + total_instances=4, + measured_qps=10.0, + ), + requests=requests, + ) + + +def _check(name: str, status: CheckStatus, detail: str = "ok") -> ReadinessCheck: + return ReadinessCheck( + name=name, status=status, detail=detail, value="1", threshold="1", + ) + + +class TestDeriveVerdict: + def test_all_pass(self): + checks = [_check("a", CheckStatus.PASS), _check("b", CheckStatus.PASS)] + assert _derive_verdict(checks) == ReadinessVerdict.READY + + def test_any_warn(self): + checks = [_check("a", CheckStatus.PASS), _check("b", CheckStatus.WARN, "meh")] + assert _derive_verdict(checks) == ReadinessVerdict.CAUTION + + def test_any_fail(self): + checks = [_check("a", CheckStatus.PASS), _check("b", CheckStatus.FAIL, "bad")] + assert _derive_verdict(checks) == ReadinessVerdict.NOT_READY + + def test_fail_overrides_warn(self): + checks = [_check("a", CheckStatus.WARN, "meh"), _check("b", CheckStatus.FAIL, "bad")] + assert _derive_verdict(checks) == ReadinessVerdict.NOT_READY + + +class TestReadinessConfig: + def test_defaults(self): + cfg = ReadinessConfig() + assert cfg.max_risk_score == 50.0 + assert cfg.min_headroom_pct == 10.0 + assert cfg.min_cost_efficiency == 0.7 + assert cfg.min_rate_headroom_pct == 15.0 + + def test_custom(self): + cfg = ReadinessConfig(max_risk_score=30.0, min_headroom_pct=20.0) + assert cfg.max_risk_score == 30.0 + assert cfg.min_headroom_pct == 20.0 + + +class TestReadinessAssessor: + def test_basic_quality_gate_only(self): + data = _make_data(n=200) + assessor = ReadinessAssessor() + report = assessor.assess(data) + assert isinstance(report, ReadinessReport) + assert report.total_requests == 200 + assert len(report.checks) >= 1 + # quality gate should be first check + assert report.checks[0].name == "quality_gate" + + def test_ready_with_sla(self): + data = _make_data(n=200, spread=0.1) + assessor = ReadinessAssessor() + report = assessor.assess( + data, + sla_ttft_ms=200.0, # very generous + sla_tpot_ms=50.0, + sla_total_ms=500.0, + ) + # Should have quality_gate + sla_risk_score + sla_headroom + names = [c.name for c in report.checks] + assert "quality_gate" in names + assert "sla_risk_score" in names + assert "sla_headroom" in names + + def test_no_sla_checks_without_sla(self): + data = _make_data(n=200) + assessor = ReadinessAssessor() + report = assessor.assess(data) + names = [c.name for c in report.checks] + assert "sla_risk_score" not in names + assert "sla_headroom" not in names + + def test_cost_efficiency_pass(self): + data = _make_data(n=200) + assessor = ReadinessAssessor() + report = assessor.assess( + data, + cost_per_request=0.01, + optimal_cost_per_request=0.009, + ) + names = [c.name for c in report.checks] + assert "cost_efficiency" in names + cost_check = [c for c in report.checks if c.name == "cost_efficiency"][0] + assert cost_check.status == CheckStatus.PASS + + def test_cost_efficiency_fail(self): + data = _make_data(n=200) + assessor = ReadinessAssessor() + report = assessor.assess( + data, + cost_per_request=0.10, + optimal_cost_per_request=0.02, + ) + cost_check = [c for c in report.checks if c.name == "cost_efficiency"][0] + assert cost_check.status == CheckStatus.FAIL + + def test_cost_efficiency_warn(self): + data = _make_data(n=200) + assessor = ReadinessAssessor() + report = assessor.assess( + data, + cost_per_request=0.10, + optimal_cost_per_request=0.075, + ) + cost_check = [c for c in report.checks if c.name == "cost_efficiency"][0] + assert cost_check.status == CheckStatus.WARN + + def test_rate_headroom_pass(self): + data = _make_data(n=200) + assessor = ReadinessAssessor() + report = assessor.assess( + data, + measured_qps=50.0, + max_safe_qps=100.0, + ) + rate_check = [c for c in report.checks if c.name == "rate_headroom"][0] + assert rate_check.status == CheckStatus.PASS + + def test_rate_headroom_fail(self): + data = _make_data(n=200) + assessor = ReadinessAssessor() + report = assessor.assess( + data, + measured_qps=95.0, + max_safe_qps=100.0, + ) + rate_check = [c for c in report.checks if c.name == "rate_headroom"][0] + assert rate_check.status == CheckStatus.FAIL + + def test_rate_headroom_warn(self): + data = _make_data(n=200) + assessor = ReadinessAssessor() + report = assessor.assess( + data, + measured_qps=78.0, + max_safe_qps=100.0, + ) + rate_check = [c for c in report.checks if c.name == "rate_headroom"][0] + assert rate_check.status == CheckStatus.WARN + + def test_rate_headroom_zero_max(self): + data = _make_data(n=200) + assessor = ReadinessAssessor() + report = assessor.assess( + data, + measured_qps=10.0, + max_safe_qps=0.0, + ) + rate_check = [c for c in report.checks if c.name == "rate_headroom"][0] + assert rate_check.status == CheckStatus.FAIL + + def test_verdict_ready(self): + data = _make_data(n=200) + assessor = ReadinessAssessor() + report = assessor.assess(data) + # Only quality gate, should pass on clean data + assert report.verdict in (ReadinessVerdict.READY, ReadinessVerdict.CAUTION) + + def test_recommendation_ready(self): + data = _make_data(n=200) + assessor = ReadinessAssessor() + report = assessor.assess(data) + assert report.recommendation # non-empty + + def test_blockers_populated_on_fail(self): + data = _make_data(n=200) + assessor = ReadinessAssessor() + report = assessor.assess( + data, + measured_qps=99.0, + max_safe_qps=100.0, + ) + if report.verdict == ReadinessVerdict.NOT_READY: + assert len(report.blockers) > 0 + + def test_custom_config(self): + cfg = ReadinessConfig(max_risk_score=10.0) + assessor = ReadinessAssessor(config=cfg) + assert assessor.config.max_risk_score == 10.0 + + +class TestReadinessReport: + def test_model_serialization(self): + report = ReadinessReport( + verdict=ReadinessVerdict.READY, + checks=[ + ReadinessCheck( + name="test", + status=CheckStatus.PASS, + detail="ok", + value="1", + threshold="1", + ) + ], + blockers=[], + warnings=[], + recommendation="Ship it.", + total_requests=100, + ) + d = report.model_dump() + assert d["verdict"] == "ready" + assert len(d["checks"]) == 1 + + def test_model_roundtrip(self): + report = ReadinessReport( + verdict=ReadinessVerdict.CAUTION, + checks=[ + ReadinessCheck( + name="x", + status=CheckStatus.WARN, + detail="meh", + value="42", + threshold="50", + ) + ], + blockers=[], + warnings=["x"], + recommendation="Proceed with care.", + total_requests=50, + ) + json_str = report.model_dump_json() + loaded = ReadinessReport.model_validate_json(json_str) + assert loaded.verdict == ReadinessVerdict.CAUTION + + +class TestAssessReadinessConvenience: + def test_from_file(self): + data = _make_data(n=150) + with tempfile.NamedTemporaryFile( + mode="w", suffix=".json", delete=False + ) as f: + f.write(data.model_dump_json()) + path = f.name + + report = assess_readiness(path) + assert isinstance(report, ReadinessReport) + assert report.total_requests == 150 + + def test_from_file_with_sla(self): + data = _make_data(n=150) + with tempfile.NamedTemporaryFile( + mode="w", suffix=".json", delete=False + ) as f: + f.write(data.model_dump_json()) + path = f.name + + report = assess_readiness( + path, + sla_ttft_ms=200.0, + sla_tpot_ms=50.0, + sla_total_ms=500.0, + ) + assert len(report.checks) >= 3 + + +class TestCheckStatusEnum: + def test_values(self): + assert CheckStatus.PASS.value == "pass" + assert CheckStatus.WARN.value == "warn" + assert CheckStatus.FAIL.value == "fail" + + +class TestReadinessVerdictEnum: + def test_values(self): + assert ReadinessVerdict.READY.value == "ready" + assert ReadinessVerdict.CAUTION.value == "caution" + assert ReadinessVerdict.NOT_READY.value == "not_ready"