diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index b62164d..12ea400 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -383,6 +383,27 @@ The same executor model is used by `vocab` (with [`vocab.py`](src/xbrain/vocab.p --- +## Snapshot diffing + +`xbrain diff [snap-b]` (default `snap-b` = live `data/`) compares two snapshot data directories and answers one question: **what moved between these two states?** Built on top of the snapshot lifecycle from issue #17 — without snapshots there is nothing to diff. + +The module ([`src/xbrain/diff.py`](src/xbrain/diff.py)) is a **pure orchestrator**: the only I/O is the three loader calls at entry (`load_store`, `load_vocab`, `load_topic_pages`); everything else is in-memory pydantic dataclasses. The CLI is the only thing that touches `typer.echo` — `diff_snapshots` returns a `DiffReport` and lets the caller render it. + +The report has four sections, each pinning a different axis of change: + +- **Items** — how many items were reassigned (`primary_topic` differs between A and B, both sides enriched), top N most-frequent transitions (`ai-coding → software-engineering: 12 items`), `None → topic` rows when an item gained enrichment between the two snapshots. +- **Topics** — per-topic membership delta (added / removed / unchanged item ids), plus an overview-drift classification (`identical` / `similar` / `different` / `not_comparable`) using a pure-Python TF cosine over the two topic-page overview texts. +- **Vocab** — slugs added, slugs removed, count of unchanged slugs. Rename detection is out of scope for v1 (a `delta` of `+1` added and `+1` removed is the user's cue). +- **Summary** — top-level counts (items in both, enriched in both, reassigned, reassigned_pct, vocab churn, topic-page counts) — same fields the JSON-format consumers anchor on. + +**Pure-Python TF cosine**, not embeddings, not TF-IDF. Two reasons: (1) zero new dependencies (no scikit-learn, no sentence-transformers); (2) IDF degenerates on N=2 documents anyway, so plain TF gives the same `identical / similar / different` bucketing without the noise. The tokenizer covers Latin-1 accented characters (`à-ÿ`) so Spanish / French overviews compare correctly. Topics with fewer than 5 members never trigger a growth flag — a 2→3 jump is 50% growth but statistically meaningless on a tiny topic. + +**Output:** `--format text` (default, human-readable section blocks) or `--format json` (pydantic `model_dump_json`, stable schema for downstream consumers — the WS3 eval harness in issue #8 will read this). + +`xbrain diff` is also the foundation for **drift monitoring** between runs: take a snapshot, re-enrich, diff. A jump in `reassigned_pct` on a small corpus change is a signal that the prompt or model output is unstable; that is the eval-by-comparison question WS3 will formalise. + +--- + ## Invariants These are the rules the rest of the architecture rests on. Breaking any of them produces silent data corruption or makes the system unreproducible. diff --git a/src/xbrain/executors/api.py b/src/xbrain/executors/api.py index c6175e8..2b18138 100644 --- a/src/xbrain/executors/api.py +++ b/src/xbrain/executors/api.py @@ -8,6 +8,7 @@ from __future__ import annotations +import json import sys from xbrain.executors.base import EnrichmentJudgment @@ -18,6 +19,26 @@ _MAX_TOKENS = 600 +def _recoverable_errors() -> tuple[type[Exception], ...]: + """Exception classes a per-item failure should swallow + log + continue on. + + `anthropic.APIError` covers auth, rate-limit, server-side and network + errors the SDK normalises. `ValueError` covers validator rejections and + `pydantic.ValidationError` (a `ValueError` subclass in pydantic v2). + `json.JSONDecodeError` covers a malformed LLM response. `KeyError` covers + a response missing an expected field. + + Lazy-imported because `anthropic` is an optional dependency in the test + environment (the client is faked). + """ + try: + from anthropic import APIError + + return (APIError, ValueError, json.JSONDecodeError, KeyError) + except ImportError: + return (ValueError, json.JSONDecodeError, KeyError) + + def _vocab_block(vocab: list[Topic]) -> str: return "\n".join(f"- {t.slug}: {t.description}" for t in vocab) @@ -82,7 +103,9 @@ def __init__(self, model: str, output_language: str, client=None): def enrich_items(self, items: list[Item], vocab: list[Topic]) -> list[EnrichmentJudgment]: system = _system_prompt(self._output_language) + recoverable = _recoverable_errors() results: list[EnrichmentJudgment] = [] + failures = 0 for item in items: try: response = self._client.messages.create( @@ -105,12 +128,27 @@ def enrich_items(self, items: list[Item], vocab: list[Topic]) -> list[Enrichment topics=list(judgment["topics"]), ) ) - except Exception as exc: # noqa: BLE001 + except recoverable as exc: # One transient/malformed response must not abort the batch: - # the item stays pending and is retried on the next run. + # the item stays pending and is retried on the next run. Note: + # programmer bugs (`AttributeError`, …) and `KeyboardInterrupt` + # are NOT in `recoverable` — they propagate so the developer + # sees the traceback and Ctrl-C still works. + failures += 1 print( f"warn: enrichment failed for item {item.id}: {exc}", file=sys.stderr, ) continue + if items and not results and failures > 0: + raise RuntimeError( + f"All {failures} items failed enrichment; see warnings above for details." + ) + if failures > 0: + # SUMMARY prefix so the line is distinguishable from the per-item + # `warn:` lines that precede it in a partial-failure batch. + print( + f"SUMMARY: enriched: {len(results)}, failed: {failures}", + file=sys.stderr, + ) return results diff --git a/src/xbrain/topic_synth.py b/src/xbrain/topic_synth.py index 0546ec2..94a7b41 100644 --- a/src/xbrain/topic_synth.py +++ b/src/xbrain/topic_synth.py @@ -77,19 +77,49 @@ def _judgment_from_response(response, slug: str) -> OverviewJudgment: ) +def _recoverable_errors() -> tuple[type[Exception], ...]: + """Exception classes a per-topic failure should swallow + log + continue on. + + Mirrors `xbrain.executors.api._recoverable_errors`. See that helper for + rationale. `anthropic.APIError` is lazy-imported because `anthropic` is + optional in the test environment (the client is faked). + """ + try: + import json as _json + from anthropic import APIError + + return (APIError, ValueError, _json.JSONDecodeError, KeyError) + except ImportError: + import json as _json + + return (ValueError, _json.JSONDecodeError, KeyError) + + def synthesize_overviews_api( inputs: list[TopicInput], model: str, output_language: str, client=None, ) -> list[OverviewJudgment]: - """Synthesize topic overviews via the Anthropic API — one call per topic.""" + """Synthesize topic overviews via the Anthropic API — one call per topic. + + A `RuntimeError` is raised when EVERY input topic fails — exposed as a + non-zero exit through the CLI's `_handle_cli_errors` wrapper. Partial + success (some topics synthesise, some fail) returns the successes and + prints a summary line to stderr. + + Programmer bugs (`AttributeError`, ...) and `KeyboardInterrupt` are not + in the recoverable set — they propagate so the developer sees the + traceback and Ctrl-C still works. + """ if client is None: from anthropic import Anthropic # lazy: tests inject a fake client = Anthropic() # reads ANTHROPIC_API_KEY from the environment system = _system_prompt(output_language) + recoverable = _recoverable_errors() results: list[OverviewJudgment] = [] + failures = 0 for topic in inputs: try: response = client.messages.create( @@ -99,14 +129,26 @@ def synthesize_overviews_api( messages=[{"role": "user", "content": _user_prompt(topic)}], ) results.append(_judgment_from_response(response, topic.slug)) - except Exception as exc: # noqa: BLE001 - # One failed topic must not abort the batch — it stays unsynthesized - # and is retried on the next run. + except recoverable as exc: + # One failed topic must not abort the batch — it stays + # unsynthesized and is retried on the next run. + failures += 1 print( f"warn: topic synthesis failed for {topic.slug}: {exc}", file=sys.stderr, ) continue + if inputs and not results and failures > 0: + raise RuntimeError( + f"All {failures} topics failed synthesis; see warnings above for details." + ) + if failures > 0: + # SUMMARY prefix so the line stands out from the per-topic `warn:` + # lines that precede it in a partial-failure batch. + print( + f"SUMMARY: synthesized: {len(results)}, failed: {failures}", + file=sys.stderr, + ) return results diff --git a/tests/test_executors_api.py b/tests/test_executors_api.py index 63c8f31..c022aad 100644 --- a/tests/test_executors_api.py +++ b/tests/test_executors_api.py @@ -101,13 +101,96 @@ def test_api_executor_skips_wrong_shape_response(capsys): def test_api_executor_skips_item_on_api_failure(capsys): # A transient API failure on one item must not abort the whole batch. + from anthropic import APIError + client = FakeAnthropic( [ - RuntimeError("503 service unavailable"), + APIError("503 service unavailable", request=None, body=None), {"summary": "r", "primary_topic": "misc", "topics": ["misc"]}, ] ) ex = ApiExecutor(model="m", output_language="English", client=client) out = ex.enrich_items([_item("1"), _item("2")], VOCAB) assert {j.item_id for j in out} == {"2"} - assert "enrichment failed for item 1" in capsys.readouterr().err + captured = capsys.readouterr().err + assert "enrichment failed for item 1" in captured + # Partial-failure summary line is visible on stderr + assert "SUMMARY: enriched: 1, failed: 1" in captured + + +def test_api_executor_raises_when_all_items_fail(): + """An API key revocation / total outage must surface as non-zero exit, not + silent empty result. The CLI's _handle_cli_errors catches RuntimeError.""" + import pytest + from anthropic import APIError + + client = FakeAnthropic( + [ + APIError("401 unauthorized", request=None, body=None), + APIError("401 unauthorized", request=None, body=None), + ] + ) + ex = ApiExecutor(model="m", output_language="English", client=client) + with pytest.raises(RuntimeError, match="All 2 items failed enrichment"): + ex.enrich_items([_item("1"), _item("2")], VOCAB) + + +def test_api_executor_propagates_programmer_bugs(): + """`AttributeError` and other non-recoverable exceptions must NOT be + swallowed — the developer needs to see the traceback. + """ + import pytest + + class _Boom: + class messages: # noqa: N801 + @staticmethod + def create(*_args, **_kwargs): + raise AttributeError("programmer bug — undefined attribute") + + ex = ApiExecutor(model="m", output_language="English", client=_Boom()) + with pytest.raises(AttributeError, match="programmer bug"): + ex.enrich_items([_item("1")], VOCAB) + + +def test_api_executor_propagates_keyboard_interrupt(): + """Ctrl-C must NOT be swallowed by the recoverable-errors tuple. The + narrow catch uses Exception subclasses; KeyboardInterrupt inherits from + BaseException and falls through. This is the property of Python, but + pin it as a regression test — a future refactor that switches the + tuple to BaseException would silently break Ctrl-C without any failing + test. + """ + import pytest + + class _CtrlC: + class messages: # noqa: N801 + @staticmethod + def create(*_args, **_kwargs): + raise KeyboardInterrupt + + ex = ApiExecutor(model="m", output_language="English", client=_CtrlC()) + with pytest.raises(KeyboardInterrupt): + ex.enrich_items([_item("1")], VOCAB) + + +def test_api_executor_emits_no_summary_on_total_failure(capsys): + """The all-failed branch raises before printing the summary — there is + no `SUMMARY: enriched: 0, ...` line on a total-failure run. The raised + RuntimeError is the signal.""" + import pytest + from anthropic import APIError + + client = FakeAnthropic([APIError("503", request=None, body=None)]) + ex = ApiExecutor(model="m", output_language="English", client=client) + with pytest.raises(RuntimeError): + ex.enrich_items([_item("1")], VOCAB) + assert "SUMMARY:" not in capsys.readouterr().err + + +def test_api_executor_emits_no_summary_when_all_succeed(capsys): + """No failures, no noise: a clean batch stays silent on stderr.""" + payload = {"summary": "r", "primary_topic": "misc", "topics": ["misc"]} + client = FakeAnthropic([payload]) + ex = ApiExecutor(model="m", output_language="English", client=client) + ex.enrich_items([_item("1")], VOCAB) + assert "SUMMARY:" not in capsys.readouterr().err diff --git a/tests/test_topic_synth.py b/tests/test_topic_synth.py index d2e0a40..67d4fdd 100644 --- a/tests/test_topic_synth.py +++ b/tests/test_topic_synth.py @@ -54,14 +54,64 @@ def test_synthesize_overviews_api_skips_an_invalid_judgment(): assert [r.slug for r in results] == ["good"] -def test_synthesize_overviews_api_isolates_an_api_error(): - client = FakeAnthropic([RuntimeError("API caída"), {"overview": "ok", "notes": []}]) +def test_synthesize_overviews_api_isolates_an_api_error(capsys): + from anthropic import APIError + + client = FakeAnthropic( + [ + APIError("API caída", request=None, body=None), + {"overview": "ok", "notes": []}, + ] + ) inputs = [ TopicInput(slug="first", description="d", summaries=["s"]), TopicInput(slug="second", description="d", summaries=["s"]), ] results = synthesize_overviews_api(inputs, model="m", output_language="English", client=client) assert [r.slug for r in results] == ["second"] + # Partial-failure summary line is visible on stderr + assert "SUMMARY: synthesized: 1, failed: 1" in capsys.readouterr().err + + +def test_synthesize_overviews_api_raises_when_all_topics_fail(): + """Total failure (every API call raises) must surface non-zero exit, not + silent empty result. The CLI's _handle_cli_errors catches RuntimeError.""" + import pytest + from anthropic import APIError + + client = FakeAnthropic( + [ + APIError("401 unauthorized", request=None, body=None), + APIError("401 unauthorized", request=None, body=None), + ] + ) + inputs = [ + TopicInput(slug="a", description="d", summaries=["s"]), + TopicInput(slug="b", description="d", summaries=["s"]), + ] + with pytest.raises(RuntimeError, match="All 2 topics failed synthesis"): + synthesize_overviews_api(inputs, model="m", output_language="English", client=client) + + +def test_synthesize_overviews_api_propagates_programmer_bugs(): + """`AttributeError` and other non-recoverable exceptions must NOT be + swallowed — the developer needs to see the traceback. + """ + import pytest + + class _Boom: + class messages: # noqa: N801 + @staticmethod + def create(*_args, **_kwargs): + raise AttributeError("programmer bug — undefined attribute") + + with pytest.raises(AttributeError, match="programmer bug"): + synthesize_overviews_api( + [TopicInput(slug="x", description="d", summaries=["s"])], + model="m", + output_language="English", + client=_Boom(), + ) def test_overview_judgment_is_a_model(): @@ -129,3 +179,53 @@ def test_apply_overview_judgments_rejects_a_non_dict_entry(): valid, invalid = apply_overview_judgments(judgments) assert [j.slug for j in valid] == ["good"] assert any("not a JSON object" in e for e in invalid[0][1]) + + +def test_synthesize_overviews_api_propagates_keyboard_interrupt(): + """Ctrl-C must propagate — falls through the narrow catch because + KeyboardInterrupt inherits from BaseException, not Exception. Regression + test against a future refactor that widens the catch. + """ + import pytest + + class _CtrlC: + class messages: # noqa: N801 + @staticmethod + def create(*_args, **_kwargs): + raise KeyboardInterrupt + + with pytest.raises(KeyboardInterrupt): + synthesize_overviews_api( + [TopicInput(slug="x", description="d", summaries=["s"])], + model="m", + output_language="English", + client=_CtrlC(), + ) + + +def test_synthesize_overviews_api_emits_no_summary_on_total_failure(capsys): + """The all-failed branch raises before printing the summary.""" + import pytest + from anthropic import APIError + + client = FakeAnthropic([APIError("503", request=None, body=None)]) + with pytest.raises(RuntimeError): + synthesize_overviews_api( + [TopicInput(slug="x", description="d", summaries=["s"])], + model="m", + output_language="English", + client=client, + ) + assert "SUMMARY:" not in capsys.readouterr().err + + +def test_synthesize_overviews_api_emits_no_summary_when_all_succeed(capsys): + """No failures, no noise: a clean batch stays silent on stderr.""" + client = FakeAnthropic([{"overview": "ok", "notes": []}]) + synthesize_overviews_api( + [TopicInput(slug="x", description="d", summaries=["s"])], + model="m", + output_language="English", + client=client, + ) + assert "SUMMARY:" not in capsys.readouterr().err