Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ These are the rules the rest of the architecture rests on. Breaking any of them
6. **`fetch` is cached per item id.** Re-runs do not re-hit the network without `--force` (or, in the future, transient-retry — issue #19).
7. **Operation names, not query ids.** The extractor anchors to X GraphQL operation names because X rotates the ids. Anything that hardcodes an id will break.
8. **Destructive ops are reversible.** Every command that overwrites a `data/` artifact (`vocab --regenerate`, `topics --resynth`, `fetch --force`) snapshots `data/` first to `data/snapshots/<ts>-pre-<command>/`. `xbrain snapshot restore <name>` is the recovery path. A snapshot failure aborts the destructive op.
9. **Fetch records are tagged unions.** A `ContentSource` on `items.json` is either a `Success` (with required `text`) or a `Failure` (with required `failure_reason`). Mixed shapes are not representable — pydantic rejects them at construction, and mypy rejects them statically (via the `pydantic.mypy` plugin). Legacy records with `ok: bool` (pre-#20) are normalised on read by a `BeforeValidator` on the union, so existing `data/items.json` files keep working without a manual migration. The static contract is pinned by `tests/type_probes/illegal_states.py`.

---

Expand Down
11 changes: 11 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,17 @@ target-version = "py312"
python_version = "3.12"
ignore_missing_imports = true
files = ["src/xbrain"]
# Pydantic mypy plugin turns model __init__ into a typed signature so
# mypy can flag missing required fields (e.g. ContentSourceSuccess
# without `text`). The #20 refactor relies on this to make illegal
# states unrepresentable at the type level — verified by the
# tests/type_probes/illegal_states.py probe.
plugins = ["pydantic.mypy"]

[tool.pydantic-mypy]
init_forbid_extra = true
init_typed = true
warn_required_dynamic_aliases = true

[tool.interrogate]
ignore-init-method = true
Expand Down
5 changes: 3 additions & 2 deletions src/xbrain/executors/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from xbrain.executors.base import EnrichmentJudgment
from xbrain.llm_json import json_from_response
from xbrain.models import Item, Topic
from xbrain.models import ContentSourceSuccess, Item, Topic
from xbrain.rubrics import ARTICLE_CHAR_LIMIT, load_rubric

_MAX_TOKENS = 600
Expand Down Expand Up @@ -57,8 +57,9 @@ def _user_prompt(item: Item, vocab: list[Topic]) -> str:
]
parts += [f"- {ln.url} (domain: {ln.domain})" for ln in item.links]
if item.content and item.content.sources:
# Narrow to the success variant — only those carry `title`/`text`.
for src in item.content.sources:
if src.ok and src.text:
if isinstance(src, ContentSourceSuccess) and src.text:
parts += [
"",
f"Linked article ({src.title or src.url}):",
Expand Down
25 changes: 16 additions & 9 deletions src/xbrain/extract/threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@

from xbrain.extract.browser import is_logged_out, x_context
from xbrain.extract.graphql import parse_tweets
from xbrain.models import Content, ContentSource, Item
from xbrain.models import (
Content,
ContentSource,
ContentSourceFailure,
ContentSourceSuccess,
Item,
)

_SETTLE_MS = 4000

Expand Down Expand Up @@ -42,16 +48,17 @@ def expand_threads(store: dict[str, Item], storage_state_path: Path, force: bool
with x_context(storage_state_path) as context:
for item in pending:
text = _fetch_thread_text(context, item)
_attach_thread(
item,
ContentSource(
source: ContentSource
if text:
source = ContentSourceSuccess(kind="thread", url=item.url, text=text)
else:
source = ContentSourceFailure(
kind="thread",
url=item.url,
text=text or None,
ok=bool(text),
error=None if text else "No se pudo recuperar el hilo.",
),
)
failure_reason="empty_content",
error="No se pudo recuperar el hilo.",
)
_attach_thread(item, source)
return len(pending)


Expand Down
197 changes: 134 additions & 63 deletions src/xbrain/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
records *structured evidence* (HTTP status + a categorised reason) so a broken
link is demonstrable, not assumed (design §4). x.com links are skipped here —
they are handled by `xbrain.fetch_x`.

`FetchResult` is the in-memory return type of the extractors and is a tagged
union (`FetchSuccess | FetchFailure`) so callers cannot accidentally read a
success-only field off a failure record (or vice versa). The persisted
`ContentSource` is itself a tagged union — see `xbrain.models`.
"""

from __future__ import annotations
Expand All @@ -13,14 +18,21 @@
import socket
import urllib.error
import urllib.request
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Callable
from typing import Callable, Union
from urllib.parse import urlparse

import trafilatura
from pydantic import BaseModel

from xbrain.models import Content, ContentSource, FailureReason, Item
from xbrain.models import (
Content,
ContentSource,
ContentSourceFailure,
ContentSourceSuccess,
FailureReason,
Item,
)

_X_HOSTS = {"x.com", "www.x.com", "twitter.com", "www.twitter.com", "mobile.twitter.com"}
_UA = "Mozilla/5.0 (compatible; XBrain/1.0)"
Expand All @@ -29,18 +41,40 @@
_FIRECRAWL_TIMEOUT = 60


@dataclass
class FetchResult:
"""The structured outcome of one content-extraction attempt."""
class FetchSuccess(BaseModel):
"""The successful outcome of one content-extraction attempt.

`text` is required: a success without text is not a success. The type
system enforces this — callers do not need to defensively check for
`result.text is not None`.
"""

title: str | None = None
text: str | None = None
text: str
http_status: int | None = None
attempts: int = 1


class FetchFailure(BaseModel):
"""The failed outcome of one content-extraction attempt.

`failure_reason` may be `None` when the failure has not yet been
categorised (e.g. an uncaught network error). Callers that persist the
result fall back to ``"unknown_error"`` — a transient bucket — to satisfy
the required field on `ContentSourceFailure` while preserving the
"uncategorised = retry-worthy" invariant from #19.
"""

failure_reason: FailureReason | None = None
error: str | None = None
http_status: int | None = None
attempts: int = 1


# The in-memory FetchResult — internal to this module. Never persisted (the
# wire format is `ContentSource`, which has its own discriminator). Callers
# narrow via `isinstance(result, FetchSuccess)` / `isinstance(result, FetchFailure)`.
FetchResult = Union[FetchSuccess, FetchFailure]
ArticleExtractor = Callable[[str], FetchResult]


Expand Down Expand Up @@ -107,18 +141,18 @@ def trafilatura_extract(
downloaded = fetch(url)
if downloaded is None:
status, reason, error = prober(url)
return FetchResult(http_status=status, failure_reason=reason, error=error, attempts=1)
return FetchFailure(http_status=status, failure_reason=reason, error=error, attempts=1)
text = extract(downloaded)
if not text:
return FetchResult(
return FetchFailure(
http_status=200,
failure_reason="empty_content",
error="La página se descargó pero no tiene un artículo extraíble.",
attempts=1,
)
metadata = trafilatura.extract_metadata(downloaded)
title = metadata.title if metadata else None
return FetchResult(title=title, text=text, http_status=200, attempts=1)
return FetchSuccess(title=title, text=text, http_status=200, attempts=1)


def _firecrawl_extract(
Expand All @@ -143,19 +177,21 @@ def _firecrawl_extract(
with opener(req, timeout=_FIRECRAWL_TIMEOUT) as resp:
payload = json.loads(resp.read())
except (urllib.error.URLError, TimeoutError, OSError, ValueError) as exc:
return FetchResult(error=f"Firecrawl falló: {exc}", attempts=1)
return FetchFailure(error=f"Firecrawl falló: {exc}", attempts=1)
if payload.get("success") is False or payload.get("error"):
return FetchResult(
return FetchFailure(
error=f"Firecrawl falló: {payload.get('error') or 'respuesta de error'}",
attempts=1,
)
data = payload.get("data") or {}
text = data.get("markdown")
if text:
title = (data.get("metadata") or {}).get("title")
return FetchResult(title=title, text=text, http_status=200, attempts=1)
return FetchResult(
failure_reason="empty_content", error="Firecrawl no devolvió contenido.", attempts=1
return FetchSuccess(title=title, text=text, http_status=200, attempts=1)
return FetchFailure(
failure_reason="empty_content",
error="Firecrawl no devolvió contenido.",
attempts=1,
)


Expand All @@ -165,22 +201,64 @@ def extract_article(
primary: Callable = trafilatura_extract,
firecrawl: Callable = _firecrawl_extract,
) -> FetchResult:
"""Default extractor: trafilatura, then Firecrawl for JS-rendered pages."""
"""Default extractor: trafilatura, then Firecrawl for JS-rendered pages.

Switches on the `FetchResult` variant: a `FetchSuccess` is returned
as-is. A `FetchFailure` whose `failure_reason` is in
``{"js_required", "empty_content"}`` triggers the Firecrawl fallback;
anything else (404, dns, ...) is terminal and Firecrawl is not called.
"""
result = primary(url)
if result.text:
if isinstance(result, FetchSuccess):
return result
# mypy now narrows `result` to FetchFailure for the rest of this function.
if result.failure_reason not in ("js_required", "empty_content"):
return result # a hard failure (404, dns, ...) — Firecrawl will not help
return result # hard failure (404, dns, ...) — Firecrawl will not help
fallback = firecrawl(url)
if fallback is None:
return result # Firecrawl not configured — keep the original evidence
if fallback.text:
fallback.attempts = result.attempts + 1
return fallback
result.attempts = result.attempts + 1 # both attempts exhausted; keep first evidence
if fallback.error:
result.error = f"{result.error or 'sin contenido'} | Firecrawl: {fallback.error}"
return result
if isinstance(fallback, FetchSuccess):
return fallback.model_copy(update={"attempts": result.attempts + 1})
# Both attempts exhausted — merge evidence and bump the attempt counter.
merged_error = (
f"{result.error or 'sin contenido'} | Firecrawl: {fallback.error}"
if fallback.error
else result.error
)
return result.model_copy(update={"attempts": result.attempts + 1, "error": merged_error})


def _content_source_from(url: str, result: FetchResult) -> ContentSource:
"""Build the persisted `ContentSource` variant matching the fetch outcome.

Maps `FetchSuccess` → `ContentSourceSuccess` and `FetchFailure` →
`ContentSourceFailure`. A failure without a categorised reason falls back
to ``"empty_content"`` — the catch-all bucket — because
`ContentSourceFailure.failure_reason` is a required field (the type
system says a failure without a reason is not demonstrable evidence).
The free-form `error` string still carries the original explanation.
"""
if isinstance(result, FetchSuccess):
return ContentSourceSuccess(
kind="external_article",
url=url,
title=result.title,
text=result.text,
http_status=result.http_status,
attempts=result.attempts,
)
return ContentSourceFailure(
kind="external_article",
url=url,
error=result.error,
http_status=result.http_status,
# `unknown_error` (a transient bucket) — NOT `empty_content` (terminal).
# An uncategorised failure must stay self-healing on the next
# `fetch_pending` run, mirroring the #19 invariant that a missing
# `failure_reason` was retry-worthy by default.
failure_reason=result.failure_reason or "unknown_error",
attempts=result.attempts,
)


def fetch_item(item: Item, extractor: ArticleExtractor = extract_article) -> Content:
Expand All @@ -198,64 +276,57 @@ def fetch_item(item: Item, extractor: ArticleExtractor = extract_article) -> Con
result = extractor(url)
except Exception as exc: # noqa: BLE001 - one bad URL must not abort the batch
new_sources.append(
ContentSource(
ContentSourceFailure(
kind="external_article",
url=url,
ok=False,
# Transient bucket: an uncaught extractor exception is
# almost always something that may succeed on the next
# run (network blip, intermittent SSL, …). #19 relied on
# this being retry-worthy; preserve that.
failure_reason="unknown_error",
error=f"Error al descargar el artículo: {exc}",
attempts=1,
)
)
continue
if result.text:
new_sources.append(
ContentSource(
kind="external_article",
url=url,
title=result.title,
text=result.text,
ok=True,
http_status=result.http_status,
attempts=result.attempts,
)
)
else:
new_sources.append(
ContentSource(
kind="external_article",
url=url,
ok=False,
error=result.error,
http_status=result.http_status,
failure_reason=result.failure_reason,
attempts=result.attempts,
)
)
new_sources.append(_content_source_from(url, result))
kept = [s for s in item.content.sources if s.kind != "external_article"] if item.content else []
return Content(fetched_at=datetime.now(timezone.utc), sources=kept + new_sources)


# Failure reasons that justify an automatic retry on the next run. Everything
# else (`not_found`, `forbidden`, `paywall`, `js_required`, `empty_content`)
# is treated as terminal — only `--force` re-fetches those.
_TRANSIENT_FAILURES: frozenset[FailureReason] = frozenset({"timeout", "dns_error"})
#
# `unknown_error` is the uncategorised-failure bucket — an extractor exception
# or any failure path that did not pin a specific reason. We retry by default:
# the alternative is silently classifying every uncaught failure as terminal,
# which would break the #19 invariant ("a failure without a categorised reason
# is anomalous; re-fetching gives it a chance to land on a known result").
_TRANSIENT_FAILURES: frozenset[FailureReason] = frozenset({"timeout", "dns_error", "unknown_error"})


def _should_refetch(content: Content | None, force: bool) -> bool:
"""Return True if `fetch_pending` should (re)fetch this item.

- `content is None` (never fetched) → True.
- `force=True` → True regardless of recorded state.
- Otherwise, True only if every `external_article` source on `content` is
a failure whose `failure_reason` is in `_TRANSIENT_FAILURES` OR `None`.
`failure_reason=None` is treated as transient: an `ok=False` source
with no categorised reason is anomalous (pre-Fase-2 records, an
uncaught `extractor` exception path, …) — re-fetching gives that case
a chance to land on a categorised result rather than staying invisibly
stuck. A single successful source, or any *categorised* terminal
failure, skips. No `external_article` sources at all → skip (there is
nothing here for `fetch_pending`; the x.com sources are handled by
`fetch_x`).
- Otherwise, True only if every `external_article` source on `content`
is a `ContentSourceFailure` whose `failure_reason` is in
`_TRANSIENT_FAILURES`. A single successful source (any
`ContentSourceSuccess`) or a categorised terminal failure (e.g.
`not_found`, `paywall`) skips. No `external_article` sources at all
→ skip (there is nothing here for `fetch_pending`; the x.com sources
are handled by `fetch_x`).

The pre-#20 helper read `src.ok` / `src.failure_reason` as Optionals;
after the tagged-union refactor the variant `isinstance` check is the
single switch and `failure_reason` is required on the failure variant,
so no `is None` special-case is needed. Pre-#20 records that lacked a
categorised `failure_reason` are migrated to `timeout` by
`_normalise_legacy_content_source` (see `xbrain.models`) — they
therefore get one automatic retry under #19, matching the prior
behaviour without an extra branch here.
"""
if content is None:
return True
Expand All @@ -265,7 +336,7 @@ def _should_refetch(content: Content | None, force: bool) -> bool:
if not external:
return False
return all(
(not src.ok) and (src.failure_reason is None or src.failure_reason in _TRANSIENT_FAILURES)
isinstance(src, ContentSourceFailure) and src.failure_reason in _TRANSIENT_FAILURES
for src in external
)

Expand Down
Loading
Loading