Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ def _validate_artifact(path: Path, fmt: str, artifact_id: str = "") -> tuple[str
if artifact_id == "candidate_claims":
return _validate_candidate_claims_payload(payload)
if artifact_id == "screened_candidates":
return _validate_screened_candidates_payload(payload)
return _validate_screened_candidates_payload(payload, artifact_path=path)
if artifact_id == "input_classification":
return _validate_input_classification_payload(payload, artifact_path=path)
if artifact_id == "source_evidence_pack_manifest":
Expand Down Expand Up @@ -324,11 +324,24 @@ def _non_empty_scalar(value: Any) -> bool:
)


def _validate_screened_candidates_payload(payload: Any) -> tuple[str, str]:
def _validate_screened_candidates_payload(
payload: Any,
*,
artifact_path: Path | None = None,
) -> tuple[str, str]:
if isinstance(payload, list):
return _validate_legacy_screened_candidates(payload)
if isinstance(payload, dict):
return _validate_contract_screened_candidates(payload)
status, result = _validate_contract_screened_candidates(payload)
if status != ARTIFACT_VALID:
return status, result
universe_error = _screened_candidates_candidate_universe_error(
payload,
artifact_path=artifact_path,
)
if universe_error:
return ARTIFACT_INVALID, f"screened_candidates_schema_error:{universe_error}"
return status, result
return ARTIFACT_INVALID, "screened_candidates_schema_error:not_list_or_object"


Expand Down Expand Up @@ -545,6 +558,71 @@ def _screened_candidates_discard_count(payload: dict[str, Any]) -> int:
return count


def _screened_candidates_candidate_universe_error(
payload: dict[str, Any],
*,
artifact_path: Path | None,
) -> str | None:
if artifact_path is None:
return None
screening_policy = payload.get("screening_policy")
if not isinstance(screening_policy, dict):
return None
declared_total, total_error = _screened_candidates_total(payload, screening_policy)
if total_error or declared_total is None:
return None

candidate_payload = _read_json_payload(artifact_path.with_name("candidate_claims.json"))
if not isinstance(candidate_payload, list):
return None
candidate_status, _ = _validate_candidate_claims_payload(candidate_payload)
if candidate_status != ARTIFACT_VALID:
return None

if declared_total != len(candidate_payload):
return "candidate_universe_count_mismatch"

candidate_ids = _candidate_claim_ids(candidate_payload)
if candidate_ids is None:
return None

screened_ids: set[str] = set()
missing_screened_id = False
for bucket in ("selected", "excluded", "deprioritized"):
entries = payload.get(bucket)
if not isinstance(entries, list):
continue
for idx, candidate in enumerate(entries):
if not isinstance(candidate, dict):
continue
candidate_id = candidate.get("candidate_id")
if not _non_empty_string(candidate_id):
missing_screened_id = True
continue
normalized_id = candidate_id.strip()
if normalized_id not in candidate_ids:
return f"{bucket}[{idx}].unknown_candidate_id:{normalized_id}"
if normalized_id in screened_ids:
return f"duplicate_screened_candidate_id:{normalized_id}"
screened_ids.add(normalized_id)

if not missing_screened_id and screened_ids != candidate_ids:
return "candidate_universe_id_coverage_mismatch"
Comment on lines +609 to +610

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Reject screened entries missing candidate IDs

When candidate_claims.json has stable IDs, any screened entry without candidate_id now disables the final universe coverage comparison because missing_screened_id makes this branch skip the set check. For example, a file can declare total_candidates: 2, include one selected item with no ID plus an excluded CAND-001, and pass as valid even though CAND-002 disappeared and CAND-001 is effectively duplicated. Since the new gate is meant to guard the screened universe and the Scout/Screener contract requires stable candidate IDs, missing IDs in this path should be invalid rather than bypassing coverage.

Useful? React with 👍 / 👎.

return None


def _candidate_claim_ids(payload: list[Any]) -> set[str] | None:
ids: set[str] = set()
for candidate in payload:
if not isinstance(candidate, dict):
return None
candidate_id = candidate.get("candidate_id")
if not _non_empty_string(candidate_id):
return None
ids.add(candidate_id.strip())
return ids


def _validate_input_classification_payload(payload: Any, *, artifact_path: Path) -> tuple[str, str]:
if not isinstance(payload, dict):
return ARTIFACT_INVALID, "input_classification_schema_error:not_object"
Expand Down Expand Up @@ -1108,6 +1186,13 @@ def _build_artifact_registry(
}


def _read_json_payload(path: Path) -> Any:
try:
return json.loads(path.read_text(encoding="utf-8"))
except (OSError, UnicodeDecodeError, json.JSONDecodeError):
return None


