From 6afa0b19a4f32bff83456f7d33ed1aa31262d6fa Mon Sep 17 00:00:00 2001 From: yihong guo Date: Thu, 2 Jul 2026 13:54:31 +0700 Subject: [PATCH] test: guard screened discard universe counts --- .../runtime_state/artifact_registry.py | 91 ++++++- tests/test_runtime_state.py | 249 ++++++++++++++++++ 2 files changed, 337 insertions(+), 3 deletions(-) diff --git a/src/multi_agent_brief/orchestrator/runtime_state/artifact_registry.py b/src/multi_agent_brief/orchestrator/runtime_state/artifact_registry.py index 54309b6d..c44d165e 100644 --- a/src/multi_agent_brief/orchestrator/runtime_state/artifact_registry.py +++ b/src/multi_agent_brief/orchestrator/runtime_state/artifact_registry.py @@ -194,7 +194,7 @@ def _validate_artifact(path: Path, fmt: str, artifact_id: str = "") -> tuple[str if artifact_id == "candidate_claims": return _validate_candidate_claims_payload(payload) if artifact_id == "screened_candidates": - return _validate_screened_candidates_payload(payload) + return _validate_screened_candidates_payload(payload, artifact_path=path) if artifact_id == "input_classification": return _validate_input_classification_payload(payload, artifact_path=path) if artifact_id == "source_evidence_pack_manifest": @@ -324,11 +324,24 @@ def _non_empty_scalar(value: Any) -> bool: ) -def _validate_screened_candidates_payload(payload: Any) -> tuple[str, str]: +def _validate_screened_candidates_payload( + payload: Any, + *, + artifact_path: Path | None = None, +) -> tuple[str, str]: if isinstance(payload, list): return _validate_legacy_screened_candidates(payload) if isinstance(payload, dict): - return _validate_contract_screened_candidates(payload) + status, result = _validate_contract_screened_candidates(payload) + if status != ARTIFACT_VALID: + return status, result + universe_error = _screened_candidates_candidate_universe_error( + payload, + artifact_path=artifact_path, + ) + if universe_error: + return ARTIFACT_INVALID, f"screened_candidates_schema_error:{universe_error}" + return status, result return ARTIFACT_INVALID, "screened_candidates_schema_error:not_list_or_object" @@ -545,6 +558,71 @@ def _screened_candidates_discard_count(payload: dict[str, Any]) -> int: return count +def _screened_candidates_candidate_universe_error( + payload: dict[str, Any], + *, + artifact_path: Path | None, +) -> str | None: + if artifact_path is None: + return None + screening_policy = payload.get("screening_policy") + if not isinstance(screening_policy, dict): + return None + declared_total, total_error = _screened_candidates_total(payload, screening_policy) + if total_error or declared_total is None: + return None + + candidate_payload = _read_json_payload(artifact_path.with_name("candidate_claims.json")) + if not isinstance(candidate_payload, list): + return None + candidate_status, _ = _validate_candidate_claims_payload(candidate_payload) + if candidate_status != ARTIFACT_VALID: + return None + + if declared_total != len(candidate_payload): + return "candidate_universe_count_mismatch" + + candidate_ids = _candidate_claim_ids(candidate_payload) + if candidate_ids is None: + return None + + screened_ids: set[str] = set() + missing_screened_id = False + for bucket in ("selected", "excluded", "deprioritized"): + entries = payload.get(bucket) + if not isinstance(entries, list): + continue + for idx, candidate in enumerate(entries): + if not isinstance(candidate, dict): + continue + candidate_id = candidate.get("candidate_id") + if not _non_empty_string(candidate_id): + missing_screened_id = True + continue + normalized_id = candidate_id.strip() + if normalized_id not in candidate_ids: + return f"{bucket}[{idx}].unknown_candidate_id:{normalized_id}" + if normalized_id in screened_ids: + return f"duplicate_screened_candidate_id:{normalized_id}" + screened_ids.add(normalized_id) + + if not missing_screened_id and screened_ids != candidate_ids: + return "candidate_universe_id_coverage_mismatch" + return None + + +def _candidate_claim_ids(payload: list[Any]) -> set[str] | None: + ids: set[str] = set() + for candidate in payload: + if not isinstance(candidate, dict): + return None + candidate_id = candidate.get("candidate_id") + if not _non_empty_string(candidate_id): + return None + ids.add(candidate_id.strip()) + return ids + + def _validate_input_classification_payload(payload: Any, *, artifact_path: Path) -> tuple[str, str]: if not isinstance(payload, dict): return ARTIFACT_INVALID, "input_classification_schema_error:not_object" @@ -1108,6 +1186,13 @@ def _build_artifact_registry( } +def _read_json_payload(path: Path) -> Any: + try: + return json.loads(path.read_text(encoding="utf-8")) + except (OSError, UnicodeDecodeError, json.JSONDecodeError): + return None + + def interpret_frozen_artifact_integrity( *, old_registry: dict[str, Any] | None, diff --git a/tests/test_runtime_state.py b/tests/test_runtime_state.py index 08e9a3c9..bd5a747e 100644 --- a/tests/test_runtime_state.py +++ b/tests/test_runtime_state.py @@ -1958,6 +1958,7 @@ def test_state_check_accepts_object_shaped_screened_candidates(tmp_path): { "selected": [ { + "candidate_id": "CAND-001", "statement": "ExampleCo opened a demo facility.", "evidence_text": "ExampleCo opened a demo facility in June.", "source_id": "SRC-001", @@ -1996,6 +1997,7 @@ def test_state_check_accepts_legacy_object_screened_candidates_reason_only(tmp_p { "selected": [ { + "candidate_id": "CAND-001", "statement": "ExampleCo opened a demo facility.", "evidence_text": "ExampleCo opened a demo facility in June.", "source_id": "SRC-001", @@ -2424,6 +2426,197 @@ def test_state_check_accepts_screened_candidates_complete_discard_audit(tmp_path assert record["validation_result"] == "valid_screened_candidates_schema" +def test_state_check_rejects_screened_candidates_total_below_candidate_universe(tmp_path): + ws = _write_workspace(tmp_path) + initialize_runtime_state(workspace=ws, repo_workdir=ROOT) + _write_json_artifact( + ws, + "candidate_claims.json", + json.dumps( + [ + {"candidate_id": "CAND-001", "claim": "ExampleCo opened a demo facility.", "source_id": "SRC-001"}, + {"candidate_id": "CAND-002", "claim": "ExampleCo expanded production.", "source_id": "SRC-002"}, + ] + ) + + "\n", + ) + _write_json_artifact( + ws, + "screened_candidates.json", + json.dumps( + { + "selected": [ + { + "statement": "ExampleCo opened a demo facility.", + "evidence_text": "ExampleCo opened a demo facility in June.", + "source_id": "SRC-001", + "published_at": "2026-06-01", + } + ], + "excluded": [], + "screening_policy": {"total_candidates": 1, "max_items": 8}, + } + ) + + "\n", + ) + + state = check_runtime_state(workspace=ws, repo_workdir=ROOT) + record = state["artifact_registry"]["artifacts"]["screened_candidates"] + + assert record["status"] == "invalid" + assert record["validation_result"] == "screened_candidates_schema_error:candidate_universe_count_mismatch" + + +def test_state_check_rejects_screened_candidates_unknown_discard_id(tmp_path): + ws = _write_workspace(tmp_path) + initialize_runtime_state(workspace=ws, repo_workdir=ROOT) + _write_json_artifact( + ws, + "candidate_claims.json", + json.dumps( + [ + {"candidate_id": "CAND-001", "claim": "ExampleCo opened a demo facility.", "source_id": "SRC-001"}, + {"candidate_id": "CAND-002", "claim": "ExampleCo expanded production.", "source_id": "SRC-002"}, + ] + ) + + "\n", + ) + _write_json_artifact( + ws, + "screened_candidates.json", + json.dumps( + { + "selected": [ + { + "candidate_id": "CAND-001", + "statement": "ExampleCo opened a demo facility.", + "evidence_text": "ExampleCo opened a demo facility in June.", + "source_id": "SRC-001", + "published_at": "2026-06-01", + } + ], + "excluded": [ + { + "candidate_id": "CAND-999", + "reason": "capacity_capped", + "reason_code": "capacity_capped", + "explanation": "Dropped because section capacity was already filled.", + } + ], + "screening_policy": {"total_candidates": 2, "max_items": 8}, + } + ) + + "\n", + ) + + state = check_runtime_state(workspace=ws, repo_workdir=ROOT) + record = state["artifact_registry"]["artifacts"]["screened_candidates"] + + assert record["status"] == "invalid" + assert ( + record["validation_result"] + == "screened_candidates_schema_error:excluded[0].unknown_candidate_id:CAND-999" + ) + + +def test_state_check_rejects_screened_candidates_duplicate_screened_id(tmp_path): + ws = _write_workspace(tmp_path) + initialize_runtime_state(workspace=ws, repo_workdir=ROOT) + _write_json_artifact( + ws, + "candidate_claims.json", + json.dumps( + [ + {"candidate_id": "CAND-001", "claim": "ExampleCo opened a demo facility.", "source_id": "SRC-001"}, + {"candidate_id": "CAND-002", "claim": "ExampleCo expanded production.", "source_id": "SRC-002"}, + ] + ) + + "\n", + ) + _write_json_artifact( + ws, + "screened_candidates.json", + json.dumps( + { + "selected": [ + { + "candidate_id": "CAND-001", + "statement": "ExampleCo opened a demo facility.", + "evidence_text": "ExampleCo opened a demo facility in June.", + "source_id": "SRC-001", + "published_at": "2026-06-01", + } + ], + "excluded": [ + { + "candidate_id": "CAND-001", + "reason": "capacity_capped", + "reason_code": "capacity_capped", + "explanation": "Dropped because section capacity was already filled.", + } + ], + "screening_policy": {"total_candidates": 2, "max_items": 8}, + } + ) + + "\n", + ) + + state = check_runtime_state(workspace=ws, repo_workdir=ROOT) + record = state["artifact_registry"]["artifacts"]["screened_candidates"] + + assert record["status"] == "invalid" + assert record["validation_result"] == "screened_candidates_schema_error:duplicate_screened_candidate_id:CAND-001" + + +def test_state_check_accepts_screened_candidates_total_matching_candidate_universe(tmp_path): + ws = _write_workspace(tmp_path) + initialize_runtime_state(workspace=ws, repo_workdir=ROOT) + _write_json_artifact( + ws, + "candidate_claims.json", + json.dumps( + [ + {"candidate_id": "CAND-001", "claim": "ExampleCo opened a demo facility.", "source_id": "SRC-001"}, + {"candidate_id": "CAND-002", "claim": "ExampleCo expanded production.", "source_id": "SRC-002"}, + ] + ) + + "\n", + ) + _write_json_artifact( + ws, + "screened_candidates.json", + json.dumps( + { + "selected": [ + { + "candidate_id": "CAND-001", + "statement": "ExampleCo opened a demo facility.", + "evidence_text": "ExampleCo opened a demo facility in June.", + "source_id": "SRC-001", + "published_at": "2026-06-01", + } + ], + "excluded": [ + { + "candidate_id": "CAND-002", + "reason": "capacity_capped", + "reason_code": "capacity_capped", + "explanation": "Dropped because section capacity was already filled.", + } + ], + "screening_policy": {"total_candidates": 2, "max_items": 8}, + } + ) + + "\n", + ) + + state = check_runtime_state(workspace=ws, repo_workdir=ROOT) + record = state["artifact_registry"]["artifacts"]["screened_candidates"] + + assert record["status"] == "valid" + assert record["validation_result"] == "valid_screened_candidates_schema" + + def test_state_check_marks_invalid_screening_status_invalid(tmp_path): ws = _write_workspace(tmp_path) initialize_runtime_state(workspace=ws, repo_workdir=ROOT) @@ -2855,6 +3048,62 @@ def test_default_topology_scout_completion_requires_screened_candidates(tmp_path assert _event_records(ws) == before_events +def test_default_topology_scout_completion_rejects_screened_candidate_universe_mismatch(tmp_path): + repo = _repo_with_role_topology( + tmp_path, + "default", + ) + ws = _write_workspace(tmp_path) + initialize_runtime_state(workspace=ws, repo_workdir=repo) + _set_current_stage(ws, "scout") + _write_json_artifact( + ws, + "candidate_claims.json", + json.dumps( + [ + {"candidate_id": "CAND-001", "claim": "ExampleCo opened a demo facility.", "source_id": "SRC-001"}, + {"candidate_id": "CAND-002", "claim": "ExampleCo expanded production.", "source_id": "SRC-002"}, + ] + ) + + "\n", + ) + _write_json_artifact( + ws, + "screened_candidates.json", + json.dumps( + { + "selected": [ + { + "candidate_id": "CAND-001", + "statement": "ExampleCo opened a demo facility.", + "evidence_text": "ExampleCo opened a demo facility in June.", + "source_id": "SRC-001", + "published_at": "2026-06-01", + } + ], + "excluded": [], + "screening_policy": {"total_candidates": 1, "max_items": 8}, + } + ) + + "\n", + ) + before_workflow = json.loads(_state_file(ws, "workflow_state").read_text(encoding="utf-8")) + before_events = _event_records(ws) + + with pytest.raises(RuntimeStateError) as excinfo: + complete_stage_transaction( + workspace=ws, + repo_workdir=repo, + stage_id="scout", + reason="scout complete", + ) + + assert excinfo.value.error_code == "E_ARTIFACT_INVALID" + assert "candidate_universe_count_mismatch" in str(excinfo.value) + assert json.loads(_state_file(ws, "workflow_state").read_text(encoding="utf-8")) == before_workflow + assert _event_records(ws) == before_events + + def test_default_topology_scout_completion_satisfies_screener(tmp_path): repo = _repo_with_role_topology( tmp_path,