diff --git a/inferedgelab/commands/compare.py b/inferedgelab/commands/compare.py
index 8db97f1..1dfbd99 100644
--- a/inferedgelab/commands/compare.py
+++ b/inferedgelab/commands/compare.py
@@ -10,6 +10,7 @@
from inferedgelab.compare.comparator import build_runtime_compare_report, compare_group, render_runtime_compare_markdown
from inferedgelab.result.loader import load_results_grouped_by_compare_key
from inferedgelab.services.compare_service import build_compare_bundle
+from inferedgelab.services.guard_analysis import guard_status, guard_verdict
def _fmt_num(v):
@@ -47,15 +48,19 @@ def _render_guard_analysis(guard_analysis: dict | None) -> None:
if not guard_analysis:
return
- if guard_analysis.get("status") == "skipped":
+ normalized_status = guard_status(guard_analysis)
+ if normalized_status == "skipped":
rprint("[yellow]Warning[/yellow]: InferEdgeAIGuard is not installed. Guard analysis skipped.")
return
rprint("[bold]Guard Analysis[/bold]")
- rprint(f"- status: {guard_analysis.get('status')}")
+ rprint(f"- status: {normalized_status}")
+ rprint(f"- guard_verdict: {guard_verdict(guard_analysis)}")
+ if guard_analysis.get("primary_reason"):
+ rprint(f"- primary_reason: {guard_analysis.get('primary_reason')}")
rprint(f"- confidence: {guard_analysis.get('confidence')}")
- for field in ("anomalies", "suspected_causes", "recommendations"):
+ for field in ("anomalies", "evidence", "suspected_causes", "recommendations"):
rprint(f"- {field}:")
values = guard_analysis.get(field) or []
if values:
diff --git a/inferedgelab/commands/compare_latest.py b/inferedgelab/commands/compare_latest.py
index cdea88b..e3c58fa 100644
--- a/inferedgelab/commands/compare_latest.py
+++ b/inferedgelab/commands/compare_latest.py
@@ -5,6 +5,7 @@
from rich.table import Table
from inferedgelab.services.compare_service import build_compare_latest_bundle
+from inferedgelab.services.guard_analysis import guard_status, guard_verdict
def _handle_error_or_warning(message: str, strict: bool) -> None:
@@ -38,15 +39,19 @@ def _render_guard_analysis(guard_analysis: dict | None) -> None:
if not guard_analysis:
return
- if guard_analysis.get("status") == "skipped":
+ normalized_status = guard_status(guard_analysis)
+ if normalized_status == "skipped":
rprint("[yellow]Warning[/yellow]: InferEdgeAIGuard is not installed. Guard analysis skipped.")
return
rprint("[bold]Guard Analysis[/bold]")
- rprint(f"- status: {guard_analysis.get('status')}")
+ rprint(f"- status: {normalized_status}")
+ rprint(f"- guard_verdict: {guard_verdict(guard_analysis)}")
+ if guard_analysis.get("primary_reason"):
+ rprint(f"- primary_reason: {guard_analysis.get('primary_reason')}")
rprint(f"- confidence: {guard_analysis.get('confidence')}")
- for field in ("anomalies", "suspected_causes", "recommendations"):
+ for field in ("anomalies", "evidence", "suspected_causes", "recommendations"):
rprint(f"- {field}:")
values = guard_analysis.get(field) or []
if values:
diff --git a/inferedgelab/report/html_generator.py b/inferedgelab/report/html_generator.py
index 0a5b967..10a0d16 100644
--- a/inferedgelab/report/html_generator.py
+++ b/inferedgelab/report/html_generator.py
@@ -3,6 +3,8 @@
from html import escape
from typing import Any, Dict, Optional
+from inferedgelab.services.guard_analysis import guard_primary_reason, guard_status, guard_verdict
+
def _fmt_num(v: Optional[float]) -> str:
if v is None:
@@ -149,15 +151,101 @@ def _guard_values_to_html(values: Any) -> str:
return "\n".join(f"
{escape(str(value))}" for value in values)
+def _guard_source_to_html(source: Any) -> str:
+ if not isinstance(source, dict) or not source:
+ return ""
+ items = "\n".join(
+ f"{escape(str(key))}: {escape(str(value))}"
+ for key, value in source.items()
+ )
+ return f"source
"
+
+
+def _guard_evidence_to_html(evidence: Any) -> str:
+ if not isinstance(evidence, list) or not evidence:
+ return ""
+ rows: list[str] = []
+ details: list[str] = []
+ for item in evidence:
+ if not isinstance(item, dict):
+ continue
+ rows.append(
+ f"""
+
+ | {escape(str(item.get("type", "-")))} |
+ {escape(str(item.get("metric_name", "-")))} |
+ {escape(str(item.get("observed_value", "-")))} |
+ {escape(str(item.get("baseline_value", "-")))} |
+ {escape(str(item.get("threshold", "-")))} |
+ {escape(str(item.get("status", "-")))} |
+ {escape(str(item.get("severity", "-")))} |
+
+ """
+ )
+ explanation = item.get("explanation")
+ recommendation = item.get("recommendation")
+ if explanation:
+ details.append(
+ ""
+ f"{escape(str(item.get('metric_name', 'evidence')))}: "
+ f"{escape(str(explanation))}"
+ + (
+ f"
recommendation: {escape(str(recommendation))}"
+ if recommendation
+ else ""
+ )
+ + ""
+ )
+ if not rows:
+ return ""
+ return f"""
+ Guard Evidence
+
+
+
+ | type |
+ metric |
+ observed |
+ baseline |
+ threshold |
+ status |
+ severity |
+
+
+ {''.join(rows)}
+
+
+ """
+
+
def _guard_analysis_to_html(guard_analysis: Dict[str, Any] | None) -> str:
if guard_analysis is None:
return ""
- if guard_analysis.get("status") == "skipped":
+ normalized_status = guard_status(guard_analysis)
+ normalized_verdict = guard_verdict(guard_analysis)
+ verdict_html = (
+ f'guard_verdict: {escape(str(normalized_verdict))}
'
+ if normalized_verdict is not None
+ else ""
+ )
+ severity_html = (
+ f'severity: {escape(str(guard_analysis.get("severity")))}
'
+ if guard_analysis.get("severity") is not None
+ else ""
+ )
+ primary_reason = guard_primary_reason(guard_analysis)
+ primary_reason_html = (
+ f"primary_reason: {escape(str(primary_reason))}
"
+ if primary_reason
+ else ""
+ )
+
+ if normalized_status == "skipped":
return f"""
Guard Analysis
"""
@@ -165,14 +253,19 @@ def _guard_analysis_to_html(guard_analysis: Dict[str, Any] | None) -> str:
return f"""
Guard Analysis
"""
diff --git a/inferedgelab/report/markdown_generator.py b/inferedgelab/report/markdown_generator.py
index 672411d..309c03c 100644
--- a/inferedgelab/report/markdown_generator.py
+++ b/inferedgelab/report/markdown_generator.py
@@ -2,6 +2,8 @@
from typing import Any, Dict, Optional
+from inferedgelab.services.guard_analysis import guard_primary_reason, guard_status, guard_verdict
+
def _fmt_num(v: Optional[float]) -> str:
if v is None:
@@ -42,14 +44,29 @@ def _sorted_accuracy_metric_items(accuracy: Dict[str, Any]) -> list[tuple[str, D
def _append_guard_analysis(lines: list[str], guard_analysis: Dict[str, Any]) -> None:
lines.append("## Guard Analysis")
lines.append("")
- lines.append(f"- status: {guard_analysis.get('status')}")
-
- if guard_analysis.get("status") == "skipped":
+ normalized_status = guard_status(guard_analysis)
+ normalized_verdict = guard_verdict(guard_analysis)
+ lines.append(f"- status: {normalized_status}")
+ if normalized_verdict is not None:
+ lines.append(f"- guard_verdict: {normalized_verdict}")
+ if guard_analysis.get("severity") is not None:
+ lines.append(f"- severity: {guard_analysis.get('severity')}")
+
+ if normalized_status == "skipped":
lines.append(f"- reason: {guard_analysis.get('reason')}")
lines.append("")
return
lines.append(f"- confidence: {guard_analysis.get('confidence')}")
+ primary_reason = guard_primary_reason(guard_analysis)
+ if primary_reason:
+ lines.append(f"- primary_reason: {primary_reason}")
+
+ source = guard_analysis.get("source")
+ if isinstance(source, dict) and source:
+ lines.append("- source:")
+ for key, value in source.items():
+ lines.append(f" - {key}: `{value}`")
for field in ("anomalies", "suspected_causes", "recommendations"):
lines.append(f"- {field}:")
@@ -59,6 +76,35 @@ def _append_guard_analysis(lines: list[str], guard_analysis: Dict[str, Any]) ->
lines.append(f" - {value}")
else:
lines.append(" - -")
+ evidence = guard_analysis.get("evidence")
+ if isinstance(evidence, list) and evidence:
+ lines.append("")
+ lines.append("### Guard Evidence")
+ lines.append("")
+ lines.append("| type | metric | observed | baseline | threshold | status | severity |")
+ lines.append("| --- | --- | ---: | ---: | ---: | --- | --- |")
+ for item in evidence:
+ if not isinstance(item, dict):
+ continue
+ lines.append(
+ "| "
+ f"{item.get('type', '-')} | "
+ f"{item.get('metric_name', '-')} | "
+ f"{item.get('observed_value', '-')} | "
+ f"{item.get('baseline_value', '-')} | "
+ f"{item.get('threshold', '-')} | "
+ f"{item.get('status', '-')} | "
+ f"{item.get('severity', '-')} |"
+ )
+ for item in evidence:
+ if not isinstance(item, dict):
+ continue
+ explanation = item.get("explanation")
+ recommendation = item.get("recommendation")
+ if explanation:
+ lines.append(f"- {item.get('metric_name', 'evidence')}: {explanation}")
+ if recommendation:
+ lines.append(f" - recommendation: {recommendation}")
lines.append("")
diff --git a/inferedgelab/services/__init__.py b/inferedgelab/services/__init__.py
index 5329b5a..470e897 100644
--- a/inferedgelab/services/__init__.py
+++ b/inferedgelab/services/__init__.py
@@ -1,3 +1,8 @@
-from .compare_service import build_compare_bundle, select_latest_compare_pair
+"""Service-layer helpers for InferEdgeLab.
-__all__ = ["build_compare_bundle", "select_latest_compare_pair"]
+Keep this package initializer light. Several services are intentionally allowed
+to import report/rendering modules, so importing compare_service here can create
+cycles during direct module imports.
+"""
+
+__all__: list[str] = []
diff --git a/inferedgelab/services/api_response_contract.py b/inferedgelab/services/api_response_contract.py
index 00d3e91..db56745 100644
--- a/inferedgelab/services/api_response_contract.py
+++ b/inferedgelab/services/api_response_contract.py
@@ -2,6 +2,8 @@
from typing import Any
+from inferedgelab.services.guard_analysis import guard_status, guard_verdict
+
def build_api_response_bundle(
bundle: dict[str, Any],
@@ -80,7 +82,8 @@ def _build_summary(
or precision.get("comparison_mode"),
"precision_pair": judgement.get("precision_pair") or precision.get("pair"),
"deployment_decision": deployment_decision.get("decision"),
- "guard_status": (guard_analysis or {}).get("status"),
+ "guard_status": guard_status(guard_analysis),
+ "guard_verdict": guard_verdict(guard_analysis),
}
diff --git a/inferedgelab/services/compare_service.py b/inferedgelab/services/compare_service.py
index 444712e..684a78e 100644
--- a/inferedgelab/services/compare_service.py
+++ b/inferedgelab/services/compare_service.py
@@ -23,7 +23,12 @@
analyze_compare_result = None
-def _build_guard_compare_input(result: dict[str, Any], judgement: dict[str, Any]) -> dict[str, Any]:
+def _build_guard_compare_input(
+ result: dict[str, Any],
+ judgement: dict[str, Any],
+ *,
+ source: dict[str, Any] | None = None,
+) -> dict[str, Any]:
accuracy = result.get("accuracy") or {}
primary_metric = accuracy.get("metric_name")
primary_accuracy = (accuracy.get("metrics") or {}).get(primary_metric) or {}
@@ -52,20 +57,65 @@ def _build_guard_compare_input(result: dict[str, Any], judgement: dict[str, Any]
"runtime_provenance": result.get("runtime_provenance"),
"run_config_diff": result.get("run_config_diff"),
"shape_context": result.get("shape_context"),
+ "source": source or {},
}
-def _run_guard_compare_reasoning(result: dict[str, Any], judgement: dict[str, Any]) -> dict[str, Any]:
+def _run_guard_compare_reasoning(
+ result: dict[str, Any],
+ judgement: dict[str, Any],
+ *,
+ source: dict[str, Any] | None = None,
+) -> dict[str, Any]:
if analyze_compare_result is None:
return {
"status": "skipped",
"reason": "inferedge_aiguard is not installed",
}
- guard_input = _build_guard_compare_input(result, judgement)
+ guard_input = _build_guard_compare_input(result, judgement, source=source)
return analyze_compare_result(guard_input)
+def _build_guard_source(
+ *,
+ base_path: str,
+ new_path: str,
+ base: dict[str, Any],
+ new: dict[str, Any],
+) -> dict[str, Any]:
+ return {
+ "baseline_profile_path": base_path,
+ "candidate_result_path": new_path,
+ "runtime_result_path": new_path,
+ "base_runtime_result_path": base_path,
+ "new_runtime_result_path": new_path,
+ "evaluation_report_path": _first_non_empty(
+ new,
+ ("evaluation_report_path", "report_path"),
+ ),
+ "model_contract_path": _first_non_empty(
+ new,
+ ("model_contract_path", "contract_path"),
+ ),
+ "lab_result_path": "inferedgelab.compare_bundle",
+ }
+
+
+def _first_non_empty(data: dict[str, Any], fields: tuple[str, ...]) -> Any:
+ for field in fields:
+ value = data.get(field)
+ if value:
+ return value
+ extra = data.get("extra")
+ if isinstance(extra, dict):
+ for field in fields:
+ value = extra.get(field)
+ if value:
+ return value
+ return None
+
+
def build_compare_bundle(
*,
base_path: str,
@@ -110,7 +160,17 @@ def build_compare_bundle(
tradeoff_severe_threshold=thresholds["tradeoff_severe_threshold"],
)
- guard_analysis = _run_guard_compare_reasoning(result, judgement) if with_guard else None
+ guard_source = _build_guard_source(
+ base_path=base_path,
+ new_path=new_path,
+ base=base,
+ new=new,
+ )
+ guard_analysis = (
+ _run_guard_compare_reasoning(result, judgement, source=guard_source)
+ if with_guard
+ else None
+ )
deployment_decision = build_deployment_decision(judgement, guard_analysis=guard_analysis)
markdown = generate_compare_markdown(
result,
diff --git a/inferedgelab/services/deployment_decision.py b/inferedgelab/services/deployment_decision.py
index 0d7b390..35511a4 100644
--- a/inferedgelab/services/deployment_decision.py
+++ b/inferedgelab/services/deployment_decision.py
@@ -2,6 +2,8 @@
from typing import Any
+from inferedgelab.services.guard_analysis import guard_status, guard_verdict
+
REVIEW_TRADEOFF_RISKS = {"risky_tradeoff", "severe_tradeoff", "not_beneficial"}
@@ -12,6 +14,7 @@ def _decision_payload(
reason: str,
lab_overall: Any,
guard_status: Any,
+ guard_verdict_value: Any,
recommended_action: str,
) -> dict[str, Any]:
return {
@@ -19,39 +22,42 @@ def _decision_payload(
"reason": reason,
"lab_overall": lab_overall,
"guard_status": guard_status,
+ "guard_verdict": guard_verdict_value,
"recommended_action": recommended_action,
}
def build_deployment_decision(judgement: dict, guard_analysis: dict | None = None) -> dict[str, Any]:
- guard_status = (guard_analysis or {}).get("status")
+ normalized_guard_status = guard_status(guard_analysis)
+ normalized_guard_verdict = guard_verdict(guard_analysis)
lab_overall = judgement.get("overall")
shape_match = judgement.get("shape_match")
system_match = judgement.get("system_match")
tradeoff_risk = judgement.get("tradeoff_risk")
- if guard_status == "error":
+ if normalized_guard_status == "error":
return _decision_payload(
decision="blocked",
reason="Guard analysis reported an error-level validation issue.",
lab_overall=lab_overall,
- guard_status=guard_status,
+ guard_status=normalized_guard_status,
+ guard_verdict_value=normalized_guard_verdict,
recommended_action="Do not deploy until the Guard anomalies are resolved.",
)
- if guard_status == "warning":
+ if normalized_guard_status == "warning":
decision = "review_required"
reason = "Guard analysis reported warning-level validation risks."
recommended_action = "Review Guard anomalies, suspected causes, and accuracy/provenance evidence before deployment."
- elif guard_status == "skipped":
+ elif normalized_guard_status == "skipped":
decision = "unknown"
reason = "Guard analysis was skipped."
recommended_action = "Install InferEdgeAIGuard or run validation reasoning before deployment."
- elif guard_status is None:
+ elif normalized_guard_status is None:
decision = "unknown"
reason = "Guard analysis is unavailable."
recommended_action = "Run compare with --with-guard before deployment decision."
- elif guard_status == "ok":
+ elif normalized_guard_status == "ok":
if lab_overall in {"improvement", "tradeoff_faster"}:
decision = "deployable"
reason = "Lab judgement is favorable and Guard analysis passed."
@@ -97,6 +103,7 @@ def build_deployment_decision(judgement: dict, guard_analysis: dict | None = Non
decision=decision,
reason=reason,
lab_overall=lab_overall,
- guard_status=guard_status,
+ guard_status=normalized_guard_status,
+ guard_verdict_value=normalized_guard_verdict,
recommended_action=recommended_action,
)
diff --git a/inferedgelab/services/guard_analysis.py b/inferedgelab/services/guard_analysis.py
new file mode 100644
index 0000000..fbad0b3
--- /dev/null
+++ b/inferedgelab/services/guard_analysis.py
@@ -0,0 +1,72 @@
+from __future__ import annotations
+
+from typing import Any
+
+
+LEGACY_STATUS_TO_VERDICT = {
+ "ok": "pass",
+ "warning": "review_required",
+ "error": "blocked",
+ "skipped": "skipped",
+}
+
+VERDICT_TO_LEGACY_STATUS = {
+ "pass": "ok",
+ "suspicious": "warning",
+ "review_required": "warning",
+ "blocked": "error",
+ "skipped": "skipped",
+}
+
+
+def guard_verdict(guard_analysis: dict[str, Any] | None) -> str | None:
+ """Return the AIGuard diagnosis verdict when available.
+
+ InferEdgeLab accepts both the older Guard reasoning shape
+ (``status: ok/warning/error``) and the newer diagnosis report contract
+ (``guard_verdict: pass/review_required/blocked``). This helper keeps Lab as
+ the final decision owner while preserving both optional evidence contracts.
+ """
+
+ if not isinstance(guard_analysis, dict):
+ return None
+ verdict = guard_analysis.get("guard_verdict")
+ if isinstance(verdict, str) and verdict:
+ return verdict
+ status = guard_analysis.get("status")
+ if isinstance(status, str):
+ return LEGACY_STATUS_TO_VERDICT.get(status, status)
+ return None
+
+
+def guard_status(guard_analysis: dict[str, Any] | None) -> str | None:
+ """Return a legacy-compatible Guard status for existing Lab/API clients."""
+
+ if not isinstance(guard_analysis, dict):
+ return None
+ status = guard_analysis.get("status")
+ if isinstance(status, str) and status:
+ return status
+ verdict = guard_analysis.get("guard_verdict")
+ if isinstance(verdict, str):
+ return VERDICT_TO_LEGACY_STATUS.get(verdict, verdict)
+ return None
+
+
+def guard_primary_reason(guard_analysis: dict[str, Any] | None) -> str | None:
+ if not isinstance(guard_analysis, dict):
+ return None
+ reason = guard_analysis.get("primary_reason") or guard_analysis.get("reason")
+ return str(reason) if reason else None
+
+
+def guard_evidence_items(guard_analysis: dict[str, Any] | None) -> list[Any]:
+ if not isinstance(guard_analysis, dict):
+ return []
+ evidence = guard_analysis.get("evidence")
+ if isinstance(evidence, list):
+ return evidence
+ anomalies = guard_analysis.get("anomalies")
+ if isinstance(anomalies, list):
+ return anomalies
+ return []
diff --git a/inferedgelab/services/worker_contract.py b/inferedgelab/services/worker_contract.py
index fa5c5e5..bcafcae 100644
--- a/inferedgelab/services/worker_contract.py
+++ b/inferedgelab/services/worker_contract.py
@@ -5,6 +5,7 @@
from inferedgelab.services.api_job_contract import ApiJobContractError
from inferedgelab.services.api_job_contract import build_api_job_response
from inferedgelab.services.api_job_contract import validate_api_job_response
+from inferedgelab.services.guard_analysis import guard_status, guard_verdict
WORKER_RESPONSE_STATUSES = {"completed", "failed"}
@@ -197,11 +198,13 @@ def _build_completed_job_result(worker_response: dict[str, Any]) -> dict[str, An
runtime_result = worker_response["runtime_result"]
guard_analysis = worker_response.get("guard_analysis")
+ normalized_guard_status = guard_status(guard_analysis)
deployment_decision = {
"decision": "unknown",
"reason": "Worker response has not been compared by Lab yet.",
"lab_overall": None,
- "guard_status": (guard_analysis or {}).get("status"),
+ "guard_status": normalized_guard_status,
+ "guard_verdict": guard_verdict(guard_analysis),
"recommended_action": "Run Lab compare/report before deployment decision.",
}
result = {
@@ -211,7 +214,8 @@ def _build_completed_job_result(worker_response: dict[str, Any]) -> dict[str, An
"comparison_mode": None,
"precision_pair": None,
"deployment_decision": deployment_decision["decision"],
- "guard_status": deployment_decision["guard_status"],
+ "guard_status": normalized_guard_status,
+ "guard_verdict": deployment_decision["guard_verdict"],
},
"comparison": {
"result": {"runtime_result": runtime_result},
diff --git a/tests/test_api_response_contract.py b/tests/test_api_response_contract.py
index 7584674..688302c 100644
--- a/tests/test_api_response_contract.py
+++ b/tests/test_api_response_contract.py
@@ -62,8 +62,17 @@ def assert_api_response_contract(response: dict[str, Any], *, guard_expected: bo
if guard_expected:
assert "guard_analysis" in response
assert isinstance(response["guard_analysis"], dict)
- assert response["guard_analysis"]["status"] in {"ok", "warning", "error", "skipped"}
- assert response["summary"]["guard_status"] == response["guard_analysis"]["status"]
+ if "status" in response["guard_analysis"]:
+ assert response["guard_analysis"]["status"] in {"ok", "warning", "error", "skipped"}
+ assert response["summary"]["guard_status"] == response["guard_analysis"]["status"]
+ else:
+ assert response["guard_analysis"]["guard_verdict"] in {
+ "pass",
+ "suspicious",
+ "review_required",
+ "blocked",
+ }
+ assert response["summary"]["guard_status"] in {"ok", "warning", "error"}
else:
assert "guard_analysis" not in response
assert response["summary"]["guard_status"] is None
@@ -152,6 +161,7 @@ def test_build_api_response_bundle_wraps_compare_bundle_with_guard():
"precision_pair": "fp32_vs_fp32",
"deployment_decision": "deployable",
"guard_status": "ok",
+ "guard_verdict": "pass",
}
assert response["comparison"]["result"] == bundle["result"]
assert response["comparison"]["judgement"] == bundle["judgement"]
@@ -193,6 +203,45 @@ def test_build_api_response_bundle_omits_guard_when_absent():
assert_api_response_contract(response, guard_expected=False)
assert response["deployment_decision"]["decision"] == "unknown"
assert response["summary"]["guard_status"] is None
+ assert response["summary"]["guard_verdict"] is None
+
+
+def test_build_api_response_bundle_summarizes_diagnosis_guard_contract():
+ guard_analysis = {
+ "schema_version": "inferedge-aiguard-diagnosis-v1",
+ "guard_verdict": "blocked",
+ "severity": "high",
+ "primary_reason": "Zero-detection frames exceed threshold.",
+ "evidence": [],
+ "created_at": "2026-05-02T00:00:00Z",
+ }
+ bundle = {
+ "result": {
+ "precision": {
+ "comparison_mode": "same_precision",
+ "pair": "fp32_vs_fp32",
+ }
+ },
+ "judgement": {
+ "overall": "improvement",
+ },
+ "deployment_decision": {
+ "decision": "blocked",
+ "reason": "Guard analysis reported an error-level validation issue.",
+ "lab_overall": "improvement",
+ "guard_status": "error",
+ "guard_verdict": "blocked",
+ "recommended_action": "Do not deploy until the Guard anomalies are resolved.",
+ },
+ "guard_analysis": guard_analysis,
+ }
+
+ response = build_api_response_bundle(bundle)
+
+ assert_api_response_contract(response, guard_expected=True)
+ assert response["summary"]["guard_status"] == "error"
+ assert response["summary"]["guard_verdict"] == "blocked"
+ assert response["guard_analysis"] == guard_analysis
def test_build_api_response_bundle_preserves_worker_provenance_guard_evidence():
diff --git a/tests/test_compare_service.py b/tests/test_compare_service.py
index de6e408..2938c46 100644
--- a/tests/test_compare_service.py
+++ b/tests/test_compare_service.py
@@ -175,6 +175,9 @@ def test_build_compare_bundle_with_guard_runs_optional_reasoning(tmp_path, monke
def fake_analyze_compare_result(guard_input):
assert guard_input["comparison_mode"] == "same_precision"
assert guard_input["precision_pair"] == "fp32_vs_fp32"
+ assert guard_input["source"]["baseline_profile_path"]
+ assert guard_input["source"]["candidate_result_path"]
+ assert guard_input["source"]["runtime_result_path"] == guard_input["source"]["candidate_result_path"]
assert guard_input["latency_delta_pct"] == pytest.approx(-10.0)
assert guard_input["base_precision"] == "fp32"
assert guard_input["candidate_precision"] == "fp32"
@@ -227,6 +230,47 @@ def fake_analyze_compare_result(guard_input):
assert bundle["deployment_decision"]["decision"] == "deployable"
+def test_build_compare_bundle_accepts_diagnosis_guard_contract(tmp_path, monkeypatch):
+ def fake_analyze_compare_result(guard_input):
+ return {
+ "schema_version": "inferedge-aiguard-diagnosis-v1",
+ "source": guard_input["source"],
+ "guard_verdict": "review_required",
+ "severity": "medium",
+ "confidence": 0.88,
+ "primary_reason": "Temporal consistency should be reviewed before deployment.",
+ "evidence": [
+ {
+ "type": "temporal_consistency",
+ "metric_name": "frame_to_frame_detection_count_cv",
+ "observed_value": 1.25,
+ "baseline_value": None,
+ "threshold": 1.0,
+ "severity": "medium",
+ "status": "warning",
+ "explanation": "Detection count variance exceeds review threshold.",
+ "recommendation": "Review frame sequence output before deployment.",
+ }
+ ],
+ "suspected_causes": ["Temporal instability"],
+ "recommendations": ["Review adjacent-frame output."],
+ }
+
+ monkeypatch.setattr(compare_service, "analyze_compare_result", fake_analyze_compare_result)
+ base_path = write_result(tmp_path, "base.json", timestamp="2026-04-13T09:00:00Z", precision="fp32")
+ new_path = write_result(tmp_path, "new.json", timestamp="2026-04-13T10:00:00Z", precision="fp32")
+
+ bundle = build_compare_bundle(base_path=base_path, new_path=new_path, with_guard=True)
+
+ assert bundle["guard_analysis"]["guard_verdict"] == "review_required"
+ assert bundle["guard_analysis"]["source"]["runtime_result_path"] == new_path
+ assert bundle["deployment_decision"]["decision"] == "review_required"
+ assert bundle["deployment_decision"]["guard_status"] == "warning"
+ assert bundle["deployment_decision"]["guard_verdict"] == "review_required"
+ assert "frame_to_frame_detection_count_cv" in bundle["markdown"]
+ assert "Temporal consistency should be reviewed before deployment." in bundle["html"]
+
+
def test_build_compare_bundle_with_guard_skips_when_aiguard_missing(tmp_path, monkeypatch):
monkeypatch.setattr(compare_service, "analyze_compare_result", None)
base_path = write_result(tmp_path, "base.json", timestamp="2026-04-13T09:00:00Z", precision="fp32")
diff --git a/tests/test_deployment_decision.py b/tests/test_deployment_decision.py
index dea4c69..8551512 100644
--- a/tests/test_deployment_decision.py
+++ b/tests/test_deployment_decision.py
@@ -46,6 +46,7 @@ def test_guard_ok_with_improvement_is_deployable():
assert decision["decision"] == "deployable"
assert decision["lab_overall"] == "improvement"
assert decision["guard_status"] == "ok"
+ assert decision["guard_verdict"] == "pass"
def test_guard_ok_with_neutral_is_deployable_with_note():
@@ -75,3 +76,37 @@ def test_risky_tradeoff_requires_review():
)
assert decision["decision"] == "review_required"
+
+
+def test_diagnosis_guard_verdict_blocked_blocks_deployment():
+ decision = build_deployment_decision(
+ make_judgement(overall="improvement"),
+ {
+ "schema_version": "inferedge-aiguard-diagnosis-v1",
+ "guard_verdict": "blocked",
+ "severity": "high",
+ "primary_reason": "Temporal consistency evidence indicates deployment risk.",
+ "evidence": [],
+ },
+ )
+
+ assert decision["decision"] == "blocked"
+ assert decision["guard_status"] == "error"
+ assert decision["guard_verdict"] == "blocked"
+
+
+def test_diagnosis_guard_verdict_review_requires_lab_review():
+ decision = build_deployment_decision(
+ make_judgement(overall="improvement"),
+ {
+ "schema_version": "inferedge-aiguard-diagnosis-v1",
+ "guard_verdict": "review_required",
+ "severity": "medium",
+ "primary_reason": "Temporal consistency should be reviewed before deployment.",
+ "evidence": [],
+ },
+ )
+
+ assert decision["decision"] == "review_required"
+ assert decision["guard_status"] == "warning"
+ assert decision["guard_verdict"] == "review_required"
diff --git a/tests/test_report_generators.py b/tests/test_report_generators.py
index 4695bf2..f9219c9 100644
--- a/tests/test_report_generators.py
+++ b/tests/test_report_generators.py
@@ -303,6 +303,47 @@ def test_generate_compare_markdown_includes_deployment_decision_section():
assert "- guard_status: ok" in text
+def test_generate_compare_markdown_includes_diagnosis_guard_evidence():
+ compare_result = make_compare_result()
+ judgement = make_judgement()
+ guard_analysis = {
+ "schema_version": "inferedge-aiguard-diagnosis-v1",
+ "source": {
+ "runtime_result_path": "results/candidate.json",
+ "model_contract_path": "model_contract.json",
+ },
+ "guard_verdict": "review_required",
+ "severity": "medium",
+ "confidence": 0.88,
+ "primary_reason": "Temporal consistency should be reviewed before deployment.",
+ "evidence": [
+ {
+ "type": "temporal_consistency",
+ "metric_name": "frame_to_frame_detection_count_cv",
+ "observed_value": 1.25,
+ "baseline_value": None,
+ "threshold": 1.0,
+ "severity": "medium",
+ "status": "warning",
+ "explanation": "Detection count variance exceeds review threshold.",
+ "recommendation": "Review frame sequence output before deployment.",
+ }
+ ],
+ "suspected_causes": ["Temporal instability"],
+ "recommendations": ["Review adjacent-frame output."],
+ }
+
+ text = generate_compare_markdown(compare_result, judgement, guard_analysis=guard_analysis)
+
+ assert "- status: warning" in text
+ assert "- guard_verdict: review_required" in text
+ assert "- primary_reason: Temporal consistency should be reviewed before deployment." in text
+ assert "runtime_result_path: `results/candidate.json`" in text
+ assert "### Guard Evidence" in text
+ assert "frame_to_frame_detection_count_cv" in text
+ assert "Detection count variance exceeds review threshold." in text
+
+
def test_generate_compare_html_includes_primary_metric_summary_and_thresholds():
compare_result = make_compare_result()
judgement = make_judgement()
@@ -403,3 +444,43 @@ def test_generate_compare_html_includes_deployment_decision_section():
assert "Deployment Decision" in html
assert "deployable" in html
assert "Deployment can proceed with normal rollout monitoring." in html
+
+
+def test_generate_compare_html_includes_diagnosis_guard_evidence():
+ compare_result = make_compare_result()
+ judgement = make_judgement()
+ guard_analysis = {
+ "schema_version": "inferedge-aiguard-diagnosis-v1",
+ "source": {
+ "runtime_result_path": "results/candidate.json",
+ "model_contract_path": "model_contract.json",
+ },
+ "guard_verdict": "blocked",
+ "severity": "high",
+ "confidence": 0.91,
+ "primary_reason": "Zero-detection frames exceed threshold.",
+ "evidence": [
+ {
+ "type": "temporal_consistency",
+ "metric_name": "zero_detection_frame_ratio",
+ "observed_value": 0.5,
+ "baseline_value": None,
+ "threshold": 0.3,
+ "severity": "high",
+ "status": "failed",
+ "explanation": "Zero-detection frame ratio exceeds blocked threshold.",
+ "recommendation": "Do not deploy until disappearance is explained.",
+ }
+ ],
+ "suspected_causes": ["Detection disappearance"],
+ "recommendations": ["Review frame sequence."],
+ }
+
+ html = generate_compare_html(compare_result, judgement, guard_analysis=guard_analysis)
+
+ assert "guard_verdict" in html
+ assert "blocked" in html
+ assert "runtime_result_path" in html
+ assert "Guard Evidence" in html
+ assert "zero_detection_frame_ratio" in html
+ assert "Zero-detection frame ratio exceeds blocked threshold." in html
diff --git a/tests/test_worker_contract.py b/tests/test_worker_contract.py
index cb9056f..23c99b2 100644
--- a/tests/test_worker_contract.py
+++ b/tests/test_worker_contract.py
@@ -206,6 +206,28 @@ def test_completed_worker_response_preserves_optional_guard_analysis():
assert completed["result"]["guard_analysis"] == worker_response["guard_analysis"]
assert completed["result"]["summary"]["guard_status"] == "ok"
+ assert completed["result"]["summary"]["guard_verdict"] == "pass"
+
+
+def test_completed_worker_response_preserves_diagnosis_guard_contract():
+ job = _make_queued_analyze_job(job_id="job_worker_smoke")
+ worker_response = load_fixture("worker_completed_response.json")
+ worker_response["guard_analysis"] = {
+ "schema_version": "inferedge-aiguard-diagnosis-v1",
+ "guard_verdict": "review_required",
+ "severity": "medium",
+ "primary_reason": "Temporal consistency should be reviewed before deployment.",
+ "evidence": [],
+ "created_at": "2026-05-02T00:00:00Z",
+ }
+
+ completed = apply_worker_response_to_job(job, worker_response)
+
+ assert completed["result"]["guard_analysis"] == worker_response["guard_analysis"]
+ assert completed["result"]["summary"]["guard_status"] == "warning"
+ assert completed["result"]["summary"]["guard_verdict"] == "review_required"
+ assert completed["result"]["deployment_decision"]["guard_status"] == "warning"
+ assert completed["result"]["deployment_decision"]["guard_verdict"] == "review_required"
def test_completed_worker_response_allows_guard_analysis_absent():
@@ -217,6 +239,7 @@ def test_completed_worker_response_allows_guard_analysis_absent():
assert "guard_analysis" not in completed["result"]
assert completed["result"]["summary"]["guard_status"] is None
+ assert completed["result"]["summary"]["guard_verdict"] is None
def test_worker_failed_response_fixture_satisfies_contract():