Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions services/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Service layer for background workers."""
5 changes: 5 additions & 0 deletions services/enrich/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Worker service for MediaWiki diff enrichment."""

from .worker import EnrichWorker

__all__ = ["EnrichWorker"]
97 changes: 97 additions & 0 deletions services/enrich/diff_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
"""Parse MediaWiki HTML diffs into structured diff fragments."""

from __future__ import annotations

from dataclasses import dataclass
from html.parser import HTMLParser
from typing import List, Optional


@dataclass
class DiffFragment:
added_text: str
removed_text: str
context: str


class _DiffHTMLParser(HTMLParser):
def __init__(self) -> None:
super().__init__()
self._rows: List[List[dict]] = []
self._current_row: List[dict] = []
self._current_cell: Optional[dict] = None
self._cell_text_parts: List[str] = []

@property
def rows(self) -> List[List[dict]]:
return self._rows

def handle_starttag(self, tag: str, attrs: List[tuple]) -> None:
attrs_dict = dict(attrs)
if tag == "tr":
self._current_row = []
elif tag == "td":
classes = attrs_dict.get("class", "")
self._current_cell = {"class": classes, "text": ""}
self._cell_text_parts = []
elif tag in {"ins", "del", "span"}:
# Preserve inline changes text; we treat them as normal text nodes.
pass

def handle_endtag(self, tag: str) -> None:
if tag == "td" and self._current_cell is not None:
text = "".join(self._cell_text_parts)
self._current_cell["text"] = " ".join(text.split())
self._current_row.append(self._current_cell)
self._current_cell = None
self._cell_text_parts = []
elif tag == "tr":
if self._current_row:
self._rows.append(self._current_row)
self._current_row = []

def handle_data(self, data: str) -> None:
if self._current_cell is not None:
self._cell_text_parts.append(data)


def parse_diff_html(html: str) -> List[DiffFragment]:
"""Parse MediaWiki diff HTML and return structured diff fragments.

The parser extracts added and removed text spans and associates them with
the most recent context line when available.
"""
parser = _DiffHTMLParser()
parser.feed(html)

fragments: List[DiffFragment] = []
last_context = ""

for row in parser.rows:
context_in_row = None
added_text = ""
removed_text = ""

for cell in row:
classes = cell.get("class", "")
text = cell.get("text", "")
if "diff-context" in classes and text:
context_in_row = text
if "diff-addedline" in classes:
added_text = text
if "diff-deletedline" in classes:
removed_text = text

if context_in_row is not None:
last_context = context_in_row

if added_text or removed_text:
fragments.append(
DiffFragment(
added_text=added_text,
removed_text=removed_text,
context=last_context,
)
)

return fragments
68 changes: 68 additions & 0 deletions services/enrich/http_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""HTTP utilities with retry and backoff for MediaWiki API calls."""

from __future__ import annotations

import json
import time
from dataclasses import dataclass
from typing import Any, Dict, Optional
from urllib.error import HTTPError, URLError
from urllib.parse import urlencode
from urllib.request import Request, urlopen


@dataclass
class HttpResponse:
status: int
data: Dict[str, Any]
headers: Dict[str, str]


class HttpClient:
def __init__(self, user_agent: str, max_retries: int = 5, base_backoff: float = 1.0) -> None:
self.user_agent = user_agent
self.max_retries = max_retries
self.base_backoff = base_backoff

def get_json(self, base_url: str, params: Dict[str, Any]) -> HttpResponse:
attempt = 0
while True:
attempt += 1
try:
url = f"{base_url}?{urlencode(params)}"
request = Request(url, headers={"User-Agent": self.user_agent})
with urlopen(request, timeout=30) as response:
payload = response.read().decode("utf-8")
return HttpResponse(
status=response.status,
data=json.loads(payload),
headers=dict(response.headers),
)
except HTTPError as exc:
if not self._should_retry(exc.code, exc.headers, attempt):
raise
except URLError:
if attempt >= self.max_retries:
raise
self._sleep_with_backoff(attempt, None)

def _should_retry(self, status: int, headers: Optional[Dict[str, str]], attempt: int) -> bool:
if status not in {429, 500, 502, 503, 504}:
return False
if attempt >= self.max_retries:
return False
retry_after = None
if headers is not None:
retry_after = headers.get("Retry-After")
self._sleep_with_backoff(attempt, retry_after)
return True

def _sleep_with_backoff(self, attempt: int, retry_after: Optional[str]) -> None:
if retry_after:
try:
delay = float(retry_after)
except ValueError:
delay = self.base_backoff * (2 ** (attempt - 1))
else:
delay = self.base_backoff * (2 ** (attempt - 1))
time.sleep(delay)
63 changes: 63 additions & 0 deletions services/enrich/metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
"""Metadata fetcher for categories and Wikidata QID."""

from __future__ import annotations

from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import List, Optional

from .http_client import HttpClient
from .storage import DiffStorage, MetadataRecord


@dataclass
class PageMetadata:
page_id: int
qid: Optional[str]
categories: List[str]


class MetadataFetcher:
def __init__(
self,
storage: DiffStorage,
http_client: HttpClient,
api_base_url: str,
ttl: timedelta = timedelta(hours=6),
) -> None:
self.storage = storage
self.http_client = http_client
self.api_base_url = api_base_url
self.ttl = ttl

def get_metadata(self, page_id: int) -> PageMetadata:
cached = self.storage.get_cached_metadata(page_id, self.ttl)
if cached:
return PageMetadata(page_id=page_id, qid=cached.qid, categories=cached.categories)

params = {
"action": "query",
"format": "json",
"pageids": page_id,
"prop": "categories|pageprops",
"cllimit": "max",
"redirects": 1,
}
response = self.http_client.get_json(self.api_base_url, params)
pages = response.data.get("query", {}).get("pages", {})
page = pages.get(str(page_id), {})
categories = [
category.get("title")
for category in page.get("categories", [])
if category.get("title")
]
qid = page.get("pageprops", {}).get("wikibase_item")

record = MetadataRecord(
page_id=page_id,
qid=qid,
categories=categories,
fetched_at=datetime.now(timezone.utc),
)
self.storage.upsert_metadata(record)
return PageMetadata(page_id=page_id, qid=qid, categories=categories)
158 changes: 158 additions & 0 deletions services/enrich/storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
"""SQLite-backed storage for diff artifacts and metadata cache."""

from __future__ import annotations

import json
import sqlite3
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Iterable, List, Optional


@dataclass
class DiffRecord:
page_id: int
from_rev_id: int
to_rev_id: int
added_text: str
removed_text: str
context: str


@dataclass
class MetadataRecord:
page_id: int
qid: Optional[str]
categories: List[str]
fetched_at: datetime


class DiffStorage:
def __init__(self, db_path: str) -> None:
self.db_path = Path(db_path)
self._ensure_schema()

def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
return conn

def _ensure_schema(self) -> None:
with self._connect() as conn:
conn.executescript(
"""
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
page_id INTEGER NOT NULL,
from_rev_id INTEGER NOT NULL,
to_rev_id INTEGER NOT NULL,
processed_at TEXT
);

