From 9663c1c410f4c1cc289f73363f264982988a6ed6 Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Sun, 14 Jun 2026 14:44:31 +0300 Subject: [PATCH] refactor: wikipedia --- Makefile | 4 +- src/_fb_types.py | 5 + src/helpers/wikipedia.py | 296 +------ src/orchestration/_source_io.py | 60 ++ .../func_wikipedia_fetch}/Makefile | 22 +- .../func_wikipedia_fetch/main.py | 33 + .../func_wikipedia_fetch}/requirements.txt | 9 +- .../func_wikipedia_update}/Makefile | 21 +- .../func_wikipedia_update/main.py | 40 + .../func_wikipedia_update}/requirements.txt | 2 - src/questions/wikipedia/fetch/main.py | 202 ----- .../wikipedia/update_questions/main.py | 263 ------- src/sources/wikipedia.py | 745 +++++++++++++++++- src/tests/conftest.py | 7 + src/tests/test_wikipedia.py | 474 ++++++++++- 15 files changed, 1388 insertions(+), 795 deletions(-) rename src/{questions/wikipedia/fetch => orchestration/func_wikipedia_fetch}/Makefile (64%) create mode 100644 src/orchestration/func_wikipedia_fetch/main.py rename src/{questions/wikipedia/fetch => orchestration/func_wikipedia_fetch}/requirements.txt (58%) rename src/{questions/wikipedia/update_questions => orchestration/func_wikipedia_update}/Makefile (64%) create mode 100644 src/orchestration/func_wikipedia_update/main.py rename src/{questions/wikipedia/update_questions => orchestration/func_wikipedia_update}/requirements.txt (88%) delete mode 100644 src/questions/wikipedia/fetch/main.py delete mode 100644 src/questions/wikipedia/update_questions/main.py diff --git a/Makefile b/Makefile index b65e2f67..c9b95d5d 100644 --- a/Makefile +++ b/Makefile @@ -175,10 +175,10 @@ polymarket-update-questions: wikipedia: wikipedia-fetch wikipedia-update-questions wikipedia-fetch: - $(MAKE) -C src/questions/wikipedia/fetch || echo "* $@" >> $(MAKE_FAILURE_LOG) + $(MAKE) -C src/orchestration/func_wikipedia_fetch || echo "* $@" >> $(MAKE_FAILURE_LOG) wikipedia-update-questions: - $(MAKE) -C src/questions/wikipedia/update_questions || echo "* $@" >> $(MAKE_FAILURE_LOG) + $(MAKE) -C src/orchestration/func_wikipedia_update || echo "* $@" >> $(MAKE_FAILURE_LOG) fred: fred-fetch fred-update-questions diff --git a/src/_fb_types.py b/src/_fb_types.py index 99586587..782bf135 100644 --- a/src/_fb_types.py +++ b/src/_fb_types.py @@ -47,6 +47,11 @@ class SourceQuestionBank: QuestionBank = dict[str, SourceQuestionBank] +# Wikipedia's fetch() returns one DataFrame per page, keyed by id_root (columns vary per page). +# Shared between WikipediaSource.fetch()/update() and the orchestration fetch IO. +WikipediaFetchResult = dict[str, pd.DataFrame] + + @dataclass class UpdateResult: """Return value of a source's update() method. diff --git a/src/helpers/wikipedia.py b/src/helpers/wikipedia.py index 234cf103..3023d2cd 100644 --- a/src/helpers/wikipedia.py +++ b/src/helpers/wikipedia.py @@ -1,10 +1,20 @@ # -*- coding: utf-8 -*- -"""Wikipedia constants.""" +"""Wikipedia shared helpers. + +Light home of Wikipedia's naive-forecast computation (scipy/numpy/pandas) plus the hash-mapping +and identity access used by the still-unrefactored ``base_eval`` naive forecaster and by +``question_curation``. Hash-mapping access routes through a lazily-instantiated ``WikipediaSource`` +(see ``_get_source``); ``sources.wikipedia`` lazy-imports its scraping deps (requests/bs4) inside +fetch, so importing this module — and the many modules that import it — stays light. + +When ``base_eval`` is refactored to call ``WikipediaSource.get_naive_forecast()`` this computation +can move onto the source class (Phase 1 plan) and this module shrinks to a metadata-only shim. +""" import logging import os import sys -from datetime import datetime, timedelta +from datetime import timedelta import numpy as np import pandas as pd @@ -14,10 +24,11 @@ from sources._metadata import SOURCE_METADATA # noqa: E402 from sources.wikipedia import _IDS_TO_NULLIFY as IDS_TO_NULLIFY # noqa: F401, E402 +from sources.wikipedia import _PAGES as PAGES # noqa: E402 from sources.wikipedia import ( # noqa: F401, E402 _TRANSFORM_ID_MAPPING as transform_id_mapping, ) -from sources.wikipedia import QuestionType # noqa: F401, E402 +from sources.wikipedia import QuestionType # noqa: E402 from . import constants # noqa: E402 @@ -27,8 +38,6 @@ logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -HEADERS = {"User-Agent": constants.BENCHMARK_USER_AGENT} - WIKIPEDIA_QUESTION_BANK_DATA_STORAGE_START_DATETIME = ( constants.QUESTION_BANK_DATA_STORAGE_START_DATETIME - timedelta(days=360 * 4) ) @@ -38,8 +47,6 @@ source = "wikipedia" -fetch_directory = f"{source}/fetch" - # Lazy import to avoid circular imports at module level _source = None @@ -79,11 +86,6 @@ def ffill_dfr(dfr): return _get_source()._ffill_dfr(dfr) -def get_fetch_filename(question_id_root: str) -> str: - """Provide the name of the fetch file for the id_root.""" - return f"{question_id_root}.jsonl" - - def id_hash(id_root: str, id_field_value: str) -> str: """Encode wikipedia Ids.""" return _get_source()._id_hash(id_root=id_root, id_field_value=id_field_value) @@ -94,23 +96,6 @@ def id_unhash(hash_key: str) -> tuple: return _get_source()._id_unhash(hash_key) -def clean_FIDE_rankings(df): - """Clean fetched data for `FIDE_rankings`. - - Fix inconsistent player names. - """ - df = df[~df["Player"].str.contains("Change from the previous month")].copy() - replacements = { - "Gukesh D.": "Gukesh Dommaraju", - "Gukesh D": "Gukesh Dommaraju", - "Leinier Dominguez": "Leinier Domínguez Pérez", - "Leinier Dominguez Pérez": "Leinier Domínguez Pérez", - "Nana Dzagnidze]": "Nana Dzagnidze", - } - df["Player"] = df["Player"].replace(replacements) - return df - - def get_probability_forecast(mid, comparison_value, forecast_mean, forecast_std): """Get forecast based on question type. @@ -174,55 +159,6 @@ def get_min_max_possible_value(mid): raise ValueError(f"Could not find min/max for {id_root}.") -def clean_List_of_world_records_in_swimming(df): - """Clean fetched data for `List_of_world_records_in_swimming`. - - Drop any rows that contain parens. - """ - df = df[~df["Name"].str.contains(r"[()]")].reset_index(drop=True) - df = df[~df["Name"].str.contains("eventsort")].reset_index(drop=True) - df = df[~df["Name"].str.contains("recordinfo")].reset_index(drop=True) - return df - - -def clean_List_of_infectious_diseases(df): - """Clean fetched data for `List_of_infectious_diseases`. - - * Remove rows with multiple answers. - * Change all `Under research[x]` to `No` - * Change all `No` to 0 - * Change all `Yes` to 1 - """ - duplicates = df[df.duplicated(subset=["date", "Common name"], keep=False)] - df = df.drop(duplicates.index).reset_index(drop=True) - # On and before this date the `"Vaccine(s)"` field had other info in it. - df = df[df["date"] > pd.Timestamp("2021-07-07")] - df["Vaccine(s)"] = df["Vaccine(s)"].replace( - { - r"Under research.*": "No", - r"Under Development.*": "No", - r"Yes.*": "Yes", - r"No.*": "No", - }, - regex=True, - ) - df.loc[df["Vaccine(s)"] == "No", "Vaccine(s)"] = 0 - df.loc[df["Vaccine(s)"] == "Yes", "Vaccine(s)"] = 1 - df["Vaccine(s)"] = df["Vaccine(s)"].astype(int) - df = df.dropna(ignore_index=True) - return df - - -def is_resolved_List_of_infectious_diseases(value): - """Return true if the vaccine has been developed.""" - return value == 1 or str(value).lower() == "yes" - - -def get_value_List_of_infectious_diseases(value): - """Return Yes/No instead of 1/0.""" - return "Yes" if value else "No" - - def get_question_type(mid): """Retun the question type given mid.""" d = id_unhash(mid) @@ -275,207 +211,3 @@ def backfill_for_forecast(mid, dfr): dfr = pd.concat([fill_df, dfr]).sort_values("date") return dfr - - -FIDE_BACKGROUND = ( - ( - "The International Chess Federation (FIDE) governs international chess " - "competition. Each month, FIDE publishes the lists 'Top 100 Players', 'Top 100 " - "Women', 'Top 100 Juniors' and 'Top 100 Girls' and rankings of countries according " - "to the average rating of their top 10 players and top 10 female players.\n" - "To create the rankings, FIDE uses the Elo rating system, which is a method for " - "calculating the relative skill levels of players in zero-sum games such as chess. " - "The difference in the ratings between two players serves as a predictor of the " - "outcome of a match. Two players with equal ratings who play against each other " - "are expected to score an equal number of wins. A player whose rating is 100 " - "points greater than their opponent's is expected to score 64%; if the difference " - "is 200 points, then the expected score for the stronger player is 76%.\n" - "A player's Elo rating is a number which may change depending on the outcome of " - "rated games played. After every game, the winning player takes points from the " - "losing one. The difference between the ratings of the winner and loser determines " - "the total number of points gained or lost after a game. If the higher-rated " - "player wins, then only a few rating points will be taken from the lower-rated " - "player. However, if the lower-rated player scores an upset win, many rating " - "points will be transferred. The lower-rated player will also gain a few points " - "from the higher rated player in the event of a draw. This means that this rating " - "system is self-correcting. Players whose ratings are too low or too high should, " - "in the long run, do better or worse correspondingly than the rating system " - "predicts and thus gain or lose rating points until the ratings reflect their true " - "playing strength.\n" - "Elo ratings are comparative only, and are valid only within the rating pool in " - "which they were calculated, rather than being an absolute measure of a player's " - "strength." - ), - tuple(), -) - -PAGES = [ - { - "id_root": "FIDE_rankings_elo_rating", - "page_title": "FIDE_rankings", - "table_index": [ - { - "start_date": WIKIPEDIA_QUESTION_BANK_DATA_STORAGE_START_DATE, - "table_index": [1, 3], - }, - ], - "question_type": QuestionType.ONE_PERCENT_MORE, - "key": { - "id", - }, - "fields": { - "id": "Player", - "value": "Rating", - }, - "resolution_file_value_column_dtype": int, - "question": ( - ( - "According to Wikipedia, will {id} have an Elo rating on {resolution_date} that's " - "at least 1% higher than on {forecast_due_date}?" - ), - ("id",), - ), - "background": FIDE_BACKGROUND, - "freeze_datetime_value_explanation": ( - "{id}'s ELO rating.", - ("id",), - ), - "clean_func": "clean_FIDE_rankings", - }, - { - "id_root": "FIDE_rankings_ranking", - "page_title": "FIDE_rankings", - "table_index": [ - { - "start_date": WIKIPEDIA_QUESTION_BANK_DATA_STORAGE_START_DATE, - "table_index": [1, 3], - }, - ], - "question_type": QuestionType.SAME_OR_LESS, - "key": { - "id", - }, - "fields": { - "id": "Player", - "value": "Rank", - }, - "resolution_file_value_column_dtype": int, - "question": ( - ( - "According to Wikipedia, will {id} have a FIDE ranking on {resolution_date} as " - "high or higher than their ranking on {forecast_due_date}?" - ), - ("id",), - ), - "background": FIDE_BACKGROUND, - "freeze_datetime_value_explanation": ( - "{id}'s FIDE ranking.", - ("id",), - ), - "clean_func": "clean_FIDE_rankings", - }, - { - "id_root": "List_of_world_records_in_swimming", - "page_title": "List_of_world_records_in_swimming", - "table_index": [ - { - "start_date": WIKIPEDIA_QUESTION_BANK_DATA_STORAGE_START_DATE, - "table_index": [0, 2], - }, - { - "start_date": datetime(2025, 5, 4).date(), - "table_index": [0, 1], - }, - ], - "question_type": QuestionType.SAME, - "key": { - "id", - "value", - }, - "fields": { - "id": "Name", - "value": "Event", - }, - "resolution_file_value_column_dtype": str, - "question": ( - ( - "According to Wikipedia, will {id} still hold the world record for {value} in " - "long course (50 metres) swimming pools on {resolution_date}?" - ), - ("id", "value"), - ), - "background": ( - ( - "The world records in swimming are ratified by World Aquatics (formerly known as FINA), " - "the international governing body of swimming. Records can be set in long course (50 " - "metres) or short course (25 metres) swimming pools.\n" - "The ratification process is described in FINA Rule SW12, and involves submission of " - "paperwork certifying the accuracy of the timing system and the length of the pool, " - "satisfaction of FINA rules regarding swimwear and a negative doping test by the " - "swimmer(s) involved. Records can be set at intermediate distances in an individual " - "race and for the first leg of a relay race. Records which have not yet been fully " - "ratified are marked with a '#' symbol in these lists." - ), - tuple(), - ), - "freeze_datetime_value_explanation": ( - "{id} is a record holder in the {value}.", - ( - "id", - "value", - ), - ), - "clean_func": "clean_List_of_world_records_in_swimming", - }, - { - "id_root": "List_of_infectious_diseases", - "page_title": "List_of_infectious_diseases", - "table_index": [ - { - "start_date": WIKIPEDIA_QUESTION_BANK_DATA_STORAGE_START_DATE, - "table_index": 0, - }, - ], - "question_type": QuestionType.MORE, - "key": { - "id", - }, - "fields": { - "id": "Common name", - "value": "Vaccine(s)", - }, - "resolution_file_value_column_dtype": str, - "question": ( - ( - "According to Wikipedia, will a vaccine have been developed for {id} by " - "{resolution_date}?" - ), - ("id",), - ), - "background": ( - ( - "According to Wikipedia, {id} is the common name of an infectious disease. A vaccine " - "is a biological preparation that provides active acquired immunity to a particular " - "infectious or malignant disease. The safety and effectiveness of vaccines has " - "been widely studied and verified. A vaccine typically contains an agent that " - "resembles a disease-causing microorganism and is often made from weakened or killed " - "forms of the microbe, its toxins, or one of its surface proteins. The agent " - "stimulates the body's immune system to recognize the agent as a threat, destroy it, " - "and recognize further and destroy any of the microorganisms associated with that " - "agent that it may encounter in the future." - ), - ("id",), - ), - "freeze_datetime_value_explanation": ( - "Vaccine status for {id}. 'No' means that a vaccine has not yet been created. " - "'Yes' means that it has.", - ("id",), - ), - "clean_func": "clean_List_of_infectious_diseases", - "is_resolved_func": "is_resolved_List_of_infectious_diseases", - "value_func": "get_value_List_of_infectious_diseases", - }, -] - -for page in PAGES: - page["table_index"].sort(key=lambda e: e["start_date"]) diff --git a/src/orchestration/_source_io.py b/src/orchestration/_source_io.py index 10a7d436..e8d8d11a 100644 --- a/src/orchestration/_source_io.py +++ b/src/orchestration/_source_io.py @@ -9,6 +9,7 @@ import pandas as pd +from _fb_types import WikipediaFetchResult from helpers import constants, data_utils, env from utils import gcp @@ -106,3 +107,62 @@ def upload_resolution_files(source: str, resolution_files: dict[str, pd.DataFram filename=remote_filename, ) logger.info(f"Uploaded {len(resolution_files)} resolution files for {source}.") + + +# --------------------------------------------------------------------------- +# Wikipedia per-page fetch IO +# +# Wikipedia's fetch returns one DataFrame per page (keyed by id_root) with page-varying columns, +# so it cannot use write_fetch_output's single-file layout. Files live under wikipedia/fetch/. +# --------------------------------------------------------------------------- + +_WIKIPEDIA_FETCH_DIR = "wikipedia/fetch" + + +def write_wikipedia_fetch_output(fetch_result: WikipediaFetchResult) -> None: + """Write per-page Wikipedia fetch DataFrames to wikipedia/fetch/.jsonl. + + Args: + fetch_result (WikipediaFetchResult): Mapping of id_root to fetched table DataFrame. + """ + for id_root, df in fetch_result.items(): + filename = f"{id_root}.jsonl" + local_filename = f"/tmp/{filename}" + df.to_json(local_filename, orient="records", lines=True, force_ascii=False) + gcp.storage.upload( + bucket_name=env.QUESTION_BANK_BUCKET, + local_filename=local_filename, + destination_folder=_WIKIPEDIA_FETCH_DIR, + ) + logger.info(f"Uploaded {len(fetch_result)} Wikipedia fetch files.") + + +def read_wikipedia_fetch_files() -> WikipediaFetchResult: + """Download per-page Wikipedia fetch files from wikipedia/fetch/. + + Returns: + WikipediaFetchResult mapping id_root to fetched table DataFrame. + """ + files = gcp.storage.list_with_prefix( + bucket_name=env.QUESTION_BANK_BUCKET, + prefix=f"{_WIKIPEDIA_FETCH_DIR}/", + ) + result: WikipediaFetchResult = {} + for remote_path in files: + if not remote_path.endswith(".jsonl"): + continue + basename = os.path.basename(remote_path) + id_root = basename.removesuffix(".jsonl") + local_filename = f"/tmp/{basename}" + + gcp.storage.download_no_error_message_on_404( + bucket_name=env.QUESTION_BANK_BUCKET, + filename=remote_path, + local_filename=local_filename, + ) + if os.path.exists(local_filename): + df = pd.read_json(local_filename, lines=True, dtype={}, convert_dates=False) + if not df.empty: + result[id_root] = df + logger.info(f"Loaded {len(result)} Wikipedia fetch files.") + return result diff --git a/src/questions/wikipedia/fetch/Makefile b/src/orchestration/func_wikipedia_fetch/Makefile similarity index 64% rename from src/questions/wikipedia/fetch/Makefile rename to src/orchestration/func_wikipedia_fetch/Makefile index c6ad8efc..6e450328 100644 --- a/src/questions/wikipedia/fetch/Makefile +++ b/src/orchestration/func_wikipedia_fetch/Makefile @@ -9,21 +9,25 @@ UPLOAD_DIR = upload .gcloudignore: cp -r $(ROOT_DIR)src/helpers/.gcloudignore . -Procfile: - cp -r $(ROOT_DIR)src/helpers/Procfile . +Dockerfile: $(ROOT_DIR)src/helpers/Dockerfile.template + sed \ + -e 's/REGION/$(CLOUD_DEPLOY_REGION)/g' \ + -e 's/STACK/google-22-full/g' \ + -e 's/PYTHON_VERSION/python312/g' \ + $< > Dockerfile NUM_CPUS = 4 -deploy : main.py .gcloudignore requirements.txt Procfile +deploy : main.py .gcloudignore requirements.txt Dockerfile mkdir -p $(UPLOAD_DIR) cp -r $(ROOT_DIR)utils $(UPLOAD_DIR)/ - cp -r $(ROOT_DIR)src/helpers $(UPLOAD_DIR)/ - cp -r $(ROOT_DIR)src/sources $(UPLOAD_DIR)/ - cp $(ROOT_DIR)src/_fb_types.py $(UPLOAD_DIR)/ - cp $(ROOT_DIR)src/_schemas.py $(UPLOAD_DIR)/ + cp -r $(ROOT_DIR)src/helpers $(UPLOAD_DIR)/helpers + cp -r $(ROOT_DIR)src/sources $(UPLOAD_DIR)/sources mkdir -p $(UPLOAD_DIR)/orchestration cp $(ROOT_DIR)src/orchestration/__init__.py $(UPLOAD_DIR)/orchestration/ - cp $(ROOT_DIR)src/orchestration/_io.py $(UPLOAD_DIR)/orchestration/ + cp $(ROOT_DIR)src/orchestration/_source_io.py $(UPLOAD_DIR)/orchestration/ + cp $(ROOT_DIR)src/_fb_types.py $(UPLOAD_DIR)/ + cp $(ROOT_DIR)src/_schemas.py $(UPLOAD_DIR)/ cp $^ $(UPLOAD_DIR)/ gcloud run jobs deploy \ func-data-wikipedia-fetch \ @@ -40,4 +44,4 @@ deploy : main.py .gcloudignore requirements.txt Procfile --source $(UPLOAD_DIR) clean : - rm -rf $(UPLOAD_DIR) .gcloudignore Procfile + rm -rf $(UPLOAD_DIR) .gcloudignore Dockerfile diff --git a/src/orchestration/func_wikipedia_fetch/main.py b/src/orchestration/func_wikipedia_fetch/main.py new file mode 100644 index 00000000..2fca7a5f --- /dev/null +++ b/src/orchestration/func_wikipedia_fetch/main.py @@ -0,0 +1,33 @@ +"""Wikipedia fetch entry point.""" + +from __future__ import annotations + +import logging +from typing import Any + +from helpers import decorator +from orchestration import _source_io +from sources.wikipedia import WikipediaSource + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +SOURCE = "wikipedia" + + +@decorator.log_runtime +def driver(_: Any) -> None: + """Fetch Wikipedia data and store in GCP Cloud Storage.""" + source = WikipediaSource() + + fetch_result = source.fetch() + if not fetch_result: + logger.error("No Wikipedia data was downloaded.") + return + + _source_io.write_wikipedia_fetch_output(fetch_result) + logger.info("Done.") + + +if __name__ == "__main__": + driver(None) diff --git a/src/questions/wikipedia/fetch/requirements.txt b/src/orchestration/func_wikipedia_fetch/requirements.txt similarity index 58% rename from src/questions/wikipedia/fetch/requirements.txt rename to src/orchestration/func_wikipedia_fetch/requirements.txt index 329d30c8..c000948f 100644 --- a/src/questions/wikipedia/fetch/requirements.txt +++ b/src/orchestration/func_wikipedia_fetch/requirements.txt @@ -1,9 +1,6 @@ google-cloud-storage -google-cloud-secret-manager -beautifulsoup4 pandas>=2.2.2,<3.0 -tqdm -lxml -scipy pandera -termcolor +requests +beautifulsoup4 +lxml diff --git a/src/questions/wikipedia/update_questions/Makefile b/src/orchestration/func_wikipedia_update/Makefile similarity index 64% rename from src/questions/wikipedia/update_questions/Makefile rename to src/orchestration/func_wikipedia_update/Makefile index 9313238b..78f7f470 100644 --- a/src/questions/wikipedia/update_questions/Makefile +++ b/src/orchestration/func_wikipedia_update/Makefile @@ -9,19 +9,24 @@ UPLOAD_DIR = upload .gcloudignore: cp -r $(ROOT_DIR)src/helpers/.gcloudignore . -Procfile: - cp -r $(ROOT_DIR)src/helpers/Procfile . +Dockerfile: $(ROOT_DIR)src/helpers/Dockerfile.template + sed \ + -e 's/REGION/$(CLOUD_DEPLOY_REGION)/g' \ + -e 's/STACK/google-22-full/g' \ + -e 's/PYTHON_VERSION/python312/g' \ + $< > Dockerfile -deploy : main.py .gcloudignore requirements.txt Procfile +deploy : main.py .gcloudignore requirements.txt Dockerfile mkdir -p $(UPLOAD_DIR) cp -r $(ROOT_DIR)utils $(UPLOAD_DIR)/ - cp -r $(ROOT_DIR)src/helpers $(UPLOAD_DIR)/ - cp -r $(ROOT_DIR)src/sources $(UPLOAD_DIR)/ - cp $(ROOT_DIR)src/_fb_types.py $(UPLOAD_DIR)/ - cp $(ROOT_DIR)src/_schemas.py $(UPLOAD_DIR)/ + cp -r $(ROOT_DIR)src/helpers $(UPLOAD_DIR)/helpers + cp -r $(ROOT_DIR)src/sources $(UPLOAD_DIR)/sources mkdir -p $(UPLOAD_DIR)/orchestration cp $(ROOT_DIR)src/orchestration/__init__.py $(UPLOAD_DIR)/orchestration/ cp $(ROOT_DIR)src/orchestration/_io.py $(UPLOAD_DIR)/orchestration/ + cp $(ROOT_DIR)src/orchestration/_source_io.py $(UPLOAD_DIR)/orchestration/ + cp $(ROOT_DIR)src/_fb_types.py $(UPLOAD_DIR)/ + cp $(ROOT_DIR)src/_schemas.py $(UPLOAD_DIR)/ cp $^ $(UPLOAD_DIR)/ gcloud run jobs deploy \ func-data-wikipedia-update-questions \ @@ -37,4 +42,4 @@ deploy : main.py .gcloudignore requirements.txt Procfile --source $(UPLOAD_DIR) clean : - rm -rf $(UPLOAD_DIR) .gcloudignore Procfile + rm -rf $(UPLOAD_DIR) .gcloudignore Dockerfile diff --git a/src/orchestration/func_wikipedia_update/main.py b/src/orchestration/func_wikipedia_update/main.py new file mode 100644 index 00000000..ab49f0cd --- /dev/null +++ b/src/orchestration/func_wikipedia_update/main.py @@ -0,0 +1,40 @@ +"""Wikipedia update entry point.""" + +from __future__ import annotations + +import logging +from typing import Any + +from helpers import data_utils, decorator +from orchestration import _io, _source_io +from sources.wikipedia import WikipediaSource + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +SOURCE = "wikipedia" + + +@decorator.log_runtime +def driver(_: Any) -> None: + """Pull in fetched data and update questions and resolution values in the question bank.""" + logger.info("Downloading previously-fetched Wikipedia data from Cloud.") + source = WikipediaSource() + source.populate_hash_mapping(_io.load_hash_mapping(SOURCE)) + + dfq = data_utils.get_data_from_cloud_storage(SOURCE, return_question_data=True) + dff = _source_io.read_wikipedia_fetch_files() + + result = source.update(dfq, dff) + + logger.info("Uploading to GCP...") + data_utils.upload_questions(result.dfq, SOURCE) + if result.resolution_files: + _source_io.upload_resolution_files(SOURCE, result.resolution_files) + if result.hash_mapping is not None: + _io.upload_hash_mapping(source.dump_hash_mapping(), SOURCE) + logger.info("Done.") + + +if __name__ == "__main__": + driver(None) diff --git a/src/questions/wikipedia/update_questions/requirements.txt b/src/orchestration/func_wikipedia_update/requirements.txt similarity index 88% rename from src/questions/wikipedia/update_questions/requirements.txt rename to src/orchestration/func_wikipedia_update/requirements.txt index a1bc7c37..2fdeb8d2 100644 --- a/src/questions/wikipedia/update_questions/requirements.txt +++ b/src/orchestration/func_wikipedia_update/requirements.txt @@ -1,7 +1,5 @@ google-cloud-storage google-cloud-secret-manager pandas>=2.2.2,<3.0 -tqdm -scipy pandera termcolor diff --git a/src/questions/wikipedia/fetch/main.py b/src/questions/wikipedia/fetch/main.py deleted file mode 100644 index cd6985ae..00000000 --- a/src/questions/wikipedia/fetch/main.py +++ /dev/null @@ -1,202 +0,0 @@ -"""Fetch data from Wikipedia.""" - -import logging -import os -import sys -import time -from concurrent.futures import ProcessPoolExecutor -from datetime import datetime -from email.utils import parsedate_to_datetime -from io import BytesIO -from urllib.parse import parse_qs, urlparse - -import pandas as pd -import requests -from bs4 import BeautifulSoup -from requests.adapters import HTTPAdapter -from tqdm import tqdm -from urllib3.util.retry import Retry - -sys.path.append(os.path.join(os.path.dirname(__file__), "../../..")) -from helpers import data_utils, decorator, env, wikipedia # noqa: E402 - -sys.path.append(os.path.join(os.path.dirname(__file__), "../../../..")) -from utils import gcp # noqa: E402 - -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - -source = "wikipedia" -filenames = data_utils.generate_filenames(source=source) - - -def make_session(): - """Make a session for requests.""" - session = requests.Session() - session.headers.update(wikipedia.HEADERS) - _retry = Retry(total=3, backoff_factor=0.25, status_forcelist=[429, 500, 502, 503, 504]) - session.mount("https://", HTTPAdapter(pool_connections=8, pool_maxsize=8, max_retries=_retry)) - return session - - -def get_edit_history(page_title): - """Get the edit history of a wikipedia page. - - Get the last edit of the day for each day between today and - wikipedia.WIKIPEDIA_QUESTION_BANK_DATA_STORAGE_START_DATE - """ - base_history_url = ( - f"https://en.wikipedia.org/w/index.php?title={page_title}&action=history&limit=200" - ) - offset = "" - edit_history = [] - last_seen_dates = set() - - session = make_session() - while True: - history_url = base_history_url + offset - response = session.get(history_url, timeout=30) - soup = BeautifulSoup(response.text, "html.parser") - edits = soup.find_all("li", attrs={"data-mw-revid": True}) - - for edit in edits: - edit_date_str = edit.find("a", class_="mw-changeslist-date").text - edit_date = datetime.strptime(edit_date_str, "%H:%M, %d %B %Y") - edit_url = ( - "https://en.wikipedia.org" + edit.find("a", class_="mw-changeslist-date")["href"] - ) - oldid = parse_qs(urlparse(edit_url).query).get("oldid", [None])[0] - - if edit_date.date() not in last_seen_dates: - edit_history.append((edit_date, oldid)) - last_seen_dates.add(edit_date.date()) - - if edit_date.date() <= wikipedia.WIKIPEDIA_QUESTION_BANK_DATA_STORAGE_START_DATE: - return [ - (dt, rev) - for dt, rev in edit_history - if dt.date() >= wikipedia.WIKIPEDIA_QUESTION_BANK_DATA_STORAGE_START_DATE - ] - - next_page = soup.find("a", {"class": "mw-nextlink"}) - if not next_page: - break - offset = "&offset=" + next_page["href"].split("offset=")[1] - - return edit_history - - -def download_wikipedia_table(page_title, edit_date, revid, table_index, session): - """Download tables from url.""" - url = f"https://en.wikipedia.org/api/rest_v1/page/html/{page_title}/{revid}" - while True: - response = session.get(url, timeout=30) - if response.status_code == 429: - retry_after = response.headers.get("Retry-After") - delay = 5 - if retry_after: - try: - delay = int(retry_after) - except ValueError: - try: - retry_dt = parsedate_to_datetime(retry_after) - now = datetime.now(tz=retry_dt.tzinfo) - delay = max(0, int((retry_dt - now).total_seconds())) - except Exception: - delay = 5 - - print(f"\n{delay} seconds\n") - time.sleep(delay) - continue - response.raise_for_status() - break - - tables = pd.read_html(BytesIO(response.content)) - table_index_to_use = max( - [e for e in table_index if e["start_date"] <= edit_date.date()], - key=lambda e: e["start_date"], - ) - ti = table_index_to_use["table_index"] - return tables[ti] if isinstance(ti, int) else pd.concat([tables[i] for i in ti]) - - -def download_tables(page): - """Download all historical changes for the tables on the page.""" - session = make_session() - - page_title = page.get("page_title") - n_rows_to_keep = page.get("table_keep_first_n_rows") - table_index = page.get("table_index", 0) - columns = list(page.get("fields").values()) - - edit_history = get_edit_history(page_title=page_title) - edit_history.sort(reverse=True, key=lambda x: x[0]) - - value_col = page["fields"]["value"] - value_col_dtype = page["resolution_file_value_column_dtype"] - - df_list = [] - for edit_date, revid in tqdm(edit_history, f"Downloading edit histories for {page_title}"): - try: - dfw = download_wikipedia_table( - page_title=page_title, - edit_date=edit_date, - revid=revid, - table_index=table_index, - session=session, - ) - if n_rows_to_keep is not None: - dfw = dfw.iloc[:n_rows_to_keep] - dfw = dfw[columns] - dfw["date"] = edit_date.date().isoformat() - if value_col_dtype in (int, float): - dfw[value_col] = pd.to_numeric(dfw[value_col], errors="coerce") - elif value_col_dtype is str: - pass - else: - raise ValueError(f"`{value_col_dtype}` dytpe not yet supported.") - dfw = dfw.dropna() - dfw[value_col] = dfw[value_col].astype(value_col_dtype) - df_list.append(dfw.dropna()) - except Exception as e: - logger.error(f"In {edit_date} {revid}\n{e}\n") - df = pd.concat(df_list, ignore_index=True) if df_list else None - return df - - -def download_and_store_wikipedia_tables(page): - """Fetch and upload data for each page object in wikipedia.PAGES.""" - question_id_root = page.get("id_root") - filename = wikipedia.get_fetch_filename(question_id_root) - local_filename = f"/tmp/{filename}" - logger.info(f"Downloading data for {question_id_root}.") - - df = download_tables(page=page) - df.to_json(local_filename, orient="records", lines=True, force_ascii=False) - gcp.storage.upload( - bucket_name=env.QUESTION_BANK_BUCKET, - local_filename=local_filename, - destination_folder=wikipedia.fetch_directory, - ) - - -@decorator.log_runtime -def driver(_): - """Fetch Wikipedia data and store in GCP Cloud Storage.""" - # Get the latest Wikipedia data - logger.info("Downloading Wikipedia data.") - - with ProcessPoolExecutor(max_workers=min(env.NUM_CPUS, len(wikipedia.PAGES))) as ex: - list( - tqdm( - ex.map(download_and_store_wikipedia_tables, wikipedia.PAGES), - total=len(wikipedia.PAGES), - desc="Downloading pages", - ) - ) - - logger.info("Done.") - - -if __name__ == "__main__": - driver(None) diff --git a/src/questions/wikipedia/update_questions/main.py b/src/questions/wikipedia/update_questions/main.py deleted file mode 100644 index 3e0dc968..00000000 --- a/src/questions/wikipedia/update_questions/main.py +++ /dev/null @@ -1,263 +0,0 @@ -"""Generate Wikipedia questions.""" - -import json -import logging -import os -import sys - -import pandas as pd - -sys.path.append(os.path.join(os.path.dirname(__file__), "../../..")) -from helpers import constants, data_utils, decorator, env, wikipedia # noqa: E402 - -sys.path.append(os.path.join(os.path.dirname(__file__), "../../../..")) # noqa: E402 -from utils import gcp # noqa: E402 - -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - -source = "wikipedia" -filenames = data_utils.generate_filenames(source=source) - - -def create_resolution_file(dff, page, wid, question_key: pd.Series): - """Create the resolution file. Overwrite it every time. - - filename is: `{source}/{wid}.jsonl` - """ - id_field = page["fields"]["id"] - value_field = page["fields"]["value"] - - mask = pd.Series(True, index=dff.index) - for field_name in question_key.index: - mask &= dff[field_name] == question_key[field_name] - - df = dff[mask].copy() - if df["date"].max().date() < constants.QUESTION_BANK_DATA_STORAGE_START_DATE: - # Fetching more data than we need for naive forecasts. Don't need to create resolution - # files for events that are no longer current - return None - - df.rename(columns={id_field: "id", value_field: "value"}, inplace=True) - df["id"] = wid - - def fill_missing_with_nan(df, dff): - """Sometimes values drop out of the table then reappear. - - This could be for valid reasons, e.g. someone had a world record, lost it, then got it - again. - - This could be for invalid reasons: a name change, e.g. Erigaisi Arjun -> Arjun Erigaisi - - Either way, fill these with nan. Invalid reasons will need to be caught by hand and - invalidated in `src/helpers/wikipedia.py` IDS_TO_NULLIFY. - """ - # fill in nan where the item has dropped out of the table - all_dates = dff["date"].sort_values().unique() - all_dates = all_dates[all_dates >= constants.QUESTION_BANK_DATA_STORAGE_START_DATETIME] - next_after_df_max_date = all_dates[all_dates > df["date"].max()] - max_cutoff = ( - next_after_df_max_date.min() if len(next_after_df_max_date) > 0 else df["date"].max() - ) - all_dates = all_dates[(all_dates <= max_cutoff) & (all_dates >= df["date"].min())] - drop_out_dates = [] - for drop_out_date in [date for date in all_dates if date not in df["date"].unique()]: - drop_out_dates.append( - { - "id": wid, - "value": None, - "date": drop_out_date, - } - ) - df = pd.concat([df, pd.DataFrame(drop_out_dates)], ignore_index=True) - return df - - df = fill_missing_with_nan(df=df, dff=dff) - - df["date"] = pd.to_datetime(df["date"]).dt.strftime("%Y-%m-%d") - df = df.sort_values(by="date", ignore_index=True) - - local_filename = f"/tmp/{wid}.jsonl" - df.to_json(local_filename, orient="records", lines=True, date_format="iso") - gcp.storage.upload( - bucket_name=env.QUESTION_BANK_BUCKET, - local_filename=local_filename, - destination_folder=source, - ) - return df - - -def add_to_dfq(dfq, dfr, page, wid, id_field_value): - """Add the question to dfq.""" - - def fill_template(page, page_key, values): - fill_values = {field: values[field] for field in page["question"][1]} - # Always maintain resolution_date and forecast_due_date when formatting the string - default_values = { - "resolution_date": "{resolution_date}", - "forecast_due_date": "{forecast_due_date}", - } - combined_fill_values = {**default_values, **fill_values} - return page[page_key][0].format(**combined_fill_values) - - dfr = dfr.sort_values(by="date") - value = dfr.iloc[-1]["value"] - - resolved = value is None - if "is_resolved_func" in page.keys(): - resolved = eval(f"wikipedia.{page['is_resolved_func']}(value)") - - if "value_func" in page.keys(): - value = eval(f"wikipedia.{page['value_func']}(value)") - - values = { - "id": id_field_value, - "value": value, - } - question = fill_template(page=page, page_key="question", values=values) - freeze_datetime_value_explanation = fill_template( - page=page, page_key="freeze_datetime_value_explanation", values=values - ) - - background = fill_template(page=page, page_key="background", values=values) - - row = { - "id": wid, - "question": question, - "background": background, - "market_info_resolution_criteria": "N/A", - "market_info_open_datetime": "N/A", - "market_info_close_datetime": "N/A", - "url": f"https://en.wikipedia.org/wiki/{page['page_title']}", - "market_info_resolution_datetime": "N/A", - "resolved": resolved, - "forecast_horizons": [] if resolved else constants.FORECAST_HORIZONS_IN_DAYS, - "freeze_datetime_value": value, - "freeze_datetime_value_explanation": freeze_datetime_value_explanation, - } - - df_question = pd.DataFrame([row]) - if row["id"] not in dfq["id"].values: - return df_question if dfq.empty else pd.concat([dfq, df_question], ignore_index=True) - - # Update the row where `dfq["id"] == df_question["id"]` - dfq = dfq.set_index("id") - df_question = df_question.set_index("id") - dfq.update(df_question) - return dfq.reset_index() - - -def update_page_questions(page, dfq, dff): - """Update questions and resolutions for the provided Wikipedia page.""" - question_id_root = page.get("id_root") - logger.info(f"Updating questions for for {question_id_root}.") - - # The `key` field of each page contains the unique entry/entries that make a question. - # See issue #123. - id_fields = [page["fields"][key] for key in page["key"]] - for _, row in dff[id_fields].drop_duplicates().iterrows(): - id_field_value_for_wid = str(row.iloc[0]) if len(row) == 1 else str(sorted(row)) - wid = wikipedia.id_hash(id_root=question_id_root, id_field_value=id_field_value_for_wid) - try: - dfr = create_resolution_file(dff=dff, page=page, wid=wid, question_key=row) - if dfr is not None: - dfq = add_to_dfq( - dfq=dfq, - dfr=dfr, - page=page, - wid=wid, - id_field_value=row[page["fields"]["id"]], - ) - except Exception as e: - logger.warning(f"Couldn't add {question_id_root} {wid}: {row}") - logger.warning(f"Exception encountered: {e}") - - return dfq - - -def resolve_questions_for_dropped_pages(dfq): - """Resolve questions for pages that have been removed from page.PAGES. - - If we ever remove pages, we want to stop sampling from those questions. - Simply resolve them. - """ - id_roots = [d["id_root"] for d in wikipedia.PAGES] - for index, row in dfq.iterrows(): - d = wikipedia.id_unhash(hash_key=row["id"]) - if d is None or d.get("id_root") not in id_roots: - dfq.loc[index, "resolved"] = True - return dfq - - -def resolve_questions_for_id_transformations(dfq): - """Resolve questions for keys in `wikipedia.transform_id_mapping`. - - `wikipedia.transform_id_mapping` contains keys of questions that were erroneously made for one - reason or another. Those keys point to the correct IDs for those questions. When the correct ID - is resolved, ensure the original question ID is resolved too. - """ - for key, value in wikipedia.transform_id_mapping.items(): - resolved_series = dfq[dfq["id"] == value]["resolved"] - if not resolved_series.empty and resolved_series.iloc[0]: - dfq.loc[dfq["id"] == key, "resolved"] = True - logger.info(f"Resolving: {key}") - return dfq - - -def update_all_forecast_questions(dfq): - """For each set of pages that is still being updated, download the associated fetch file.""" - for page in wikipedia.PAGES: - filename = wikipedia.get_fetch_filename(page.get("id_root")) - local_filename = f"/tmp/{filename}" - remote_filename = f"{wikipedia.fetch_directory}/{filename}" - dff = data_utils.download_and_read( - filename=remote_filename, local_filename=local_filename, df_tmp=pd.DataFrame(), dtype={} - ) - if not dff.empty: - dff["date"] = pd.to_datetime(dff["date"]) - if "clean_func" in page.keys(): - dff = eval(f"wikipedia.{page['clean_func']}(dff)") - dfq = update_page_questions(page=page, dfq=dfq, dff=dff) - - dfq = resolve_questions_for_dropped_pages(dfq=dfq) - dfq = resolve_questions_for_id_transformations(dfq=dfq) - return dfq - - -@decorator.log_runtime -def driver(_): - """Pull in fetched data and update questions and resolved values in question bank.""" - # Download pertinent files from Cloud Storage - logger.info("Downloading previously-fetched Wikipedia data from Cloud.") - - wikipedia.populate_hash_mapping() - - # We'll overwrite all questions for wikipedia.PAGES that we are still getting - # Only pull this in to save pages we've stopped fetching for one reason or another. - dfq = data_utils.get_data_from_cloud_storage(source=source, return_question_data=True) - - # Update the existing questions - dfq = update_all_forecast_questions(dfq) - - logger.info(f"Found {len(dfq)} questions.") - - # Save - with open(filenames["local_question"], "w", encoding="utf-8") as f: - for record in dfq.to_dict(orient="records"): - jsonl_str = json.dumps(record, ensure_ascii=False) - f.write(jsonl_str + "\n") - - # Upload Questions - gcp.storage.upload( - bucket_name=env.QUESTION_BANK_BUCKET, - local_filename=filenames["local_question"], - ) - - # Upload hash - wikipedia.upload_hash_mapping() - - logger.info("Done.") - - -if __name__ == "__main__": - driver(None) diff --git a/src/sources/wikipedia.py b/src/sources/wikipedia.py index 55027f41..b06c7db3 100644 --- a/src/sources/wikipedia.py +++ b/src/sources/wikipedia.py @@ -5,18 +5,30 @@ import hashlib import json import logging +import time +from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timedelta +from email.utils import parsedate_to_datetime from enum import Enum -from typing import ClassVar +from io import BytesIO +from typing import TYPE_CHECKING, ClassVar +from urllib.parse import parse_qs, urlparse import numpy as np import pandas as pd +import pandera.pandas as pa +from pandera.typing import DataFrame +from _fb_types import UpdateResult, WikipediaFetchResult +from _schemas import QuestionFrame from helpers import constants, dates from ._dataset import DatasetSource from ._metadata import SOURCE_METADATA +if TYPE_CHECKING: + import requests + logger = logging.getLogger(__name__) @@ -30,11 +42,724 @@ class QuestionType(Enum): SAME_OR_LESS = 4 +# --------------------------------------------------------------------------- +# Module-level constants +# --------------------------------------------------------------------------- + +_HEADERS = {"User-Agent": constants.BENCHMARK_USER_AGENT} + +_WIKIPEDIA_QUESTION_BANK_DATA_STORAGE_START_DATETIME = ( + constants.QUESTION_BANK_DATA_STORAGE_START_DATETIME - timedelta(days=360 * 4) +) +_WIKIPEDIA_QUESTION_BANK_DATA_STORAGE_START_DATE = ( + _WIKIPEDIA_QUESTION_BANK_DATA_STORAGE_START_DATETIME.date() +) + +_FIDE_BACKGROUND = ( + "The International Chess Federation (FIDE) governs international chess " + "competition. Each month, FIDE publishes the lists 'Top 100 Players', 'Top 100 " + "Women', 'Top 100 Juniors' and 'Top 100 Girls' and rankings of countries according " + "to the average rating of their top 10 players and top 10 female players.\n" + "To create the rankings, FIDE uses the Elo rating system, which is a method for " + "calculating the relative skill levels of players in zero-sum games such as chess. " + "The difference in the ratings between two players serves as a predictor of the " + "outcome of a match. Two players with equal ratings who play against each other " + "are expected to score an equal number of wins. A player whose rating is 100 " + "points greater than their opponent's is expected to score 64%; if the difference " + "is 200 points, then the expected score for the stronger player is 76%.\n" + "A player's Elo rating is a number which may change depending on the outcome of " + "rated games played. After every game, the winning player takes points from the " + "losing one. The difference between the ratings of the winner and loser determines " + "the total number of points gained or lost after a game. If the higher-rated " + "player wins, then only a few rating points will be taken from the lower-rated " + "player. However, if the lower-rated player scores an upset win, many rating " + "points will be transferred. The lower-rated player will also gain a few points " + "from the higher rated player in the event of a draw. This means that this rating " + "system is self-correcting. Players whose ratings are too low or too high should, " + "in the long run, do better or worse correspondingly than the rating system " + "predicts and thus gain or lose rating points until the ratings reflect their true " + "playing strength.\n" + "Elo ratings are comparative only, and are valid only within the rating pool in " + "which they were calculated, rather than being an absolute measure of a player's " + "strength." +) + +_PAGES = [ + { + "id_root": "FIDE_rankings_elo_rating", + "page_title": "FIDE_rankings", + "table_index": [ + { + "start_date": _WIKIPEDIA_QUESTION_BANK_DATA_STORAGE_START_DATE, + "table_index": [1, 3], + }, + ], + "question_type": QuestionType.ONE_PERCENT_MORE, + "key": { + "id", + }, + "fields": { + "id": "Player", + "value": "Rating", + }, + "resolution_file_value_column_dtype": int, + "question": ( + ( + "According to Wikipedia, will {id} have an Elo rating on {resolution_date} that's " + "at least 1% higher than on {forecast_due_date}?" + ), + ("id",), + ), + "background": (_FIDE_BACKGROUND, tuple()), + "freeze_datetime_value_explanation": ( + "{id}'s ELO rating.", + ("id",), + ), + "clean_func": "clean_FIDE_rankings", + }, + { + "id_root": "FIDE_rankings_ranking", + "page_title": "FIDE_rankings", + "table_index": [ + { + "start_date": _WIKIPEDIA_QUESTION_BANK_DATA_STORAGE_START_DATE, + "table_index": [1, 3], + }, + ], + "question_type": QuestionType.SAME_OR_LESS, + "key": { + "id", + }, + "fields": { + "id": "Player", + "value": "Rank", + }, + "resolution_file_value_column_dtype": int, + "question": ( + ( + "According to Wikipedia, will {id} have a FIDE ranking on {resolution_date} as " + "high or higher than their ranking on {forecast_due_date}?" + ), + ("id",), + ), + "background": (_FIDE_BACKGROUND, tuple()), + "freeze_datetime_value_explanation": ( + "{id}'s FIDE ranking.", + ("id",), + ), + "clean_func": "clean_FIDE_rankings", + }, + { + "id_root": "List_of_world_records_in_swimming", + "page_title": "List_of_world_records_in_swimming", + "table_index": [ + { + "start_date": _WIKIPEDIA_QUESTION_BANK_DATA_STORAGE_START_DATE, + "table_index": [0, 2], + }, + { + "start_date": datetime(2025, 5, 4).date(), + "table_index": [0, 1], + }, + ], + "question_type": QuestionType.SAME, + "key": { + "id", + "value", + }, + "fields": { + "id": "Name", + "value": "Event", + }, + "resolution_file_value_column_dtype": str, + "question": ( + ( + "According to Wikipedia, will {id} still hold the world record for {value} in " + "long course (50 metres) swimming pools on {resolution_date}?" + ), + ("id", "value"), + ), + "background": ( + ( + "The world records in swimming are ratified by World Aquatics (formerly known as " + "FINA), the international governing body of swimming. Records can be set in long " + "course (50 metres) or short course (25 metres) swimming pools.\n" + "The ratification process is described in FINA Rule SW12, and involves submission " + "of paperwork certifying the accuracy of the timing system and the length of the " + "pool, satisfaction of FINA rules regarding swimwear and a negative doping test by " + "the swimmer(s) involved. Records can be set at intermediate distances in an " + "individual race and for the first leg of a relay race. Records which have not yet " + "been fully ratified are marked with a '#' symbol in these lists." + ), + tuple(), + ), + "freeze_datetime_value_explanation": ( + "{id} is a record holder in the {value}.", + ( + "id", + "value", + ), + ), + "clean_func": "clean_List_of_world_records_in_swimming", + }, + { + "id_root": "List_of_infectious_diseases", + "page_title": "List_of_infectious_diseases", + "table_index": [ + { + "start_date": _WIKIPEDIA_QUESTION_BANK_DATA_STORAGE_START_DATE, + "table_index": 0, + }, + ], + "question_type": QuestionType.MORE, + "key": { + "id", + }, + "fields": { + "id": "Common name", + "value": "Vaccine(s)", + }, + "resolution_file_value_column_dtype": str, + "question": ( + ( + "According to Wikipedia, will a vaccine have been developed for {id} by " + "{resolution_date}?" + ), + ("id",), + ), + "background": ( + ( + "According to Wikipedia, {id} is the common name of an infectious disease. A " + "vaccine is a biological preparation that provides active acquired immunity to a " + "particular infectious or malignant disease. The safety and effectiveness of " + "vaccines has been widely studied and verified. A vaccine typically contains an " + "agent that resembles a disease-causing microorganism and is often made from " + "weakened or killed forms of the microbe, its toxins, or one of its surface " + "proteins. The agent stimulates the body's immune system to recognize the agent " + "as a threat, destroy it, and recognize further and destroy any of the " + "microorganisms associated with that agent that it may encounter in the future." + ), + ("id",), + ), + "freeze_datetime_value_explanation": ( + "Vaccine status for {id}. 'No' means that a vaccine has not yet been created. " + "'Yes' means that it has.", + ("id",), + ), + "clean_func": "clean_List_of_infectious_diseases", + "is_resolved_func": "is_resolved_List_of_infectious_diseases", + "value_func": "get_value_List_of_infectious_diseases", + }, +] + +for _page in _PAGES: + _page["table_index"].sort(key=lambda e: e["start_date"]) + + class WikipediaSource(DatasetSource): """Wikipedia dataset source with custom row-by-row resolution logic.""" name: ClassVar[str] = "wikipedia" + # ------------------------------------------------------------------ + # Public: fetch + # ------------------------------------------------------------------ + + def fetch(self, **kwargs) -> WikipediaFetchResult: + """Fetch Wikipedia table data for all configured pages. + + Returns a dict mapping id_root -> DataFrame of raw table data. + """ + + def _download_page(page): + session = self._make_session() + return page["id_root"], self._download_tables(page, session) + + results: WikipediaFetchResult = {} + with ThreadPoolExecutor(max_workers=len(_PAGES)) as ex: + for id_root, df in ex.map(_download_page, _PAGES): + if df is None or df.empty: + raise ValueError(f"No Wikipedia data was downloaded for {id_root}.") + results[id_root] = df + + return results + + # ------------------------------------------------------------------ + # Public: update + # ------------------------------------------------------------------ + + @pa.check_types + def update( + self, + dfq: DataFrame[QuestionFrame], + dff: WikipediaFetchResult, + **kwargs, + ) -> UpdateResult: + """Process fetched Wikipedia data into questions and resolution files. + + Args: + dfq (DataFrame[QuestionFrame]): Existing questions. + dff (WikipediaFetchResult): dict mapping id_root -> fetched table DataFrame. + """ + resolution_files: dict[str, pd.DataFrame] = {} + + for page in _PAGES: + id_root = page["id_root"] + page_dff = dff.get(id_root) + if page_dff is None or page_dff.empty: + continue + + page_dff = page_dff.copy() + page_dff["date"] = pd.to_datetime(page_dff["date"]) + if "clean_func" in page: + page_dff = eval(f"WikipediaSource.{page['clean_func']}(page_dff)") + + dfq, page_res = self._update_page_questions(page=page, dfq=dfq, dff=page_dff) + resolution_files.update(page_res) + + dfq = self._resolve_questions_for_dropped_pages(dfq) + dfq = self._resolve_questions_for_id_transformations(dfq) + + return UpdateResult( + dfq=dfq, + resolution_files=resolution_files, + hash_mapping=self.hash_mapping, + ) + + # ------------------------------------------------------------------ + # Private: fetch helpers + # ------------------------------------------------------------------ + + @staticmethod + def _make_session() -> requests.Session: + """Create an HTTP session with retry logic.""" + # NB: requests/bs4 are imported lazily here (and in _get_edit_history) rather than at module + # top level so that importing `sources.wikipedia` stays light. `helpers.wikipedia` and + # `sources.registry` both import this module, and those are pulled (directly or via + # `helpers.question_curation`) by ~13 jobs that never scrape Wikipedia (resolve, metaculus, + # metadata, curate, leaderboard, nightly, base_eval). A top-level import would force + # beautifulsoup4/lxml into all of their images. + # TODO: revisit once requirements are refactored — if those consumers stop importing this + # module at load time (e.g. question_curation rewired to sources._metadata), these can move + # back to module-level imports. + import requests + from requests.adapters import HTTPAdapter + from urllib3.util.retry import Retry + + session = requests.Session() + session.headers.update(_HEADERS) + _retry = Retry(total=3, backoff_factor=0.25, status_forcelist=[429, 500, 502, 503, 504]) + session.mount( + "https://", HTTPAdapter(pool_connections=8, pool_maxsize=8, max_retries=_retry) + ) + return session + + @staticmethod + def _get_edit_history(page_title: str, session: requests.Session) -> list[tuple]: + """Get the edit history of a Wikipedia page. + + Get the last edit of the day for each day between today and + _WIKIPEDIA_QUESTION_BANK_DATA_STORAGE_START_DATE. + """ + # Lazy import (see _make_session for rationale); TODO: revisit when requirements refactor. + from bs4 import BeautifulSoup + + base_history_url = ( + f"https://en.wikipedia.org/w/index.php?title={page_title}&action=history&limit=200" + ) + offset = "" + edit_history = [] + last_seen_dates = set() + + while True: + history_url = base_history_url + offset + response = session.get(history_url, timeout=30) + soup = BeautifulSoup(response.text, "html.parser") + edits = soup.find_all("li", attrs={"data-mw-revid": True}) + + for edit in edits: + edit_date_str = edit.find("a", class_="mw-changeslist-date").text + edit_date = datetime.strptime(edit_date_str, "%H:%M, %d %B %Y") + edit_url = ( + "https://en.wikipedia.org" + + edit.find("a", class_="mw-changeslist-date")["href"] + ) + oldid = parse_qs(urlparse(edit_url).query).get("oldid", [None])[0] + + if edit_date.date() not in last_seen_dates: + edit_history.append((edit_date, oldid)) + last_seen_dates.add(edit_date.date()) + + if edit_date.date() <= _WIKIPEDIA_QUESTION_BANK_DATA_STORAGE_START_DATE: + return [ + (dt, rev) + for dt, rev in edit_history + if dt.date() >= _WIKIPEDIA_QUESTION_BANK_DATA_STORAGE_START_DATE + ] + + next_page = soup.find("a", {"class": "mw-nextlink"}) + if not next_page: + break + offset = "&offset=" + next_page["href"].split("offset=")[1] + + return edit_history + + @staticmethod + def _download_wikipedia_table( + page_title: str, + edit_date: datetime, + revid: str, + table_index: list, + session: requests.Session, + ) -> pd.DataFrame: + """Download tables from url.""" + url = f"https://en.wikipedia.org/api/rest_v1/page/html/{page_title}/{revid}" + while True: + response = session.get(url, timeout=30) + if response.status_code == 429: + retry_after = response.headers.get("Retry-After") + delay = 5 + if retry_after: + try: + delay = int(retry_after) + except ValueError: + try: + retry_dt = parsedate_to_datetime(retry_after) + now = datetime.now(tz=retry_dt.tzinfo) + delay = max(0, int((retry_dt - now).total_seconds())) + except Exception: + delay = 5 + + logger.info(f"Rate limited, waiting {delay}s") + time.sleep(delay) + continue + response.raise_for_status() + break + + tables = pd.read_html(BytesIO(response.content)) + table_index_to_use = max( + [e for e in table_index if e["start_date"] <= edit_date.date()], + key=lambda e: e["start_date"], + ) + ti = table_index_to_use["table_index"] + return tables[ti] if isinstance(ti, int) else pd.concat([tables[i] for i in ti]) + + @staticmethod + def _download_tables(page: dict, session: requests.Session) -> pd.DataFrame | None: + """Download all historical changes for the tables on the page.""" + page_title = page.get("page_title") + n_rows_to_keep = page.get("table_keep_first_n_rows") + table_index = page.get("table_index", 0) + columns = list(page.get("fields").values()) + + edit_history = WikipediaSource._get_edit_history(page_title=page_title, session=session) + edit_history.sort(reverse=True, key=lambda x: x[0]) + + value_col = page["fields"]["value"] + value_col_dtype = page["resolution_file_value_column_dtype"] + + df_list = [] + for edit_date, revid in edit_history: + try: + dfw = WikipediaSource._download_wikipedia_table( + page_title=page_title, + edit_date=edit_date, + revid=revid, + table_index=table_index, + session=session, + ) + if n_rows_to_keep is not None: + dfw = dfw.iloc[:n_rows_to_keep] + dfw = dfw[columns] + dfw["date"] = edit_date.date().isoformat() + if value_col_dtype in (int, float): + dfw[value_col] = pd.to_numeric(dfw[value_col], errors="coerce") + elif value_col_dtype is str: + pass + else: + raise ValueError(f"`{value_col_dtype}` dtype not yet supported.") + dfw = dfw.dropna() + dfw[value_col] = dfw[value_col].astype(value_col_dtype) + df_list.append(dfw.dropna()) + except Exception as e: + logger.error(f"In {edit_date} {revid}\n{e}\n") + df = pd.concat(df_list, ignore_index=True) if df_list else None + return df + + # ------------------------------------------------------------------ + # Private: update helpers + # ------------------------------------------------------------------ + + @staticmethod + def _fill_template(page: dict, page_key: str, values: dict) -> str: + """Fill a question/background/explanation template.""" + fill_values = {field: values[field] for field in page[page_key][1]} + # Always maintain resolution_date and forecast_due_date when formatting the string. + default_values = { + "resolution_date": "{resolution_date}", + "forecast_due_date": "{forecast_due_date}", + } + combined_fill_values = {**default_values, **fill_values} + return page[page_key][0].format(**combined_fill_values) + + @staticmethod + def _build_resolution_df( + dff: pd.DataFrame, page: dict, wid: str, question_key: pd.Series + ) -> pd.DataFrame | None: + """Build the per-question resolution DataFrame. Returns a DataFrame or None. + + Validation is intentionally left to UpdateResult.__post_init__ (which validates every + resolution file against ResolutionFrame); here we only cast the id/date dtypes, mirroring + the other sources' resolution-building helpers. + + Args: + dff (pd.DataFrame): Fetched data DataFrame for a page. + page (dict): Page config dict. + wid (str): Hashed question ID. + question_key (pd.Series): Series with key field values identifying the question. + """ + id_field = page["fields"]["id"] + value_field = page["fields"]["value"] + + mask = pd.Series(True, index=dff.index) + for field_name in question_key.index: + mask &= dff[field_name] == question_key[field_name] + + df = dff[mask].copy() + if df["date"].max().date() < constants.QUESTION_BANK_DATA_STORAGE_START_DATE: + # Fetching more data than we need for naive forecasts. Don't need to create resolution + # files for events that are no longer current. + return None + + df.rename(columns={id_field: "id", value_field: "value"}, inplace=True) + df["id"] = wid + + def fill_missing_with_nan(df, dff): + """Fill in nan where the item has dropped out of the table. + + Sometimes values drop out of the table then reappear. This could be for valid reasons, + e.g. someone had a world record, lost it, then got it again. Either way, fill these with + nan. Invalid reasons (e.g. name changes) need to be caught by hand and nullified. + """ + all_dates = dff["date"].sort_values().unique() + all_dates = all_dates[all_dates >= constants.QUESTION_BANK_DATA_STORAGE_START_DATETIME] + next_after_df_max_date = all_dates[all_dates > df["date"].max()] + max_cutoff = ( + next_after_df_max_date.min() + if len(next_after_df_max_date) > 0 + else df["date"].max() + ) + all_dates = all_dates[(all_dates <= max_cutoff) & (all_dates >= df["date"].min())] + drop_out_dates = [] + for drop_out_date in [date for date in all_dates if date not in df["date"].unique()]: + drop_out_dates.append( + { + "id": wid, + "value": None, + "date": drop_out_date, + } + ) + df = pd.concat([df, pd.DataFrame(drop_out_dates)], ignore_index=True) + return df + + df = fill_missing_with_nan(df=df, dff=dff) + + df["date"] = pd.to_datetime(df["date"]).dt.strftime("%Y-%m-%d") + df = df.sort_values(by="date", ignore_index=True) + + return df[["id", "date", "value"]].astype(dtype=constants.RESOLUTION_FILE_COLUMN_DTYPE) + + def _add_to_dfq( + self, + dfq: pd.DataFrame, + dfr: pd.DataFrame, + page: dict, + wid: str, + id_field_value: str, + ) -> pd.DataFrame: + """Add the question to dfq.""" + dfr = dfr.sort_values(by="date") + value = dfr.iloc[-1]["value"] + + resolved = value is None + if "is_resolved_func" in page.keys(): + resolved = eval(f"WikipediaSource.{page['is_resolved_func']}(value)") + + if "value_func" in page.keys(): + value = eval(f"WikipediaSource.{page['value_func']}(value)") + + values = { + "id": id_field_value, + "value": value, + } + question = self._fill_template(page=page, page_key="question", values=values) + freeze_datetime_value_explanation = self._fill_template( + page=page, page_key="freeze_datetime_value_explanation", values=values + ) + + background = self._fill_template(page=page, page_key="background", values=values) + + row = { + "id": wid, + "question": question, + "background": background, + "market_info_resolution_criteria": "N/A", + "market_info_open_datetime": "N/A", + "market_info_close_datetime": "N/A", + "url": f"https://en.wikipedia.org/wiki/{page['page_title']}", + "market_info_resolution_datetime": "N/A", + "resolved": resolved, + "forecast_horizons": [] if resolved else constants.FORECAST_HORIZONS_IN_DAYS, + "freeze_datetime_value": value, + "freeze_datetime_value_explanation": freeze_datetime_value_explanation, + } + + df_question = pd.DataFrame([row]) + if row["id"] not in dfq["id"].values: + return df_question if dfq.empty else pd.concat([dfq, df_question], ignore_index=True) + + # Update the row where `dfq["id"] == df_question["id"]`. + dfq = dfq.set_index("id") + df_question = df_question.set_index("id") + dfq.update(df_question) + return dfq.reset_index() + + def _update_page_questions( + self, page: dict, dfq: pd.DataFrame, dff: pd.DataFrame + ) -> tuple[pd.DataFrame, dict[str, pd.DataFrame]]: + """Update questions and resolutions for the provided Wikipedia page. + + Returns (dfq, resolution_files_dict). + """ + question_id_root = page.get("id_root") + logger.info(f"Updating questions for {question_id_root}.") + + # The `key` field of each page contains the unique entry/entries that make a question. + # See issue #123. + id_fields = [page["fields"][key] for key in page["key"]] + resolution_files = {} + + for _, row in dff[id_fields].drop_duplicates().iterrows(): + id_field_value_for_wid = str(row.iloc[0]) if len(row) == 1 else str(sorted(row)) + wid = self._id_hash(id_root=question_id_root, id_field_value=id_field_value_for_wid) + try: + dfr = self._build_resolution_df(dff=dff, page=page, wid=wid, question_key=row) + if dfr is not None: + resolution_files[wid] = dfr + dfq = self._add_to_dfq( + dfq=dfq, + dfr=dfr, + page=page, + wid=wid, + id_field_value=row[page["fields"]["id"]], + ) + except Exception as e: + logger.warning(f"Couldn't add {question_id_root} {wid}: {row}") + logger.warning(f"Exception encountered: {e}") + + return dfq, resolution_files + + def _resolve_questions_for_dropped_pages(self, dfq: pd.DataFrame) -> pd.DataFrame: + """Resolve questions for pages that have been removed from _PAGES. + + If we ever remove pages, we want to stop sampling from those questions. Simply resolve them. + """ + id_roots = [d["id_root"] for d in _PAGES] + for index, row in dfq.iterrows(): + d = self._id_unhash(hash_key=row["id"]) + if d is None or d.get("id_root") not in id_roots: + dfq.loc[index, "resolved"] = True + return dfq + + @staticmethod + def _resolve_questions_for_id_transformations(dfq: pd.DataFrame) -> pd.DataFrame: + """Resolve questions for keys in `_TRANSFORM_ID_MAPPING`. + + `_TRANSFORM_ID_MAPPING` contains keys of questions that were erroneously made for one reason + or another. Those keys point to the correct IDs for those questions. When the correct ID is + resolved, ensure the original question ID is resolved too. + """ + for key, value in _TRANSFORM_ID_MAPPING.items(): + resolved_series = dfq[dfq["id"] == value]["resolved"] + if not resolved_series.empty and resolved_series.iloc[0]: + dfq.loc[dfq["id"] == key, "resolved"] = True + logger.info(f"Resolving: {key}") + return dfq + + # ------------------------------------------------------------------ + # Clean / value / resolved functions (referenced by _PAGES via eval) + # ------------------------------------------------------------------ + + @staticmethod + def clean_FIDE_rankings(df: pd.DataFrame) -> pd.DataFrame: + """Clean fetched data for FIDE_rankings. + + Fix inconsistent player names. + """ + df = df[~df["Player"].str.contains("Change from the previous month")].copy() + replacements = { + "Gukesh D.": "Gukesh Dommaraju", + "Gukesh D": "Gukesh Dommaraju", + "Leinier Dominguez": "Leinier Domínguez Pérez", + "Leinier Dominguez Pérez": "Leinier Domínguez Pérez", + "Nana Dzagnidze]": "Nana Dzagnidze", + } + df["Player"] = df["Player"].replace(replacements) + return df + + @staticmethod + def clean_List_of_world_records_in_swimming(df: pd.DataFrame) -> pd.DataFrame: + """Clean fetched data for List_of_world_records_in_swimming. + + Drop any rows that contain parens. + """ + df = df[~df["Name"].str.contains(r"[()]")].reset_index(drop=True) + df = df[~df["Name"].str.contains("eventsort")].reset_index(drop=True) + df = df[~df["Name"].str.contains("recordinfo")].reset_index(drop=True) + return df + + @staticmethod + def clean_List_of_infectious_diseases(df: pd.DataFrame) -> pd.DataFrame: + """Clean fetched data for List_of_infectious_diseases. + + * Remove rows with multiple answers. + * Change all `Under research[x]` to `No` + * Change all `No` to 0 + * Change all `Yes` to 1 + """ + duplicates = df[df.duplicated(subset=["date", "Common name"], keep=False)] + df = df.drop(duplicates.index).reset_index(drop=True) + # On and before this date the `"Vaccine(s)"` field had other info in it. + df = df[df["date"] > pd.Timestamp("2021-07-07")] + df["Vaccine(s)"] = df["Vaccine(s)"].replace( + { + r"Under research.*": "No", + r"Under Development.*": "No", + r"Yes.*": "Yes", + r"No.*": "No", + }, + regex=True, + ) + df.loc[df["Vaccine(s)"] == "No", "Vaccine(s)"] = 0 + df.loc[df["Vaccine(s)"] == "Yes", "Vaccine(s)"] = 1 + df["Vaccine(s)"] = df["Vaccine(s)"].astype(int) + df = df.dropna(ignore_index=True) + return df + + @staticmethod + def is_resolved_List_of_infectious_diseases(value) -> bool: + """Return true if the vaccine has been developed.""" + return value == 1 or str(value).lower() == "yes" + + @staticmethod + def get_value_List_of_infectious_diseases(value) -> str: + """Return Yes/No instead of 1/0.""" + return "Yes" if value else "No" + + # ------------------------------------------------------------------ + # Resolve + # ------------------------------------------------------------------ + def _resolve(self, df: pd.DataFrame, dfq: pd.DataFrame, dfr: pd.DataFrame) -> pd.DataFrame: """Resolve Wikipedia questions row by row.""" logger.info("Resolving Wikipedia questions.") @@ -99,11 +824,7 @@ def get_value(dfr, mid, date): ) return np.nan - # lazy to avoid circular import - # TO DO: fix during wikipedia refactor - from helpers.wikipedia import PAGES - - question_type = [q["question_type"] for q in PAGES if q["id_root"] == d["id_root"]] + question_type = [q["question_type"] for q in _PAGES if q["id_root"] == d["id_root"]] if len(question_type) != 1: logger.error( f"Nullifying Wikipedia market {mid}. Couldn't find comparison type " @@ -194,18 +915,6 @@ def _id_unhash(self, hash_key: str): hash_key = self._transform_id(hash_key) return self.hash_mapping.get(hash_key) - # ------------------------------------------------------------------ - # Fetch / update (not yet implemented) - # ------------------------------------------------------------------ - - def fetch(self, **kwargs): - """Fetch Wikipedia data.""" - raise NotImplementedError - - def update(self, dfq, dff, **kwargs): - """Process fetched Wikipedia data into questions and resolution files.""" - raise NotImplementedError - # flake8: noqa: B950 _TRANSFORM_ID_MAPPING = { diff --git a/src/tests/conftest.py b/src/tests/conftest.py index 48da008d..7e9ee1fb 100644 --- a/src/tests/conftest.py +++ b/src/tests/conftest.py @@ -13,6 +13,7 @@ from sources.manifold import ManifoldSource from sources.metaculus import MetaculusSource from sources.polymarket import PolymarketSource +from sources.wikipedia import WikipediaSource # --------------------------------------------------------------------------- # Time-freezing fixture @@ -99,6 +100,12 @@ def polymarket_source(): return PolymarketSource() +@pytest.fixture() +def wikipedia_source(): + """Return a WikipediaSource instance.""" + return WikipediaSource() + + # --------------------------------------------------------------------------- # DataFrame factories # --------------------------------------------------------------------------- diff --git a/src/tests/test_wikipedia.py b/src/tests/test_wikipedia.py index 3dcf9bc3..cb87ce68 100644 --- a/src/tests/test_wikipedia.py +++ b/src/tests/test_wikipedia.py @@ -1,12 +1,13 @@ -"""Tests for WikipediaSource: _compare_values, _ffill_dfr, _transform_id, hash mapping.""" +"""Tests for WikipediaSource: resolution, fetch, and update.""" -from datetime import date +from datetime import date, datetime +from unittest.mock import patch import pandas as pd import pytest from sources.wikipedia import QuestionType, WikipediaSource -from tests.conftest import make_resolution_df +from tests.conftest import make_forecast_df, make_question_df, make_resolution_df # --------------------------------------------------------------------------- # _compare_values @@ -229,3 +230,470 @@ def test_nullified_questions_are_nullified_question_instances(self): assert isinstance(nq, NullifiedQuestion) assert isinstance(nq.id, str) assert isinstance(nq.nullification_start_date, date) + + +# --------------------------------------------------------------------------- +# Clean / value / resolved functions +# --------------------------------------------------------------------------- + + +class TestCleanFunctions: + """Test the per-page cleaning helpers referenced by _PAGES.""" + + def test_clean_fide_rankings_replaces_names_and_drops_change_rows(self): + df = pd.DataFrame( + { + "Player": [ + "Gukesh D.", + "Leinier Dominguez", + "Change from the previous month", + "Magnus Carlsen", + ], + "Rating": [2780, 2750, 0, 2839], + } + ) + result = WikipediaSource.clean_FIDE_rankings(df) + players = list(result["Player"]) + assert "Change from the previous month" not in players + assert "Gukesh Dommaraju" in players + assert "Leinier Domínguez Pérez" in players + assert "Magnus Carlsen" in players + + def test_clean_swimming_drops_parens_and_metadata_rows(self): + df = pd.DataFrame( + { + "Name": ["Sarah Sjöström", "Someone (relay)", "eventsort", "recordinfo"], + "Event": ["50m freestyle", "x", "y", "z"], + } + ) + result = WikipediaSource.clean_List_of_world_records_in_swimming(df) + assert list(result["Name"]) == ["Sarah Sjöström"] + + def test_clean_infectious_diseases_maps_yes_no_to_binary(self): + df = pd.DataFrame( + { + "date": pd.to_datetime(["2024-06-01", "2024-06-01", "2024-06-01", "2024-06-01"]), + "Common name": ["Measles", "Smallpox", "DiseaseX", "DiseaseY"], + "Vaccine(s)": ["Yes, since 1963", "No", "Under research[1]", "Under Development"], + } + ) + result = WikipediaSource.clean_List_of_infectious_diseases(df) + mapping = dict(zip(result["Common name"], result["Vaccine(s)"])) + assert mapping["Measles"] == 1 + assert mapping["Smallpox"] == 0 + assert mapping["DiseaseX"] == 0 + assert mapping["DiseaseY"] == 0 + + def test_is_resolved_infectious_diseases(self): + assert WikipediaSource.is_resolved_List_of_infectious_diseases(1) is True + assert WikipediaSource.is_resolved_List_of_infectious_diseases("Yes") is True + assert WikipediaSource.is_resolved_List_of_infectious_diseases(0) is False + + def test_get_value_infectious_diseases(self): + assert WikipediaSource.get_value_List_of_infectious_diseases(1) == "Yes" + assert WikipediaSource.get_value_List_of_infectious_diseases(0) == "No" + + +# --------------------------------------------------------------------------- +# _fill_template +# --------------------------------------------------------------------------- + + +class TestFillTemplate: + """Test question/explanation template filling.""" + + def test_fills_id_keeps_date_placeholders(self): + page = { + "question": ( + "Will {id} have an Elo rating on {resolution_date} higher than " + "{forecast_due_date}?", + ("id",), + ) + } + result = WikipediaSource._fill_template( + page=page, page_key="question", values={"id": "Magnus Carlsen"} + ) + assert "Magnus Carlsen" in result + # date placeholders are preserved for later formatting + assert "{resolution_date}" in result + assert "{forecast_due_date}" in result + + +# --------------------------------------------------------------------------- +# _build_resolution_df +# --------------------------------------------------------------------------- + + +def _fide_elo_page(): + from sources.wikipedia import _PAGES + + return next(p for p in _PAGES if p["id_root"] == "FIDE_rankings_elo_rating") + + +class TestBuildResolutionDf: + """Test resolution-DataFrame construction from fetched page data.""" + + def test_returns_none_when_data_is_stale(self): + page = _fide_elo_page() + # All data well before QUESTION_BANK_DATA_STORAGE_START_DATE (2023-05-07). + dff = pd.DataFrame( + { + "Player": ["Old Player", "Old Player"], + "Rating": [2700, 2710], + "date": pd.to_datetime(["2020-01-01", "2020-02-01"]), + } + ) + question_key = pd.Series({"Player": "Old Player"}) + result = WikipediaSource._build_resolution_df( + dff=dff, page=page, wid="wid1", question_key=question_key + ) + assert result is None + + def test_builds_id_date_value_frame(self): + page = _fide_elo_page() + dff = pd.DataFrame( + { + "Player": ["Magnus Carlsen", "Magnus Carlsen"], + "Rating": [2839, 2850], + "date": pd.to_datetime(["2024-06-01", "2024-07-01"]), + } + ) + question_key = pd.Series({"Player": "Magnus Carlsen"}) + result = WikipediaSource._build_resolution_df( + dff=dff, page=page, wid="wid1", question_key=question_key + ) + assert list(result.columns) == ["id", "date", "value"] + assert (result["id"] == "wid1").all() + # id/date are cast to str (RESOLUTION_FILE_COLUMN_DTYPE); value stays mixed (ANY). + assert result["id"].dtype == object and isinstance(result["id"].iloc[0], str) + assert isinstance(result["date"].iloc[0], str) + assert list(result["value"]) == [2839, 2850] + + def test_fills_nan_when_item_drops_out_of_table(self): + """A date present in the page but missing for this question gets a None value. + + Player A is absent on 2024-07-01 (a date that exists in the page because Player B was + recorded then), so a None row must be inserted for A — the core fill_missing_with_nan + behavior described in the method docstring. + """ + page = _fide_elo_page() + dff = pd.DataFrame( + { + "Player": ["A", "B", "A"], + "Rating": [2800, 2700, 2820], + "date": pd.to_datetime(["2024-06-01", "2024-07-01", "2024-08-01"]), + } + ) + question_key = pd.Series({"Player": "A"}) + result = WikipediaSource._build_resolution_df( + dff=dff, page=page, wid="widA", question_key=question_key + ) + by_date = dict(zip(result["date"], result["value"])) + assert set(by_date) == {"2024-06-01", "2024-07-01", "2024-08-01"} + assert by_date["2024-06-01"] == 2800 + assert by_date["2024-08-01"] == 2820 + assert pd.isna(by_date["2024-07-01"]) # dropped out -> None + + +# --------------------------------------------------------------------------- +# update() — end to end +# --------------------------------------------------------------------------- + + +class TestUpdate: + """Behavioral tests for WikipediaSource.update().""" + + def test_creates_question_and_resolution_file(self, wikipedia_source): + dff = { + "FIDE_rankings_elo_rating": pd.DataFrame( + { + "Player": ["Magnus Carlsen", "Magnus Carlsen"], + "Rating": [2839, 2850], + "date": ["2024-06-01", "2024-07-01"], + } + ) + } + dfq = make_question_df([{"id": "seed"}]).iloc[0:0] + result = wikipedia_source.update(dfq, dff) + + # One question created. + added = result.dfq[result.dfq["question"].str.contains("Magnus Carlsen", na=False)] + assert len(added) == 1 + row = added.iloc[0] + assert "Elo rating" in row["question"] + assert row["freeze_datetime_value_explanation"] == "Magnus Carlsen's ELO rating." + assert row["url"] == "https://en.wikipedia.org/wiki/FIDE_rankings" + assert not row["resolved"] + # freeze_datetime_value is the latest fetched rating (coerced to str by QuestionFrame). + assert row["freeze_datetime_value"] == "2850" + + # One resolution file, keyed by the hashed question id. + wid = row["id"] + assert wid in result.resolution_files + assert list(result.resolution_files[wid].columns) == ["id", "date", "value"] + + # Hash mapping populated so the id can be unhashed back to its root. + assert result.hash_mapping[wid]["id_root"] == "FIDE_rankings_elo_rating" + + def test_skips_pages_absent_from_fetch(self, wikipedia_source): + dfq = make_question_df([{"id": "seed"}]).iloc[0:0] + result = wikipedia_source.update(dfq, {}) + # No fetch data -> no questions added, no resolution files. + assert result.resolution_files == {} + + def test_resolves_questions_for_dropped_pages(self, wikipedia_source): + # A pre-existing question whose id is not in the hash mapping / not a current page + # is marked resolved. + dfq = make_question_df([{"id": "unknown_id", "resolved": False}]) + result = wikipedia_source.update(dfq, {}) + assert result.dfq[result.dfq["id"] == "unknown_id"]["resolved"].iloc[0] + + def test_resolves_questions_for_id_transformations(self): + old_id = "d4fd9e41e71c3e5a2992b9c8b36ff655eb7265b7a46a434484f1267eabd59b92" + new_id = "a1c131d5c2ad476fc579b30b72ea6762e3b6324b0252a57c10c890436604f44f" + dfq = make_question_df( + [ + {"id": old_id, "resolved": False}, + {"id": new_id, "resolved": True}, + ] + ) + result = WikipediaSource._resolve_questions_for_id_transformations(dfq) + # When the new id is resolved, the deprecated old id is resolved too. + assert result[result["id"] == old_id]["resolved"].iloc[0] + + def test_infectious_disease_uses_is_resolved_and_value_funcs(self, wikipedia_source): + """The is_resolved_func / value_func path: vaccine=Yes -> resolved, freeze value 'Yes'.""" + dff = { + "List_of_infectious_diseases": pd.DataFrame( + { + "Common name": ["Measles", "DiseaseX"], + "Vaccine(s)": ["Yes", "No"], + "date": ["2024-06-01", "2024-06-01"], + } + ) + } + dfq = make_question_df([{"id": "seed"}]).iloc[0:0] + result = wikipedia_source.update(dfq, dff) + + measles = result.dfq[result.dfq["question"].str.contains("Measles", na=False)].iloc[0] + diseasex = result.dfq[result.dfq["question"].str.contains("DiseaseX", na=False)].iloc[0] + # Vaccine present -> resolved, freeze value rendered as "Yes" by value_func. + assert measles["resolved"] + assert measles["freeze_datetime_value"] == "Yes" + # No vaccine -> unresolved, "No". + assert not diseasex["resolved"] + assert diseasex["freeze_datetime_value"] == "No" + + def test_updates_existing_question_in_place(self, wikipedia_source): + """A question already in dfq is updated, not duplicated (the _add_to_dfq update branch).""" + wid = wikipedia_source._id_hash( + id_root="FIDE_rankings_elo_rating", id_field_value="Magnus Carlsen" + ) + dfq = make_question_df([{"id": wid, "question": "STALE QUESTION", "resolved": False}]) + dff = { + "FIDE_rankings_elo_rating": pd.DataFrame( + { + "Player": ["Magnus Carlsen"], + "Rating": [2850], + "date": ["2024-07-01"], + } + ) + } + result = wikipedia_source.update(dfq, dff) + matching = result.dfq[result.dfq["id"] == wid] + assert len(matching) == 1 # updated in place, not appended + assert "Magnus Carlsen" in matching.iloc[0]["question"] + assert "STALE" not in matching.iloc[0]["question"] + + +# --------------------------------------------------------------------------- +# _download_tables (fetch data shaping, network calls mocked) +# --------------------------------------------------------------------------- + + +class TestDownloadTables: + """_download_tables selects columns, stamps the date, coerces value dtype, drops NaN.""" + + def test_coerces_value_dtype_and_drops_unparseable_rows(self): + page = _fide_elo_page() # fields Player/Rating, value dtype int + raw = pd.DataFrame({"Player": ["A", "B"], "Rating": ["2800", "not-a-number"]}) + with patch.object( + WikipediaSource, + "_get_edit_history", + return_value=[(datetime(2024, 6, 1, 12, 0), "rev1")], + ), patch.object(WikipediaSource, "_download_wikipedia_table", return_value=raw.copy()): + result = WikipediaSource._download_tables(page, session=object()) + + # "not-a-number" -> NaN -> row dropped; A kept as an int; edit date stamped. + assert list(result["Player"]) == ["A"] + assert result["Rating"].iloc[0] == 2800 + assert result["Rating"].dtype.kind in "iu" + assert result["date"].iloc[0] == "2024-06-01" + + +# --------------------------------------------------------------------------- +# fetch() +# --------------------------------------------------------------------------- + + +class TestFetch: + """fetch() orchestrates per-page downloads and returns a dict keyed by id_root.""" + + def test_returns_dict_keyed_by_id_root(self, wikipedia_source): + fake_df = pd.DataFrame({"Player": ["A"], "Rating": [2800], "date": ["2024-06-01"]}) + + def fake_download_tables(page, session): + return fake_df.copy() + + with patch.object(WikipediaSource, "_make_session", return_value=object()), patch.object( + WikipediaSource, "_download_tables", side_effect=fake_download_tables + ): + result = wikipedia_source.fetch() + + from sources.wikipedia import _PAGES + + assert set(result.keys()) == {p["id_root"] for p in _PAGES} + for df in result.values(): + assert not df.empty + + def test_raises_when_page_returns_no_rows(self, wikipedia_source): + def fake_download_tables(page, session): + if page["id_root"] == "List_of_infectious_diseases": + return pd.DataFrame({"Common name": ["X"], "Vaccine(s)": ["No"], "date": ["x"]}) + return None + + with patch.object(WikipediaSource, "_make_session", return_value=object()), patch.object( + WikipediaSource, "_download_tables", side_effect=fake_download_tables + ): + with pytest.raises(ValueError, match="No Wikipedia data was downloaded"): + wikipedia_source.fetch() + + def test_raises_when_page_returns_empty_dataframe(self, wikipedia_source): + def fake_download_tables(page, session): + if page["id_root"] == "List_of_infectious_diseases": + return pd.DataFrame() + return pd.DataFrame({"Common name": ["X"], "Vaccine(s)": ["No"], "date": ["x"]}) + + with patch.object(WikipediaSource, "_make_session", return_value=object()), patch.object( + WikipediaSource, "_download_tables", side_effect=fake_download_tables + ): + with pytest.raises(ValueError, match="No Wikipedia data was downloaded"): + wikipedia_source.fetch() + + +# --------------------------------------------------------------------------- +# resolve() — custom row-by-row resolution +# --------------------------------------------------------------------------- + + +class TestResolve: + """End-to-end tests for the custom resolution path (resolve -> _resolve).""" + + def test_resolves_single_question_to_binary(self, wikipedia_source, freeze_today): + freeze_today(date(2025, 6, 2)) # yesterday = 2025-06-01 + wid = wikipedia_source._id_hash( + id_root="FIDE_rankings_ranking", id_field_value="Test Player" + ) + df = make_forecast_df( + [ + { + "id": wid, + "source": "wikipedia", + "forecast_due_date": "2025-01-01", + "resolution_date": "2025-06-01", + } + ] + ) + dfr = make_resolution_df( + [ + {"id": wid, "date": "2025-01-01", "value": 5}, + {"id": wid, "date": "2025-06-01", "value": 3}, + ] + ) + resolved, warnings = wikipedia_source.resolve( + df, pd.DataFrame(), dfr, forecast_due_date=date(2025, 1, 1) + ) + row = resolved.iloc[0] + # SAME_OR_LESS: rank improved from 5 to 3 (3 <= 5) -> resolves True (1.0). + assert row["resolved_to"] == 1.0 + assert row["resolved"] + assert warnings == [] + + def test_nullified_question_resolves_to_nan(self, wikipedia_source, freeze_today): + freeze_today(date(2025, 6, 2)) + # Monkeypox -> Mpox id, nullified from 2022-08-21. + nid = "f9323386a651ce67fc0da31285bee22a4ec53b8a2ea5220431ecb4560fb44c77" + df = make_forecast_df( + [ + { + "id": nid, + "source": "wikipedia", + "forecast_due_date": "2025-01-01", + "resolution_date": "2025-06-01", + } + ] + ) + dfr = make_resolution_df([{"id": "unused", "date": "2025-01-01", "value": 1}]) + resolved, _ = wikipedia_source.resolve( + df, pd.DataFrame(), dfr, forecast_due_date=date(2025, 1, 1) + ) + row = resolved.iloc[0] + assert pd.isna(row["resolved_to"]) + assert row["resolved"] + + def test_future_resolution_date_not_resolved(self, wikipedia_source, freeze_today): + freeze_today(date(2025, 6, 2)) + wid = wikipedia_source._id_hash( + id_root="FIDE_rankings_ranking", id_field_value="Future Player" + ) + df = make_forecast_df( + [ + { + "id": wid, + "source": "wikipedia", + "forecast_due_date": "2025-01-01", + "resolution_date": "2025-12-31", # after yesterday -> not yet resolvable + } + ] + ) + dfr = make_resolution_df( + [ + {"id": wid, "date": "2025-01-01", "value": 5}, + {"id": wid, "date": "2025-06-01", "value": 3}, + ] + ) + resolved, _ = wikipedia_source.resolve( + df, pd.DataFrame(), dfr, forecast_due_date=date(2025, 1, 1) + ) + row = resolved.iloc[0] + assert pd.isna(row["resolved_to"]) + assert not row["resolved"] + + def test_resolves_combo_question(self, wikipedia_source, freeze_today): + freeze_today(date(2025, 6, 2)) + w1 = wikipedia_source._id_hash(id_root="FIDE_rankings_ranking", id_field_value="P1") + w2 = wikipedia_source._id_hash(id_root="FIDE_rankings_ranking", id_field_value="P2") + df = make_forecast_df( + [ + { + "id": (w1, w2), + "source": "wikipedia", + "direction": (1, 1), + "forecast_due_date": "2025-01-01", + "resolution_date": "2025-06-01", + } + ] + ) + dfr = make_resolution_df( + [ + {"id": w1, "date": "2025-01-01", "value": 5}, + {"id": w1, "date": "2025-06-01", "value": 3}, + {"id": w2, "date": "2025-01-01", "value": 8}, + {"id": w2, "date": "2025-06-01", "value": 4}, + ] + ) + resolved, _ = wikipedia_source.resolve( + df, pd.DataFrame(), dfr, forecast_due_date=date(2025, 1, 1) + ) + # Both sub-questions resolve True; direction (1, 1) -> 1.0 * 1.0 = 1.0. + assert resolved.iloc[0]["resolved_to"] == 1.0