From db176f4111ef2b8b0c79991f1ca28e4e349646b3 Mon Sep 17 00:00:00 2001 From: Jigar <1252580+artvandelay@users.noreply.github.com> Date: Sat, 10 Jan 2026 00:52:14 -0800 Subject: [PATCH] Add MediaWiki diff enrichment worker --- services/__init__.py | 1 + services/enrich/__init__.py | 5 + services/enrich/diff_parser.py | 97 ++++++++++++++++++++ services/enrich/http_client.py | 68 ++++++++++++++ services/enrich/metadata.py | 63 +++++++++++++ services/enrich/storage.py | 158 ++++++++++++++++++++++++++++++++ services/enrich/worker.py | 87 ++++++++++++++++++ tests/fixtures/sample_diff.html | 15 +++ tests/test_diff_parser.py | 29 ++++++ 9 files changed, 523 insertions(+) create mode 100644 services/__init__.py create mode 100644 services/enrich/__init__.py create mode 100644 services/enrich/diff_parser.py create mode 100644 services/enrich/http_client.py create mode 100644 services/enrich/metadata.py create mode 100644 services/enrich/storage.py create mode 100644 services/enrich/worker.py create mode 100644 tests/fixtures/sample_diff.html create mode 100644 tests/test_diff_parser.py diff --git a/services/__init__.py b/services/__init__.py new file mode 100644 index 0000000..ec8b54c --- /dev/null +++ b/services/__init__.py @@ -0,0 +1 @@ +"""Service layer for background workers.""" diff --git a/services/enrich/__init__.py b/services/enrich/__init__.py new file mode 100644 index 0000000..8119eec --- /dev/null +++ b/services/enrich/__init__.py @@ -0,0 +1,5 @@ +"""Worker service for MediaWiki diff enrichment.""" + +from .worker import EnrichWorker + +__all__ = ["EnrichWorker"] diff --git a/services/enrich/diff_parser.py b/services/enrich/diff_parser.py new file mode 100644 index 0000000..8c18ff6 --- /dev/null +++ b/services/enrich/diff_parser.py @@ -0,0 +1,97 @@ +"""Parse MediaWiki HTML diffs into structured diff fragments.""" + +from __future__ import annotations + +from dataclasses import dataclass +from html.parser import HTMLParser +from typing import List, Optional + + +@dataclass +class DiffFragment: + added_text: str + removed_text: str + context: str + + +class _DiffHTMLParser(HTMLParser): + def __init__(self) -> None: + super().__init__() + self._rows: List[List[dict]] = [] + self._current_row: List[dict] = [] + self._current_cell: Optional[dict] = None + self._cell_text_parts: List[str] = [] + + @property + def rows(self) -> List[List[dict]]: + return self._rows + + def handle_starttag(self, tag: str, attrs: List[tuple]) -> None: + attrs_dict = dict(attrs) + if tag == "tr": + self._current_row = [] + elif tag == "td": + classes = attrs_dict.get("class", "") + self._current_cell = {"class": classes, "text": ""} + self._cell_text_parts = [] + elif tag in {"ins", "del", "span"}: + # Preserve inline changes text; we treat them as normal text nodes. + pass + + def handle_endtag(self, tag: str) -> None: + if tag == "td" and self._current_cell is not None: + text = "".join(self._cell_text_parts) + self._current_cell["text"] = " ".join(text.split()) + self._current_row.append(self._current_cell) + self._current_cell = None + self._cell_text_parts = [] + elif tag == "tr": + if self._current_row: + self._rows.append(self._current_row) + self._current_row = [] + + def handle_data(self, data: str) -> None: + if self._current_cell is not None: + self._cell_text_parts.append(data) + + +def parse_diff_html(html: str) -> List[DiffFragment]: + """Parse MediaWiki diff HTML and return structured diff fragments. + + The parser extracts added and removed text spans and associates them with + the most recent context line when available. + """ + parser = _DiffHTMLParser() + parser.feed(html) + + fragments: List[DiffFragment] = [] + last_context = "" + + for row in parser.rows: + context_in_row = None + added_text = "" + removed_text = "" + + for cell in row: + classes = cell.get("class", "") + text = cell.get("text", "") + if "diff-context" in classes and text: + context_in_row = text + if "diff-addedline" in classes: + added_text = text + if "diff-deletedline" in classes: + removed_text = text + + if context_in_row is not None: + last_context = context_in_row + + if added_text or removed_text: + fragments.append( + DiffFragment( + added_text=added_text, + removed_text=removed_text, + context=last_context, + ) + ) + + return fragments diff --git a/services/enrich/http_client.py b/services/enrich/http_client.py new file mode 100644 index 0000000..1bb5aa2 --- /dev/null +++ b/services/enrich/http_client.py @@ -0,0 +1,68 @@ +"""HTTP utilities with retry and backoff for MediaWiki API calls.""" + +from __future__ import annotations + +import json +import time +from dataclasses import dataclass +from typing import Any, Dict, Optional +from urllib.error import HTTPError, URLError +from urllib.parse import urlencode +from urllib.request import Request, urlopen + + +@dataclass +class HttpResponse: + status: int + data: Dict[str, Any] + headers: Dict[str, str] + + +class HttpClient: + def __init__(self, user_agent: str, max_retries: int = 5, base_backoff: float = 1.0) -> None: + self.user_agent = user_agent + self.max_retries = max_retries + self.base_backoff = base_backoff + + def get_json(self, base_url: str, params: Dict[str, Any]) -> HttpResponse: + attempt = 0 + while True: + attempt += 1 + try: + url = f"{base_url}?{urlencode(params)}" + request = Request(url, headers={"User-Agent": self.user_agent}) + with urlopen(request, timeout=30) as response: + payload = response.read().decode("utf-8") + return HttpResponse( + status=response.status, + data=json.loads(payload), + headers=dict(response.headers), + ) + except HTTPError as exc: + if not self._should_retry(exc.code, exc.headers, attempt): + raise + except URLError: + if attempt >= self.max_retries: + raise + self._sleep_with_backoff(attempt, None) + + def _should_retry(self, status: int, headers: Optional[Dict[str, str]], attempt: int) -> bool: + if status not in {429, 500, 502, 503, 504}: + return False + if attempt >= self.max_retries: + return False + retry_after = None + if headers is not None: + retry_after = headers.get("Retry-After") + self._sleep_with_backoff(attempt, retry_after) + return True + + def _sleep_with_backoff(self, attempt: int, retry_after: Optional[str]) -> None: + if retry_after: + try: + delay = float(retry_after) + except ValueError: + delay = self.base_backoff * (2 ** (attempt - 1)) + else: + delay = self.base_backoff * (2 ** (attempt - 1)) + time.sleep(delay) diff --git a/services/enrich/metadata.py b/services/enrich/metadata.py new file mode 100644 index 0000000..5fe9fe1 --- /dev/null +++ b/services/enrich/metadata.py @@ -0,0 +1,63 @@ +"""Metadata fetcher for categories and Wikidata QID.""" + +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime, timedelta, timezone +from typing import List, Optional + +from .http_client import HttpClient +from .storage import DiffStorage, MetadataRecord + + +@dataclass +class PageMetadata: + page_id: int + qid: Optional[str] + categories: List[str] + + +class MetadataFetcher: + def __init__( + self, + storage: DiffStorage, + http_client: HttpClient, + api_base_url: str, + ttl: timedelta = timedelta(hours=6), + ) -> None: + self.storage = storage + self.http_client = http_client + self.api_base_url = api_base_url + self.ttl = ttl + + def get_metadata(self, page_id: int) -> PageMetadata: + cached = self.storage.get_cached_metadata(page_id, self.ttl) + if cached: + return PageMetadata(page_id=page_id, qid=cached.qid, categories=cached.categories) + + params = { + "action": "query", + "format": "json", + "pageids": page_id, + "prop": "categories|pageprops", + "cllimit": "max", + "redirects": 1, + } + response = self.http_client.get_json(self.api_base_url, params) + pages = response.data.get("query", {}).get("pages", {}) + page = pages.get(str(page_id), {}) + categories = [ + category.get("title") + for category in page.get("categories", []) + if category.get("title") + ] + qid = page.get("pageprops", {}).get("wikibase_item") + + record = MetadataRecord( + page_id=page_id, + qid=qid, + categories=categories, + fetched_at=datetime.now(timezone.utc), + ) + self.storage.upsert_metadata(record) + return PageMetadata(page_id=page_id, qid=qid, categories=categories) diff --git a/services/enrich/storage.py b/services/enrich/storage.py new file mode 100644 index 0000000..7a81cbd --- /dev/null +++ b/services/enrich/storage.py @@ -0,0 +1,158 @@ +"""SQLite-backed storage for diff artifacts and metadata cache.""" + +from __future__ import annotations + +import json +import sqlite3 +from dataclasses import dataclass +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import Iterable, List, Optional + + +@dataclass +class DiffRecord: + page_id: int + from_rev_id: int + to_rev_id: int + added_text: str + removed_text: str + context: str + + +@dataclass +class MetadataRecord: + page_id: int + qid: Optional[str] + categories: List[str] + fetched_at: datetime + + +class DiffStorage: + def __init__(self, db_path: str) -> None: + self.db_path = Path(db_path) + self._ensure_schema() + + def _connect(self) -> sqlite3.Connection: + conn = sqlite3.connect(self.db_path) + conn.row_factory = sqlite3.Row + return conn + + def _ensure_schema(self) -> None: + with self._connect() as conn: + conn.executescript( + """ + CREATE TABLE IF NOT EXISTS events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + page_id INTEGER NOT NULL, + from_rev_id INTEGER NOT NULL, + to_rev_id INTEGER NOT NULL, + processed_at TEXT + ); + + CREATE TABLE IF NOT EXISTS diffs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + page_id INTEGER NOT NULL, + from_rev_id INTEGER NOT NULL, + to_rev_id INTEGER NOT NULL, + added_text TEXT, + removed_text TEXT, + context TEXT, + created_at TEXT NOT NULL + ); + + CREATE TABLE IF NOT EXISTS metadata_cache ( + page_id INTEGER PRIMARY KEY, + qid TEXT, + categories TEXT NOT NULL, + fetched_at TEXT NOT NULL + ); + """ + ) + + def fetch_pending_events(self, limit: int = 100) -> List[sqlite3.Row]: + with self._connect() as conn: + rows = conn.execute( + """ + SELECT id, page_id, from_rev_id, to_rev_id + FROM events + WHERE processed_at IS NULL + ORDER BY id ASC + LIMIT ? + """, + (limit,), + ).fetchall() + return rows + + def mark_event_processed(self, event_id: int) -> None: + with self._connect() as conn: + conn.execute( + "UPDATE events SET processed_at = ? WHERE id = ?", + (datetime.now(timezone.utc).isoformat(), event_id), + ) + + def insert_diffs(self, records: Iterable[DiffRecord]) -> None: + rows = [ + ( + record.page_id, + record.from_rev_id, + record.to_rev_id, + record.added_text, + record.removed_text, + record.context, + datetime.now(timezone.utc).isoformat(), + ) + for record in records + ] + if not rows: + return + with self._connect() as conn: + conn.executemany( + """ + INSERT INTO diffs ( + page_id, from_rev_id, to_rev_id, + added_text, removed_text, context, created_at + ) VALUES (?, ?, ?, ?, ?, ?, ?) + """, + rows, + ) + + def get_cached_metadata(self, page_id: int, ttl: timedelta) -> Optional[MetadataRecord]: + with self._connect() as conn: + row = conn.execute( + "SELECT page_id, qid, categories, fetched_at FROM metadata_cache WHERE page_id = ?", + (page_id,), + ).fetchone() + if row is None: + return None + + fetched_at = datetime.fromisoformat(row["fetched_at"]) + if datetime.now(timezone.utc) - fetched_at > ttl: + return None + + categories = json.loads(row["categories"]) if row["categories"] else [] + return MetadataRecord( + page_id=row["page_id"], + qid=row["qid"], + categories=categories, + fetched_at=fetched_at, + ) + + def upsert_metadata(self, record: MetadataRecord) -> None: + with self._connect() as conn: + conn.execute( + """ + INSERT INTO metadata_cache (page_id, qid, categories, fetched_at) + VALUES (?, ?, ?, ?) + ON CONFLICT(page_id) DO UPDATE SET + qid = excluded.qid, + categories = excluded.categories, + fetched_at = excluded.fetched_at + """, + ( + record.page_id, + record.qid, + json.dumps(record.categories), + record.fetched_at.isoformat(), + ), + ) diff --git a/services/enrich/worker.py b/services/enrich/worker.py new file mode 100644 index 0000000..42734e9 --- /dev/null +++ b/services/enrich/worker.py @@ -0,0 +1,87 @@ +"""Worker that enriches stored events with MediaWiki diffs and metadata.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Iterable, List + +from .diff_parser import DiffFragment, parse_diff_html +from .http_client import HttpClient +from .metadata import MetadataFetcher +from .storage import DiffRecord, DiffStorage + + +@dataclass +class Event: + id: int + page_id: int + from_rev_id: int + to_rev_id: int + + +class EnrichWorker: + def __init__( + self, + db_path: str, + api_base_url: str, + user_agent: str = "nlbt-enrich/0.1", + ) -> None: + self.storage = DiffStorage(db_path) + self.http_client = HttpClient(user_agent=user_agent) + self.api_base_url = api_base_url + self.metadata_fetcher = MetadataFetcher( + storage=self.storage, + http_client=self.http_client, + api_base_url=api_base_url, + ) + + def run(self, limit: int = 100) -> int: + events = self.storage.fetch_pending_events(limit=limit) + processed = 0 + for row in events: + event = Event( + id=row["id"], + page_id=row["page_id"], + from_rev_id=row["from_rev_id"], + to_rev_id=row["to_rev_id"], + ) + self._process_event(event) + self.storage.mark_event_processed(event.id) + processed += 1 + return processed + + def _process_event(self, event: Event) -> None: + diff_html = self._fetch_diff(event.from_rev_id, event.to_rev_id) + fragments = parse_diff_html(diff_html) + records = self._build_diff_records(event, fragments) + self.storage.insert_diffs(records) + self.metadata_fetcher.get_metadata(event.page_id) + + def _fetch_diff(self, from_rev_id: int, to_rev_id: int) -> str: + params = { + "action": "compare", + "fromrev": from_rev_id, + "torev": to_rev_id, + "format": "json", + } + response = self.http_client.get_json(self.api_base_url, params) + compare = response.data.get("compare", {}) + html = compare.get("*", "") + if not html: + html = compare.get("body", "") + return html + + def _build_diff_records( + self, event: Event, fragments: Iterable[DiffFragment] + ) -> List[DiffRecord]: + return [ + DiffRecord( + page_id=event.page_id, + from_rev_id=event.from_rev_id, + to_rev_id=event.to_rev_id, + added_text=fragment.added_text, + removed_text=fragment.removed_text, + context=fragment.context, + ) + for fragment in fragments + ] diff --git a/tests/fixtures/sample_diff.html b/tests/fixtures/sample_diff.html new file mode 100644 index 0000000..5d83508 --- /dev/null +++ b/tests/fixtures/sample_diff.html @@ -0,0 +1,15 @@ +
| This is a context line. | +|
-Old content removed. |
+ +New content added. |
+
| Another context line. | +|
Inserted text. |
+