Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,27 @@ The same executor model is used by `vocab` (with [`vocab.py`](src/xbrain/vocab.p

---

## Snapshot diffing

`xbrain diff <snap-a> [snap-b]` (default `snap-b` = live `data/`) compares two snapshot data directories and answers one question: **what moved between these two states?** Built on top of the snapshot lifecycle from issue #17 — without snapshots there is nothing to diff.

The module ([`src/xbrain/diff.py`](src/xbrain/diff.py)) is a **pure orchestrator**: the only I/O is the three loader calls at entry (`load_store`, `load_vocab`, `load_topic_pages`); everything else is in-memory pydantic dataclasses. The CLI is the only thing that touches `typer.echo` — `diff_snapshots` returns a `DiffReport` and lets the caller render it.

The report has four sections, each pinning a different axis of change:

- **Items** — how many items were reassigned (`primary_topic` differs between A and B, both sides enriched), top N most-frequent transitions (`ai-coding → software-engineering: 12 items`), `None → topic` rows when an item gained enrichment between the two snapshots.
- **Topics** — per-topic membership delta (added / removed / unchanged item ids), plus an overview-drift classification (`identical` / `similar` / `different` / `not_comparable`) using a pure-Python TF cosine over the two topic-page overview texts.
- **Vocab** — slugs added, slugs removed, count of unchanged slugs. Rename detection is out of scope for v1 (a `delta` of `+1` added and `+1` removed is the user's cue).
- **Summary** — top-level counts (items in both, enriched in both, reassigned, reassigned_pct, vocab churn, topic-page counts) — same fields the JSON-format consumers anchor on.

**Pure-Python TF cosine**, not embeddings, not TF-IDF. Two reasons: (1) zero new dependencies (no scikit-learn, no sentence-transformers); (2) IDF degenerates on N=2 documents anyway, so plain TF gives the same `identical / similar / different` bucketing without the noise. The tokenizer covers Latin-1 accented characters (`à-ÿ`) so Spanish / French overviews compare correctly. Topics with fewer than 5 members never trigger a growth flag — a 2→3 jump is 50% growth but statistically meaningless on a tiny topic.

**Output:** `--format text` (default, human-readable section blocks) or `--format json` (pydantic `model_dump_json`, stable schema for downstream consumers — the WS3 eval harness in issue #8 will read this).

`xbrain diff` is also the foundation for **drift monitoring** between runs: take a snapshot, re-enrich, diff. A jump in `reassigned_pct` on a small corpus change is a signal that the prompt or model output is unstable; that is the eval-by-comparison question WS3 will formalise.

---

## Invariants

These are the rules the rest of the architecture rests on. Breaking any of them produces silent data corruption or makes the system unreproducible.
Expand Down
42 changes: 40 additions & 2 deletions src/xbrain/executors/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from __future__ import annotations

import json
import sys

from xbrain.executors.base import EnrichmentJudgment
Expand All @@ -18,6 +19,26 @@
_MAX_TOKENS = 600


def _recoverable_errors() -> tuple[type[Exception], ...]:
"""Exception classes a per-item failure should swallow + log + continue on.

`anthropic.APIError` covers auth, rate-limit, server-side and network
errors the SDK normalises. `ValueError` covers validator rejections and
`pydantic.ValidationError` (a `ValueError` subclass in pydantic v2).
`json.JSONDecodeError` covers a malformed LLM response. `KeyError` covers
a response missing an expected field.

Lazy-imported because `anthropic` is an optional dependency in the test
environment (the client is faked).
"""
try:
from anthropic import APIError

return (APIError, ValueError, json.JSONDecodeError, KeyError)
except ImportError:
return (ValueError, json.JSONDecodeError, KeyError)


def _vocab_block(vocab: list[Topic]) -> str:
return "\n".join(f"- {t.slug}: {t.description}" for t in vocab)

Expand Down Expand Up @@ -82,7 +103,9 @@ def __init__(self, model: str, output_language: str, client=None):

def enrich_items(self, items: list[Item], vocab: list[Topic]) -> list[EnrichmentJudgment]:
system = _system_prompt(self._output_language)
recoverable = _recoverable_errors()
results: list[EnrichmentJudgment] = []
failures = 0
for item in items:
try:
response = self._client.messages.create(
Expand All @@ -105,12 +128,27 @@ def enrich_items(self, items: list[Item], vocab: list[Topic]) -> list[Enrichment
topics=list(judgment["topics"]),
)
)
except Exception as exc: # noqa: BLE001
except recoverable as exc:
# One transient/malformed response must not abort the batch:
# the item stays pending and is retried on the next run.
# the item stays pending and is retried on the next run. Note:
# programmer bugs (`AttributeError`, …) and `KeyboardInterrupt`
# are NOT in `recoverable` — they propagate so the developer
# sees the traceback and Ctrl-C still works.
failures += 1
print(
f"warn: enrichment failed for item {item.id}: {exc}",
file=sys.stderr,
)
continue
if items and not results and failures > 0:
raise RuntimeError(
f"All {failures} items failed enrichment; see warnings above for details."
)
if failures > 0:
# SUMMARY prefix so the line is distinguishable from the per-item
# `warn:` lines that precede it in a partial-failure batch.
print(
f"SUMMARY: enriched: {len(results)}, failed: {failures}",
file=sys.stderr,
)
return results
50 changes: 46 additions & 4 deletions src/xbrain/topic_synth.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,19 +77,49 @@ def _judgment_from_response(response, slug: str) -> OverviewJudgment:
)


def _recoverable_errors() -> tuple[type[Exception], ...]:
"""Exception classes a per-topic failure should swallow + log + continue on.

Mirrors `xbrain.executors.api._recoverable_errors`. See that helper for
rationale. `anthropic.APIError` is lazy-imported because `anthropic` is
optional in the test environment (the client is faked).
"""
try:
import json as _json
from anthropic import APIError

return (APIError, ValueError, _json.JSONDecodeError, KeyError)
except ImportError:
import json as _json

return (ValueError, _json.JSONDecodeError, KeyError)


def synthesize_overviews_api(
inputs: list[TopicInput],
model: str,
output_language: str,
client=None,
) -> list[OverviewJudgment]:
"""Synthesize topic overviews via the Anthropic API — one call per topic."""
"""Synthesize topic overviews via the Anthropic API — one call per topic.

A `RuntimeError` is raised when EVERY input topic fails — exposed as a
non-zero exit through the CLI's `_handle_cli_errors` wrapper. Partial
success (some topics synthesise, some fail) returns the successes and
prints a summary line to stderr.

Programmer bugs (`AttributeError`, ...) and `KeyboardInterrupt` are not
in the recoverable set — they propagate so the developer sees the
traceback and Ctrl-C still works.
"""
if client is None:
from anthropic import Anthropic # lazy: tests inject a fake

client = Anthropic() # reads ANTHROPIC_API_KEY from the environment
system = _system_prompt(output_language)
recoverable = _recoverable_errors()
results: list[OverviewJudgment] = []
failures = 0
for topic in inputs:
try:
response = client.messages.create(
Expand All @@ -99,14 +129,26 @@ def synthesize_overviews_api(
messages=[{"role": "user", "content": _user_prompt(topic)}],
)
results.append(_judgment_from_response(response, topic.slug))
except Exception as exc: # noqa: BLE001
# One failed topic must not abort the batch — it stays unsynthesized
# and is retried on the next run.
except recoverable as exc:
# One failed topic must not abort the batch — it stays
# unsynthesized and is retried on the next run.
failures += 1
print(
f"warn: topic synthesis failed for {topic.slug}: {exc}",
file=sys.stderr,
)
continue
if inputs and not results and failures > 0:
raise RuntimeError(
f"All {failures} topics failed synthesis; see warnings above for details."
)
if failures > 0:
# SUMMARY prefix so the line stands out from the per-topic `warn:`
# lines that precede it in a partial-failure batch.
print(
f"SUMMARY: synthesized: {len(results)}, failed: {failures}",
file=sys.stderr,
)
return results


Expand Down
87 changes: 85 additions & 2 deletions tests/test_executors_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,96 @@ def test_api_executor_skips_wrong_shape_response(capsys):

def test_api_executor_skips_item_on_api_failure(capsys):
# A transient API failure on one item must not abort the whole batch.
from anthropic import APIError

client = FakeAnthropic(
[
RuntimeError("503 service unavailable"),
APIError("503 service unavailable", request=None, body=None),
{"summary": "r", "primary_topic": "misc", "topics": ["misc"]},
]
)
ex = ApiExecutor(model="m", output_language="English", client=client)
out = ex.enrich_items([_item("1"), _item("2")], VOCAB)
assert {j.item_id for j in out} == {"2"}
assert "enrichment failed for item 1" in capsys.readouterr().err
captured = capsys.readouterr().err
assert "enrichment failed for item 1" in captured
# Partial-failure summary line is visible on stderr
assert "SUMMARY: enriched: 1, failed: 1" in captured


def test_api_executor_raises_when_all_items_fail():
"""An API key revocation / total outage must surface as non-zero exit, not
silent empty result. The CLI's _handle_cli_errors catches RuntimeError."""
import pytest
from anthropic import APIError

client = FakeAnthropic(
[
APIError("401 unauthorized", request=None, body=None),
APIError("401 unauthorized", request=None, body=None),
]
)
ex = ApiExecutor(model="m", output_language="English", client=client)
with pytest.raises(RuntimeError, match="All 2 items failed enrichment"):
ex.enrich_items([_item("1"), _item("2")], VOCAB)


def test_api_executor_propagates_programmer_bugs():
"""`AttributeError` and other non-recoverable exceptions must NOT be
swallowed — the developer needs to see the traceback.
"""
import pytest

class _Boom:
class messages: # noqa: N801
@staticmethod
def create(*_args, **_kwargs):
raise AttributeError("programmer bug — undefined attribute")

ex = ApiExecutor(model="m", output_language="English", client=_Boom())
with pytest.raises(AttributeError, match="programmer bug"):
ex.enrich_items([_item("1")], VOCAB)


def test_api_executor_propagates_keyboard_interrupt():
"""Ctrl-C must NOT be swallowed by the recoverable-errors tuple. The
narrow catch uses Exception subclasses; KeyboardInterrupt inherits from
BaseException and falls through. This is the property of Python, but
pin it as a regression test — a future refactor that switches the
tuple to BaseException would silently break Ctrl-C without any failing
test.
"""
import pytest

class _CtrlC:
class messages: # noqa: N801
@staticmethod
def create(*_args, **_kwargs):
raise KeyboardInterrupt

ex = ApiExecutor(model="m", output_language="English", client=_CtrlC())
with pytest.raises(KeyboardInterrupt):
ex.enrich_items([_item("1")], VOCAB)


def test_api_executor_emits_no_summary_on_total_failure(capsys):
"""The all-failed branch raises before printing the summary — there is
no `SUMMARY: enriched: 0, ...` line on a total-failure run. The raised
RuntimeError is the signal."""
import pytest
from anthropic import APIError

client = FakeAnthropic([APIError("503", request=None, body=None)])
ex = ApiExecutor(model="m", output_language="English", client=client)
with pytest.raises(RuntimeError):
ex.enrich_items([_item("1")], VOCAB)
assert "SUMMARY:" not in capsys.readouterr().err


def test_api_executor_emits_no_summary_when_all_succeed(capsys):
"""No failures, no noise: a clean batch stays silent on stderr."""
payload = {"summary": "r", "primary_topic": "misc", "topics": ["misc"]}
client = FakeAnthropic([payload])
ex = ApiExecutor(model="m", output_language="English", client=client)
ex.enrich_items([_item("1")], VOCAB)
assert "SUMMARY:" not in capsys.readouterr().err
Loading
Loading