CREATE TABLE IF NOT EXISTS diffs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
page_id INTEGER NOT NULL,
from_rev_id INTEGER NOT NULL,
to_rev_id INTEGER NOT NULL,
added_text TEXT,
removed_text TEXT,
context TEXT,
created_at TEXT NOT NULL
);

CREATE TABLE IF NOT EXISTS metadata_cache (
page_id INTEGER PRIMARY KEY,
qid TEXT,
categories TEXT NOT NULL,
fetched_at TEXT NOT NULL
);
"""
)

def fetch_pending_events(self, limit: int = 100) -> List[sqlite3.Row]:
with self._connect() as conn:
rows = conn.execute(
"""
SELECT id, page_id, from_rev_id, to_rev_id
FROM events
WHERE processed_at IS NULL
ORDER BY id ASC
LIMIT ?
""",
(limit,),
).fetchall()
return rows

def mark_event_processed(self, event_id: int) -> None:
with self._connect() as conn:
conn.execute(
"UPDATE events SET processed_at = ? WHERE id = ?",
(datetime.now(timezone.utc).isoformat(), event_id),
)

def insert_diffs(self, records: Iterable[DiffRecord]) -> None:
rows = [
(
record.page_id,
record.from_rev_id,
record.to_rev_id,
record.added_text,
record.removed_text,
record.context,
datetime.now(timezone.utc).isoformat(),
)
for record in records
]
if not rows:
return
with self._connect() as conn:
conn.executemany(
"""
INSERT INTO diffs (
page_id, from_rev_id, to_rev_id,
added_text, removed_text, context, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?)
""",
rows,
)

def get_cached_metadata(self, page_id: int, ttl: timedelta) -> Optional[MetadataRecord]:
with self._connect() as conn:
row = conn.execute(
"SELECT page_id, qid, categories, fetched_at FROM metadata_cache WHERE page_id = ?",
(page_id,),
).fetchone()
if row is None:
return None

fetched_at = datetime.fromisoformat(row["fetched_at"])
if datetime.now(timezone.utc) - fetched_at > ttl:
return None

categories = json.loads(row["categories"]) if row["categories"] else []
return MetadataRecord(
page_id=row["page_id"],
qid=row["qid"],
categories=categories,
fetched_at=fetched_at,
)

def upsert_metadata(self, record: MetadataRecord) -> None:
with self._connect() as conn:
conn.execute(
"""
INSERT INTO metadata_cache (page_id, qid, categories, fetched_at)
VALUES (?, ?, ?, ?)
ON CONFLICT(page_id) DO UPDATE SET
qid = excluded.qid,
categories = excluded.categories,
fetched_at = excluded.fetched_at
""",
(
record.page_id,
record.qid,
json.dumps(record.categories),
record.fetched_at.isoformat(),
),
)
Loading