def interpret_frozen_artifact_integrity(
*,
old_registry: dict[str, Any] | None,
Expand Down
249 changes: 249 additions & 0 deletions tests/test_runtime_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -1958,6 +1958,7 @@ def test_state_check_accepts_object_shaped_screened_candidates(tmp_path):
{
"selected": [
{
"candidate_id": "CAND-001",
"statement": "ExampleCo opened a demo facility.",
"evidence_text": "ExampleCo opened a demo facility in June.",
"source_id": "SRC-001",
Expand Down Expand Up @@ -1996,6 +1997,7 @@ def test_state_check_accepts_legacy_object_screened_candidates_reason_only(tmp_p
{
"selected": [
{
"candidate_id": "CAND-001",
"statement": "ExampleCo opened a demo facility.",
"evidence_text": "ExampleCo opened a demo facility in June.",
"source_id": "SRC-001",
Expand Down Expand Up @@ -2424,6 +2426,197 @@ def test_state_check_accepts_screened_candidates_complete_discard_audit(tmp_path
assert record["validation_result"] == "valid_screened_candidates_schema"


def test_state_check_rejects_screened_candidates_total_below_candidate_universe(tmp_path):
ws = _write_workspace(tmp_path)
initialize_runtime_state(workspace=ws, repo_workdir=ROOT)
_write_json_artifact(
ws,
"candidate_claims.json",
json.dumps(
[
{"candidate_id": "CAND-001", "claim": "ExampleCo opened a demo facility.", "source_id": "SRC-001"},
{"candidate_id": "CAND-002", "claim": "ExampleCo expanded production.", "source_id": "SRC-002"},
]
)
+ "\n",
)
_write_json_artifact(
ws,
"screened_candidates.json",
json.dumps(
{
"selected": [
{
"statement": "ExampleCo opened a demo facility.",
"evidence_text": "ExampleCo opened a demo facility in June.",
"source_id": "SRC-001",
"published_at": "2026-06-01",
}
],
"excluded": [],
"screening_policy": {"total_candidates": 1, "max_items": 8},
}
)
+ "\n",
)

state = check_runtime_state(workspace=ws, repo_workdir=ROOT)
record = state["artifact_registry"]["artifacts"]["screened_candidates"]

assert record["status"] == "invalid"
assert record["validation_result"] == "screened_candidates_schema_error:candidate_universe_count_mismatch"


def test_state_check_rejects_screened_candidates_unknown_discard_id(tmp_path):
ws = _write_workspace(tmp_path)
initialize_runtime_state(workspace=ws, repo_workdir=ROOT)
_write_json_artifact(
ws,
"candidate_claims.json",
json.dumps(
[
{"candidate_id": "CAND-001", "claim": "ExampleCo opened a demo facility.", "source_id": "SRC-001"},
{"candidate_id": "CAND-002", "claim": "ExampleCo expanded production.", "source_id": "SRC-002"},
]
)
+ "\n",
)
_write_json_artifact(
ws,
"screened_candidates.json",
json.dumps(
{
"selected": [
{
"candidate_id": "CAND-001",
"statement": "ExampleCo opened a demo facility.",
"evidence_text": "ExampleCo opened a demo facility in June.",
"source_id": "SRC-001",
"published_at": "2026-06-01",
}
],
"excluded": [
{
"candidate_id": "CAND-999",
"reason": "capacity_capped",
"reason_code": "capacity_capped",
"explanation": "Dropped because section capacity was already filled.",
}
],
"screening_policy": {"total_candidates": 2, "max_items": 8},
}
)
+ "\n",
)

state = check_runtime_state(workspace=ws, repo_workdir=ROOT)
record = state["artifact_registry"]["artifacts"]["screened_candidates"]

assert record["status"] == "invalid"
assert (
record["validation_result"]
== "screened_candidates_schema_error:excluded[0].unknown_candidate_id:CAND-999"
)


def test_state_check_rejects_screened_candidates_duplicate_screened_id(tmp_path):
ws = _write_workspace(tmp_path)
initialize_runtime_state(workspace=ws, repo_workdir=ROOT)
_write_json_artifact(
ws,
"candidate_claims.json",
json.dumps(
[
{"candidate_id": "CAND-001", "claim": "ExampleCo opened a demo facility.", "source_id": "SRC-001"},
{"candidate_id": "CAND-002", "claim": "ExampleCo expanded production.", "source_id": "SRC-002"},
]
)
+ "\n",
)
_write_json_artifact(
ws,
"screened_candidates.json",
json.dumps(
{
"selected": [
{
"candidate_id": "CAND-001",
"statement": "ExampleCo opened a demo facility.",
"evidence_text": "ExampleCo opened a demo facility in June.",
"source_id": "SRC-001",
"published_at": "2026-06-01",
}
],
"excluded": [
{
"candidate_id": "CAND-001",
"reason": "capacity_capped",
"reason_code": "capacity_capped",
"explanation": "Dropped because section capacity was already filled.",
}
],
"screening_policy": {"total_candidates": 2, "max_items": 8},
}
)
+ "\n",
)

state = check_runtime_state(workspace=ws, repo_workdir=ROOT)
record = state["artifact_registry"]["artifacts"]["screened_candidates"]

assert record["status"] == "invalid"
assert record["validation_result"] == "screened_candidates_schema_error:duplicate_screened_candidate_id:CAND-001"


def test_state_check_accepts_screened_candidates_total_matching_candidate_universe(tmp_path):
ws = _write_workspace(tmp_path)
initialize_runtime_state(workspace=ws, repo_workdir=ROOT)
_write_json_artifact(
ws,
"candidate_claims.json",
json.dumps(
[
{"candidate_id": "CAND-001", "claim": "ExampleCo opened a demo facility.", "source_id": "SRC-001"},
{"candidate_id": "CAND-002", "claim": "ExampleCo expanded production.", "source_id": "SRC-002"},
]
)
+ "\n",
)
_write_json_artifact(
ws,
"screened_candidates.json",
json.dumps(
{
"selected": [
{
"candidate_id": "CAND-001",
"statement": "ExampleCo opened a demo facility.",
"evidence_text": "ExampleCo opened a demo facility in June.",
"source_id": "SRC-001",
"published_at": "2026-06-01",
}
],
"excluded": [
{
"candidate_id": "CAND-002",
"reason": "capacity_capped",
"reason_code": "capacity_capped",
"explanation": "Dropped because section capacity was already filled.",
}
],
"screening_policy": {"total_candidates": 2, "max_items": 8},
}
)
+ "\n",
)

state = check_runtime_state(workspace=ws, repo_workdir=ROOT)
record = state["artifact_registry"]["artifacts"]["screened_candidates"]

assert record["status"] == "valid"
assert record["validation_result"] == "valid_screened_candidates_schema"


def test_state_check_marks_invalid_screening_status_invalid(tmp_path):
ws = _write_workspace(tmp_path)
initialize_runtime_state(workspace=ws, repo_workdir=ROOT)
Expand Down Expand Up @@ -2855,6 +3048,62 @@ def test_default_topology_scout_completion_requires_screened_candidates(tmp_path
assert _event_records(ws) == before_events


def test_default_topology_scout_completion_rejects_screened_candidate_universe_mismatch(tmp_path):
repo = _repo_with_role_topology(
tmp_path,
"default",
)
ws = _write_workspace(tmp_path)
initialize_runtime_state(workspace=ws, repo_workdir=repo)
_set_current_stage(ws, "scout")
_write_json_artifact(
ws,
"candidate_claims.json",
json.dumps(
[
{"candidate_id": "CAND-001", "claim": "ExampleCo opened a demo facility.", "source_id": "SRC-001"},
{"candidate_id": "CAND-002", "claim": "ExampleCo expanded production.", "source_id": "SRC-002"},
]
)
+ "\n",
)
_write_json_artifact(
ws,
"screened_candidates.json",
json.dumps(
{
"selected": [
{
"candidate_id": "CAND-001",
"statement": "ExampleCo opened a demo facility.",
"evidence_text": "ExampleCo opened a demo facility in June.",
"source_id": "SRC-001",
"published_at": "2026-06-01",
}
],
"excluded": [],
"screening_policy": {"total_candidates": 1, "max_items": 8},
}
)
+ "\n",
)
before_workflow = json.loads(_state_file(ws, "workflow_state").read_text(encoding="utf-8"))
before_events = _event_records(ws)

with pytest.raises(RuntimeStateError) as excinfo:
complete_stage_transaction(
workspace=ws,
repo_workdir=repo,
stage_id="scout",
reason="scout complete",
)

assert excinfo.value.error_code == "E_ARTIFACT_INVALID"
assert "candidate_universe_count_mismatch" in str(excinfo.value)
assert json.loads(_state_file(ws, "workflow_state").read_text(encoding="utf-8")) == before_workflow
assert _event_records(ws) == before_events


def test_default_topology_scout_completion_satisfies_screener(tmp_path):
repo = _repo_with_role_topology(
tmp_path,
Expand Down
Loading