diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 20b6235..b62164d 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -395,6 +395,7 @@ These are the rules the rest of the architecture rests on. Breaking any of them 6. **`fetch` is cached per item id.** Re-runs do not re-hit the network without `--force` (or, in the future, transient-retry — issue #19). 7. **Operation names, not query ids.** The extractor anchors to X GraphQL operation names because X rotates the ids. Anything that hardcodes an id will break. 8. **Destructive ops are reversible.** Every command that overwrites a `data/` artifact (`vocab --regenerate`, `topics --resynth`, `fetch --force`) snapshots `data/` first to `data/snapshots/-pre-/`. `xbrain snapshot restore ` is the recovery path. A snapshot failure aborts the destructive op. +9. **Fetch records are tagged unions.** A `ContentSource` on `items.json` is either a `Success` (with required `text`) or a `Failure` (with required `failure_reason`). Mixed shapes are not representable — pydantic rejects them at construction, and mypy rejects them statically (via the `pydantic.mypy` plugin). Legacy records with `ok: bool` (pre-#20) are normalised on read by a `BeforeValidator` on the union, so existing `data/items.json` files keep working without a manual migration. The static contract is pinned by `tests/type_probes/illegal_states.py`. --- diff --git a/pyproject.toml b/pyproject.toml index 0b3f3ad..8f001e1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,6 +51,17 @@ target-version = "py312" python_version = "3.12" ignore_missing_imports = true files = ["src/xbrain"] +# Pydantic mypy plugin turns model __init__ into a typed signature so +# mypy can flag missing required fields (e.g. ContentSourceSuccess +# without `text`). The #20 refactor relies on this to make illegal +# states unrepresentable at the type level — verified by the +# tests/type_probes/illegal_states.py probe. +plugins = ["pydantic.mypy"] + +[tool.pydantic-mypy] +init_forbid_extra = true +init_typed = true +warn_required_dynamic_aliases = true [tool.interrogate] ignore-init-method = true diff --git a/src/xbrain/executors/api.py b/src/xbrain/executors/api.py index 8c56c01..c6175e8 100644 --- a/src/xbrain/executors/api.py +++ b/src/xbrain/executors/api.py @@ -12,7 +12,7 @@ from xbrain.executors.base import EnrichmentJudgment from xbrain.llm_json import json_from_response -from xbrain.models import Item, Topic +from xbrain.models import ContentSourceSuccess, Item, Topic from xbrain.rubrics import ARTICLE_CHAR_LIMIT, load_rubric _MAX_TOKENS = 600 @@ -57,8 +57,9 @@ def _user_prompt(item: Item, vocab: list[Topic]) -> str: ] parts += [f"- {ln.url} (domain: {ln.domain})" for ln in item.links] if item.content and item.content.sources: + # Narrow to the success variant — only those carry `title`/`text`. for src in item.content.sources: - if src.ok and src.text: + if isinstance(src, ContentSourceSuccess) and src.text: parts += [ "", f"Linked article ({src.title or src.url}):", diff --git a/src/xbrain/extract/threads.py b/src/xbrain/extract/threads.py index 6c72b2f..38dd68a 100644 --- a/src/xbrain/extract/threads.py +++ b/src/xbrain/extract/threads.py @@ -9,7 +9,13 @@ from xbrain.extract.browser import is_logged_out, x_context from xbrain.extract.graphql import parse_tweets -from xbrain.models import Content, ContentSource, Item +from xbrain.models import ( + Content, + ContentSource, + ContentSourceFailure, + ContentSourceSuccess, + Item, +) _SETTLE_MS = 4000 @@ -42,16 +48,17 @@ def expand_threads(store: dict[str, Item], storage_state_path: Path, force: bool with x_context(storage_state_path) as context: for item in pending: text = _fetch_thread_text(context, item) - _attach_thread( - item, - ContentSource( + source: ContentSource + if text: + source = ContentSourceSuccess(kind="thread", url=item.url, text=text) + else: + source = ContentSourceFailure( kind="thread", url=item.url, - text=text or None, - ok=bool(text), - error=None if text else "No se pudo recuperar el hilo.", - ), - ) + failure_reason="empty_content", + error="No se pudo recuperar el hilo.", + ) + _attach_thread(item, source) return len(pending) diff --git a/src/xbrain/fetch.py b/src/xbrain/fetch.py index bcccc68..0fb0723 100644 --- a/src/xbrain/fetch.py +++ b/src/xbrain/fetch.py @@ -4,6 +4,11 @@ records *structured evidence* (HTTP status + a categorised reason) so a broken link is demonstrable, not assumed (design §4). x.com links are skipped here — they are handled by `xbrain.fetch_x`. + +`FetchResult` is the in-memory return type of the extractors and is a tagged +union (`FetchSuccess | FetchFailure`) so callers cannot accidentally read a +success-only field off a failure record (or vice versa). The persisted +`ContentSource` is itself a tagged union — see `xbrain.models`. """ from __future__ import annotations @@ -13,14 +18,21 @@ import socket import urllib.error import urllib.request -from dataclasses import dataclass from datetime import datetime, timezone -from typing import Callable +from typing import Callable, Union from urllib.parse import urlparse import trafilatura +from pydantic import BaseModel -from xbrain.models import Content, ContentSource, FailureReason, Item +from xbrain.models import ( + Content, + ContentSource, + ContentSourceFailure, + ContentSourceSuccess, + FailureReason, + Item, +) _X_HOSTS = {"x.com", "www.x.com", "twitter.com", "www.twitter.com", "mobile.twitter.com"} _UA = "Mozilla/5.0 (compatible; XBrain/1.0)" @@ -29,18 +41,40 @@ _FIRECRAWL_TIMEOUT = 60 -@dataclass -class FetchResult: - """The structured outcome of one content-extraction attempt.""" +class FetchSuccess(BaseModel): + """The successful outcome of one content-extraction attempt. + + `text` is required: a success without text is not a success. The type + system enforces this — callers do not need to defensively check for + `result.text is not None`. + """ title: str | None = None - text: str | None = None + text: str http_status: int | None = None + attempts: int = 1 + + +class FetchFailure(BaseModel): + """The failed outcome of one content-extraction attempt. + + `failure_reason` may be `None` when the failure has not yet been + categorised (e.g. an uncaught network error). Callers that persist the + result fall back to ``"unknown_error"`` — a transient bucket — to satisfy + the required field on `ContentSourceFailure` while preserving the + "uncategorised = retry-worthy" invariant from #19. + """ + failure_reason: FailureReason | None = None error: str | None = None + http_status: int | None = None attempts: int = 1 +# The in-memory FetchResult — internal to this module. Never persisted (the +# wire format is `ContentSource`, which has its own discriminator). Callers +# narrow via `isinstance(result, FetchSuccess)` / `isinstance(result, FetchFailure)`. +FetchResult = Union[FetchSuccess, FetchFailure] ArticleExtractor = Callable[[str], FetchResult] @@ -107,10 +141,10 @@ def trafilatura_extract( downloaded = fetch(url) if downloaded is None: status, reason, error = prober(url) - return FetchResult(http_status=status, failure_reason=reason, error=error, attempts=1) + return FetchFailure(http_status=status, failure_reason=reason, error=error, attempts=1) text = extract(downloaded) if not text: - return FetchResult( + return FetchFailure( http_status=200, failure_reason="empty_content", error="La página se descargó pero no tiene un artículo extraíble.", @@ -118,7 +152,7 @@ def trafilatura_extract( ) metadata = trafilatura.extract_metadata(downloaded) title = metadata.title if metadata else None - return FetchResult(title=title, text=text, http_status=200, attempts=1) + return FetchSuccess(title=title, text=text, http_status=200, attempts=1) def _firecrawl_extract( @@ -143,9 +177,9 @@ def _firecrawl_extract( with opener(req, timeout=_FIRECRAWL_TIMEOUT) as resp: payload = json.loads(resp.read()) except (urllib.error.URLError, TimeoutError, OSError, ValueError) as exc: - return FetchResult(error=f"Firecrawl falló: {exc}", attempts=1) + return FetchFailure(error=f"Firecrawl falló: {exc}", attempts=1) if payload.get("success") is False or payload.get("error"): - return FetchResult( + return FetchFailure( error=f"Firecrawl falló: {payload.get('error') or 'respuesta de error'}", attempts=1, ) @@ -153,9 +187,11 @@ def _firecrawl_extract( text = data.get("markdown") if text: title = (data.get("metadata") or {}).get("title") - return FetchResult(title=title, text=text, http_status=200, attempts=1) - return FetchResult( - failure_reason="empty_content", error="Firecrawl no devolvió contenido.", attempts=1 + return FetchSuccess(title=title, text=text, http_status=200, attempts=1) + return FetchFailure( + failure_reason="empty_content", + error="Firecrawl no devolvió contenido.", + attempts=1, ) @@ -165,22 +201,64 @@ def extract_article( primary: Callable = trafilatura_extract, firecrawl: Callable = _firecrawl_extract, ) -> FetchResult: - """Default extractor: trafilatura, then Firecrawl for JS-rendered pages.""" + """Default extractor: trafilatura, then Firecrawl for JS-rendered pages. + + Switches on the `FetchResult` variant: a `FetchSuccess` is returned + as-is. A `FetchFailure` whose `failure_reason` is in + ``{"js_required", "empty_content"}`` triggers the Firecrawl fallback; + anything else (404, dns, ...) is terminal and Firecrawl is not called. + """ result = primary(url) - if result.text: + if isinstance(result, FetchSuccess): return result + # mypy now narrows `result` to FetchFailure for the rest of this function. if result.failure_reason not in ("js_required", "empty_content"): - return result # a hard failure (404, dns, ...) — Firecrawl will not help + return result # hard failure (404, dns, ...) — Firecrawl will not help fallback = firecrawl(url) if fallback is None: return result # Firecrawl not configured — keep the original evidence - if fallback.text: - fallback.attempts = result.attempts + 1 - return fallback - result.attempts = result.attempts + 1 # both attempts exhausted; keep first evidence - if fallback.error: - result.error = f"{result.error or 'sin contenido'} | Firecrawl: {fallback.error}" - return result + if isinstance(fallback, FetchSuccess): + return fallback.model_copy(update={"attempts": result.attempts + 1}) + # Both attempts exhausted — merge evidence and bump the attempt counter. + merged_error = ( + f"{result.error or 'sin contenido'} | Firecrawl: {fallback.error}" + if fallback.error + else result.error + ) + return result.model_copy(update={"attempts": result.attempts + 1, "error": merged_error}) + + +def _content_source_from(url: str, result: FetchResult) -> ContentSource: + """Build the persisted `ContentSource` variant matching the fetch outcome. + + Maps `FetchSuccess` → `ContentSourceSuccess` and `FetchFailure` → + `ContentSourceFailure`. A failure without a categorised reason falls back + to ``"empty_content"`` — the catch-all bucket — because + `ContentSourceFailure.failure_reason` is a required field (the type + system says a failure without a reason is not demonstrable evidence). + The free-form `error` string still carries the original explanation. + """ + if isinstance(result, FetchSuccess): + return ContentSourceSuccess( + kind="external_article", + url=url, + title=result.title, + text=result.text, + http_status=result.http_status, + attempts=result.attempts, + ) + return ContentSourceFailure( + kind="external_article", + url=url, + error=result.error, + http_status=result.http_status, + # `unknown_error` (a transient bucket) — NOT `empty_content` (terminal). + # An uncategorised failure must stay self-healing on the next + # `fetch_pending` run, mirroring the #19 invariant that a missing + # `failure_reason` was retry-worthy by default. + failure_reason=result.failure_reason or "unknown_error", + attempts=result.attempts, + ) def fetch_item(item: Item, extractor: ArticleExtractor = extract_article) -> Content: @@ -198,39 +276,20 @@ def fetch_item(item: Item, extractor: ArticleExtractor = extract_article) -> Con result = extractor(url) except Exception as exc: # noqa: BLE001 - one bad URL must not abort the batch new_sources.append( - ContentSource( + ContentSourceFailure( kind="external_article", url=url, - ok=False, + # Transient bucket: an uncaught extractor exception is + # almost always something that may succeed on the next + # run (network blip, intermittent SSL, …). #19 relied on + # this being retry-worthy; preserve that. + failure_reason="unknown_error", error=f"Error al descargar el artículo: {exc}", attempts=1, ) ) continue - if result.text: - new_sources.append( - ContentSource( - kind="external_article", - url=url, - title=result.title, - text=result.text, - ok=True, - http_status=result.http_status, - attempts=result.attempts, - ) - ) - else: - new_sources.append( - ContentSource( - kind="external_article", - url=url, - ok=False, - error=result.error, - http_status=result.http_status, - failure_reason=result.failure_reason, - attempts=result.attempts, - ) - ) + new_sources.append(_content_source_from(url, result)) kept = [s for s in item.content.sources if s.kind != "external_article"] if item.content else [] return Content(fetched_at=datetime.now(timezone.utc), sources=kept + new_sources) @@ -238,7 +297,13 @@ def fetch_item(item: Item, extractor: ArticleExtractor = extract_article) -> Con # Failure reasons that justify an automatic retry on the next run. Everything # else (`not_found`, `forbidden`, `paywall`, `js_required`, `empty_content`) # is treated as terminal — only `--force` re-fetches those. -_TRANSIENT_FAILURES: frozenset[FailureReason] = frozenset({"timeout", "dns_error"}) +# +# `unknown_error` is the uncategorised-failure bucket — an extractor exception +# or any failure path that did not pin a specific reason. We retry by default: +# the alternative is silently classifying every uncaught failure as terminal, +# which would break the #19 invariant ("a failure without a categorised reason +# is anomalous; re-fetching gives it a chance to land on a known result"). +_TRANSIENT_FAILURES: frozenset[FailureReason] = frozenset({"timeout", "dns_error", "unknown_error"}) def _should_refetch(content: Content | None, force: bool) -> bool: @@ -246,16 +311,22 @@ def _should_refetch(content: Content | None, force: bool) -> bool: - `content is None` (never fetched) → True. - `force=True` → True regardless of recorded state. - - Otherwise, True only if every `external_article` source on `content` is - a failure whose `failure_reason` is in `_TRANSIENT_FAILURES` OR `None`. - `failure_reason=None` is treated as transient: an `ok=False` source - with no categorised reason is anomalous (pre-Fase-2 records, an - uncaught `extractor` exception path, …) — re-fetching gives that case - a chance to land on a categorised result rather than staying invisibly - stuck. A single successful source, or any *categorised* terminal - failure, skips. No `external_article` sources at all → skip (there is - nothing here for `fetch_pending`; the x.com sources are handled by - `fetch_x`). + - Otherwise, True only if every `external_article` source on `content` + is a `ContentSourceFailure` whose `failure_reason` is in + `_TRANSIENT_FAILURES`. A single successful source (any + `ContentSourceSuccess`) or a categorised terminal failure (e.g. + `not_found`, `paywall`) skips. No `external_article` sources at all + → skip (there is nothing here for `fetch_pending`; the x.com sources + are handled by `fetch_x`). + + The pre-#20 helper read `src.ok` / `src.failure_reason` as Optionals; + after the tagged-union refactor the variant `isinstance` check is the + single switch and `failure_reason` is required on the failure variant, + so no `is None` special-case is needed. Pre-#20 records that lacked a + categorised `failure_reason` are migrated to `timeout` by + `_normalise_legacy_content_source` (see `xbrain.models`) — they + therefore get one automatic retry under #19, matching the prior + behaviour without an extra branch here. """ if content is None: return True @@ -265,7 +336,7 @@ def _should_refetch(content: Content | None, force: bool) -> bool: if not external: return False return all( - (not src.ok) and (src.failure_reason is None or src.failure_reason in _TRANSIENT_FAILURES) + isinstance(src, ContentSourceFailure) and src.failure_reason in _TRANSIENT_FAILURES for src in external ) diff --git a/src/xbrain/fetch_x.py b/src/xbrain/fetch_x.py index c424024..bfb0d99 100644 --- a/src/xbrain/fetch_x.py +++ b/src/xbrain/fetch_x.py @@ -20,7 +20,13 @@ from xbrain.extract.browser import is_logged_out, x_context from xbrain.extract.graphql import parse_tweets from xbrain.fetch import is_x_url -from xbrain.models import Content, ContentSource, Item +from xbrain.models import ( + Content, + ContentSource, + ContentSourceFailure, + ContentSourceSuccess, + Item, +) _SETTLE_MS = 4000 _STATUS_RE = re.compile(r"/[^/]+/status/(\d+)") @@ -86,10 +92,9 @@ def on_response(response: Response) -> None: page.goto(url, wait_until="domcontentloaded") page.wait_for_timeout(_SETTLE_MS) except Exception: # noqa: BLE001 - navigation failure -> empty result - return ContentSource( + return ContentSourceFailure( kind="x_article", url=url, - ok=False, failure_reason="timeout", error="No se pudo cargar el tweet.", attempts=1, @@ -100,11 +105,10 @@ def on_response(response: Response) -> None: finally: page.close() if text: - return ContentSource(kind="x_article", url=url, text=text, ok=True, attempts=1) - return ContentSource( + return ContentSourceSuccess(kind="x_article", url=url, text=text, attempts=1) + return ContentSourceFailure( kind="x_article", url=url, - ok=False, failure_reason="empty_content", error="No se pudo recuperar el contenido del tweet.", attempts=1, @@ -119,10 +123,9 @@ def _fetch_rendered(context: BrowserContext, url: str) -> ContentSource: page.goto(url, wait_until="domcontentloaded") page.wait_for_timeout(_SETTLE_MS) except Exception: # noqa: BLE001 - navigation failure -> empty result - return ContentSource( + return ContentSourceFailure( kind="x_article", url=url, - ok=False, failure_reason="timeout", error="No se pudo cargar el artículo de X.", attempts=1, @@ -134,13 +137,12 @@ def _fetch_rendered(context: BrowserContext, url: str) -> ContentSource: page.close() text = trafilatura.extract(html) if text: - return ContentSource( - kind="x_article", url=url, text=text, ok=True, http_status=200, attempts=1 + return ContentSourceSuccess( + kind="x_article", url=url, text=text, http_status=200, attempts=1 ) - return ContentSource( + return ContentSourceFailure( kind="x_article", url=url, - ok=False, failure_reason="empty_content", error="No se pudo extraer el contenido del artículo de X.", attempts=1, diff --git a/src/xbrain/generate.py b/src/xbrain/generate.py index 9d4d421..15d9503 100644 --- a/src/xbrain/generate.py +++ b/src/xbrain/generate.py @@ -8,7 +8,13 @@ from xbrain.config import SUPPORTED_TOPIC_STYLES from xbrain.i18n import Strings, strings_for -from xbrain.models import Content, ContentSource, FailureReason, Item +from xbrain.models import ( + Content, + ContentSourceFailure, + ContentSourceSuccess, + FailureReason, + Item, +) from xbrain.notes_io import DEFAULT_TAIL, note_filename, slugify, title_of, user_tail, wrap logger = logging.getLogger(__name__) @@ -21,16 +27,20 @@ "dns_error": "dominio no resuelto", "js_required": "requiere JavaScript", "empty_content": "sin contenido extraíble", + "unknown_error": "error desconocido", } -def _broken_link_line(source: ContentSource, fetched_at: datetime) -> str: - """A one-line, human-readable record of a link that could not be fetched.""" +def _broken_link_line(source: ContentSourceFailure, fetched_at: datetime) -> str: + """A one-line, human-readable record of a link that could not be fetched. + + Accepts only the failure variant — the type system enforces that + `failure_reason` is present (no Optional check needed). + """ bits: list[str] = [] if source.http_status: bits.append(f"HTTP {source.http_status}") - if source.failure_reason: - bits.append(_FAILURE_ES.get(source.failure_reason, source.failure_reason)) + bits.append(_FAILURE_ES.get(source.failure_reason, source.failure_reason)) detail = " · ".join(bits) or "no se pudo recuperar" date = fetched_at.date().isoformat() return f"> ⚠ Enlace roto: <{source.url}> — {detail} (verificado {date})" @@ -148,13 +158,20 @@ def _enrichment_lines(item: Item, strings: Strings, topic_style: str) -> list[st def _content_lines(content: Content, strings: Strings) -> list[str]: - """Rendered article bodies + broken-link evidence for a fetched item.""" + """Rendered article bodies + broken-link evidence for a fetched item. + + Switches on the `ContentSource` variant: the success variant is + rendered as a content block; the failure variant is rendered as a + broken-link line *only* for external articles and X articles (a + failed thread fetch is silently elided, matching the pre-refactor + behaviour — `source.kind` is what guarded that path before). + """ lines: list[str] = [] for source in content.sources: - if source.ok and source.text: + if isinstance(source, ContentSourceSuccess): heading = source.title or source.url lines += [f"## {strings.content_header}: {heading}", "", source.text, ""] - elif not source.ok and source.kind in ("external_article", "x_article"): + elif source.kind in ("external_article", "x_article"): lines += [_broken_link_line(source, content.fetched_at), ""] return lines diff --git a/src/xbrain/models.py b/src/xbrain/models.py index 745fc87..c5b745b 100644 --- a/src/xbrain/models.py +++ b/src/xbrain/models.py @@ -2,10 +2,13 @@ from __future__ import annotations +import logging from datetime import datetime -from typing import Literal +from typing import Annotated, Any, Literal, Union -from pydantic import BaseModel, Field +from pydantic import BaseModel, BeforeValidator, Field, TypeAdapter + +logger = logging.getLogger(__name__) # The set of enrichment executor names — one source of truth shared by the # data model, the config loader and the enrichment phase. @@ -25,49 +28,188 @@ "dns_error", "js_required", "empty_content", + "unknown_error", # catch-all for uncategorised failures (e.g. an extractor + # exception we did not classify). Transient by default — `_should_refetch` + # in fetch.py treats it as retry-worthy on the next run, mirroring the + # pre-#20 behaviour where `failure_reason=None` meant transient. ] +# The set of content-source kinds — one source of truth shared by the data +# model, the fetch stage and the wiki renderer. +ContentKind = Literal["external_article", "x_article", "thread", "quoted_tweet"] + class Author(BaseModel): + """The X account that authored an item.""" + handle: str name: str class Link(BaseModel): + """One external URL extracted from an item's text.""" + url: str domain: str class Media(BaseModel): + """One photo or video attached to an item.""" + type: Literal["photo", "video"] url: str class ThreadInfo(BaseModel): + """Marker that an item is part of a multi-tweet thread.""" + is_thread: bool = True root_id: str position: int | None = None -class ContentSource(BaseModel): - kind: Literal["external_article", "x_article", "thread", "quoted_tweet"] +class ContentSourceSuccess(BaseModel): + """A fetched article whose body was successfully extracted. + + The success variant of the `ContentSource` tagged union. `text` is + required — a success without text is not a success — and the type + system enforces this at construction time. + """ + + outcome: Literal["success"] = "success" + kind: ContentKind url: str title: str | None = None - text: str | None = None - ok: bool = True + text: str + http_status: int | None = None + # extraction attempts: 1 = single pass, 2 = + Firecrawl fallback; + # 0 only on pre-Fase-2 records. + attempts: int = 0 + + +class ContentSourceFailure(BaseModel): + """A fetched article whose body could not be extracted. + + The failure variant of the `ContentSource` tagged union — structured + broken-link evidence so the wiki can render a ``⚠ Enlace roto`` line + rather than pretending the link was never there (design §4). + `failure_reason` is required: a failure without a reason is not + demonstrable evidence. + """ + + outcome: Literal["failure"] = "failure" + kind: ContentKind + url: str + failure_reason: FailureReason error: str | None = None http_status: int | None = None - failure_reason: FailureReason | None = None - # extraction attempts: 1 = single pass, 2 = + Firecrawl fallback; 0 only on pre-Fase-2 records + # extraction attempts: 1 = single pass, 2 = + Firecrawl fallback; + # 0 only on pre-Fase-2 records. attempts: int = 0 +def _normalise_legacy_content_source(value: Any) -> Any: + """Map the legacy ``{ok: bool, ...}`` shape to the tagged-union shape. + + Older ``data/items.json`` records (pre-#20) carry ``ok: True`` / + ``ok: False`` instead of ``outcome: "success"`` / ``outcome: "failure"``. + The mapping is one-to-one: + + - ``ok=True`` (success) → ``outcome="success"`` + - ``ok=False`` (failure) → ``outcome="failure"`` + + Records that already carry ``outcome`` are returned unchanged. Records + that have neither discriminator are rejected — silently inventing one + would mask data corruption. + + Fields irrelevant to the new variant (e.g. ``title`` / ``text`` on the + failure variant) are dropped during normalisation so the resulting dict + matches the variant's declared fields exactly. This is purely defensive + — extra fields on a pydantic model are ignored by default, but stripping + them up front keeps the on-the-wire shape clean once the record is + re-dumped. + """ + if not isinstance(value, dict): + return value + if "outcome" in value: + return value + if "ok" not in value: + # Include enough context to find the offending record in a big file. + url = value.get("url", "") + raise ValueError( + f"ContentSource record missing both 'outcome' and 'ok' " + f"discriminator (url={url!r}); the record cannot be safely " + "categorised as success or failure." + ) + payload = {k: v for k, v in value.items() if k != "ok"} + payload["outcome"] = "success" if value["ok"] else "failure" + if payload["outcome"] == "success": + # success has no failure_reason / error — drop if present so the + # re-dumped record is clean (pydantic ignores extras anyway). + payload.pop("failure_reason", None) + payload.pop("error", None) + else: + # failure has no title / text + payload.pop("title", None) + payload.pop("text", None) + # Legacy records sometimes recorded a failure (`ok=False`) with no + # categorised `failure_reason` (e.g. an HTTP 429 that the old code + # did not map). The new variant requires the field — bucket those + # under `unknown_error` (a transient retry-worthy reason added in + # the #20 review pass, see `xbrain.fetch._TRANSIENT_FAILURES`). + # `unknown_error` is preferable to `timeout` here because the + # actual cause is unknown — "timeout" would be a lie that hides + # 429s, SSL handshake failures, and other distinct error modes. + if payload.get("failure_reason") in (None, ""): + payload["failure_reason"] = "unknown_error" + logger.warning( + "Legacy ContentSource without failure_reason bucketed as " + "'unknown_error' (url=%s). The next `fetch_pending` run will " + "retry it; use `--force` to suppress the retry.", + value.get("url", ""), + ) + return payload + + +# The persisted ContentSource type — a discriminated union over the success +# and failure variants, wrapped in an outer `BeforeValidator` that normalises +# the legacy `ok: bool` records on read so existing `data/items.json` files +# keep working. +# +# The wrapping is layered on purpose: the `BeforeValidator` must run BEFORE +# pydantic dispatches on the `outcome` discriminator. If both annotations were +# on the same `Annotated`, the discriminator check would run first and reject +# legacy records that carry `ok` instead of `outcome`. The outer Annotated +# guarantees the right ordering. +_ContentSourceTagged = Annotated[ + Union[ContentSourceSuccess, ContentSourceFailure], + Field(discriminator="outcome"), +] +ContentSource = Annotated[ + _ContentSourceTagged, + BeforeValidator(_normalise_legacy_content_source), +] + + +# A TypeAdapter is the documented pydantic-v2 entry point for validating / +# dumping a discriminated-union *type alias* (since the alias itself is not a +# class with `.model_validate`). Tests use this; production code goes through +# `Item` and `Content` which carry the union as a field. +ContentSourceAdapter: TypeAdapter[Union[ContentSourceSuccess, ContentSourceFailure]] = TypeAdapter( + ContentSource +) + + class Content(BaseModel): + """The fetched article(s) attached to an item, with their fetch timestamp.""" + fetched_at: datetime sources: list[ContentSource] = Field(default_factory=list) class Enrichment(BaseModel): + """LLM-generated summary and topic assignment for an item.""" + enriched_at: datetime executor: ExecutorName summary: str | None = None @@ -99,6 +241,8 @@ class TopicPage(BaseModel): class Item(BaseModel): + """One captured X post (bookmark or own tweet) with all its derived data.""" + id: str source: SourceName url: str @@ -116,16 +260,22 @@ class Item(BaseModel): class SourceCursor(BaseModel): + """Per-source extractor cursor: where we left off last run.""" + last_seen_id: str | None = None last_run: datetime | None = None class ArchiveImport(BaseModel): + """Marker recording a one-off X archive import.""" + file: str at: datetime class State(BaseModel): + """Top-level extractor state persisted in `data/state.json`.""" + bookmarks: SourceCursor = Field(default_factory=SourceCursor) own_tweets: SourceCursor = Field(default_factory=SourceCursor) archive_imported: ArchiveImport | None = None diff --git a/src/xbrain/notes_io.py b/src/xbrain/notes_io.py index d3325d3..552d52b 100644 --- a/src/xbrain/notes_io.py +++ b/src/xbrain/notes_io.py @@ -54,10 +54,17 @@ def slugify(text: str) -> str: def title_of(item: Item) -> str: - """A display title for an item — a fetched article title, else the text.""" + """A display title for an item — a fetched article title, else the text. + + Only the success variant of `ContentSource` carries a `title` field; the + failure variant has no title, so the isinstance narrowing both satisfies + mypy and silently skips broken-link sources. + """ + from xbrain.models import ContentSourceSuccess + if item.content: for source in item.content.sources: - if source.title: + if isinstance(source, ContentSourceSuccess) and source.title: return source.title return item.text.replace("\n", " ")[:80] or item.id diff --git a/src/xbrain/worksheet.py b/src/xbrain/worksheet.py index bf0ecb8..35e657b 100644 --- a/src/xbrain/worksheet.py +++ b/src/xbrain/worksheet.py @@ -12,15 +12,21 @@ from datetime import datetime, timezone from pathlib import Path -from xbrain.models import Item, Topic +from xbrain.models import ContentSourceSuccess, Item, Topic from xbrain.rubrics import ARTICLE_CHAR_LIMIT, load_rubric def _article_text(item: Item) -> str | None: + """First successful fetched article body, truncated, or None. + + Only the success variant of `ContentSource` carries `text`. The + isinstance narrowing satisfies mypy and silently skips broken-link + sources, matching the pre-#20 ``src.ok and src.text`` behaviour. + """ if not item.content: return None for src in item.content.sources: - if src.ok and src.text: + if isinstance(src, ContentSourceSuccess) and src.text: return src.text[:ARTICLE_CHAR_LIMIT] return None diff --git a/tests/test_cli.py b/tests/test_cli.py index aff456d..fc00c1d 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -283,7 +283,7 @@ def test_cli_fetch_persists_partial_work_when_a_stage_raises(tmp_path, monkeypat # earlier stages already produced. This proves that `finally` contract. _setup_repo(tmp_path, monkeypatch) import xbrain.cli as cli - from xbrain.models import Content, ContentSource + from xbrain.models import Content, ContentSourceSuccess from xbrain.store import load_store save_store({"1": _linked_item("1")}, tmp_path / "data" / "items.json") @@ -294,11 +294,10 @@ def _fake_fetch_pending(store, *a, **k): store["1"].content = Content( fetched_at=datetime(2026, 5, 17, tzinfo=timezone.utc), sources=[ - ContentSource( + ContentSourceSuccess( kind="external_article", url="https://example.com/p", text="cuerpo", - ok=True, ) ], ) diff --git a/tests/test_fetch.py b/tests/test_fetch.py index fb274fd..759bf7b 100644 --- a/tests/test_fetch.py +++ b/tests/test_fetch.py @@ -4,7 +4,9 @@ from datetime import datetime, timezone from xbrain.fetch import ( + FetchFailure, FetchResult, + FetchSuccess, _categorize_url_error, _probe_status, _reason_for_status, @@ -13,7 +15,14 @@ fetch_pending, trafilatura_extract, ) -from xbrain.models import Author, Content, ContentSource, Item, Link +from xbrain.models import ( + Author, + Content, + ContentSourceFailure, + ContentSourceSuccess, + Item, + Link, +) def _item(item_id: str, urls: list[str]) -> Item: @@ -30,7 +39,7 @@ def _item(item_id: str, urls: list[str]) -> Item: def _fake_extractor(url: str) -> FetchResult: - return FetchResult(title="Título", text=f"cuerpo de {url}", http_status=200) + return FetchSuccess(title="Título", text=f"cuerpo de {url}", http_status=200) def test_reason_for_status_maps_http_codes(): @@ -64,9 +73,9 @@ def test_trafilatura_extract_success(): extract=lambda html: "el cuerpo", prober=lambda url: (None, None, ""), ) + assert isinstance(result, FetchSuccess) assert result.text == "el cuerpo" assert result.http_status == 200 - assert result.failure_reason is None def test_trafilatura_extract_empty_content_when_no_article(): @@ -76,7 +85,7 @@ def test_trafilatura_extract_empty_content_when_no_article(): extract=lambda html: None, prober=lambda url: (None, None, ""), ) - assert result.text is None + assert isinstance(result, FetchFailure) assert result.failure_reason == "empty_content" @@ -87,15 +96,17 @@ def test_trafilatura_extract_probes_when_download_fails(): extract=lambda html: None, prober=lambda url: (404, "not_found", "HTTP 404"), ) + assert isinstance(result, FetchFailure) assert result.http_status == 404 assert result.failure_reason == "not_found" def test_fetch_item_extracts_external_articles(): content = fetch_item(_item("1", ["https://example.com/p"]), _fake_extractor) - assert content.sources[0].kind == "external_article" - assert content.sources[0].ok is True - assert content.sources[0].text == "cuerpo de https://example.com/p" + source = content.sources[0] + assert isinstance(source, ContentSourceSuccess) + assert source.kind == "external_article" + assert source.text == "cuerpo de https://example.com/p" def test_fetch_item_skips_x_urls(): @@ -107,11 +118,12 @@ def test_fetch_item_skips_x_urls(): def test_fetch_item_records_failure_evidence(): content = fetch_item( _item("1", ["https://example.com/p"]), - lambda url: FetchResult(http_status=404, failure_reason="not_found", error="HTTP 404"), + lambda url: FetchFailure(http_status=404, failure_reason="not_found", error="HTTP 404"), ) - assert content.sources[0].ok is False - assert content.sources[0].http_status == 404 - assert content.sources[0].failure_reason == "not_found" + source = content.sources[0] + assert isinstance(source, ContentSourceFailure) + assert source.http_status == 404 + assert source.failure_reason == "not_found" def test_fetch_item_isolates_extractor_exception(): @@ -120,15 +132,16 @@ def _raising(url): content = fetch_item(_item("1", ["https://example.com/p"]), _raising) assert len(content.sources) == 1 - assert content.sources[0].ok is False - assert "boom" in content.sources[0].error + source = content.sources[0] + assert isinstance(source, ContentSourceFailure) + assert "boom" in (source.error or "") def test_fetch_item_preserves_non_external_sources_on_refetch(): item = _item("1", ["https://example.com/p"]) item.content = Content( fetched_at=datetime.now(timezone.utc), - sources=[ContentSource(kind="thread", url="u", text="hilo", ok=True)], + sources=[ContentSourceSuccess(kind="thread", url="u", text="hilo")], ) content = fetch_item(item, _fake_extractor) kinds = {s.kind for s in content.sources} @@ -206,54 +219,56 @@ def opener(req, timeout=0): return _Resp(json.dumps(payload).encode()) result = _firecrawl_extract("https://e.com/a", opener=opener) - assert result is not None + assert isinstance(result, FetchSuccess) assert result.text == "el cuerpo" assert result.title == "T" def test_extract_article_falls_back_to_firecrawl_on_js_required(): - from xbrain.fetch import FetchResult, extract_article + from xbrain.fetch import extract_article def primary(url): - return FetchResult(failure_reason="js_required", error="js", attempts=1) + return FetchFailure(failure_reason="js_required", error="js", attempts=1) def firecrawl(url): - return FetchResult(text="rescatado por firecrawl", attempts=1) + return FetchSuccess(text="rescatado por firecrawl", attempts=1) result = extract_article("https://e.com/a", primary=primary, firecrawl=firecrawl) + assert isinstance(result, FetchSuccess) assert result.text == "rescatado por firecrawl" assert result.attempts == 2 def test_extract_article_keeps_evidence_when_firecrawl_unavailable(): - from xbrain.fetch import FetchResult, extract_article + from xbrain.fetch import extract_article def primary(url): - return FetchResult(failure_reason="js_required", error="js", attempts=1) + return FetchFailure(failure_reason="js_required", error="js", attempts=1) def firecrawl(url): return None result = extract_article("https://e.com/a", primary=primary, firecrawl=firecrawl) - assert result.text is None + assert isinstance(result, FetchFailure) assert result.failure_reason == "js_required" assert result.attempts == 1 def test_extract_article_does_not_retry_hard_failures(): - from xbrain.fetch import FetchResult, extract_article + from xbrain.fetch import extract_article # A 404 is definitive — Firecrawl must not even be called. calls = [] def primary(url): - return FetchResult(http_status=404, failure_reason="not_found", attempts=1) + return FetchFailure(http_status=404, failure_reason="not_found", attempts=1) def firecrawl(url): calls.append(url) - return FetchResult(text="x") + return FetchSuccess(text="x") result = extract_article("https://e.com/a", primary=primary, firecrawl=firecrawl) + assert isinstance(result, FetchFailure) assert result.failure_reason == "not_found" assert calls == [] @@ -313,8 +328,7 @@ def opener(req, timeout=0): raise urllib.error.URLError("network down") result = _firecrawl_extract("https://e.com/a", opener=opener) - assert result is not None - assert result.text is None + assert isinstance(result, FetchFailure) assert "Firecrawl falló" in (result.error or "") @@ -330,23 +344,22 @@ def opener(req, timeout=0): return _CtxResp(json.dumps(payload).encode()) result = _firecrawl_extract("https://e.com/a", opener=opener) - assert result is not None - assert result.text is None + assert isinstance(result, FetchFailure) assert "rate limited" in (result.error or "") def test_extract_article_merges_firecrawl_error_when_both_fail(): - from xbrain.fetch import FetchResult, extract_article + from xbrain.fetch import extract_article def primary(url): - return FetchResult(failure_reason="js_required", error="js", attempts=1) + return FetchFailure(failure_reason="js_required", error="js", attempts=1) def firecrawl(url): # Firecrawl reachable but returned no text — carries its own evidence. - return FetchResult(error="Firecrawl no devolvió contenido.", attempts=1) + return FetchFailure(error="Firecrawl no devolvió contenido.", attempts=1) result = extract_article("https://e.com/a", primary=primary, firecrawl=firecrawl) - assert result.text is None + assert isinstance(result, FetchFailure) assert result.attempts == 2 assert "Firecrawl: Firecrawl no devolvió contenido." in (result.error or "") @@ -354,19 +367,22 @@ def firecrawl(url): # --------------------------------------------------------------------- #19: transient retry -def _content_with_source(*, ok: bool, failure_reason=None, kind="external_article", text=None): - """Helper: build a Content with a single ContentSource of the requested shape.""" +def _content_with_source(*, ok: bool, failure_reason="timeout", kind="external_article", text=None): + """Helper: build a Content with a single ContentSource of the requested shape. + + `ok=True` builds a `ContentSourceSuccess` (text required); `ok=False` + builds a `ContentSourceFailure` (failure_reason required, default + ``"timeout"`` — the transient bucket). + """ + if ok: + source = ContentSourceSuccess(kind=kind, url="https://example.com/p", text=text or "body") + else: + source = ContentSourceFailure( + kind=kind, url="https://example.com/p", failure_reason=failure_reason + ) return Content( fetched_at=datetime(2026, 5, 17, tzinfo=timezone.utc), - sources=[ - ContentSource( - kind=kind, - url="https://example.com/p", - ok=ok, - failure_reason=failure_reason, - text=text, - ) - ], + sources=[source], ) @@ -433,8 +449,8 @@ def test_should_skip_mixed_transient_and_terminal(): c = Content( fetched_at=datetime(2026, 5, 17, tzinfo=timezone.utc), sources=[ - ContentSource(kind="external_article", url="a", ok=False, failure_reason="timeout"), - ContentSource(kind="external_article", url="b", ok=False, failure_reason="not_found"), + ContentSourceFailure(kind="external_article", url="a", failure_reason="timeout"), + ContentSourceFailure(kind="external_article", url="b", failure_reason="not_found"), ], ) assert _should_refetch(c, force=False) is False @@ -445,8 +461,8 @@ def test_should_skip_mixed_transient_and_success(): c = Content( fetched_at=datetime(2026, 5, 17, tzinfo=timezone.utc), sources=[ - ContentSource(kind="external_article", url="a", ok=False, failure_reason="timeout"), - ContentSource(kind="external_article", url="b", ok=True, text="got it"), + ContentSourceFailure(kind="external_article", url="a", failure_reason="timeout"), + ContentSourceSuccess(kind="external_article", url="b", text="got it"), ], ) assert _should_refetch(c, force=False) is False @@ -457,10 +473,9 @@ def test_should_skip_only_x_sources(): c = Content( fetched_at=datetime(2026, 5, 17, tzinfo=timezone.utc), sources=[ - ContentSource( + ContentSourceFailure( kind="x_article", url="https://x.com/i/article/1", - ok=False, failure_reason="timeout", ), ], @@ -484,7 +499,8 @@ def test_fetch_pending_retries_timeout_without_force(): assert count == 1 # The retry overwrote the failure with a fresh, successful source src = store["1"].content.sources[0] - assert src.ok and src.text is not None + assert isinstance(src, ContentSourceSuccess) + assert src.text def test_fetch_pending_skips_not_found_without_force(): @@ -493,7 +509,8 @@ def test_fetch_pending_skips_not_found_without_force(): assert count == 0 # The recorded failure is preserved untouched src = store["1"].content.sources[0] - assert (not src.ok) and src.failure_reason == "not_found" + assert isinstance(src, ContentSourceFailure) + assert src.failure_reason == "not_found" def test_fetch_pending_force_refetches_not_found(): @@ -513,19 +530,38 @@ def test_fetch_pending_skips_transient_failures_outside_date_range(): ) assert count == 0 # And the recorded failure is preserved - assert store["1"].content.sources[0].failure_reason == "timeout" + src = store["1"].content.sources[0] + assert isinstance(src, ContentSourceFailure) + assert src.failure_reason == "timeout" # --- Additional fixes from PR #26 review pipeline --- -def test_should_refetch_uncategorized_failure_treated_as_transient(): - """A failure with `failure_reason=None` (default field, pre-Fase-2 records, - uncaught extractor exceptions) is treated as transient — re-fetching gives - those anomalous records a chance to land on a categorised result rather - than staying invisibly stuck under a default-skip policy. +def test_should_refetch_legacy_uncategorized_failure_migrates_to_transient(): + """A pre-#20 record with `ok=False, failure_reason=None` (anomalous — + uncategorised) is normalised on read to `failure_reason="unknown_error"` + by the legacy-shape validator, so the next `fetch_pending` run retries it + automatically. This preserves the #19 behaviour (uncategorised failures + get one auto-retry) under the new tagged-union shape, even though after + the refactor a new failure record can no longer have a `None` reason. """ - c = _content_with_source(ok=False, failure_reason=None) + from xbrain.models import ContentSourceAdapter + + src = ContentSourceAdapter.validate_python( + { + "kind": "external_article", + "url": "https://e.com/p", + "ok": False, + "failure_reason": None, + "error": "anomalous failure", + } + ) + assert isinstance(src, ContentSourceFailure) + # migrator picked the transient `unknown_error` bucket (not `timeout`, + # which would mislabel the actual cause). + assert src.failure_reason == "unknown_error" + c = Content(fetched_at=datetime(2026, 5, 17, tzinfo=timezone.utc), sources=[src]) assert _should_refetch(c, force=False) is True @@ -538,16 +574,14 @@ def test_should_refetch_external_transient_alongside_xcom_source(): c = Content( fetched_at=datetime(2026, 5, 17, tzinfo=timezone.utc), sources=[ - ContentSource( + ContentSourceFailure( kind="external_article", url="https://ext.com/a", - ok=False, failure_reason="timeout", ), - ContentSource( + ContentSourceFailure( kind="x_article", url="https://x.com/i/article/9", - ok=False, failure_reason="not_found", ), ], @@ -568,4 +602,48 @@ def test_fetch_pending_replaces_sources_does_not_append(): assert len(store["1"].content.sources) == 1 # And the lone source is now the fresh successful fetch src = store["1"].content.sources[0] - assert src.ok and src.text is not None + assert isinstance(src, ContentSourceSuccess) + assert src.text + + +def test_extractor_exception_persists_as_transient_failure(): + """An uncaught extractor exception must persist as a TRANSIENT failure + (`unknown_error`), so `_should_refetch` retries it on the next run. + + Regression guard: pre-#20 the bare-except in fetch_item wrote + `failure_reason=None`, and `_should_refetch` (post-#19) treated `None` + as transient. After #20 introduced the discriminated union, the field + became required — the temporary fix bucketed to `empty_content`, which + is TERMINAL and silently broke #19's invariant. This test pins the + correct behaviour: uncaught exceptions stay self-healing. + """ + + def _raising(url): + raise RuntimeError("network blip simulated") + + content = fetch_item(_item("1", ["https://example.com/p"]), _raising) + assert len(content.sources) == 1 + failure = content.sources[0] + assert isinstance(failure, ContentSourceFailure) + assert failure.failure_reason == "unknown_error" + assert "network blip" in failure.error + # And `_should_refetch` correctly retries on the next run + assert _should_refetch(content, force=False) is True + + +def test_content_source_from_uncategorised_failure_is_transient(): + """`_content_source_from` is the public helper that maps an in-memory + `FetchFailure` to a persisted `ContentSourceFailure`. A `FetchFailure` + with `failure_reason=None` must fall back to `unknown_error` (transient), + NOT `empty_content` (terminal). + """ + from xbrain.fetch import FetchFailure, _content_source_from + + failure_result = FetchFailure( + failure_reason=None, + error="something went wrong", + attempts=1, + ) + src = _content_source_from("https://example.com/p", failure_result) + assert isinstance(src, ContentSourceFailure) + assert src.failure_reason == "unknown_error" diff --git a/tests/test_fetch_x.py b/tests/test_fetch_x.py index 7c46d09..80e689d 100644 --- a/tests/test_fetch_x.py +++ b/tests/test_fetch_x.py @@ -2,7 +2,13 @@ from datetime import datetime, timezone from xbrain.fetch_x import _classify_x_url, _x_status_id, assemble_linked_thread, fetch_x_articles -from xbrain.models import Author, Content, ContentSource, Item, Link +from xbrain.models import ( + Author, + Content, + ContentSourceSuccess, + Item, + Link, +) def _item(item_id: str, urls: list[str]) -> Item: @@ -82,7 +88,7 @@ def test_fetch_x_articles_attaches_sources_via_injected_fetcher(): fetched = fetch_x_articles( store, storage_state_path=None, - link_fetcher=lambda url: ContentSource(kind="x_article", url=url, text="hilo", ok=True), + link_fetcher=lambda url: ContentSourceSuccess(kind="x_article", url=url, text="hilo"), ) assert fetched == 1 assert store["1"].content is not None @@ -103,7 +109,7 @@ def test_fetch_x_articles_skips_already_fetched_unless_forced(): store = {"1": _item("1", ["https://x.com/jack/status/9"])} def fetcher(url): - return ContentSource(kind="x_article", url=url, text="hilo", ok=True) + return ContentSourceSuccess(kind="x_article", url=url, text="hilo") assert fetch_x_articles(store, None, link_fetcher=fetcher) == 1 assert fetch_x_articles(store, None, link_fetcher=fetcher) == 0 @@ -129,7 +135,7 @@ def test_fetch_x_articles_respects_since_until(): storage_state_path=None, since=datetime(2026, 5, 1, tzinfo=timezone.utc), until=datetime(2026, 7, 1, tzinfo=timezone.utc), - link_fetcher=lambda url: ContentSource(kind="x_article", url=url, text="x", ok=True), + link_fetcher=lambda url: ContentSourceSuccess(kind="x_article", url=url, text="x"), ) assert count == 1 assert store["1"].content is None @@ -141,7 +147,7 @@ def test_fetch_x_articles_dedups_repeated_x_urls(): fetch_x_articles( store, storage_state_path=None, - link_fetcher=lambda url: ContentSource(kind="x_article", url=url, text="hilo", ok=True), + link_fetcher=lambda url: ContentSourceSuccess(kind="x_article", url=url, text="hilo"), ) sources = store["1"].content.sources assert len(sources) == 1 @@ -151,13 +157,13 @@ def test_fetch_x_articles_preserves_external_sources(): item = _item("1", ["https://x.com/jack/status/9"]) item.content = Content( fetched_at=datetime.now(timezone.utc), - sources=[ContentSource(kind="external_article", url="https://e.com", text="art", ok=True)], + sources=[ContentSourceSuccess(kind="external_article", url="https://e.com", text="art")], ) store = {"1": item} fetch_x_articles( store, None, - link_fetcher=lambda url: ContentSource(kind="x_article", url=url, text="x", ok=True), + link_fetcher=lambda url: ContentSourceSuccess(kind="x_article", url=url, text="x"), ) kinds = {s.kind for s in store["1"].content.sources} assert kinds == {"external_article", "x_article"} diff --git a/tests/test_generate.py b/tests/test_generate.py index a392468..49d586d 100644 --- a/tests/test_generate.py +++ b/tests/test_generate.py @@ -3,7 +3,15 @@ from pathlib import Path from xbrain.generate import generate -from xbrain.models import Author, Content, ContentSource, Enrichment, Item, Link +from xbrain.models import ( + Author, + Content, + ContentSourceFailure, + ContentSourceSuccess, + Enrichment, + Item, + Link, +) from xbrain.notes_io import slugify @@ -165,7 +173,6 @@ def test_note_renders_broken_link_evidence(tmp_path): from datetime import datetime, timezone from xbrain.generate import generate - from xbrain.models import Author, Content, ContentSource, Item, Link item = Item( id="1", @@ -179,10 +186,9 @@ def test_note_renders_broken_link_evidence(tmp_path): content=Content( fetched_at=datetime(2026, 5, 17, tzinfo=timezone.utc), sources=[ - ContentSource( + ContentSourceFailure( kind="external_article", url="https://dead.example.com/p", - ok=False, http_status=404, failure_reason="not_found", ) @@ -220,7 +226,7 @@ def _enriched_item(item_id: str = "9") -> Item: content=Content( fetched_at=datetime(2026, 5, 16, tzinfo=timezone.utc), sources=[ - ContentSource( + ContentSourceSuccess( kind="external_article", url="https://example.com/p", title="Article title", diff --git a/tests/test_models.py b/tests/test_models.py index 4f84f60..989555c 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -78,34 +78,160 @@ def test_item_has_optional_bookmark_folder(): assert Item(**base, bookmark_folder="AI papers").bookmark_folder == "AI papers" -def test_content_source_carries_failure_evidence(): - from xbrain.models import ContentSource +def test_content_source_failure_variant_carries_failure_evidence(): + from xbrain.models import ContentSourceFailure - src = ContentSource( + src = ContentSourceFailure( kind="external_article", url="https://example.com/x", - ok=False, http_status=404, failure_reason="not_found", attempts=2, error="HTTP 404", ) + assert src.outcome == "failure" assert src.http_status == 404 assert src.failure_reason == "not_found" assert src.attempts == 2 -def test_content_source_failure_fields_default_cleanly(): - # An old-shape dict (no failure fields) must still validate — the existing - # items.json predates these fields. - from xbrain.models import ContentSource +def test_content_source_loads_legacy_ok_true_shape(): + """A pre-#20 record with `ok=True` must read into the success variant.""" + from xbrain.models import ContentSourceAdapter, ContentSourceSuccess + + src = ContentSourceAdapter.validate_python( + { + "kind": "external_article", + "url": "https://e.com/x", + "ok": True, + "title": "T", + "text": "body", + "http_status": 200, + "failure_reason": None, + "error": None, + "attempts": 1, + } + ) + assert isinstance(src, ContentSourceSuccess) + assert src.outcome == "success" + assert src.text == "body" + assert src.http_status == 200 + + +def test_content_source_loads_legacy_ok_false_shape(): + """A pre-#20 record with `ok=False` must read into the failure variant.""" + from xbrain.models import ContentSourceAdapter, ContentSourceFailure + + src = ContentSourceAdapter.validate_python( + { + "kind": "external_article", + "url": "https://e.com/x", + "ok": False, + "title": None, + "text": None, + "http_status": 404, + "failure_reason": "not_found", + "error": "HTTP 404", + "attempts": 2, + } + ) + assert isinstance(src, ContentSourceFailure) + assert src.outcome == "failure" + assert src.failure_reason == "not_found" + assert src.error == "HTTP 404" + + +def test_content_source_dumps_with_outcome_discriminator(): + """A success variant dumps with `outcome: "success"` and NO `ok` field.""" + from xbrain.models import ContentSourceSuccess + + src = ContentSourceSuccess(kind="external_article", url="u", text="t") + dumped = src.model_dump(mode="json") + assert dumped["outcome"] == "success" + assert "ok" not in dumped + + +def test_content_source_failure_dumps_with_outcome_discriminator(): + """A failure variant dumps with `outcome: "failure"` and NO `ok` field.""" + from xbrain.models import ContentSourceFailure + + src = ContentSourceFailure(kind="external_article", url="u", failure_reason="not_found") + dumped = src.model_dump(mode="json") + assert dumped["outcome"] == "failure" + assert "ok" not in dumped + + +def test_content_source_rejects_record_with_neither_discriminator(): + """Silently inventing `outcome` would mask data corruption — reject loudly.""" + import pytest + from pydantic import ValidationError + + from xbrain.models import ContentSourceAdapter + + with pytest.raises(ValidationError): + ContentSourceAdapter.validate_python({"kind": "external_article", "url": "u"}) + + +def test_content_source_success_requires_text(): + """The whole point of the refactor: a success without text is a type error. + + Pydantic raises a ValidationError at construction; mypy raises an error + statically (see tests/test_type_safety.py). + """ + import pytest + from pydantic import ValidationError + + from xbrain.models import ContentSourceSuccess + + with pytest.raises(ValidationError): + ContentSourceSuccess(kind="external_article", url="u") # missing `text` + + +def test_content_source_failure_requires_failure_reason(): + """Symmetric: a failure without a `failure_reason` is not demonstrable evidence.""" + import pytest + from pydantic import ValidationError - src = ContentSource.model_validate( - {"kind": "external_article", "url": "https://e.com", "ok": True} + from xbrain.models import ContentSourceFailure + + with pytest.raises(ValidationError): + ContentSourceFailure(kind="external_article", url="u") # missing `failure_reason` + + +def test_content_source_legacy_failure_without_reason_buckets_as_transient(): + """A pre-#20 record with `ok=False, failure_reason=None` (e.g. HTTP 429 that + the old code did not categorise) migrates losslessly: the `error` text is + preserved, and `failure_reason` is bucketed under `unknown_error` (a + transient retry-worthy reason added in the #20 review pass) so: + + 1. The wiki still renders a broken-link line. + 2. The next `fetch_pending` run auto-retries the record (issue #19 + retries `timeout`/`dns_error`/`unknown_error`), giving it one chance + to land on a proper category rather than staying invisibly stuck. + + `unknown_error` is preferred over `timeout` for honesty — "timeout" + would mislabel 429s / SSL handshake failures / other distinct error + modes that the legacy code dumped without a reason. + """ + from xbrain.models import ContentSourceAdapter, ContentSourceFailure + + src = ContentSourceAdapter.validate_python( + { + "kind": "external_article", + "url": "https://e.com/throttled", + "ok": False, + "title": None, + "text": None, + "http_status": 429, + "failure_reason": None, + "error": "HTTP 429: Too Many Requests", + "attempts": 1, + } ) - assert src.http_status is None - assert src.failure_reason is None - assert src.attempts == 0 + assert isinstance(src, ContentSourceFailure) + assert src.failure_reason == "unknown_error" + assert src.error == "HTTP 429: Too Many Requests" + assert src.http_status == 429 def test_topic_page_model_round_trips(): diff --git a/tests/test_store.py b/tests/test_store.py index e29fc5c..44a6e07 100644 --- a/tests/test_store.py +++ b/tests/test_store.py @@ -75,3 +75,113 @@ def test_load_topic_pages_returns_empty_when_absent(tmp_path): from xbrain.store import load_topic_pages assert load_topic_pages(tmp_path / "missing.json") == {} + + +def test_load_store_migrates_legacy_content_source_records_in_place(tmp_path: Path): + """Pre-#20 `data/items.json` files use `ok: bool` on each ContentSource. + + `load_store` must read those records into the new tagged-union variants, + and the next `save_store` must persist them with the `outcome` + discriminator (no `ok` field). This is the load-bearing test for the + upgrade story: existing users do not need to run any migration command — + a single read/write cycle is enough. + """ + import json + + from xbrain.models import ContentSourceFailure, ContentSourceSuccess + + legacy_items = { + "1": { + "id": "1", + "source": "bookmark", + "url": "https://x.com/a/status/1", + "author": {"handle": "a", "name": "A"}, + "text": "t", + "created_at": "2026-05-10T00:00:00+00:00", + "captured_at": "2026-05-16T00:00:00+00:00", + "media": [], + "links": [], + "content": { + "fetched_at": "2026-05-17T00:00:00+00:00", + "sources": [ + { + "kind": "external_article", + "url": "https://e.com/good", + "ok": True, + "title": "T", + "text": "body", + "http_status": 200, + "failure_reason": None, + "error": None, + "attempts": 1, + }, + { + "kind": "external_article", + "url": "https://e.com/dead", + "ok": False, + "title": None, + "text": None, + "http_status": 404, + "failure_reason": "not_found", + "error": "HTTP 404", + "attempts": 2, + }, + ], + }, + } + } + path = tmp_path / "items.json" + path.write_text(json.dumps(legacy_items), encoding="utf-8") + + store = load_store(path) + sources = store["1"].content.sources + assert isinstance(sources[0], ContentSourceSuccess) + assert sources[0].text == "body" + assert sources[0].title == "T" + assert isinstance(sources[1], ContentSourceFailure) + assert sources[1].failure_reason == "not_found" + assert sources[1].error == "HTTP 404" + + save_store(store, path) + raw = json.loads(path.read_text(encoding="utf-8")) + persisted = raw["1"]["content"]["sources"] + assert persisted[0]["outcome"] == "success" + assert "ok" not in persisted[0] + assert "failure_reason" not in persisted[0] + assert persisted[1]["outcome"] == "failure" + assert "ok" not in persisted[1] + assert "title" not in persisted[1] + + # The post-migration file is itself idempotent under reload. + assert load_store(path) == store + + +def test_load_store_rejects_content_source_without_any_discriminator(tmp_path: Path): + """A record with neither `outcome` nor `ok` must fail loudly, not default.""" + import json + + import pytest + + legacy_items = { + "1": { + "id": "1", + "source": "bookmark", + "url": "https://x.com/a/status/1", + "author": {"handle": "a", "name": "A"}, + "text": "t", + "created_at": "2026-05-10T00:00:00+00:00", + "captured_at": "2026-05-16T00:00:00+00:00", + "media": [], + "links": [], + "content": { + "fetched_at": "2026-05-17T00:00:00+00:00", + "sources": [ + {"kind": "external_article", "url": "https://e.com/x"}, + ], + }, + } + } + path = tmp_path / "items.json" + path.write_text(json.dumps(legacy_items), encoding="utf-8") + with pytest.raises(Exception): # noqa: BLE001 - pydantic ValidationError wraps the ValueError + load_store(path) diff --git a/tests/test_type_safety.py b/tests/test_type_safety.py new file mode 100644 index 0000000..eae468e --- /dev/null +++ b/tests/test_type_safety.py @@ -0,0 +1,89 @@ +"""Static-type-safety regression test for the #20 tagged-union refactor. + +The whole point of the #20 refactor is *illegal states unrepresentable at +the type level*. This test shells out to ``mypy`` against the +``tests/type_probes/illegal_states.py`` probe and asserts the four +intended errors are reported. + +If a future edit weakens the type contract (e.g. a Union[..] sneaks in, +or a required field becomes Optional), one of these checks stops failing +under mypy and this test goes red — that is exactly the regression we want +to catch, because it would silently allow the class of bugs the refactor +is meant to prevent. +""" + +from __future__ import annotations + +import os +import subprocess +import sys +from pathlib import Path + + +PROBE = Path(__file__).parent / "type_probes" / "illegal_states.py" +REPO_ROOT = Path(__file__).resolve().parent.parent +SRC_DIR = REPO_ROOT / "src" + + +def test_illegal_states_probe_is_rejected_by_mypy(): + """Every annotation in the probe must produce one mypy error. + + Four illegal states, four errors expected: + 1. `ContentSourceSuccess(kind=..., url=...)` missing `text`. + 2. `ContentSourceFailure(kind=..., url=...)` missing `failure_reason`. + 3. Reading `.failure_reason` off a `ContentSourceSuccess`. + 4. Reading `.text` off a `ContentSourceFailure`. + """ + import pytest + + # Use the SAME Python interpreter that runs pytest — guarantees the + # mypy executable matches the project's venv and the pydantic plugin + # resolves. `shutil.which("mypy")` is unreliable: under `uv run` the + # venv's bin is on PATH but under bare `pytest` it might not be. + try: + import mypy # noqa: F401 - import probe only + except ImportError: + pytest.skip("mypy not installed — install dev deps") + + env = { + **os.environ, + "MYPYPATH": str(SRC_DIR), + } + result = subprocess.run( + [ + sys.executable, + "-m", + "mypy", + "--no-incremental", + "--show-error-codes", + "--config-file", + str(REPO_ROOT / "pyproject.toml"), + str(PROBE), + ], + capture_output=True, + text=True, + check=False, + cwd=str(REPO_ROOT), + env=env, + ) + + error_lines = [line for line in result.stdout.splitlines() if "error:" in line] + expected_errors = { + # call-arg: missing required field on construction + 'Missing named argument "text" for "ContentSourceSuccess"', + 'Missing named argument "failure_reason" for "ContentSourceFailure"', + # attr-defined: the field does not exist on the narrowed variant + '"ContentSourceSuccess" has no attribute "failure_reason"', + '"ContentSourceFailure" has no attribute "text"', + } + matched = {expected for expected in expected_errors if any(expected in e for e in error_lines)} + + assert matched == expected_errors, ( + f"mypy did not report the expected errors.\n" + f"matched: {matched}\n" + f"missing: {expected_errors - matched}\n" + f"actual stdout:\n{result.stdout}\n" + f"actual stderr:\n{result.stderr}" + ) + # And mypy must exit non-zero overall. + assert result.returncode != 0, "mypy returned 0 — illegal states slipped through" diff --git a/tests/type_probes/__init__.py b/tests/type_probes/__init__.py new file mode 100644 index 0000000..97612b8 --- /dev/null +++ b/tests/type_probes/__init__.py @@ -0,0 +1,6 @@ +"""Mypy probes — fixtures that exist only to be checked by mypy. + +These files are NOT picked up by pytest's normal discovery; they are +imported / type-checked by tests in `tests/test_type_safety.py` which +shells out to mypy and asserts the expected errors are reported. +""" diff --git a/tests/type_probes/illegal_states.py b/tests/type_probes/illegal_states.py new file mode 100644 index 0000000..37873d5 --- /dev/null +++ b/tests/type_probes/illegal_states.py @@ -0,0 +1,31 @@ +"""Mypy probe: every annotation below MUST be a static type error. + +Verified at runtime by ``tests/test_type_safety.py``, which shells out to +``mypy`` on this file and asserts the exact errors fire. The point of the +#20 refactor is *illegal states unrepresentable*; this probe is the only +test that proves the property keeps holding under future edits. + +DO NOT add ``# type: ignore`` to these — that would defeat the test. +""" + +from xbrain.models import ContentSourceFailure, ContentSourceSuccess + + +def missing_text_on_success() -> ContentSourceSuccess: + """`ContentSourceSuccess` requires `text` — mypy must flag the omission.""" + return ContentSourceSuccess(kind="external_article", url="x") + + +def missing_failure_reason_on_failure() -> ContentSourceFailure: + """`ContentSourceFailure` requires `failure_reason` — mypy must flag the omission.""" + return ContentSourceFailure(kind="external_article", url="x") + + +def cannot_read_failure_reason_off_success(src: ContentSourceSuccess) -> str: + """A `failure_reason` field does not exist on the success variant.""" + return src.failure_reason + + +def cannot_read_text_off_failure(src: ContentSourceFailure) -> str: + """A `text` field does not exist on the failure variant.""" + return src.text