From 6638db7e6877faaf913dcfd98dd530633f561a6c Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Thu, 11 Jun 2026 12:12:18 +0300 Subject: [PATCH 1/4] refactor: acled --- Makefile | 4 +- src/_schemas.py | 20 + .../requirements.txt | 1 + src/helpers/acled.py | 216 +------ .../func_acled_fetch}/Makefile | 22 +- src/orchestration/func_acled_fetch/main.py | 36 ++ .../func_acled_fetch}/requirements.txt | 5 +- .../func_acled_update}/Makefile | 20 +- src/orchestration/func_acled_update/main.py | 49 ++ .../func_acled_update}/requirements.txt | 1 + src/questions/acled/fetch/main.py | 165 ----- src/questions/acled/update_questions/main.py | 151 ----- src/sources/acled.py | 519 ++++++++++++++- src/tests/conftest.py | 66 ++ src/tests/test_acled.py | 598 +++++++++++++++++- src/tests/test_acled_fetch.py | 186 ------ 16 files changed, 1317 insertions(+), 742 deletions(-) rename src/{questions/acled/fetch => orchestration/func_acled_fetch}/Makefile (62%) create mode 100644 src/orchestration/func_acled_fetch/main.py rename src/{questions/acled/update_questions => orchestration/func_acled_fetch}/requirements.txt (76%) rename src/{questions/acled/update_questions => orchestration/func_acled_update}/Makefile (68%) create mode 100644 src/orchestration/func_acled_update/main.py rename src/{questions/acled/fetch => orchestration/func_acled_update}/requirements.txt (94%) delete mode 100644 src/questions/acled/fetch/main.py delete mode 100644 src/questions/acled/update_questions/main.py delete mode 100644 src/tests/test_acled_fetch.py diff --git a/Makefile b/Makefile index b65e2f67..540423d3 100644 --- a/Makefile +++ b/Makefile @@ -151,10 +151,10 @@ infer-update-questions: acled: acled-fetch acled-update-questions acled-fetch: - $(MAKE) -C src/questions/acled/fetch || echo "* $@" >> $(MAKE_FAILURE_LOG) + $(MAKE) -C src/orchestration/func_acled_fetch || echo "* $@" >> $(MAKE_FAILURE_LOG) acled-update-questions: - $(MAKE) -C src/questions/acled/update_questions || echo "* $@" >> $(MAKE_FAILURE_LOG) + $(MAKE) -C src/orchestration/func_acled_update || echo "* $@" >> $(MAKE_FAILURE_LOG) yfinance: yfinance-fetch yfinance-update-questions diff --git a/src/_schemas.py b/src/_schemas.py index 76495e69..03e1f4d9 100644 --- a/src/_schemas.py +++ b/src/_schemas.py @@ -124,6 +124,26 @@ class Config: coerce = True +class AcledFetchFrame(pa.DataFrameModel): + """Output of AcledSource.fetch(). Raw per-event rows from the ACLED API.""" + + event_id_cnty: Series[str] + event_date: Series[str] + iso: Series[int] + region: Series[str] + country: Series[str] + admin1: Series[str] + event_type: Series[str] + fatalities: Series[int] + timestamp: Series[str] + + class Config: + """Schema configuration.""" + + strict = False + coerce = True + + class AcledResolutionFrame(pa.DataFrameModel): """ACLED-specific: aggregated events by country and date. diff --git a/src/base_eval/naive_and_dummy_forecasters/requirements.txt b/src/base_eval/naive_and_dummy_forecasters/requirements.txt index af107f8f..f55743b3 100644 --- a/src/base_eval/naive_and_dummy_forecasters/requirements.txt +++ b/src/base_eval/naive_and_dummy_forecasters/requirements.txt @@ -1,6 +1,7 @@ google-cloud-storage google-cloud-secret-manager gcsfs==2024.3.1 +backoff pandas>=2.2.2,<3.0 pandas-market-calendars tqdm diff --git a/src/helpers/acled.py b/src/helpers/acled.py index c0ba7905..17236e5a 100644 --- a/src/helpers/acled.py +++ b/src/helpers/acled.py @@ -1,15 +1,26 @@ -"""ACLED-specific variables.""" +"""ACLED shared helpers. + +Light home of ACLED's pure computation (aggregations + naive-forecast helpers): the module +imports only numpy/pandas and the lightweight metadata layer at load time. Hash-mapping access +routes through a lazily-instantiated ``AcledSource`` (see ``_get_source``), so importing this +module stays light — only *calling* the hash funcs pulls the (now heavy) ``sources.acled``. + +The sole caller of those hash funcs is the unrefactored ``base_eval`` naive forecaster, which +therefore declares ``backoff`` in its requirements. ``question_curation`` imports this module only +for the ``SOURCE_INTRO``/``RESOLUTION_CRITERIA`` constants (served from ``_metadata``); it never +triggers the lazy import, so it and its many consumers stay light. + +When ``base_eval`` is refactored to call ``AcledSource.get_naive_forecast()`` this computation can +move onto the source class (Phase 1 plan) and this module shrinks to a metadata-only shim. +""" from datetime import timedelta -from enum import Enum import numpy as np import pandas as pd from sources._metadata import SOURCE_METADATA -from . import data_utils - SOURCE_INTRO = SOURCE_METADATA["acled"]["source_intro"] RESOLUTION_CRITERIA = SOURCE_METADATA["acled"]["resolution_criteria"] @@ -54,134 +65,6 @@ def upload_hash_mapping(): _upload(raw_json, source) -FETCH_COLUMN_DTYPE = { - "event_id_cnty": str, - "event_date": str, - "iso": int, - "region": str, - "country": str, - "admin1": str, - "event_type": str, - "fatalities": int, - "timestamp": str, -} -FETCH_COLUMNS = list(FETCH_COLUMN_DTYPE.keys()) - -BACKGROUND = """ -ACLED classifies events into six distinct categories: - -1. Battles: violent interactions between two organized armed groups at a particular time and - location; -2. Protests: in-person public demonstrations of three or more participants in which the participants - do not engage in violence, though violence may be used against them; -3. Riots: violent events where demonstrators or mobs of three or more engage in violent or - destructive acts, including but not limited to physical fights, rock throwing, property - destruction, etc.; -4. Explosions/Remote violence: incidents in which one side uses weapon types that, by their nature, - are at range and widely destructive; -5. Violence against civilians: violent events where an organized armed group inflicts violence upon - unarmed non-combatants; and -6. Strategic developments: contextually important information regarding incidents and activities of - groups that are not recorded as any of the other event types, yet may trigger future events or - contribute to political dynamics within and across states. - -Detailed information about the categories can be found at: -https://acleddata.com/knowledge-base/codebook/#acled-events -""" - - -def read_dff(local_question_bank_dir=None) -> pd.DataFrame: - """ - Read fetch file and create dfr. - - Args: - local_question_bank_dir (str): the location where the question bank was unzipped. - - Returns: - df (pd.DataFrame): parsed and formatted fetch file. - dfr (pd.DataFrame): the ACLED resolution values. - """ - if local_question_bank_dir is None: - filenames = data_utils.generate_filenames(source=source) - df = data_utils.download_and_read( - filename=filenames["jsonl_fetch"], - local_filename=filenames["local_fetch"], - df_tmp=pd.DataFrame(columns=FETCH_COLUMNS), - dtype=FETCH_COLUMN_DTYPE, - ) - else: - filenames = data_utils.generate_filenames(source=source) - source_fetch_file = filenames.get("jsonl_fetch") - local_filename = f"{local_question_bank_dir}/{source_fetch_file}" - - df = pd.read_json( - local_filename, - lines=True, - dtype=FETCH_COLUMN_DTYPE, - convert_dates=False, - ) - - # The values for the `event_date` field in the following entries are incorrect. - # They are "0025-" for "2025" and "0024-" for "2024-" - # - # Bug reported to ACLED on 26 Sept 2025 - # - # 2025: - # * https://acleddata.com/api/acled/read?_format=json&event_id_cnty=ABW24 - # * https://acleddata.com/api/acled/read?_format=json&event_id_cnty=YEM104718 - # * https://acleddata.com/api/acled/read?_format=json&event_id_cnty=YEM99604 - # - # 2024: - # * https://acleddata.com/api/acled/read?_format=json&event_id_cnty=NCL346 - # * https://acleddata.com/api/acled/read?_format=json&event_id_cnty=NCL351 - # * https://acleddata.com/api/acled/read?_format=json&event_id_cnty=PYF127 - def fix_year_prefix(date_str): - if isinstance(date_str, str): - if date_str.startswith("0025-"): - return "2025-" + date_str[5:] - if date_str.startswith("0024-"): - return "2024-" + date_str[5:] - return date_str - - df["event_date"] = df["event_date"].apply(fix_year_prefix) - # End fix bug with ACLED data - - df["event_date"] = pd.to_datetime(df["event_date"]) - - df = df[["country", "event_date", "event_type", "fatalities"]].copy() - - dfr = ( - pd.get_dummies(df, columns=["event_type"], prefix="", prefix_sep="") - .groupby(["country", "event_date"]) - .sum() - .reset_index() - ) - - return df, dfr - - -def download_dff_and_prepare_dfr(local_question_bank_dir: str = None) -> tuple: - """Prepare ACLED data for resolution.""" - df, dfr = read_dff(local_question_bank_dir=local_question_bank_dir) - countries = df["country"].unique() - event_types = list(df["event_type"].unique()) + ["fatalities"] - return ( - dfr, - countries, - event_types, - ) - - -class QuestionType(Enum): - """Types of questions. - - These will determine how a given question is resolved. - """ - - N_30_DAYS_GT_30_DAY_AVG_OVER_PAST_360_DAYS = 0 - N_30_DAYS_X_10_GT_30_DAY_AVG_OVER_PAST_360_DAYS_PLUS_1 = 1 - - def get_forecast(comparison_value, dfr, country, col, ref_date): """Retrun the LHS of the comparison for the question. @@ -233,17 +116,6 @@ def get_base_comparison_value(key, dfr, country, col, ref_date): raise ValueError("Invalid key.") -def get_freeze_value(key, dfr, country, event_type, today): - """Return the freeze value given the key.""" - if key == "last30Days.gt.30DayAvgOverPast360Days": - return thirty_day_avg_over_past_360_days(dfr, country, event_type, today) - - if key == "last30DaysTimes10.gt.30DayAvgOverPast360DaysPlus1": - return thirty_day_avg_over_past_360_days_plus_1(dfr, country, event_type, today) - - raise Exception("Invalid key.") - - def sum_over_past_30_days(dfr, country, col, ref_date): """Sum over the 30 days before the ref_date.""" dfc = dfr[dfr["country"] == country].copy() @@ -269,61 +141,3 @@ def thirty_day_avg_over_past_360_days(dfr, country, col, ref_date): def thirty_day_avg_over_past_360_days_plus_1(dfr, country, col, ref_date): """Get 1 plus the 30 day average over the 360 days before the ref_date.""" return 1 + thirty_day_avg_over_past_360_days(dfr, country, col, ref_date) - - -QUESTIONS = { - "last30Days.gt.30DayAvgOverPast360Days": { - "question_type": QuestionType.N_30_DAYS_GT_30_DAY_AVG_OVER_PAST_360_DAYS, - "question": ( - ( - "Will there be more {event_type} in {country} for the 30 days before " - "{resolution_date} compared to the 30-day average of {event_type} over the 360 " - "days preceding {forecast_due_date}?" - "\n\n" - "e.g. If the forecast due date is 2024-01-01 and we have the following data:\n" - "Date,{event_type}\n" - "2023-11-11,1\n" - "2023-10-10,2\n" - "to calculate the 30-day average of {event_type} over the preceding 360 " - "days, we’d have: (1+2)/12=0.25.\n\n" - "In this example, for the question to resolve positively, 1 or more " - "{event_type} would need to occur in the 30 days leading up to the resolution." - ), - ("event_type", "country"), - ), - "freeze_datetime_value_explanation": ( - ( - "The 30-day average of {event_type} over the past 360 days in {country}. " - "This reference value will potentially change as ACLED updates its dataset." - ), - ("event_type", "country"), - ), - }, - "last30DaysTimes10.gt.30DayAvgOverPast360DaysPlus1": { - "question_type": QuestionType.N_30_DAYS_X_10_GT_30_DAY_AVG_OVER_PAST_360_DAYS_PLUS_1, - "question": ( - ( - "Will there be more than ten times as many {event_type} in {country} for the 30 " - "days before {resolution_date} compared to one plus the 30-day average of " - "{event_type} over the 360 days preceding {forecast_due_date}?" - "\n\n" - "e.g. If the forecast due date is 2024-01-01 and we have the following data:\n" - "Date,{event_type}\n" - "2023-11-11,1\n" - "2023-10-10,2\n" - "to calculate one plus the 30-day average of {event_type} over the preceding 360 " - "days, we’d have: 1+(1+2)/12=1.25.\n\n" - "In this example, for the question to resolve positively, 13 (10 x 1.25) or more " - "{event_type} would need to occur in the 30 days leading up to the resolution." - ), - ("event_type", "country"), - ), - "freeze_datetime_value_explanation": ( - ( - "One plus the 30-day average of {event_type} over the past 360 days in {country}. " - "This reference value will potentially change as ACLED updates its dataset." - ), - ("event_type", "country"), - ), - }, -} diff --git a/src/questions/acled/fetch/Makefile b/src/orchestration/func_acled_fetch/Makefile similarity index 62% rename from src/questions/acled/fetch/Makefile rename to src/orchestration/func_acled_fetch/Makefile index 803db85e..36e8ea4c 100644 --- a/src/questions/acled/fetch/Makefile +++ b/src/orchestration/func_acled_fetch/Makefile @@ -9,19 +9,23 @@ UPLOAD_DIR = upload .gcloudignore: cp -r $(ROOT_DIR)src/helpers/.gcloudignore . -Procfile: - cp -r $(ROOT_DIR)src/helpers/Procfile . +Dockerfile: $(ROOT_DIR)src/helpers/Dockerfile.template + sed \ + -e 's/REGION/$(CLOUD_DEPLOY_REGION)/g' \ + -e 's/STACK/google-22-full/g' \ + -e 's/PYTHON_VERSION/python312/g' \ + $< > Dockerfile -deploy : main.py .gcloudignore requirements.txt Procfile +deploy : main.py .gcloudignore requirements.txt Dockerfile mkdir -p $(UPLOAD_DIR) cp -r $(ROOT_DIR)utils $(UPLOAD_DIR)/ - cp -r $(ROOT_DIR)src/helpers $(UPLOAD_DIR)/ - cp -r $(ROOT_DIR)src/sources $(UPLOAD_DIR)/ - cp $(ROOT_DIR)src/_fb_types.py $(UPLOAD_DIR)/ - cp $(ROOT_DIR)src/_schemas.py $(UPLOAD_DIR)/ + cp -r $(ROOT_DIR)src/helpers $(UPLOAD_DIR)/helpers + cp -r $(ROOT_DIR)src/sources $(UPLOAD_DIR)/sources mkdir -p $(UPLOAD_DIR)/orchestration cp $(ROOT_DIR)src/orchestration/__init__.py $(UPLOAD_DIR)/orchestration/ - cp $(ROOT_DIR)src/orchestration/_io.py $(UPLOAD_DIR)/orchestration/ + cp $(ROOT_DIR)src/orchestration/_source_io.py $(UPLOAD_DIR)/orchestration/ + cp $(ROOT_DIR)src/_fb_types.py $(UPLOAD_DIR)/ + cp $(ROOT_DIR)src/_schemas.py $(UPLOAD_DIR)/ cp $^ $(UPLOAD_DIR)/ gcloud run jobs deploy \ func-data-acled-fetch \ @@ -37,4 +41,4 @@ deploy : main.py .gcloudignore requirements.txt Procfile --source $(UPLOAD_DIR) clean : - rm -rf $(UPLOAD_DIR) .gcloudignore Procfile + rm -rf $(UPLOAD_DIR) .gcloudignore Dockerfile diff --git a/src/orchestration/func_acled_fetch/main.py b/src/orchestration/func_acled_fetch/main.py new file mode 100644 index 00000000..c6c6aec7 --- /dev/null +++ b/src/orchestration/func_acled_fetch/main.py @@ -0,0 +1,36 @@ +"""ACLED fetch entry point.""" + +from __future__ import annotations + +import logging +from typing import Any + +from helpers import decorator, keys +from orchestration import _source_io +from sources.acled import AcledSource + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +SOURCE = "acled" + + +@decorator.log_runtime +def driver(_: Any) -> None: + """Fetch ACLED data and store in GCP Cloud Storage.""" + source = AcledSource() + source.api_email = keys.API_EMAIL_ACLED + source.api_password = keys.API_PASSWORD_ACLED + + dff = source.fetch() + + if dff.empty: + logger.error("No ACLED data was downloaded.") + return + + _source_io.write_fetch_output(SOURCE, dff) + logger.info("Done.") + + +if __name__ == "__main__": + driver(None) diff --git a/src/questions/acled/update_questions/requirements.txt b/src/orchestration/func_acled_fetch/requirements.txt similarity index 76% rename from src/questions/acled/update_questions/requirements.txt rename to src/orchestration/func_acled_fetch/requirements.txt index cebc5cb6..8ae5ce4c 100644 --- a/src/questions/acled/update_questions/requirements.txt +++ b/src/orchestration/func_acled_fetch/requirements.txt @@ -1,6 +1,7 @@ google-cloud-storage google-cloud-secret-manager +backoff pandas>=2.2.2,<3.0 -tqdm +requests pandera -termcolor +numpy diff --git a/src/questions/acled/update_questions/Makefile b/src/orchestration/func_acled_update/Makefile similarity index 68% rename from src/questions/acled/update_questions/Makefile rename to src/orchestration/func_acled_update/Makefile index 12f66f3a..b5ddfcba 100644 --- a/src/questions/acled/update_questions/Makefile +++ b/src/orchestration/func_acled_update/Makefile @@ -9,19 +9,23 @@ UPLOAD_DIR = upload .gcloudignore: cp -r $(ROOT_DIR)src/helpers/.gcloudignore . -Procfile: - cp -r $(ROOT_DIR)src/helpers/Procfile . +Dockerfile: $(ROOT_DIR)src/helpers/Dockerfile.template + sed \ + -e 's/REGION/$(CLOUD_DEPLOY_REGION)/g' \ + -e 's/STACK/google-22-full/g' \ + -e 's/PYTHON_VERSION/python312/g' \ + $< > Dockerfile -deploy : main.py .gcloudignore requirements.txt Procfile +deploy : main.py .gcloudignore requirements.txt Dockerfile mkdir -p $(UPLOAD_DIR) cp -r $(ROOT_DIR)utils $(UPLOAD_DIR)/ - cp -r $(ROOT_DIR)src/helpers $(UPLOAD_DIR)/ - cp -r $(ROOT_DIR)src/sources $(UPLOAD_DIR)/ - cp $(ROOT_DIR)src/_fb_types.py $(UPLOAD_DIR)/ - cp $(ROOT_DIR)src/_schemas.py $(UPLOAD_DIR)/ + cp -r $(ROOT_DIR)src/helpers $(UPLOAD_DIR)/helpers + cp -r $(ROOT_DIR)src/sources $(UPLOAD_DIR)/sources mkdir -p $(UPLOAD_DIR)/orchestration cp $(ROOT_DIR)src/orchestration/__init__.py $(UPLOAD_DIR)/orchestration/ cp $(ROOT_DIR)src/orchestration/_io.py $(UPLOAD_DIR)/orchestration/ + cp $(ROOT_DIR)src/_fb_types.py $(UPLOAD_DIR)/ + cp $(ROOT_DIR)src/_schemas.py $(UPLOAD_DIR)/ cp $^ $(UPLOAD_DIR)/ gcloud run jobs deploy \ func-data-acled-update-questions \ @@ -37,4 +41,4 @@ deploy : main.py .gcloudignore requirements.txt Procfile --source $(UPLOAD_DIR) clean : - rm -rf $(UPLOAD_DIR) .gcloudignore Procfile + rm -rf $(UPLOAD_DIR) .gcloudignore Dockerfile diff --git a/src/orchestration/func_acled_update/main.py b/src/orchestration/func_acled_update/main.py new file mode 100644 index 00000000..df8cdc52 --- /dev/null +++ b/src/orchestration/func_acled_update/main.py @@ -0,0 +1,49 @@ +"""ACLED update entry point.""" + +from __future__ import annotations + +import logging +from typing import Any + +import pandas as pd + +from helpers import data_utils, decorator +from orchestration import _io +from sources.acled import FETCH_COLUMN_DTYPE, FETCH_COLUMNS, AcledSource + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +SOURCE = "acled" + + +@decorator.log_runtime +def driver(_: Any) -> None: + """Pull in fetched data and update questions in the question bank.""" + logger.info("Downloading previously-fetched ACLED data from Cloud.") + source = AcledSource() + source.populate_hash_mapping(_io.load_hash_mapping(SOURCE)) + + dfq = data_utils.get_data_from_cloud_storage(SOURCE, return_question_data=True) + + # Read the fetch file with explicit dtypes, replicating the legacy read: event_date must + # stay a string so the year-prefix fix inside update() can apply. + filenames = data_utils.generate_filenames(SOURCE) + dff = data_utils.download_and_read( + filename=filenames["jsonl_fetch"], + local_filename=filenames["local_fetch"], + df_tmp=pd.DataFrame(columns=FETCH_COLUMNS), + dtype=FETCH_COLUMN_DTYPE, + ) + + result = source.update(dfq, dff) + + logger.info("Uploading to GCP...") + data_utils.upload_questions(result.dfq, SOURCE) + if result.hash_mapping is not None: + _io.upload_hash_mapping(source.dump_hash_mapping(), SOURCE) + logger.info("Done.") + + +if __name__ == "__main__": + driver(None) diff --git a/src/questions/acled/fetch/requirements.txt b/src/orchestration/func_acled_update/requirements.txt similarity index 94% rename from src/questions/acled/fetch/requirements.txt rename to src/orchestration/func_acled_update/requirements.txt index 85fcbdd2..c4f1dbe3 100644 --- a/src/questions/acled/fetch/requirements.txt +++ b/src/orchestration/func_acled_update/requirements.txt @@ -4,4 +4,5 @@ backoff pandas>=2.2.2,<3.0 requests pandera +numpy termcolor diff --git a/src/questions/acled/fetch/main.py b/src/questions/acled/fetch/main.py deleted file mode 100644 index d8e68373..00000000 --- a/src/questions/acled/fetch/main.py +++ /dev/null @@ -1,165 +0,0 @@ -"""Fetch data from Acled API.""" - -import json -import logging -import os -import sys -from typing import Any - -import backoff -import pandas as pd -import requests - -sys.path.append(os.path.join(os.path.dirname(__file__), "../../..")) -from helpers import acled, constants, data_utils, decorator, env, keys # noqa: E402 - -sys.path.append(os.path.join(os.path.dirname(__file__), "../../../..")) -from utils import gcp # noqa: E402 - -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - -source = "acled" -filenames = data_utils.generate_filenames(source=source) -# Need 2 years of data to get monthly average over the year -# As ACLED only uses > filter so >2022 gets 2023 or more recent, providing yearly average for -# questions in 2024 -ACLED_START_YEAR = constants.BENCHMARK_START_YEAR - 2 - - -@backoff.on_exception( - backoff.expo, - (requests.exceptions.Timeout, requests.exceptions.ConnectionError), - max_time=60, - on_backoff=data_utils.print_error_info_handler, -) -def get_access_token() -> str: - """ - Authenticate with the ACLED API and retrieves an access token. - - Returns: - str: The access token if the request is successful. - """ - logger.info("Get ACLED access token.") - endpoint = "https://acleddata.com/oauth/token" - headers = { - "Content-Type": "application/x-www-form-urlencoded", - } - params = { - "username": keys.API_EMAIL_ACLED, - "password": keys.API_PASSWORD_ACLED, - "grant_type": "password", - "client_id": "acled", - "scope": "authenticated", - } - - try: - response = requests.post(endpoint, headers=headers, data=params) - logger.debug(f"Response status code: {response.status_code}") - logger.debug(f"Response headers: {response.headers}") - logger.debug(f"Response content: {response.text}") - response.raise_for_status() - - data = response.json() - if "access_token" not in data: - raise ValueError("Access token not found in response") - return data["access_token"] - - except requests.exceptions.RequestException as e: - raise requests.exceptions.RequestException( - f"Failed to authenticate with ACLED API: {str(e)}" - ) - except ValueError as e: - raise ValueError(f"Error processing API response: {str(e)}") - - -@backoff.on_exception( - backoff.expo, - requests.exceptions.RequestException, - max_time=3600, - on_backoff=data_utils.print_error_info_handler, -) -def get_acled_page( - endpoint: str, headers: dict[str, str], params: dict[str, Any] -) -> dict[str, Any]: - """Fetch a single ACLED page and retry transient request failures.""" - response = requests.get(endpoint, headers=headers, params=params, timeout=100) - - if not response.ok: - logger.error(f"Request to ACLED API endpoint {endpoint} failed with params {params}") - response.raise_for_status() - return response.json() - - -def get_acled_events(access_token: str) -> pd.DataFrame: - """ - Fetch data from the ACLED API and return it as a pandas DataFrame. - - Args: - access_token (str): OAuth2 bearer token for authenticating with the ACLED API. - - Returns: - pd.DataFrame: A DataFrame containing all retrieved ACLED events with standardized columns. - """ - endpoint = "https://acleddata.com/api/acled/read?_format=json" - headers = { - "Authorization": f"Bearer {access_token}", - "Content-Type": "application/json", - } - params = { - "fields": "|".join(acled.FETCH_COLUMNS), - "year": ACLED_START_YEAR, - "year_where": ">", - "page": 0, - } - - seen_ids = set() - dfs = [] - df = pd.DataFrame(columns=acled.FETCH_COLUMNS) - while True: - params["page"] += 1 - logger.info(f"Downloading page {params['page']}") - data = get_acled_page(endpoint=endpoint, headers=headers, params=params) - rows = data.get("data", []) - - if not rows: - logger.info(f"No ACLED rows returned on page {params['page']}; stopping pagination.") - break - - df_tmp = pd.DataFrame(rows).astype(acled.FETCH_COLUMN_DTYPE) - df_new_rows = df_tmp[~df_tmp["event_id_cnty"].isin(seen_ids)] - seen_ids.update(df_new_rows["event_id_cnty"]) - dfs.append(df_new_rows) - - df = pd.concat(dfs, ignore_index=True).sort_values(by="event_id_cnty", ignore_index=True) - logger.info(f"Downloaded {len(df)} rows.") - return df - - -@decorator.log_runtime -def driver(_: Any) -> None: - """Fetch Acled data and store in GCP Cloud Storage.""" - # Get the latest ACLED data - logger.info("Downloading ACLED data.") - access_token = get_access_token() - df = get_acled_events(access_token=access_token) - - if df.empty: - logger.error("No ACLED data was downloaded.") - return - - with open(filenames["local_fetch"], "w", encoding="utf-8") as f: - for record in df.to_dict(orient="records"): - jsonl_str = json.dumps(record, ensure_ascii=False) - f.write(jsonl_str + "\n") - - gcp.storage.upload( - bucket_name=env.QUESTION_BANK_BUCKET, - local_filename=filenames["local_fetch"], - ) - - logger.info("Done.") - - -if __name__ == "__main__": - driver(None) diff --git a/src/questions/acled/update_questions/main.py b/src/questions/acled/update_questions/main.py deleted file mode 100644 index f4439977..00000000 --- a/src/questions/acled/update_questions/main.py +++ /dev/null @@ -1,151 +0,0 @@ -"""Generate ACLED questions.""" - -import json -import logging -import os -import sys - -import pandas as pd -from tqdm import tqdm - -sys.path.append(os.path.join(os.path.dirname(__file__), "../../..")) -from helpers import acled, constants, data_utils, dates, decorator, env # noqa: E402 - -sys.path.append(os.path.join(os.path.dirname(__file__), "../../../..")) # noqa: E402 -from utils import gcp # noqa: E402 - -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - -source = "acled" -filenames = data_utils.generate_filenames(source=source) - - -def generate_forecast_questions(dfq, dfr, countries, event_types): - """Generate forecast questions given fetch data.""" - logger.info(f"Found {len(countries)} countries.") - logger.info(f"Found {len(event_types)} event_types.") - - TODAY = dates.get_date_today() - - def fill_template(template, fields, values): - fill_values = {field: values[field] for field in fields} - # Always maintain resolution_date and forecast_due_date when formatting the string - default_values = { - "resolution_date": "{resolution_date}", - "forecast_due_date": "{forecast_due_date}", - } - combined_fill_values = {**default_values, **fill_values} - return template.format(**combined_fill_values) - - def create_question(question_key, country, event_type, dfr): - question, variables = acled.QUESTIONS.get(question_key).get("question") - event_type_quoted = event_type if event_type == "fatalities" else f"'{event_type}'" - question = fill_template( - template=question, - fields=variables, - values={"event_type": event_type_quoted, "country": country}, - ) - aid = acled.id_hash( - {"key": question_key, "event_type": event_type, "country": country}, - ) - freeze_datetime_value_explanation, variables = acled.QUESTIONS.get(question_key).get( - "freeze_datetime_value_explanation" - ) - freeze_datetime_value_explanation = fill_template( - template=freeze_datetime_value_explanation, - fields=variables, - values={"event_type": event_type_quoted, "country": country}, - ) - freeze_datetime_value = acled.get_freeze_value( - key=question_key, dfr=dfr, country=country, event_type=event_type, today=TODAY - ) - return { - "id": aid, - "question": question, - "background": acled.BACKGROUND, - "freeze_datetime_value": str(freeze_datetime_value), - "freeze_datetime_value_explanation": freeze_datetime_value_explanation, - "market_info_resolution_criteria": "N/A", - "market_info_open_datetime": "N/A", - "market_info_close_datetime": "N/A", - "market_info_resolution_datetime": "N/A", - "url": "https://acleddata.com/", - "resolved": False, - "forecast_horizons": constants.FORECAST_HORIZONS_IN_DAYS, - } - - questions = [] - for country in tqdm(countries, "Creating questions"): - for event_type in event_types: - questions.append( - create_question( - question_key="last30Days.gt.30DayAvgOverPast360Days", - country=country, - event_type=event_type, - dfr=dfr, - ) - ) - questions.append( - create_question( - question_key="last30DaysTimes10.gt.30DayAvgOverPast360DaysPlus1", - country=country, - event_type=event_type, - dfr=dfr, - ) - ) - - df = pd.DataFrame(questions) - - if dfq.empty: - return df - rows_to_append = df[~df["id"].isin(dfq["id"])] - dfq = pd.concat([dfq, rows_to_append], ignore_index=True).sort_values( - by="id", ignore_index=True - ) - rows_to_update = df[df["id"].isin(dfq["id"])] - fields_to_update = [ - "question", - "background", - "freeze_datetime_value", - "freeze_datetime_value_explanation", - ] - for aid in rows_to_update["id"].unique(): - for field in fields_to_update: - dfq.loc[dfq["id"] == aid, field] = df.loc[df["id"] == aid, field].iloc[0] - return dfq - - -@decorator.log_runtime -def driver(_): - """Pull in fetched data and update questions and resolved values in question bank.""" - # Download pertinent files from Cloud Storage - logger.info("Downloading previously-fetched ACLED data from Cloud.") - - acled.populate_hash_mapping() - dfr, countries, event_types = acled.download_dff_and_prepare_dfr() - dfq = data_utils.get_data_from_cloud_storage(source="acled", return_question_data=True) - - # Update the existing questions - dfq = generate_forecast_questions(dfq, dfr, countries, event_types) - dfq = dfq[constants.QUESTION_FILE_COLUMNS] - logger.info(f"Found {len(dfq):,} questions.") - - # Save - with open(filenames["local_question"], "w", encoding="utf-8") as f: - for record in dfq.to_dict(orient="records"): - jsonl_str = json.dumps(record, ensure_ascii=False) - f.write(jsonl_str + "\n") - - # Upload - gcp.storage.upload( - bucket_name=env.QUESTION_BANK_BUCKET, - local_filename=filenames["local_question"], - ) - acled.upload_hash_mapping() - - logger.info("Done.") - - -if __name__ == "__main__": - driver(None) diff --git a/src/sources/acled.py b/src/sources/acled.py index 2c76514b..e3e46c06 100644 --- a/src/sources/acled.py +++ b/src/sources/acled.py @@ -5,17 +5,134 @@ import hashlib import json import logging -from typing import ClassVar +from enum import Enum +from typing import Any, ClassVar +import backoff import numpy as np import pandas as pd +import pandera.pandas as pa +import requests +from pandera.typing import DataFrame -from _schemas import AcledResolutionFrame +from _fb_types import UpdateResult +from _schemas import AcledFetchFrame, AcledResolutionFrame, QuestionFrame +from helpers import constants, data_utils, dates from ._dataset import DatasetSource logger = logging.getLogger(__name__) +# Need 2 years of data to get monthly average over the year +# As ACLED only uses > filter so >2022 gets 2023 or more recent, providing yearly average for +# questions in 2024 +_ACLED_START_YEAR = constants.BENCHMARK_START_YEAR - 2 + +# Read/write dtypes for the fetch file. Also used by the update job to read the fetch file +# with explicit dtypes (event_date must stay a string so the year-prefix fix can apply). +FETCH_COLUMN_DTYPE = { + "event_id_cnty": str, + "event_date": str, + "iso": int, + "region": str, + "country": str, + "admin1": str, + "event_type": str, + "fatalities": int, + "timestamp": str, +} +FETCH_COLUMNS = list(FETCH_COLUMN_DTYPE.keys()) + +_BACKGROUND = """ +ACLED classifies events into six distinct categories: + +1. Battles: violent interactions between two organized armed groups at a particular time and + location; +2. Protests: in-person public demonstrations of three or more participants in which the participants + do not engage in violence, though violence may be used against them; +3. Riots: violent events where demonstrators or mobs of three or more engage in violent or + destructive acts, including but not limited to physical fights, rock throwing, property + destruction, etc.; +4. Explosions/Remote violence: incidents in which one side uses weapon types that, by their nature, + are at range and widely destructive; +5. Violence against civilians: violent events where an organized armed group inflicts violence upon + unarmed non-combatants; and +6. Strategic developments: contextually important information regarding incidents and activities of + groups that are not recorded as any of the other event types, yet may trigger future events or + contribute to political dynamics within and across states. + +Detailed information about the categories can be found at: +https://acleddata.com/knowledge-base/codebook/#acled-events +""" + + +class QuestionType(Enum): + """Types of questions. + + These will determine how a given question is resolved. + """ + + N_30_DAYS_GT_30_DAY_AVG_OVER_PAST_360_DAYS = 0 + N_30_DAYS_X_10_GT_30_DAY_AVG_OVER_PAST_360_DAYS_PLUS_1 = 1 + + +_QUESTIONS = { + "last30Days.gt.30DayAvgOverPast360Days": { + "question_type": QuestionType.N_30_DAYS_GT_30_DAY_AVG_OVER_PAST_360_DAYS, + "question": ( + ( + "Will there be more {event_type} in {country} for the 30 days before " + "{resolution_date} compared to the 30-day average of {event_type} over the 360 " + "days preceding {forecast_due_date}?" + "\n\n" + "e.g. If the forecast due date is 2024-01-01 and we have the following data:\n" + "Date,{event_type}\n" + "2023-11-11,1\n" + "2023-10-10,2\n" + "to calculate the 30-day average of {event_type} over the preceding 360 " + "days, we’d have: (1+2)/12=0.25.\n\n" + "In this example, for the question to resolve positively, 1 or more " + "{event_type} would need to occur in the 30 days leading up to the resolution." + ), + ("event_type", "country"), + ), + "freeze_datetime_value_explanation": ( + ( + "The 30-day average of {event_type} over the past 360 days in {country}. " + "This reference value will potentially change as ACLED updates its dataset." + ), + ("event_type", "country"), + ), + }, + "last30DaysTimes10.gt.30DayAvgOverPast360DaysPlus1": { + "question_type": QuestionType.N_30_DAYS_X_10_GT_30_DAY_AVG_OVER_PAST_360_DAYS_PLUS_1, + "question": ( + ( + "Will there be more than ten times as many {event_type} in {country} for the 30 " + "days before {resolution_date} compared to one plus the 30-day average of " + "{event_type} over the 360 days preceding {forecast_due_date}?" + "\n\n" + "e.g. If the forecast due date is 2024-01-01 and we have the following data:\n" + "Date,{event_type}\n" + "2023-11-11,1\n" + "2023-10-10,2\n" + "to calculate one plus the 30-day average of {event_type} over the preceding 360 " + "days, we’d have: 1+(1+2)/12=1.25.\n\n" + "In this example, for the question to resolve positively, 13 (10 x 1.25) or more " + "{event_type} would need to occur in the 30 days leading up to the resolution." + ), + ("event_type", "country"), + ), + "freeze_datetime_value_explanation": ( + ( + "One plus the 30-day average of {event_type} over the past 360 days in {country}. " + "This reference value will potentially change as ACLED updates its dataset." + ), + ("event_type", "country"), + ), + }, +} + class AcledSource(DatasetSource): """Armed Conflict Location & Event Data source with custom resolution logic.""" @@ -23,6 +140,384 @@ class AcledSource(DatasetSource): name: ClassVar[str] = "acled" resolution_schema: ClassVar[type] = AcledResolutionFrame + def __init__(self) -> None: + """Initialize with ACLED credential slots.""" + super().__init__() + self.api_email: str | None = None + self.api_password: str | None = None + + # ------------------------------------------------------------------ + # Public: fetch + # ------------------------------------------------------------------ + + @pa.check_types + def fetch(self, **kwargs: Any) -> DataFrame[AcledFetchFrame]: + """Fetch all ACLED events since _ACLED_START_YEAR. + + Authenticates via OAuth2, then paginates through the events endpoint, + deduplicating events by event_id_cnty. + """ + self._require_credentials() + logger.info("Downloading ACLED data.") + access_token = self._get_access_token() + return self._get_events(access_token=access_token) + + # ------------------------------------------------------------------ + # Public: update + # ------------------------------------------------------------------ + + @pa.check_types + def update( + self, + dfq: DataFrame[QuestionFrame], + dff: DataFrame[AcledFetchFrame], + **kwargs: Any, + ) -> UpdateResult: + """Generate and update ACLED questions from fetched event data. + + ACLED produces no per-question resolution files: questions are resolved + directly from the aggregated fetch data. populate_hash_mapping() must be + called before update() so newly hashed question IDs accumulate into the + existing mapping. + + Args: + dfq (DataFrame[QuestionFrame]): Existing questions. + dff (DataFrame[AcledFetchFrame]): Raw ACLED event rows from fetch(). + """ + today = dates.get_date_today() + dfr, countries, event_types = self._prepare_resolution_data(dff) + dfq = self._generate_questions(dfq, dfr, countries, event_types, today=today) + dfq = dfq[constants.QUESTION_FILE_COLUMNS] + logger.info(f"Found {len(dfq):,} questions.") + return UpdateResult(dfq=dfq, hash_mapping=self.hash_mapping) + + # ------------------------------------------------------------------ + # Private: API calls + # ------------------------------------------------------------------ + + def _require_credentials(self) -> None: + """Raise if the ACLED API credentials are not set.""" + if not self.api_email or not self.api_password: + raise RuntimeError( + "AcledSource.api_email and AcledSource.api_password must be set before " + "calling fetch(). Set them in the orchestration layer." + ) + + @backoff.on_exception( + backoff.expo, + (requests.exceptions.Timeout, requests.exceptions.ConnectionError), + max_time=60, + on_backoff=data_utils.print_error_info_handler, + ) + def _get_access_token(self) -> str: + """ + Authenticate with the ACLED API and retrieve an access token. + + Returns: + str: The access token if the request is successful. + """ + logger.info("Get ACLED access token.") + endpoint = "https://acleddata.com/oauth/token" + headers = { + "Content-Type": "application/x-www-form-urlencoded", + } + params = { + "username": self.api_email, + "password": self.api_password, + "grant_type": "password", + "client_id": "acled", + "scope": "authenticated", + } + + try: + response = requests.post(endpoint, headers=headers, data=params) + logger.debug(f"Response status code: {response.status_code}") + logger.debug(f"Response headers: {response.headers}") + logger.debug(f"Response content: {response.text}") + response.raise_for_status() + + data = response.json() + if "access_token" not in data: + raise ValueError("Access token not found in response") + return data["access_token"] + + except requests.exceptions.RequestException as e: + raise requests.exceptions.RequestException( + f"Failed to authenticate with ACLED API: {str(e)}" + ) + except ValueError as e: + raise ValueError(f"Error processing API response: {str(e)}") + + @backoff.on_exception( + backoff.expo, + requests.exceptions.RequestException, + max_time=3600, + on_backoff=data_utils.print_error_info_handler, + ) + def _get_page( + self, endpoint: str, headers: dict[str, str], params: dict[str, Any] + ) -> dict[str, Any]: + """Fetch a single ACLED page and retry transient request failures.""" + response = requests.get(endpoint, headers=headers, params=params, timeout=100) + + if not response.ok: + logger.error(f"Request to ACLED API endpoint {endpoint} failed with params {params}") + response.raise_for_status() + return response.json() + + def _get_events(self, access_token: str) -> DataFrame[AcledFetchFrame]: + """ + Fetch data from the ACLED API and return it as a pandas DataFrame. + + The per-page astype(FETCH_COLUMN_DTYPE) makes the returned frame conform to + AcledFetchFrame (notably timestamp int -> str). + + Args: + access_token (str): OAuth2 bearer token for authenticating with the ACLED API. + + Returns: + DataFrame[AcledFetchFrame]: All retrieved ACLED events with standardized columns. + """ + endpoint = "https://acleddata.com/api/acled/read?_format=json" + headers = { + "Authorization": f"Bearer {access_token}", + "Content-Type": "application/json", + } + params = { + "fields": "|".join(FETCH_COLUMNS), + "year": _ACLED_START_YEAR, + "year_where": ">", + "page": 0, + } + + seen_ids: set[str] = set() + dfs: list[pd.DataFrame] = [] + while True: + params["page"] += 1 + logger.info(f"Downloading page {params['page']}") + data = self._get_page(endpoint=endpoint, headers=headers, params=params) + rows = data.get("data", []) + + if not rows: + logger.info( + f"No ACLED rows returned on page {params['page']}; stopping pagination." + ) + break + + df_tmp = pd.DataFrame(rows).astype(FETCH_COLUMN_DTYPE) + df_new_rows = df_tmp[~df_tmp["event_id_cnty"].isin(seen_ids)] + seen_ids.update(df_new_rows["event_id_cnty"]) + dfs.append(df_new_rows) + + df = pd.concat(dfs, ignore_index=True).sort_values(by="event_id_cnty", ignore_index=True) + logger.info(f"Downloaded {len(df)} rows.") + return df + + # ------------------------------------------------------------------ + # Private: data transformation + # ------------------------------------------------------------------ + + @staticmethod + def _prepare_resolution_data(dff: pd.DataFrame) -> tuple[pd.DataFrame, list, list]: + """Aggregate raw event rows into the ACLED resolution frame. + + Args: + dff (pd.DataFrame): Raw ACLED event rows. + + Returns: + Tuple of (dfr, countries, event_types): events one-hot encoded by event_type and + summed by (country, event_date), plus the unique countries and event types + (with "fatalities" appended) used to generate questions. + """ + df = dff.copy() + + # The values for the `event_date` field in the following entries are incorrect. + # They are "0025-" for "2025" and "0024-" for "2024-" + # + # Bug reported to ACLED on 26 Sept 2025 + # + # 2025: + # * https://acleddata.com/api/acled/read?_format=json&event_id_cnty=ABW24 + # * https://acleddata.com/api/acled/read?_format=json&event_id_cnty=YEM104718 + # * https://acleddata.com/api/acled/read?_format=json&event_id_cnty=YEM99604 + # + # 2024: + # * https://acleddata.com/api/acled/read?_format=json&event_id_cnty=NCL346 + # * https://acleddata.com/api/acled/read?_format=json&event_id_cnty=NCL351 + # * https://acleddata.com/api/acled/read?_format=json&event_id_cnty=PYF127 + def fix_year_prefix(date_str): + if isinstance(date_str, str): + if date_str.startswith("0025-"): + return "2025-" + date_str[5:] + if date_str.startswith("0024-"): + return "2024-" + date_str[5:] + return date_str + + df["event_date"] = df["event_date"].apply(fix_year_prefix) + # End fix bug with ACLED data + + df["event_date"] = pd.to_datetime(df["event_date"]) + + df = df[["country", "event_date", "event_type", "fatalities"]].copy() + + dfr = ( + pd.get_dummies(df, columns=["event_type"], prefix="", prefix_sep="") + .groupby(["country", "event_date"]) + .sum() + .reset_index() + ) + + countries = df["country"].unique() + event_types = list(df["event_type"].unique()) + ["fatalities"] + + return dfr, countries, event_types + + # ------------------------------------------------------------------ + # Private: question generation + # ------------------------------------------------------------------ + + def _generate_questions( + self, + dfq: pd.DataFrame, + dfr: pd.DataFrame, + countries: list, + event_types: list, + today, + ) -> pd.DataFrame: + """Generate forecast questions for all (country, event_type, question key) combinations. + + Args: + dfq (pd.DataFrame): Existing questions (may be empty). + dfr (pd.DataFrame): Aggregated resolution frame. + countries (list): Unique countries from the fetch data. + event_types (list): Unique event types plus "fatalities". + today (date): Reference date for freeze value calculation. + """ + logger.info(f"Found {len(countries)} countries.") + logger.info(f"Found {len(event_types)} event_types.") + + questions = [] + for country in countries: + for event_type in event_types: + for question_key in _QUESTIONS: + questions.append( + self._create_question( + question_key=question_key, + country=country, + event_type=event_type, + dfr=dfr, + today=today, + ) + ) + + df = pd.DataFrame(questions) + + if dfq.empty: + return df + rows_to_append = df[~df["id"].isin(dfq["id"])] + dfq = pd.concat([dfq, rows_to_append], ignore_index=True).sort_values( + by="id", ignore_index=True + ) + rows_to_update = df[df["id"].isin(dfq["id"])] + fields_to_update = [ + "question", + "background", + "freeze_datetime_value", + "freeze_datetime_value_explanation", + ] + for aid in rows_to_update["id"].unique(): + for field in fields_to_update: + dfq.loc[dfq["id"] == aid, field] = df.loc[df["id"] == aid, field].iloc[0] + return dfq + + def _create_question( + self, + question_key: str, + country: str, + event_type: str, + dfr: pd.DataFrame, + today, + ) -> dict: + """Create a single ACLED question dict. + + Args: + question_key (str): One of the keys in _QUESTIONS. + country (str): Country name. + event_type (str): Event type column name. + dfr (pd.DataFrame): Aggregated resolution frame. + today (date): Reference date for freeze value calculation. + """ + question_template, variables = _QUESTIONS[question_key]["question"] + event_type_quoted = event_type if event_type == "fatalities" else f"'{event_type}'" + question = self._fill_template( + template=question_template, + fields=variables, + values={"event_type": event_type_quoted, "country": country}, + ) + aid = self._id_hash( + {"key": question_key, "event_type": event_type, "country": country}, + ) + explanation_template, variables = _QUESTIONS[question_key][ + "freeze_datetime_value_explanation" + ] + freeze_datetime_value_explanation = self._fill_template( + template=explanation_template, + fields=variables, + values={"event_type": event_type_quoted, "country": country}, + ) + freeze_datetime_value = self._get_freeze_value( + key=question_key, dfr=dfr, country=country, event_type=event_type, today=today + ) + return { + "id": aid, + "question": question, + "background": _BACKGROUND, + "freeze_datetime_value": str(freeze_datetime_value), + "freeze_datetime_value_explanation": freeze_datetime_value_explanation, + "market_info_resolution_criteria": "N/A", + "market_info_open_datetime": "N/A", + "market_info_close_datetime": "N/A", + "market_info_resolution_datetime": "N/A", + "url": "https://acleddata.com/", + "resolved": False, + "forecast_horizons": constants.FORECAST_HORIZONS_IN_DAYS, + } + + @staticmethod + def _fill_template(template: str, fields: tuple, values: dict) -> str: + """Fill a template, preserving the {resolution_date}/{forecast_due_date} placeholders. + + Args: + template (str): Question or explanation template. + fields (tuple): Field names to fill. + values (dict): Values for the fields. + """ + fill_values = {field: values[field] for field in fields} + # Always maintain resolution_date and forecast_due_date when formatting the string + default_values = { + "resolution_date": "{resolution_date}", + "forecast_due_date": "{forecast_due_date}", + } + combined_fill_values = {**default_values, **fill_values} + return template.format(**combined_fill_values) + + @staticmethod + def _get_freeze_value(key, dfr, country, event_type, today): + """Return the freeze value given the key.""" + if key == "last30Days.gt.30DayAvgOverPast360Days": + return AcledSource._thirty_day_avg_over_past_360_days(dfr, country, event_type, today) + + if key == "last30DaysTimes10.gt.30DayAvgOverPast360DaysPlus1": + return AcledSource._thirty_day_avg_over_past_360_days_plus_1( + dfr, country, event_type, today + ) + + raise Exception("Invalid key.") + + # ------------------------------------------------------------------ + # Private: resolution + # ------------------------------------------------------------------ + def _resolve(self, df: pd.DataFrame, dfq: pd.DataFrame, dfr: pd.DataFrame) -> pd.DataFrame: """Resolve ACLED questions row by row.""" logger.info("Resolving ACLED questions.") @@ -98,6 +593,14 @@ def _acled_resolve(key, dfr, country, event_type, forecast_due_date, resolution_ ) return int(lhs > rhs) + # ------------------------------------------------------------------ + # Private: aggregation helpers + # + # The implementations live in helpers/acled.py (a light module the unrefactored + # base_eval naive forecaster imports without the fetch deps). We delegate here rather + # than the reverse so importing this heavy source module is not forced on base_eval. + # ------------------------------------------------------------------ + @staticmethod def _sum_over_past_30_days(dfr, country, col, ref_date): """Sum of col for country over the 30 days before ref_date.""" @@ -148,15 +651,3 @@ def _id_hash(self, d: dict) -> str: def _id_unhash(self, hash_key: str): """Look up the original question dict from a hash key.""" return self.hash_mapping.get(hash_key) - - # ------------------------------------------------------------------ - # Fetch / update (not yet implemented) - # ------------------------------------------------------------------ - - def fetch(self, **kwargs): - """Fetch ACLED data from external API.""" - raise NotImplementedError - - def update(self, dfq, dff, **kwargs): - """Process fetched ACLED data into questions and resolution files.""" - raise NotImplementedError diff --git a/src/tests/conftest.py b/src/tests/conftest.py index 48da008d..3d92fcff 100644 --- a/src/tests/conftest.py +++ b/src/tests/conftest.py @@ -71,6 +71,15 @@ def acled_source(): return AcledSource() +@pytest.fixture() +def acled_source_with_creds(): + """Return an AcledSource instance with fake API credentials.""" + src = AcledSource() + src.api_email = "test@example.com" + src.api_password = "test-password" + return src + + @pytest.fixture() def infer_source(): """Return an InferSource instance with a fake API key.""" @@ -179,6 +188,63 @@ def make_question_set_df(rows): return pd.DataFrame(rows) +# --------------------------------------------------------------------------- +# ACLED-specific factories +# --------------------------------------------------------------------------- + + +def make_acled_api_auth_response(**overrides): + """Build a realistic ACLED OAuth token response dict.""" + base = { + "token_type": "Bearer", + "expires_in": 86400, + "access_token": "test-token-abc123", + "refresh_token": "test-refresh-xyz", + } + base.update(overrides) + return base + + +def make_acled_api_data_response(data, count=None, **overrides): + """Build a realistic ACLED data API page response dict.""" + base = { + "status": 200, + "success": True, + "count": count if count is not None else len(data), + "data": data, + "filename": "results.json", + } + base.update(overrides) + return base + + +def make_acled_event(**overrides): + """Build a single ACLED event dict as returned by the API.""" + base = { + "event_id_cnty": "TST001", + "event_date": "2025-06-15", + "iso": 706, + "region": "Eastern Africa", + "country": "Somalia", + "admin1": "Banadir", + "event_type": "Battles", + "fatalities": 1, + "timestamp": "1750000000", + } + base.update(overrides) + return base + + +def make_acled_fetch_df(rows): + """Build a DataFrame matching AcledFetchFrame from partial row dicts.""" + defaults = make_acled_event() + df = pd.DataFrame(rows) + for col, default in defaults.items(): + if col not in df.columns: + df[col] = default + return df + + # --------------------------------------------------------------------------- # INFER-specific factories # --------------------------------------------------------------------------- diff --git a/src/tests/test_acled.py b/src/tests/test_acled.py index 98dffa95..aa0f7f49 100644 --- a/src/tests/test_acled.py +++ b/src/tests/test_acled.py @@ -1,12 +1,24 @@ -"""Tests for AcledSource: aggregation functions, _acled_resolve, hash mapping.""" +"""Tests for AcledSource: aggregation, resolution, hash mapping, fetch, update.""" from datetime import date, timedelta +from unittest.mock import patch +import backoff._sync import pandas as pd import pytest - -from sources.acled import AcledSource -from tests.conftest import make_acled_resolution_df +import requests + +from _schemas import AcledFetchFrame +from helpers import constants +from sources.acled import FETCH_COLUMN_DTYPE, FETCH_COLUMNS, AcledSource +from tests.conftest import ( + make_acled_api_auth_response, + make_acled_api_data_response, + make_acled_event, + make_acled_fetch_df, + make_acled_resolution_df, + make_question_df, +) # --------------------------------------------------------------------------- # Shared test data factory @@ -268,3 +280,581 @@ def test_id_unhash_not_found(self): source = AcledSource() source.hash_mapping = {} assert source._id_unhash("missing") is None + + +# --------------------------------------------------------------------------- +# _id_hash +# --------------------------------------------------------------------------- + + +class TestIdHash: + """Test hash encoding of question IDs.""" + + def test_deterministic(self): + d = {"key": "k1", "event_type": "Battles", "country": "Somalia"} + assert AcledSource()._id_hash(d) == AcledSource()._id_hash(d) + + def test_stored_in_hash_mapping(self): + source = AcledSource() + d = {"key": "k1", "event_type": "Battles", "country": "Somalia"} + aid = source._id_hash(d) + assert source.hash_mapping[aid] == d + + def test_different_inputs_different_hashes(self): + source = AcledSource() + h1 = source._id_hash({"key": "k1", "event_type": "Battles", "country": "Somalia"}) + h2 = source._id_hash({"key": "k1", "event_type": "Riots", "country": "Somalia"}) + assert h1 != h2 + + +# --------------------------------------------------------------------------- +# _fill_template +# --------------------------------------------------------------------------- + + +class TestFillTemplate: + """Test question template filling.""" + + def test_fills_event_type_and_country(self): + result = AcledSource._fill_template( + template="More {event_type} in {country}?", + fields=("event_type", "country"), + values={"event_type": "'Battles'", "country": "Somalia"}, + ) + assert result == "More 'Battles' in Somalia?" + + def test_preserves_date_placeholders(self): + result = AcledSource._fill_template( + template="{event_type} before {resolution_date} vs {forecast_due_date} in {country}?", + fields=("event_type", "country"), + values={"event_type": "'Battles'", "country": "Somalia"}, + ) + assert "{resolution_date}" in result + assert "{forecast_due_date}" in result + + +# --------------------------------------------------------------------------- +# _get_freeze_value +# --------------------------------------------------------------------------- + + +class TestGetFreezeValue: + """Test freeze value dispatch on key string.""" + + def test_key_avg(self): + dfr = _make_acled_dfr() + result = AcledSource._get_freeze_value( + key="last30Days.gt.30DayAvgOverPast360Days", + dfr=dfr, + country="CountryA", + event_type="Battles", + today=date(2024, 12, 31), + ) + expected = AcledSource._thirty_day_avg_over_past_360_days( + dfr, "CountryA", "Battles", date(2024, 12, 31) + ) + assert result == expected + + def test_key_plus_1(self): + dfr = _make_acled_dfr() + result = AcledSource._get_freeze_value( + key="last30DaysTimes10.gt.30DayAvgOverPast360DaysPlus1", + dfr=dfr, + country="CountryA", + event_type="Battles", + today=date(2024, 12, 31), + ) + expected = AcledSource._thirty_day_avg_over_past_360_days_plus_1( + dfr, "CountryA", "Battles", date(2024, 12, 31) + ) + assert result == expected + + def test_invalid_key_raises(self): + with pytest.raises(Exception, match="Invalid key"): + AcledSource._get_freeze_value( + key="invalid_key", + dfr=_make_acled_dfr(), + country="CountryA", + event_type="Battles", + today=date(2024, 12, 31), + ) + + +# --------------------------------------------------------------------------- +# _prepare_resolution_data +# --------------------------------------------------------------------------- + + +class TestPrepareResolutionData: + """Test the fetch-data-to-resolution-frame transformation.""" + + def test_basic_transformation(self): + dff = make_acled_fetch_df( + [ + {"event_id_cnty": "A1", "country": "X", "event_type": "Battles", "fatalities": 2}, + {"event_id_cnty": "A2", "country": "Y", "event_type": "Riots", "fatalities": 0}, + ] + ) + dfr, countries, event_types = AcledSource._prepare_resolution_data(dff) + + assert set(countries) == {"X", "Y"} + assert event_types == ["Battles", "Riots", "fatalities"] + assert {"country", "event_date", "Battles", "Riots", "fatalities"} <= set(dfr.columns) + + def test_year_prefix_bug_fix(self): + dff = make_acled_fetch_df( + [ + {"event_id_cnty": "A1", "event_date": "0025-01-03"}, + {"event_id_cnty": "A2", "event_date": "0024-12-31"}, + {"event_id_cnty": "A3", "event_date": "2025-02-01"}, + ] + ) + dfr, _, _ = AcledSource._prepare_resolution_data(dff) + + assert pd.Timestamp("2025-01-03") in set(dfr["event_date"]) + assert pd.Timestamp("2024-12-31") in set(dfr["event_date"]) + assert dfr["event_date"].min() >= pd.Timestamp("2024-01-01") + + def test_groupby_sums_events_per_country_and_date(self): + dff = make_acled_fetch_df( + [ + { + "event_id_cnty": "A1", + "country": "X", + "event_date": "2025-06-15", + "fatalities": 1, + }, + { + "event_id_cnty": "A2", + "country": "X", + "event_date": "2025-06-15", + "fatalities": 2, + }, + ] + ) + dfr, _, _ = AcledSource._prepare_resolution_data(dff) + + assert len(dfr) == 1 + row = dfr.iloc[0] + assert row["Battles"] == 2 + assert row["fatalities"] == 3 + + +# --------------------------------------------------------------------------- +# _create_question +# --------------------------------------------------------------------------- + + +class TestCreateQuestion: + """Test single question dict creation.""" + + def test_basic_question_structure(self): + source = AcledSource() + question = source._create_question( + question_key="last30Days.gt.30DayAvgOverPast360Days", + country="CountryA", + event_type="Battles", + dfr=_make_acled_dfr(), + today=date(2024, 12, 31), + ) + + assert set(question.keys()) == set(constants.QUESTION_FILE_COLUMNS) + assert question["id"] in source.hash_mapping + assert question["resolved"] is False + assert question["url"] == "https://acleddata.com/" + assert question["forecast_horizons"] == constants.FORECAST_HORIZONS_IN_DAYS + + def test_event_type_quoted_in_question_text(self): + source = AcledSource() + question = source._create_question( + question_key="last30Days.gt.30DayAvgOverPast360Days", + country="CountryA", + event_type="Battles", + dfr=_make_acled_dfr(), + today=date(2024, 12, 31), + ) + assert "'Battles'" in question["question"] + + def test_fatalities_not_quoted_in_question_text(self): + source = AcledSource() + dfr = _make_acled_dfr() + dfr["fatalities"] = 1 + question = source._create_question( + question_key="last30Days.gt.30DayAvgOverPast360Days", + country="CountryA", + event_type="fatalities", + dfr=dfr, + today=date(2024, 12, 31), + ) + assert "more fatalities" in question["question"] + assert "'fatalities'" not in question["question"] + + def test_freeze_value_is_string(self): + source = AcledSource() + question = source._create_question( + question_key="last30Days.gt.30DayAvgOverPast360Days", + country="CountryA", + event_type="Battles", + dfr=_make_acled_dfr(), + today=date(2024, 12, 31), + ) + assert isinstance(question["freeze_datetime_value"], str) + + +# --------------------------------------------------------------------------- +# _generate_questions +# --------------------------------------------------------------------------- + + +class TestGenerateQuestions: + """Test question generation and upsert into dfq.""" + + def test_generates_two_questions_per_country_event_type(self): + source = AcledSource() + dfq = pd.DataFrame() + df = source._generate_questions( + dfq=dfq, + dfr=_make_acled_dfr(), + countries=["CountryA", "CountryB"], + event_types=["Battles", "Riots"], + today=date(2024, 12, 31), + ) + assert len(df) == 2 * 2 * 2 + + def test_upserts_existing_question(self): + source = AcledSource() + aid = source._id_hash( + { + "key": "last30Days.gt.30DayAvgOverPast360Days", + "event_type": "Battles", + "country": "CountryA", + } + ) + dfq = make_question_df([{"id": aid, "question": "stale text"}]) + + df = source._generate_questions( + dfq=dfq, + dfr=_make_acled_dfr(), + countries=["CountryA"], + event_types=["Battles"], + today=date(2024, 12, 31), + ) + + assert len(df) == 2 + assert df.loc[df["id"] == aid, "question"].iloc[0] != "stale text" + assert sorted(df["id"]) == list(df["id"]) + + +# --------------------------------------------------------------------------- +# _get_access_token (mock requests.post) +# --------------------------------------------------------------------------- + + +class _FakeResponse: + """Minimal `requests.Response` test double.""" + + def __init__(self, payload, *, error=None): + self._payload = payload + self._error = error + self.status_code = 524 if error else 200 + self.headers = {} + self.text = "" + + @property + def ok(self): + return self._error is None + + def raise_for_status(self): + if self._error is not None: + raise self._error + + def json(self): + return self._payload + + +class TestGetAccessToken: + """Test OAuth authentication.""" + + @patch("sources.acled.requests.post") + def test_successful_auth(self, mock_post, acled_source_with_creds): + mock_post.return_value = _FakeResponse(make_acled_api_auth_response()) + + token = acled_source_with_creds._get_access_token() + + assert token == "test-token-abc123" + + @patch("sources.acled.requests.post") + def test_requests_authenticated_scope(self, mock_post, acled_source_with_creds): + """Regression: the oauth request must carry scope=authenticated (prod fix ebcadc8).""" + mock_post.return_value = _FakeResponse(make_acled_api_auth_response()) + + acled_source_with_creds._get_access_token() + + params = mock_post.call_args.kwargs["data"] + assert params["scope"] == "authenticated" + assert params["grant_type"] == "password" + assert params["client_id"] == "acled" + + @patch("sources.acled.requests.post") + def test_missing_token_in_response(self, mock_post, acled_source_with_creds): + mock_post.return_value = _FakeResponse({"token_type": "Bearer"}) + + with pytest.raises(ValueError, match="Access token not found"): + acled_source_with_creds._get_access_token() + + +# --------------------------------------------------------------------------- +# _get_events (mock requests.get) +# --------------------------------------------------------------------------- + + +class TestGetEvents: + """Test event pagination, dedup, and retry semantics.""" + + def test_page_scoped_retry_does_not_restart_pagination( + self, monkeypatch, acled_source_with_creds + ): + """Regression: a failing page is retried alone, not from page 1 (prod fix 2da4643).""" + monkeypatch.setattr(backoff._sync.time, "sleep", lambda _: None) + requested_pages = [] + page_attempts = {} + + def fake_get(_endpoint, headers=None, params=None, timeout=None): + del headers + assert timeout == 100 + page = params["page"] + requested_pages.append(page) + page_attempts[page] = page_attempts.get(page, 0) + 1 + + if page == 1: + return _FakeResponse( + make_acled_api_data_response([make_acled_event(event_id_cnty="evt-1")]) + ) + if page == 2 and page_attempts[page] == 1: + error = requests.exceptions.HTTPError("524 Server Error") + return _FakeResponse({}, error=error) + if page == 2: + return _FakeResponse( + make_acled_api_data_response([make_acled_event(event_id_cnty="evt-2")]) + ) + if page == 3: + return _FakeResponse(make_acled_api_data_response([])) + raise AssertionError(f"Unexpected page request: {page}") + + monkeypatch.setattr("sources.acled.requests.get", fake_get) + + df = acled_source_with_creds._get_events(access_token="token") + + assert requested_pages == [1, 2, 2, 3] + assert list(df["event_id_cnty"]) == ["evt-1", "evt-2"] + + def test_empty_data_page_stops_pagination_when_count_is_null( + self, monkeypatch, acled_source_with_creds + ): + """Regression: pagination stops on an empty data list even when count is not 0 + (prod fix b833376).""" + requested_pages = [] + + def fake_get(_endpoint, headers=None, params=None, timeout=None): + del headers + assert timeout == 100 + page = params["page"] + requested_pages.append(page) + + if page == 1: + return _FakeResponse( + make_acled_api_data_response( + [make_acled_event(event_id_cnty="evt-1")], count=None + ) + ) + if page == 2: + return _FakeResponse(make_acled_api_data_response([], count=None)) + raise AssertionError(f"Unexpected page request: {page}") + + monkeypatch.setattr("sources.acled.requests.get", fake_get) + + df = acled_source_with_creds._get_events(access_token="token") + + assert requested_pages == [1, 2] + assert list(df["event_id_cnty"]) == ["evt-1"] + + def test_deduplicates_by_event_id_across_pages(self, monkeypatch, acled_source_with_creds): + responses = iter( + [ + _FakeResponse( + make_acled_api_data_response( + [ + make_acled_event(event_id_cnty="DUP1"), + make_acled_event(event_id_cnty="UNIQUE"), + ] + ) + ), + _FakeResponse( + make_acled_api_data_response([make_acled_event(event_id_cnty="DUP1")]) + ), + _FakeResponse(make_acled_api_data_response([])), + ] + ) + monkeypatch.setattr("sources.acled.requests.get", lambda *args, **kwargs: next(responses)) + + df = acled_source_with_creds._get_events(access_token="token") + + assert sorted(df["event_id_cnty"]) == ["DUP1", "UNIQUE"] + + def test_sorts_by_event_id(self, monkeypatch, acled_source_with_creds): + responses = iter( + [ + _FakeResponse( + make_acled_api_data_response( + [ + make_acled_event(event_id_cnty="ZZZ"), + make_acled_event(event_id_cnty="AAA"), + ] + ) + ), + _FakeResponse(make_acled_api_data_response([])), + ] + ) + monkeypatch.setattr("sources.acled.requests.get", lambda *args, **kwargs: next(responses)) + + df = acled_source_with_creds._get_events(access_token="token") + + assert list(df["event_id_cnty"]) == ["AAA", "ZZZ"] + + +# --------------------------------------------------------------------------- +# fetch() +# --------------------------------------------------------------------------- + + +class TestFetch: + """Test the public fetch entry point.""" + + @patch.object(AcledSource, "_get_events") + @patch.object(AcledSource, "_get_access_token") + def test_basic_fetch(self, mock_token, mock_events, acled_source_with_creds): + mock_token.return_value = "fake-token" + mock_events.return_value = make_acled_fetch_df( + [ + {"event_id_cnty": "A1", "event_date": "2025-06-15"}, + {"event_id_cnty": "A2", "event_date": "2025-06-16"}, + ] + ) + + dff = acled_source_with_creds.fetch() + + assert len(dff) == 2 + AcledFetchFrame.validate(dff) + + def test_credentials_required(self): + with pytest.raises(RuntimeError, match="api_email"): + AcledSource().fetch() + + +# --------------------------------------------------------------------------- +# update() +# --------------------------------------------------------------------------- + + +class TestUpdate: + """Test the public update entry point.""" + + def test_basic_update(self, freeze_today): + freeze_today(date(2025, 6, 20)) + source = AcledSource() + dfq = pd.DataFrame(columns=constants.QUESTION_FILE_COLUMNS) + dff = make_acled_fetch_df( + [ + { + "event_id_cnty": "A1", + "country": "Somalia", + "event_type": "Battles", + "event_date": "2025-06-15", + "fatalities": 3, + }, + ] + ) + + result = source.update(dfq, dff) + + # 1 country x 2 event_types (Battles + fatalities) x 2 question keys + assert len(result.dfq) == 4 + assert list(result.dfq.columns) == constants.QUESTION_FILE_COLUMNS + assert result.resolution_files is None + + def test_hash_mapping_returned_for_all_questions(self, freeze_today): + freeze_today(date(2025, 6, 20)) + source = AcledSource() + dfq = pd.DataFrame(columns=constants.QUESTION_FILE_COLUMNS) + dff = make_acled_fetch_df( + [ + { + "event_id_cnty": "A1", + "country": "X", + "event_type": "Battles", + "event_date": "2025-06-15", + "fatalities": 0, + }, + ] + ) + + result = source.update(dfq, dff) + + assert result.hash_mapping is not None + assert set(result.dfq["id"]) == set(result.hash_mapping.keys()) + for entry in result.hash_mapping.values(): + assert {"key", "country", "event_type"} == set(entry.keys()) + + def test_updates_existing_question_fields(self, freeze_today): + freeze_today(date(2025, 6, 20)) + source = AcledSource() + aid = source._id_hash( + { + "key": "last30Days.gt.30DayAvgOverPast360Days", + "event_type": "Battles", + "country": "X", + } + ) + dfq = make_question_df( + [{"id": aid, "question": "stale text", "freeze_datetime_value": "999"}] + ) + dff = make_acled_fetch_df( + [ + { + "event_id_cnty": "A1", + "country": "X", + "event_type": "Battles", + "event_date": "2025-06-15", + "fatalities": 0, + }, + ] + ) + + result = source.update(dfq, dff) + + row = result.dfq[result.dfq["id"] == aid].iloc[0] + assert row["question"] != "stale text" + assert row["freeze_datetime_value"] != "999" + assert len(result.dfq) == 4 + + +# --------------------------------------------------------------------------- +# FETCH_COLUMN_DTYPE <-> AcledFetchFrame consistency +# --------------------------------------------------------------------------- + + +class TestFetchColumnDtypeMatchesSchema: + """FETCH_COLUMN_DTYPE is the operational twin of AcledFetchFrame. + + The dict drives the API `fields` param and the fetch-file read dtypes; the + pandera model validates the resulting frame. These pin the two in sync. + """ + + def test_same_columns_in_same_order(self): + assert list(AcledFetchFrame.to_schema().columns.keys()) == FETCH_COLUMNS + + def test_dict_typed_data_satisfies_schema_unchanged(self): + """Data typed per FETCH_COLUMN_DTYPE passes validation with no coercion drift.""" + df = make_acled_fetch_df([make_acled_event()]).astype(FETCH_COLUMN_DTYPE) + validated = AcledFetchFrame.validate(df) + pd.testing.assert_frame_equal(validated, df) diff --git a/src/tests/test_acled_fetch.py b/src/tests/test_acled_fetch.py deleted file mode 100644 index 7a99ab98..00000000 --- a/src/tests/test_acled_fetch.py +++ /dev/null @@ -1,186 +0,0 @@ -"""Regression tests for the ACLED fetch job.""" - -import importlib.util -import sys -import types -import warnings -from collections import Counter -from pathlib import Path - -import backoff._sync -import requests - - -def _load_acled_fetch_module(monkeypatch): - """Load the ACLED fetch entrypoint with stubbed external dependencies.""" - import helpers - - fake_keys = types.ModuleType("helpers.keys") - fake_keys.API_EMAIL_ACLED = "test@example.com" - fake_keys.API_PASSWORD_ACLED = "secret" - monkeypatch.setitem(sys.modules, "helpers.keys", fake_keys) - monkeypatch.setattr(helpers, "keys", fake_keys, raising=False) - - fake_gcp = types.ModuleType("utils.gcp") - fake_gcp.storage = types.SimpleNamespace(upload=lambda **kwargs: None) - fake_archiving = types.ModuleType("utils.archiving") - fake_utils = types.ModuleType("utils") - fake_utils.archiving = fake_archiving - fake_utils.gcp = fake_gcp - monkeypatch.setitem(sys.modules, "utils.gcp", fake_gcp) - monkeypatch.setitem(sys.modules, "utils.archiving", fake_archiving) - monkeypatch.setitem(sys.modules, "utils", fake_utils) - - module_path = Path(__file__).resolve().parents[1] / "questions" / "acled" / "fetch" / "main.py" - module_name = "tests._acled_fetch_main" - sys.modules.pop(module_name, None) - - spec = importlib.util.spec_from_file_location(module_name, module_path) - module = importlib.util.module_from_spec(spec) - sys.modules[module_name] = module - assert spec.loader is not None - with warnings.catch_warnings(): - warnings.filterwarnings( - "ignore", - message=r".*asyncio\.iscoroutinefunction.*deprecated.*", - category=DeprecationWarning, - ) - spec.loader.exec_module(module) - return module - - -class _FakeResponse: - """Minimal `requests.Response` test double.""" - - def __init__(self, payload, *, error=None): - self._payload = payload - self._error = error - self.status_code = 524 if error else 200 - self.text = "" - - @property - def ok(self): - return self._error is None - - def raise_for_status(self): - if self._error is not None: - raise self._error - - def json(self): - return self._payload - - -def test_page_scoped_retry_does_not_restart_pagination(monkeypatch): - module = _load_acled_fetch_module(monkeypatch) - monkeypatch.setattr(backoff._sync.time, "sleep", lambda _: None) - - page_attempts = Counter() - requested_pages = [] - - def fake_get(_endpoint, headers=None, params=None, timeout=None): - del headers - assert timeout == 100 - page = params["page"] - requested_pages.append(page) - page_attempts[page] += 1 - - if page == 1: - return _FakeResponse( - { - "count": 1, - "data": [ - { - "event_id_cnty": "evt-1", - "event_date": "2024-01-01", - "iso": 1, - "region": "Region", - "country": "Country", - "admin1": "Admin", - "event_type": "Battles", - "fatalities": 1, - "timestamp": "1704067200", - } - ], - } - ) - - if page == 2 and page_attempts[page] == 1: - error = requests.exceptions.HTTPError("524 Server Error") - return _FakeResponse({}, error=error) - - if page == 2: - return _FakeResponse( - { - "count": 1, - "data": [ - { - "event_id_cnty": "evt-2", - "event_date": "2024-01-02", - "iso": 1, - "region": "Region", - "country": "Country", - "admin1": "Admin", - "event_type": "Riots", - "fatalities": 2, - "timestamp": "1704153600", - } - ], - } - ) - - if page == 3: - return _FakeResponse({"count": 0, "data": []}) - - raise AssertionError(f"Unexpected page request: {page}") - - monkeypatch.setattr(module.requests, "get", fake_get) - - df = module.get_acled_events(access_token="token") - - assert requested_pages == [1, 2, 2, 3] - assert requested_pages.count(1) == 1 - assert list(df["event_id_cnty"]) == ["evt-1", "evt-2"] - - -def test_empty_data_page_stops_pagination_when_count_is_null(monkeypatch): - module = _load_acled_fetch_module(monkeypatch) - - requested_pages = [] - - def fake_get(_endpoint, headers=None, params=None, timeout=None): - del headers - assert timeout == 100 - page = params["page"] - requested_pages.append(page) - - if page == 1: - return _FakeResponse( - { - "count": None, - "data": [ - { - "event_id_cnty": "evt-1", - "event_date": "2024-01-01", - "iso": 1, - "region": "Region", - "country": "Country", - "admin1": "Admin", - "event_type": "Battles", - "fatalities": 1, - "timestamp": "1704067200", - } - ], - } - ) - - if page == 2: - return _FakeResponse({"count": None, "data": []}) - - raise AssertionError(f"Unexpected page request: {page}") - - monkeypatch.setattr(module.requests, "get", fake_get) - - df = module.get_acled_events(access_token="token") - - assert requested_pages == [1, 2] - assert list(df["event_id_cnty"]) == ["evt-1"] From 415362e353dffa6e0b25dbbdf540adb7d2527666 Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Sat, 13 Jun 2026 10:21:39 +0300 Subject: [PATCH 2/4] fix: DRY-ify `_io._read_acled_dfr` to reuse `acled._prepare_resolution_data()` --- src/orchestration/_io.py | 54 ++++++++++------------------------------ 1 file changed, 13 insertions(+), 41 deletions(-) diff --git a/src/orchestration/_io.py b/src/orchestration/_io.py index f2f7f1ac..9b537a85 100644 --- a/src/orchestration/_io.py +++ b/src/orchestration/_io.py @@ -18,7 +18,7 @@ from termcolor import colored from _fb_types import QuestionBank, SourceQuestionBank -from _schemas import AcledResolutionFrame, QuestionFrame, ResolutionFrame +from _schemas import QuestionFrame, ResolutionFrame from helpers import constants, data_utils, dates, env, keys from sources import ALL_SOURCE_NAMES, MARKET_SOURCE_NAMES from sources._base import BaseSource @@ -34,51 +34,23 @@ def _read_acled_dfr(local_question_bank_dir: str) -> pd.DataFrame: - """Read ACLED fetch file and prepare the resolution DataFrame.""" - acled_fetch_column_dtype = { - "event_id_cnty": str, - "event_date": str, - "iso": int, - "region": str, - "country": str, - "admin1": str, - "event_type": str, - "fatalities": int, - "timestamp": str, - } - filenames = data_utils.generate_filenames("acled") - source_fetch_file = filenames.get("jsonl_fetch") - local_filename = f"{local_question_bank_dir}/{source_fetch_file}" + """Read the ACLED fetch file and build the resolution DataFrame. + + The dff -> dfr transform (year-prefix fix, one-hot encode, aggregate) is owned by the + domain layer; this is just the IO wrapper that reads the fetch file off the unzipped + question bank and delegates to it, so the transform lives in exactly one place. + """ + from sources.acled import FETCH_COLUMN_DTYPE, AcledSource - df = pd.read_json( + filenames = data_utils.generate_filenames("acled") + local_filename = f"{local_question_bank_dir}/{filenames['jsonl_fetch']}" + dff = pd.read_json( local_filename, lines=True, - dtype=acled_fetch_column_dtype, + dtype=FETCH_COLUMN_DTYPE, convert_dates=False, ) - - # Fix year prefix bug in ACLED data - def fix_year_prefix(date_str): - if isinstance(date_str, str): - if date_str.startswith("0025-"): - return "2025-" + date_str[5:] - if date_str.startswith("0024-"): - return "2024-" + date_str[5:] - return date_str - - df["event_date"] = df["event_date"].apply(fix_year_prefix) - df = AcledResolutionFrame.validate(df) - df["event_date"] = pd.to_datetime(df["event_date"]) - - df = df[["country", "event_date", "event_type", "fatalities"]].copy() - - dfr = ( - pd.get_dummies(df, columns=["event_type"], prefix="", prefix_sep="") - .groupby(["country", "event_date"]) - .sum() - .reset_index() - ) - + dfr, _, _ = AcledSource._prepare_resolution_data(dff) return dfr From 38b43b13d37134d065a944d9c5eda271305c3d53 Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Sat, 13 Jun 2026 10:23:19 +0300 Subject: [PATCH 3/4] fix: guard no events results on first fetch --- src/sources/acled.py | 5 +++++ src/tests/test_acled.py | 13 +++++++++++++ 2 files changed, 18 insertions(+) diff --git a/src/sources/acled.py b/src/sources/acled.py index e3e46c06..a28e9c34 100644 --- a/src/sources/acled.py +++ b/src/sources/acled.py @@ -309,6 +309,11 @@ def _get_events(self, access_token: str) -> DataFrame[AcledFetchFrame]: seen_ids.update(df_new_rows["event_id_cnty"]) dfs.append(df_new_rows) + if not dfs: + # No data on any page: return an empty frame so the job's `if dff.empty` guard can + # handle it gracefully (pd.concat([]) would otherwise raise ValueError). + return pd.DataFrame(columns=FETCH_COLUMNS) + df = pd.concat(dfs, ignore_index=True).sort_values(by="event_id_cnty", ignore_index=True) logger.info(f"Downloaded {len(df)} rows.") return df diff --git a/src/tests/test_acled.py b/src/tests/test_acled.py index aa0f7f49..b9df55f6 100644 --- a/src/tests/test_acled.py +++ b/src/tests/test_acled.py @@ -721,6 +721,19 @@ def test_sorts_by_event_id(self, monkeypatch, acled_source_with_creds): assert list(df["event_id_cnty"]) == ["AAA", "ZZZ"] + def test_empty_first_page_returns_empty_frame(self, monkeypatch, acled_source_with_creds): + """Regression: empty data on the first page returns an empty frame instead of raising + ValueError from pd.concat([]), so the job's `if dff.empty` guard is reachable.""" + monkeypatch.setattr( + "sources.acled.requests.get", + lambda *args, **kwargs: _FakeResponse(make_acled_api_data_response([], count=0)), + ) + + df = acled_source_with_creds._get_events(access_token="token") + + assert df.empty + assert list(df.columns) == FETCH_COLUMNS + # --------------------------------------------------------------------------- # fetch() From 315f5bed1ff9f4fc5c3cc076e43070ef7ae41a1c Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Sat, 13 Jun 2026 10:25:08 +0300 Subject: [PATCH 4/4] fix: let backoff handle access token auth failures --- src/sources/acled.py | 31 +++++++++++++------------------ src/tests/test_acled.py | 18 ++++++++++++++++++ 2 files changed, 31 insertions(+), 18 deletions(-) diff --git a/src/sources/acled.py b/src/sources/acled.py index a28e9c34..5c9ecace 100644 --- a/src/sources/acled.py +++ b/src/sources/acled.py @@ -229,24 +229,19 @@ def _get_access_token(self) -> str: "scope": "authenticated", } - try: - response = requests.post(endpoint, headers=headers, data=params) - logger.debug(f"Response status code: {response.status_code}") - logger.debug(f"Response headers: {response.headers}") - logger.debug(f"Response content: {response.text}") - response.raise_for_status() - - data = response.json() - if "access_token" not in data: - raise ValueError("Access token not found in response") - return data["access_token"] - - except requests.exceptions.RequestException as e: - raise requests.exceptions.RequestException( - f"Failed to authenticate with ACLED API: {str(e)}" - ) - except ValueError as e: - raise ValueError(f"Error processing API response: {str(e)}") + # No try/except: let Timeout/ConnectionError propagate to the @backoff decorator so they + # are retried. Wrapping them in a plain RequestException (as the legacy code did) defeated + # the retry, since backoff only retries Timeout/ConnectionError, not their base class. + response = requests.post(endpoint, headers=headers, data=params) + logger.debug(f"Response status code: {response.status_code}") + logger.debug(f"Response headers: {response.headers}") + logger.debug(f"Response content: {response.text}") + response.raise_for_status() + + data = response.json() + if "access_token" not in data: + raise ValueError("Access token not found in response") + return data["access_token"] @backoff.on_exception( backoff.expo, diff --git a/src/tests/test_acled.py b/src/tests/test_acled.py index b9df55f6..05c8471c 100644 --- a/src/tests/test_acled.py +++ b/src/tests/test_acled.py @@ -602,6 +602,24 @@ def test_missing_token_in_response(self, mock_post, acled_source_with_creds): with pytest.raises(ValueError, match="Access token not found"): acled_source_with_creds._get_access_token() + @patch("sources.acled.requests.post") + def test_timeout_is_retried_by_backoff(self, mock_post, acled_source_with_creds, monkeypatch): + """Regression: a transient Timeout during auth must reach @backoff and be retried. + + The legacy try/except re-raised Timeout/ConnectionError as a plain RequestException, + which @backoff (configured for Timeout/ConnectionError only) never retried. + """ + monkeypatch.setattr(backoff._sync.time, "sleep", lambda _: None) + mock_post.side_effect = [ + requests.exceptions.Timeout("transient"), + _FakeResponse(make_acled_api_auth_response()), + ] + + token = acled_source_with_creds._get_access_token() + + assert token == "test-token-abc123" + assert mock_post.call_count == 2 + # --------------------------------------------------------------------------- # _get_events (mock requests.get)