diff --git a/docs/iterations/current.md b/docs/iterations/current.md index 2fd08d7..0ca3402 100644 --- a/docs/iterations/current.md +++ b/docs/iterations/current.md @@ -70,3 +70,4 @@ The project has completed **110 milestones**, covering the full feature chain fr | 9 | 2026-04-06 | M115 Workload Mix Optimizer | ✅ merged | PR #254, both bots approved | | 10 | 2026-04-06 | M116 GPU Hour Calculator | ✅ merged | PR #256, both bots approved | | 11 | 2026-04-06 | M117 Benchmark Quality Gate | ⏳ pending review | PR TBD | +| 12 | 2026-04-06 | M118 SLA Risk Score | ⏳ pending review | PR TBD | diff --git a/src/xpyd_plan/__init__.py b/src/xpyd_plan/__init__.py index 158a9ff..4950045 100644 --- a/src/xpyd_plan/__init__.py +++ b/src/xpyd_plan/__init__.py @@ -1573,3 +1573,21 @@ "evaluate_quality_gate", "load_gate_config", ] + +from xpyd_plan.sla_risk import ( # noqa: E402 + RiskFactor, + RiskLevel, + RiskScore, + SLARiskReport, + SLARiskScorer, + assess_sla_risk, +) + +__all__ += [ + "RiskFactor", + "RiskLevel", + "RiskScore", + "SLARiskReport", + "SLARiskScorer", + "assess_sla_risk", +] diff --git a/src/xpyd_plan/cli/_main.py b/src/xpyd_plan/cli/_main.py index 330409a..bdb9186 100644 --- a/src/xpyd_plan/cli/_main.py +++ b/src/xpyd_plan/cli/_main.py @@ -86,6 +86,7 @@ from xpyd_plan.cli._sglang_commands import register_sglang_commands from xpyd_plan.cli._size_distribution import _cmd_size_distribution, add_size_distribution_parser from xpyd_plan.cli._sla_headroom import add_sla_headroom_parser +from xpyd_plan.cli._sla_risk import register as register_sla_risk from xpyd_plan.cli._sla_tier import add_sla_tier_parser from xpyd_plan.cli._spike import add_spike_parser from xpyd_plan.cli._sqlite_export import add_sqlite_export_parser @@ -972,6 +973,7 @@ def main(argv: list[str] | None = None) -> None: register_compare_backends(subparsers) register_gpu_hours(subparsers) register_quality_gate(subparsers) + register_sla_risk(subparsers) register_workload_mix(subparsers) add_rate_limit_parser(subparsers) add_batch_analysis_parser(subparsers) @@ -1338,6 +1340,10 @@ def main(argv: list[str] | None = None) -> None: from xpyd_plan.cli._quality_gate import _run as _cmd_quality_gate _cmd_quality_gate(args) + elif args.command == "sla-risk": + from xpyd_plan.cli._sla_risk import _cmd_sla_risk + + _cmd_sla_risk(args) else: parser.print_help() sys.exit(1) diff --git a/src/xpyd_plan/cli/_sla_risk.py b/src/xpyd_plan/cli/_sla_risk.py new file mode 100644 index 0000000..028ad94 --- /dev/null +++ b/src/xpyd_plan/cli/_sla_risk.py @@ -0,0 +1,106 @@ +"""CLI sla-risk command.""" + +from __future__ import annotations + +import argparse +import json +import sys + +from rich.console import Console +from rich.table import Table + +from xpyd_plan.bench_adapter import load_benchmark_auto +from xpyd_plan.sla_risk import SLARiskScorer + + +def _cmd_sla_risk(args: argparse.Namespace) -> None: + """Handle the 'sla-risk' subcommand.""" + console = Console() + + data = load_benchmark_auto(args.benchmark) + scorer = SLARiskScorer() + report = scorer.assess( + data, + sla_ttft_ms=args.sla_ttft, + sla_tpot_ms=args.sla_tpot, + sla_total_ms=args.sla_total, + ) + + output_format = getattr(args, "output_format", "table") + if output_format == "json": + json.dump(report.model_dump(), sys.stdout, indent=2) + sys.stdout.write("\n") + return + + # Risk score summary + level_style = { + "low": "[bold green]LOW[/bold green]", + "moderate": "[yellow]MODERATE[/yellow]", + "high": "[bold yellow]HIGH[/bold yellow]", + "critical": "[bold red]CRITICAL[/bold red]", + } + level_val = report.risk_score.risk_level.value + styled = level_style.get(level_val, level_val) + console.print( + f"\nSLA Risk Score: [bold]{report.risk_score.total_score:.1f}" + f"[/bold] / 100 — {styled}" + ) + console.print() + + # Factor table + table = Table(title="Risk Factor Breakdown") + table.add_column("Factor", justify="left") + table.add_column("Score", justify="right") + table.add_column("Weight", justify="right") + table.add_column("Weighted", justify="right") + table.add_column("Detail", justify="left") + + for f in report.factors: + table.add_row( + f.name, + f"{f.score:.1f}", + f"{f.weight:.0%}", + f"{f.weighted_score:.1f}", + f.detail, + ) + + console.print(table) + console.print() + console.print(f"[bold]{report.recommendation}[/bold]") + + +def register(subparsers: argparse._SubParsersAction) -> None: + """Register the sla-risk subcommand.""" + parser = subparsers.add_parser( + "sla-risk", + help="Composite SLA risk score combining headroom, tail, jitter, convergence, burn rate", + ) + parser.add_argument( + "--benchmark", + required=True, + help="Path to benchmark JSON file", + ) + parser.add_argument( + "--sla-ttft", + type=float, + default=None, + help="TTFT SLA threshold in ms", + ) + parser.add_argument( + "--sla-tpot", + type=float, + default=None, + help="TPOT SLA threshold in ms", + ) + parser.add_argument( + "--sla-total", + type=float, + default=None, + help="Total latency SLA threshold in ms", + ) + parser.add_argument( + "--output-format", + choices=["table", "json"], + default="table", + help="Output format (default: table)", + ) diff --git a/src/xpyd_plan/sla_risk.py b/src/xpyd_plan/sla_risk.py new file mode 100644 index 0000000..44cf162 --- /dev/null +++ b/src/xpyd_plan/sla_risk.py @@ -0,0 +1,426 @@ +"""SLA Risk Score — composite 0-100 risk assessment combining multiple signal dimensions. + +Combines headroom tightness, tail heaviness, latency jitter, convergence adequacy, +and error budget burn rate into a single actionable risk score. +""" + +from __future__ import annotations + +import math +import statistics +from enum import Enum + +from pydantic import BaseModel, Field + +from .benchmark_models import BenchmarkData + + +class RiskLevel(str, Enum): + """Risk level classification.""" + + LOW = "low" + MODERATE = "moderate" + HIGH = "high" + CRITICAL = "critical" + + +class RiskFactor(BaseModel): + """Individual risk dimension score.""" + + name: str = Field(..., description="Risk factor name") + score: float = Field(..., description="Factor score 0-100") + weight: float = Field(..., description="Weight in composite score") + weighted_score: float = Field(..., description="score * weight") + detail: str = Field(..., description="Human-readable explanation") + + +class RiskScore(BaseModel): + """Composite risk score.""" + + total_score: float = Field(..., description="Composite risk score 0-100") + risk_level: RiskLevel = Field(..., description="Overall risk classification") + + +class SLARiskReport(BaseModel): + """Complete SLA risk assessment report.""" + + risk_score: RiskScore = Field(..., description="Composite risk result") + factors: list[RiskFactor] = Field(..., description="Per-factor breakdown") + total_requests: int = Field(..., description="Requests analyzed") + recommendation: str = Field(..., description="Actionable recommendation") + + +def _percentile_value(values: list[float], pct: float) -> float: + """Compute percentile via linear interpolation.""" + if not values: + return 0.0 + s = sorted(values) + n = len(s) + k = (pct / 100.0) * (n - 1) + f = math.floor(k) + c = math.ceil(k) + if f == c: + return s[int(k)] + return s[f] * (c - k) + s[c] * (k - f) + + +def _classify_risk(score: float) -> RiskLevel: + if score >= 75.0: + return RiskLevel.CRITICAL + if score >= 50.0: + return RiskLevel.HIGH + if score >= 25.0: + return RiskLevel.MODERATE + return RiskLevel.LOW + + +def _headroom_score( + values: list[float], threshold: float, percentile: float = 95.0, +) -> float: + """Score 0-100 for headroom tightness. 100 = SLA violated, 0 = lots of margin.""" + if threshold <= 0: + return 0.0 + actual = _percentile_value(values, percentile) + headroom_pct = (threshold - actual) / threshold * 100.0 + if headroom_pct <= 0: + return 100.0 # violated + if headroom_pct >= 50: + return 0.0 + # Linear: 0% headroom → 100, 50% headroom → 0 + return max(0.0, 100.0 - headroom_pct * 2.0) + + +def _tail_score(values: list[float]) -> float: + """Score 0-100 for tail heaviness based on P99/P50 ratio.""" + if len(values) < 2: + return 0.0 + p50 = _percentile_value(values, 50.0) + p99 = _percentile_value(values, 99.0) + if p50 <= 0: + return 0.0 + ratio = p99 / p50 + # ratio < 2 → 0, ratio >= 10 → 100 + if ratio <= 2.0: + return 0.0 + if ratio >= 10.0: + return 100.0 + return (ratio - 2.0) / 8.0 * 100.0 + + +def _jitter_score(values: list[float]) -> float: + """Score 0-100 for latency jitter based on CV.""" + if len(values) < 2: + return 0.0 + mean = statistics.mean(values) + if mean <= 0: + return 0.0 + std = statistics.stdev(values) + cv = std / mean + # CV < 0.3 → 0, CV >= 0.7 → 100 + if cv <= 0.3: + return 0.0 + if cv >= 0.7: + return 100.0 + return (cv - 0.3) / 0.4 * 100.0 + + +def _convergence_score(values: list[float], steps: int = 10) -> float: + """Score 0-100 for convergence adequacy. High = unstable percentiles.""" + n = len(values) + if n < 10: + return 100.0 # too few samples + # Compute running P95 at progressive windows + step_size = max(1, n // steps) + p95_vals = [] + for i in range(1, steps + 1): + end = min(i * step_size, n) + window = values[:end] + p95_vals.append(_percentile_value(window, 95.0)) + + if len(p95_vals) < 2: + return 0.0 + mean_p95 = statistics.mean(p95_vals) + if mean_p95 <= 0: + return 0.0 + cv = statistics.stdev(p95_vals) / mean_p95 + # CV < 0.05 → 0 (well converged), CV >= 0.25 → 100 + if cv <= 0.05: + return 0.0 + if cv >= 0.25: + return 100.0 + return (cv - 0.05) / 0.20 * 100.0 + + +def _burn_rate_score( + data: BenchmarkData, + sla_ttft_ms: float | None, + sla_tpot_ms: float | None, + sla_total_ms: float | None, +) -> float: + """Score 0-100 for error budget burn rate.""" + if not data.requests: + return 0.0 + violations = 0 + for r in data.requests: + if sla_ttft_ms is not None and r.ttft_ms > sla_ttft_ms: + violations += 1 + continue + if sla_tpot_ms is not None and r.tpot_ms > sla_tpot_ms: + violations += 1 + continue + if sla_total_ms is not None and r.total_latency_ms > sla_total_ms: + violations += 1 + continue + error_rate = violations / len(data.requests) + # 0% → 0, >=10% → 100 + if error_rate <= 0: + return 0.0 + if error_rate >= 0.10: + return 100.0 + return error_rate / 0.10 * 100.0 + + +class SLARiskScorer: + """Composite SLA risk scorer combining multiple signal dimensions.""" + + def assess( + self, + data: BenchmarkData, + *, + sla_ttft_ms: float | None = None, + sla_tpot_ms: float | None = None, + sla_total_ms: float | None = None, + weight_headroom: float = 0.30, + weight_tail: float = 0.20, + weight_jitter: float = 0.20, + weight_convergence: float = 0.15, + weight_burn_rate: float = 0.15, + ) -> SLARiskReport: + """Assess SLA risk from benchmark data. + + Args: + data: Benchmark data to analyze. + sla_ttft_ms: TTFT SLA threshold in ms. + sla_tpot_ms: TPOT SLA threshold in ms. + sla_total_ms: Total latency SLA threshold in ms. + weight_headroom: Weight for headroom factor (default 0.30). + weight_tail: Weight for tail heaviness factor (default 0.20). + weight_jitter: Weight for jitter factor (default 0.20). + weight_convergence: Weight for convergence factor (default 0.15). + weight_burn_rate: Weight for burn rate factor (default 0.15). + + Returns: + SLARiskReport with composite score and per-factor breakdown. + """ + ttft_vals = [r.ttft_ms for r in data.requests] + tpot_vals = [r.tpot_ms for r in data.requests] + total_vals = [r.total_latency_ms for r in data.requests] + + # 1. Headroom tightness — worst across configured metrics + headroom_scores = [] + if sla_ttft_ms is not None: + headroom_scores.append(_headroom_score(ttft_vals, sla_ttft_ms)) + if sla_tpot_ms is not None: + headroom_scores.append(_headroom_score(tpot_vals, sla_tpot_ms)) + if sla_total_ms is not None: + headroom_scores.append(_headroom_score(total_vals, sla_total_ms)) + h_score = max(headroom_scores) if headroom_scores else 0.0 + + # 2. Tail heaviness — worst across all latency fields + t_scores = [_tail_score(v) for v in [ttft_vals, tpot_vals, total_vals] if v] + t_score = max(t_scores) if t_scores else 0.0 + + # 3. Jitter — worst across all latency fields + j_scores = [_jitter_score(v) for v in [ttft_vals, tpot_vals, total_vals] if v] + j_score = max(j_scores) if j_scores else 0.0 + + # 4. Convergence — worst across all fields + c_scores = [ + _convergence_score(v) + for v in [ttft_vals, tpot_vals, total_vals] + if v + ] + c_score = max(c_scores) if c_scores else 0.0 + + # 5. Burn rate + b_score = _burn_rate_score(data, sla_ttft_ms, sla_tpot_ms, sla_total_ms) + + factors = [ + RiskFactor( + name="headroom_tightness", + score=round(h_score, 2), + weight=weight_headroom, + weighted_score=round(h_score * weight_headroom, 2), + detail=self._headroom_detail(h_score), + ), + RiskFactor( + name="tail_heaviness", + score=round(t_score, 2), + weight=weight_tail, + weighted_score=round(t_score * weight_tail, 2), + detail=self._tail_detail(t_score), + ), + RiskFactor( + name="latency_jitter", + score=round(j_score, 2), + weight=weight_jitter, + weighted_score=round(j_score * weight_jitter, 2), + detail=self._jitter_detail(j_score), + ), + RiskFactor( + name="convergence_adequacy", + score=round(c_score, 2), + weight=weight_convergence, + weighted_score=round(c_score * weight_convergence, 2), + detail=self._convergence_detail(c_score), + ), + RiskFactor( + name="error_budget_burn_rate", + score=round(b_score, 2), + weight=weight_burn_rate, + weighted_score=round(b_score * weight_burn_rate, 2), + detail=self._burn_rate_detail(b_score), + ), + ] + + total = sum(f.weighted_score for f in factors) + total = min(100.0, max(0.0, round(total, 2))) + risk_level = _classify_risk(total) + + recommendation = self._generate_recommendation(factors, risk_level) + + return SLARiskReport( + risk_score=RiskScore(total_score=total, risk_level=risk_level), + factors=factors, + total_requests=len(data.requests), + recommendation=recommendation, + ) + + @staticmethod + def _headroom_detail(score: float) -> str: + if score >= 80: + return "SLA threshold nearly or already breached" + if score >= 40: + return "Limited headroom — monitor closely" + return "Comfortable SLA margin" + + @staticmethod + def _tail_detail(score: float) -> str: + if score >= 80: + return "Extreme tail latency (P99/P50 ratio very high)" + if score >= 40: + return "Moderate tail heaviness" + return "Light tail — latency distribution is tight" + + @staticmethod + def _jitter_detail(score: float) -> str: + if score >= 80: + return "High latency variability (unstable)" + if score >= 40: + return "Moderate latency variability" + return "Stable latency" + + @staticmethod + def _convergence_detail(score: float) -> str: + if score >= 80: + return "Percentile estimates unreliable — insufficient data" + if score >= 40: + return "Marginal sample size — percentiles may shift" + return "Sufficient data for reliable percentile estimates" + + @staticmethod + def _burn_rate_detail(score: float) -> str: + if score >= 80: + return "High SLA violation rate — error budget depleting fast" + if score >= 40: + return "Noticeable SLA violations" + return "Low or zero SLA violations" + + @staticmethod + def _generate_recommendation( + factors: list[RiskFactor], risk_level: RiskLevel, + ) -> str: + if risk_level == RiskLevel.LOW: + return "Low risk. Current configuration is operating within safe margins." + + # Find dominant factor + dominant = max(factors, key=lambda f: f.weighted_score) + recs = { + "headroom_tightness": ( + "Scale up instances or relax SLA thresholds to increase headroom." + ), + "tail_heaviness": ( + "Investigate tail latency causes (large requests, cold starts). " + "Consider request size limits or prefill optimization." + ), + "latency_jitter": ( + "Investigate latency instability. Check for load spikes, " + "resource contention, or thermal throttling." + ), + "convergence_adequacy": ( + "Collect more benchmark data. Current sample size is insufficient " + "for reliable percentile estimates." + ), + "error_budget_burn_rate": ( + "SLA violations are consuming error budget. " + "Scale up or rebalance P:D ratio to reduce violations." + ), + } + + prefix = { + RiskLevel.MODERATE: "Moderate risk.", + RiskLevel.HIGH: "High risk — action recommended.", + RiskLevel.CRITICAL: "CRITICAL risk — immediate attention required.", + } + + return ( + f"{prefix.get(risk_level, 'Risk detected.')} " + f"Dominant factor: {dominant.name} (score {dominant.score:.0f}). " + f"{recs.get(dominant.name, 'Review risk factors.')}" + ) + + +def assess_sla_risk( + benchmark_path: str, + *, + sla_ttft_ms: float | None = None, + sla_tpot_ms: float | None = None, + sla_total_ms: float | None = None, + weight_headroom: float = 0.30, + weight_tail: float = 0.20, + weight_jitter: float = 0.20, + weight_convergence: float = 0.15, + weight_burn_rate: float = 0.15, +) -> dict: + """Programmatic API for SLA risk assessment. + + Args: + benchmark_path: Path to benchmark JSON file. + sla_ttft_ms: TTFT SLA threshold in ms. + sla_tpot_ms: TPOT SLA threshold in ms. + sla_total_ms: Total latency SLA threshold in ms. + weight_headroom: Weight for headroom factor. + weight_tail: Weight for tail heaviness factor. + weight_jitter: Weight for jitter factor. + weight_convergence: Weight for convergence factor. + weight_burn_rate: Weight for burn rate factor. + + Returns: + Dict representation of SLARiskReport. + """ + from .bench_adapter import load_benchmark_auto + + data = load_benchmark_auto(benchmark_path) + scorer = SLARiskScorer() + report = scorer.assess( + data, + sla_ttft_ms=sla_ttft_ms, + sla_tpot_ms=sla_tpot_ms, + sla_total_ms=sla_total_ms, + weight_headroom=weight_headroom, + weight_tail=weight_tail, + weight_jitter=weight_jitter, + weight_convergence=weight_convergence, + weight_burn_rate=weight_burn_rate, + ) + return report.model_dump() diff --git a/tests/test_sla_risk.py b/tests/test_sla_risk.py new file mode 100644 index 0000000..4729d9d --- /dev/null +++ b/tests/test_sla_risk.py @@ -0,0 +1,324 @@ +"""Tests for SLA Risk Score (M118).""" + +from __future__ import annotations + +import os +import tempfile + +from xpyd_plan.benchmark_models import ( + BenchmarkData, + BenchmarkMetadata, + BenchmarkRequest, +) +from xpyd_plan.sla_risk import ( + RiskFactor, + RiskLevel, + SLARiskReport, + SLARiskScorer, + _burn_rate_score, + _classify_risk, + _convergence_score, + _headroom_score, + _jitter_score, + _tail_score, + assess_sla_risk, +) + + +def _make_data( + n: int = 200, + ttft_base: float = 50.0, + tpot_base: float = 10.0, + total_base: float = 200.0, + spread: float = 0.5, +) -> BenchmarkData: + """Generate benchmark data with controllable latency distributions.""" + requests = [] + for i in range(n): + frac = i / max(n - 1, 1) + requests.append( + BenchmarkRequest( + request_id=f"req-{i:04d}", + prompt_tokens=100, + output_tokens=50, + ttft_ms=ttft_base + frac * spread * ttft_base, + tpot_ms=tpot_base + frac * spread * tpot_base, + total_latency_ms=total_base + frac * spread * total_base, + timestamp=1000.0 + i, + ) + ) + return BenchmarkData( + metadata=BenchmarkMetadata( + num_prefill_instances=2, + num_decode_instances=2, + total_instances=4, + measured_qps=10.0, + ), + requests=requests, + ) + + +def _make_high_risk_data(n: int = 200) -> BenchmarkData: + """Generate data that triggers high risk scores.""" + import random + random.seed(42) + requests = [] + for i in range(n): + # High jitter, heavy tails, some violations + base = 50.0 + if i % 10 == 0: + # Spike every 10th request + mult = random.uniform(5.0, 15.0) + else: + mult = random.uniform(0.5, 2.0) + requests.append( + BenchmarkRequest( + request_id=f"req-{i:04d}", + prompt_tokens=100, + output_tokens=50, + ttft_ms=base * mult, + tpot_ms=10.0 * mult, + total_latency_ms=200.0 * mult, + timestamp=1000.0 + i, + ) + ) + return BenchmarkData( + metadata=BenchmarkMetadata( + num_prefill_instances=2, + num_decode_instances=2, + total_instances=4, + measured_qps=10.0, + ), + requests=requests, + ) + + +class TestClassifyRisk: + def test_low(self): + assert _classify_risk(0.0) == RiskLevel.LOW + assert _classify_risk(24.9) == RiskLevel.LOW + + def test_moderate(self): + assert _classify_risk(25.0) == RiskLevel.MODERATE + assert _classify_risk(49.9) == RiskLevel.MODERATE + + def test_high(self): + assert _classify_risk(50.0) == RiskLevel.HIGH + assert _classify_risk(74.9) == RiskLevel.HIGH + + def test_critical(self): + assert _classify_risk(75.0) == RiskLevel.CRITICAL + assert _classify_risk(100.0) == RiskLevel.CRITICAL + + +class TestHeadroomScore: + def test_comfortable_headroom(self): + values = [10.0] * 100 + score = _headroom_score(values, 100.0) + assert score == 0.0 # 90% headroom + + def test_no_headroom(self): + values = [100.0] * 100 + score = _headroom_score(values, 50.0) + assert score == 100.0 # violated + + def test_tight_headroom(self): + values = list(range(1, 101)) # P95 ≈ 95.05 + score = _headroom_score(values, 100.0) + assert score > 0 # some risk + + def test_zero_threshold(self): + assert _headroom_score([1.0], 0.0) == 0.0 + + +class TestTailScore: + def test_light_tail(self): + # All same → ratio = 1 + values = [10.0] * 100 + assert _tail_score(values) == 0.0 + + def test_heavy_tail(self): + # 99 values at 10, 1 at 200 → P99 ≈ 200, P50 = 10 → ratio 20 + values = [10.0] * 90 + [200.0] * 10 + score = _tail_score(values) + assert score > 0 + + def test_single_value(self): + assert _tail_score([10.0]) == 0.0 + + +class TestJitterScore: + def test_stable(self): + values = [10.0] * 100 + assert _jitter_score(values) == 0.0 + + def test_high_jitter(self): + import random + random.seed(123) + values = [random.uniform(1, 100) for _ in range(200)] + score = _jitter_score(values) + assert score > 0 + + def test_single_value(self): + assert _jitter_score([10.0]) == 0.0 + + +class TestConvergenceScore: + def test_well_converged(self): + values = [10.0 + i * 0.001 for i in range(1000)] + score = _convergence_score(values) + assert score < 50 + + def test_too_few_samples(self): + assert _convergence_score([1.0] * 5) == 100.0 + + +class TestBurnRateScore: + def test_no_violations(self): + data = _make_data(100, spread=0.1) + score = _burn_rate_score(data, sla_ttft_ms=1000.0, sla_tpot_ms=None, sla_total_ms=None) + assert score == 0.0 + + def test_all_violations(self): + data = _make_data(100, ttft_base=100.0) + score = _burn_rate_score(data, sla_ttft_ms=1.0, sla_tpot_ms=None, sla_total_ms=None) + assert score == 100.0 + + def test_no_thresholds(self): + data = _make_data(100) + score = _burn_rate_score(data, None, None, None) + assert score == 0.0 + + +class TestSLARiskScorer: + def test_low_risk(self): + data = _make_data(200, spread=0.1) # tight distribution + scorer = SLARiskScorer() + report = scorer.assess(data, sla_ttft_ms=200.0, sla_tpot_ms=50.0, sla_total_ms=1000.0) + assert isinstance(report, SLARiskReport) + assert report.risk_score.risk_level == RiskLevel.LOW + assert report.risk_score.total_score < 25 + assert len(report.factors) == 5 + assert report.total_requests == 200 + + def test_high_risk(self): + data = _make_high_risk_data(200) + scorer = SLARiskScorer() + report = scorer.assess(data, sla_ttft_ms=100.0, sla_tpot_ms=20.0, sla_total_ms=400.0) + assert report.risk_score.total_score > 25 # at least moderate + + def test_no_sla_thresholds(self): + data = _make_data(100) + scorer = SLARiskScorer() + report = scorer.assess(data) + # headroom and burn rate should be 0 without thresholds + headroom_factor = next(f for f in report.factors if f.name == "headroom_tightness") + burn_factor = next(f for f in report.factors if f.name == "error_budget_burn_rate") + assert headroom_factor.score == 0.0 + assert burn_factor.score == 0.0 + + def test_custom_weights(self): + data = _make_data(200, spread=0.5) + scorer = SLARiskScorer() + report = scorer.assess( + data, + sla_ttft_ms=100.0, + weight_headroom=0.50, + weight_tail=0.10, + weight_jitter=0.10, + weight_convergence=0.15, + weight_burn_rate=0.15, + ) + headroom_factor = next(f for f in report.factors if f.name == "headroom_tightness") + assert headroom_factor.weight == 0.50 + + def test_factors_have_required_fields(self): + data = _make_data(100) + scorer = SLARiskScorer() + report = scorer.assess(data, sla_ttft_ms=200.0) + for f in report.factors: + assert isinstance(f, RiskFactor) + assert f.name + assert 0.0 <= f.score <= 100.0 + assert f.weight > 0 + assert f.detail + + def test_recommendation_present(self): + data = _make_data(100) + scorer = SLARiskScorer() + report = scorer.assess(data, sla_ttft_ms=200.0) + assert report.recommendation + assert isinstance(report.recommendation, str) + + def test_critical_recommendation(self): + data = _make_high_risk_data(200) + scorer = SLARiskScorer() + report = scorer.assess(data, sla_ttft_ms=50.0, sla_tpot_ms=10.0, sla_total_ms=200.0) + if report.risk_score.risk_level == RiskLevel.CRITICAL: + assert "CRITICAL" in report.recommendation + + def test_model_dump(self): + data = _make_data(100) + scorer = SLARiskScorer() + report = scorer.assess(data, sla_ttft_ms=200.0) + d = report.model_dump() + assert "risk_score" in d + assert "factors" in d + assert d["risk_score"]["risk_level"] == report.risk_score.risk_level.value + + def test_score_bounded(self): + data = _make_data(100) + scorer = SLARiskScorer() + report = scorer.assess(data, sla_ttft_ms=200.0) + assert 0.0 <= report.risk_score.total_score <= 100.0 + + +class TestAssessSLARiskAPI: + def test_programmatic_api(self): + data = _make_data(100) + # Write benchmark to temp file + with tempfile.NamedTemporaryFile( + mode="w", suffix=".json", delete=False, + ) as f: + f.write(data.model_dump_json()) + path = f.name + + try: + result = assess_sla_risk( + path, + sla_ttft_ms=200.0, + sla_tpot_ms=50.0, + sla_total_ms=1000.0, + ) + assert isinstance(result, dict) + assert "risk_score" in result + assert "factors" in result + assert result["risk_score"]["risk_level"] in ["low", "moderate", "high", "critical"] + finally: + os.unlink(path) + + +class TestEdgeCases: + def test_single_request(self): + data = BenchmarkData( + metadata=BenchmarkMetadata( + num_prefill_instances=1, + num_decode_instances=1, + total_instances=2, + measured_qps=1.0, + ), + requests=[ + BenchmarkRequest( + request_id="req-0000", + prompt_tokens=100, + output_tokens=50, + ttft_ms=50.0, + tpot_ms=10.0, + total_latency_ms=200.0, + timestamp=1000.0, + ), + ], + ) + scorer = SLARiskScorer() + report = scorer.assess(data, sla_ttft_ms=200.0) + assert report.total_requests == 1