From 55334eebc84f5339d2ddad5af65bbfc217e461dc Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Wed, 10 Jun 2026 14:03:01 +0300 Subject: [PATCH] refactor: dbnomics --- Makefile | 4 +- src/_schemas.py | 17 + src/helpers/dbnomics.py | 106 +------ .../func_dbnomics_fetch}/Makefile | 22 +- src/orchestration/func_dbnomics_fetch/main.py | 30 ++ .../func_dbnomics_fetch}/requirements.txt | 7 +- .../func_dbnomics_update}/Makefile | 22 +- .../func_dbnomics_update/main.py | 37 +++ .../func_dbnomics_update}/requirements.txt | 6 +- src/questions/dbnomics/fetch/main.py | 93 ------ .../dbnomics/update_questions/main.py | 156 ---------- src/sources/dbnomics.py | 290 +++++++++++++++++- src/tests/conftest.py | 60 ++++ src/tests/test_dbnomics.py | 203 ++++++++++++ 14 files changed, 672 insertions(+), 381 deletions(-) rename src/{questions/dbnomics/fetch => orchestration/func_dbnomics_fetch}/Makefile (62%) create mode 100644 src/orchestration/func_dbnomics_fetch/main.py rename src/{questions/dbnomics/fetch => orchestration/func_dbnomics_fetch}/requirements.txt (54%) rename src/{questions/dbnomics/update_questions => orchestration/func_dbnomics_update}/Makefile (62%) create mode 100644 src/orchestration/func_dbnomics_update/main.py rename src/{questions/dbnomics/update_questions => orchestration/func_dbnomics_update}/requirements.txt (52%) delete mode 100644 src/questions/dbnomics/fetch/main.py delete mode 100644 src/questions/dbnomics/update_questions/main.py create mode 100644 src/tests/test_dbnomics.py diff --git a/Makefile b/Makefile index 35c9b03e..a9776922 100644 --- a/Makefile +++ b/Makefile @@ -191,10 +191,10 @@ fred-update-questions: dbnomics: dbnomics-fetch dbnomics-update-questions dbnomics-fetch: - $(MAKE) -C src/questions/dbnomics/fetch || echo "* $@" >> $(MAKE_FAILURE_LOG) + $(MAKE) -C src/orchestration/func_dbnomics_fetch || echo "* $@" >> $(MAKE_FAILURE_LOG) dbnomics-update-questions: - $(MAKE) -C src/questions/dbnomics/update_questions || echo "* $@" >> $(MAKE_FAILURE_LOG) + $(MAKE) -C src/orchestration/func_dbnomics_update || echo "* $@" >> $(MAKE_FAILURE_LOG) tag-questions: $(MAKE) -C src/metadata/tag_questions || echo "* $@" >> $(MAKE_FAILURE_LOG) diff --git a/src/_schemas.py b/src/_schemas.py index e8ac7064..920a1785 100644 --- a/src/_schemas.py +++ b/src/_schemas.py @@ -131,6 +131,23 @@ class Config: coerce = True +class DbnomicsFetchFrame(pa.DataFrameModel): + """Output of DbnomicsSource.fetch(). Per-observation rows from the DBnomics API.""" + + id: Series[str] + period: Series[str] + value: Series[object] # float observation or the string "NA" for missing values + provider_name: Series[str] + dataset_name: Series[str] + series_name: Series[str] + + class Config: + """Schema configuration.""" + + strict = False + coerce = True + + class AcledResolutionFrame(pa.DataFrameModel): """ACLED-specific: aggregated events by country and date. diff --git a/src/helpers/dbnomics.py b/src/helpers/dbnomics.py index 8403c065..2780c74e 100644 --- a/src/helpers/dbnomics.py +++ b/src/helpers/dbnomics.py @@ -1,102 +1,12 @@ -"""DBnomics-specific variables.""" +"""DBnomics constants — thin re-export shim over the lightweight sources._metadata layer.""" -from sources._metadata import SOURCE_METADATA +import os +import sys -SOURCE_INTRO = SOURCE_METADATA["dbnomics"]["source_intro"] -RESOLUTION_CRITERIA = SOURCE_METADATA["dbnomics"]["resolution_criteria"] +sys.path.append(os.path.join(os.path.dirname(__file__), "..")) -FETCH_COLUMN_DTYPE = { - "id": str, - "period": str, - "value": str, - "question_text": str, - "value_at_freeze_datetime_explanation": str, -} -FETCH_COLUMNS = list(FETCH_COLUMN_DTYPE.keys()) +from sources._metadata import SOURCE_METADATA # noqa: E402 -METEOFRANCE_STATIONS = [ - {"id": "07005", "station": "Abbeville"}, - {"id": "07015", "station": "Lille Airport"}, - {"id": "07020", "station": "Pointe De La Hague"}, - {"id": "07027", "station": "Caen – Carpiquet Airport"}, - {"id": "07037", "station": "Rouen Airport"}, - {"id": "07072", "station": "Reims – Prunay Aerodrome"}, - {"id": "07110", "station": "Brest Bretagne Airport"}, - {"id": "07117", "station": "Ploumanac'h"}, - {"id": "07130", "station": "Rennes–Saint-Jacques Airport"}, - {"id": "07139", "station": "Alençon"}, - {"id": "07149", "station": "Orly"}, - {"id": "07168", "station": "Troyes-Barberey Airport"}, - {"id": "07181", "station": "Nancy – Ochey Air Base"}, - {"id": "07190", "station": "Strasbourg Airport"}, - {"id": "07222", "station": "Nantes Atlantique Airport"}, - {"id": "07240", "station": "Tours"}, - {"id": "07255", "station": "Bourges"}, - {"id": "07280", "station": "Dijon-Bourgogne Airport"}, - {"id": "07299", "station": "EuroAirport Basel Mulhouse Freiburg"}, - {"id": "07335", "station": "Poitiers–Biard Airport"}, - {"id": "07434", "station": "Limoges – Bellegarde Airport"}, - {"id": "07460", "station": "Clermont-Ferrand Auvergne Airport"}, - {"id": "07471", "station": "Le Puy – Loudes Airport"}, - {"id": "07481", "station": "Lyon–Saint Exupéry Airport"}, - {"id": "07510", "station": "Bordeaux–Mérignac Airport"}, - {"id": "07535", "station": "Gourdon"}, - {"id": "07558", "station": "Millau"}, - {"id": "07577", "station": "Montélimar"}, - {"id": "07591", "station": "Embrun"}, - {"id": "07607", "station": "Mont-de-Marsan"}, - {"id": "07621", "station": "Tarbes–Lourdes–Pyrénées Airport"}, - {"id": "07627", "station": "Saint-Girons"}, - {"id": "07630", "station": "Toulouse–Blagnac Airport"}, - {"id": "07650", "station": "Marignane"}, - {"id": "07690", "station": "Nice"}, - {"id": "07747", "station": "Perpignan"}, - {"id": "07761", "station": "Ajaccio"}, - {"id": "61968", "station": "Glorioso Islands"}, - {"id": "61970", "station": "Juan de Nova Island"}, - {"id": "61972", "station": "Europa Island"}, - {"id": "61976", "station": "Tromelin Island"}, - {"id": "61980", "station": "Roland Garros Airport"}, - {"id": "61996", "station": "Amsterdam Island"}, - {"id": "61997", "station": "Île de la Possession"}, - {"id": "61998", "station": "Grande Terre"}, - {"id": "67005", "station": "Pamandzi"}, - {"id": "71805", "station": "Saint-Pierre"}, - {"id": "78890", "station": "La Désirade"}, - {"id": "78894", "station": "Saint Barthélemy"}, - {"id": "78897", "station": "Pointe-à-Pitre International Airport"}, - {"id": "78925", "station": "Martinique Aimé Césaire International Airport"}, - {"id": "81401", "station": "Saint-Laurent"}, - {"id": "81405", "station": "Cayenne – Félix Éboué Airport"}, -] - -QUESTION_TEMPLATES = { - "meteofrance": ( - "What is the probability that the daily average temperature at the French weather station " - "at {station} will be higher on {resolution_date} than on {forecast_due_date}?" - ) -} - -VALUE_EXPLANATIONS = { - "meteofrance": "The daily average temperature at the French weather station at {station}." -} - - -def create_meteofrance_constants(STATIONS): - """Convert PRE-CONSTANTS data to format expected by fetch and update_questions functions.""" - constants = [] - for item in STATIONS: - id = item["id"] - station = item["station"] - question_text = QUESTION_TEMPLATES["meteofrance"].replace("{station}", station) - explanation = VALUE_EXPLANATIONS["meteofrance"].format(station=station) - new_entry = { - "id": f"meteofrance/TEMPERATURE/celsius.{id}.D", - "question_text": question_text, - "freeze_datetime_value_explanation": explanation, - } - constants.append(new_entry) - return constants - - -CONSTANTS = create_meteofrance_constants(METEOFRANCE_STATIONS) +_META = SOURCE_METADATA["dbnomics"] +SOURCE_INTRO = _META["source_intro"] +RESOLUTION_CRITERIA = _META["resolution_criteria"] diff --git a/src/questions/dbnomics/fetch/Makefile b/src/orchestration/func_dbnomics_fetch/Makefile similarity index 62% rename from src/questions/dbnomics/fetch/Makefile rename to src/orchestration/func_dbnomics_fetch/Makefile index dac115c6..4c79372e 100644 --- a/src/questions/dbnomics/fetch/Makefile +++ b/src/orchestration/func_dbnomics_fetch/Makefile @@ -9,19 +9,23 @@ UPLOAD_DIR = upload .gcloudignore: cp -r $(ROOT_DIR)src/helpers/.gcloudignore . -Procfile: - cp -r $(ROOT_DIR)src/helpers/Procfile . +Dockerfile: $(ROOT_DIR)src/helpers/Dockerfile.template + sed \ + -e 's/REGION/$(CLOUD_DEPLOY_REGION)/g' \ + -e 's/STACK/google-22-full/g' \ + -e 's/PYTHON_VERSION/python312/g' \ + $< > Dockerfile -deploy : main.py .gcloudignore requirements.txt Procfile +deploy : main.py .gcloudignore requirements.txt Dockerfile mkdir -p $(UPLOAD_DIR) cp -r $(ROOT_DIR)utils $(UPLOAD_DIR)/ - cp -r $(ROOT_DIR)src/helpers $(UPLOAD_DIR)/ - cp -r $(ROOT_DIR)src/sources $(UPLOAD_DIR)/ - cp $(ROOT_DIR)src/_fb_types.py $(UPLOAD_DIR)/ - cp $(ROOT_DIR)src/_schemas.py $(UPLOAD_DIR)/ + cp -r $(ROOT_DIR)src/helpers $(UPLOAD_DIR)/helpers + cp -r $(ROOT_DIR)src/sources $(UPLOAD_DIR)/sources mkdir -p $(UPLOAD_DIR)/orchestration cp $(ROOT_DIR)src/orchestration/__init__.py $(UPLOAD_DIR)/orchestration/ - cp $(ROOT_DIR)src/orchestration/_io.py $(UPLOAD_DIR)/orchestration/ + cp $(ROOT_DIR)src/orchestration/_source_io.py $(UPLOAD_DIR)/orchestration/ + cp $(ROOT_DIR)src/_fb_types.py $(UPLOAD_DIR)/ + cp $(ROOT_DIR)src/_schemas.py $(UPLOAD_DIR)/ cp $^ $(UPLOAD_DIR)/ gcloud run jobs deploy \ func-data-dbnomics-fetch \ @@ -37,4 +41,4 @@ deploy : main.py .gcloudignore requirements.txt Procfile --source $(UPLOAD_DIR) clean : - rm -rf $(UPLOAD_DIR) .gcloudignore Procfile + rm -rf $(UPLOAD_DIR) .gcloudignore Dockerfile diff --git a/src/orchestration/func_dbnomics_fetch/main.py b/src/orchestration/func_dbnomics_fetch/main.py new file mode 100644 index 00000000..23327b6e --- /dev/null +++ b/src/orchestration/func_dbnomics_fetch/main.py @@ -0,0 +1,30 @@ +"""DBnomics fetch entry point.""" + +from __future__ import annotations + +import logging +from typing import Any + +from helpers import decorator +from orchestration import _source_io +from sources.dbnomics import DbnomicsSource + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +SOURCE = "dbnomics" + + +@decorator.log_runtime +def driver(_: Any) -> None: + """Fetch DBnomics data and upload to question bank.""" + source = DbnomicsSource() + + dff = source.fetch() + + _source_io.write_fetch_output(SOURCE, dff) + logger.info("Done.") + + +if __name__ == "__main__": + driver(None) diff --git a/src/questions/dbnomics/fetch/requirements.txt b/src/orchestration/func_dbnomics_fetch/requirements.txt similarity index 54% rename from src/questions/dbnomics/fetch/requirements.txt rename to src/orchestration/func_dbnomics_fetch/requirements.txt index 1d4383a3..961b4524 100644 --- a/src/questions/dbnomics/fetch/requirements.txt +++ b/src/orchestration/func_dbnomics_fetch/requirements.txt @@ -1,7 +1,6 @@ google-cloud-storage -google-cloud-secret-manager -backoff -certifi pandas>=2.2.2,<3.0 pandera -termcolor +requests +backoff +numpy diff --git a/src/questions/dbnomics/update_questions/Makefile b/src/orchestration/func_dbnomics_update/Makefile similarity index 62% rename from src/questions/dbnomics/update_questions/Makefile rename to src/orchestration/func_dbnomics_update/Makefile index c12ab14c..b0fbd4ae 100644 --- a/src/questions/dbnomics/update_questions/Makefile +++ b/src/orchestration/func_dbnomics_update/Makefile @@ -9,19 +9,23 @@ UPLOAD_DIR = upload .gcloudignore: cp -r $(ROOT_DIR)src/helpers/.gcloudignore . -Procfile: - cp -r $(ROOT_DIR)src/helpers/Procfile . +Dockerfile: $(ROOT_DIR)src/helpers/Dockerfile.template + sed \ + -e 's/REGION/$(CLOUD_DEPLOY_REGION)/g' \ + -e 's/STACK/google-22-full/g' \ + -e 's/PYTHON_VERSION/python312/g' \ + $< > Dockerfile -deploy : main.py .gcloudignore requirements.txt Procfile +deploy : main.py .gcloudignore requirements.txt Dockerfile mkdir -p $(UPLOAD_DIR) cp -r $(ROOT_DIR)utils $(UPLOAD_DIR)/ - cp -r $(ROOT_DIR)src/helpers $(UPLOAD_DIR)/ - cp -r $(ROOT_DIR)src/sources $(UPLOAD_DIR)/ - cp $(ROOT_DIR)src/_fb_types.py $(UPLOAD_DIR)/ - cp $(ROOT_DIR)src/_schemas.py $(UPLOAD_DIR)/ + cp -r $(ROOT_DIR)src/helpers $(UPLOAD_DIR)/helpers + cp -r $(ROOT_DIR)src/sources $(UPLOAD_DIR)/sources mkdir -p $(UPLOAD_DIR)/orchestration cp $(ROOT_DIR)src/orchestration/__init__.py $(UPLOAD_DIR)/orchestration/ - cp $(ROOT_DIR)src/orchestration/_io.py $(UPLOAD_DIR)/orchestration/ + cp $(ROOT_DIR)src/orchestration/_source_io.py $(UPLOAD_DIR)/orchestration/ + cp $(ROOT_DIR)src/_fb_types.py $(UPLOAD_DIR)/ + cp $(ROOT_DIR)src/_schemas.py $(UPLOAD_DIR)/ cp $^ $(UPLOAD_DIR)/ gcloud run jobs deploy \ func-data-dbnomics-update-questions \ @@ -37,4 +41,4 @@ deploy : main.py .gcloudignore requirements.txt Procfile --source $(UPLOAD_DIR) clean : - rm -rf $(UPLOAD_DIR) .gcloudignore Procfile + rm -rf $(UPLOAD_DIR) .gcloudignore Dockerfile diff --git a/src/orchestration/func_dbnomics_update/main.py b/src/orchestration/func_dbnomics_update/main.py new file mode 100644 index 00000000..680e4527 --- /dev/null +++ b/src/orchestration/func_dbnomics_update/main.py @@ -0,0 +1,37 @@ +"""DBnomics update entry point.""" + +from __future__ import annotations + +import logging +from typing import Any + +from helpers import data_utils, decorator +from orchestration import _source_io +from sources.dbnomics import DbnomicsSource + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +SOURCE = "dbnomics" + + +@decorator.log_runtime +def driver(_: Any) -> None: + """Update DBnomics questions and resolution files.""" + source = DbnomicsSource() + + dfq, dff = data_utils.get_data_from_cloud_storage( + SOURCE, return_question_data=True, return_fetch_data=True + ) + + result = source.update(dfq, dff) + + logger.info("Uploading to GCP...") + data_utils.upload_questions(result.dfq, SOURCE) + if result.resolution_files: + _source_io.upload_resolution_files(SOURCE, result.resolution_files) + logger.info("Done.") + + +if __name__ == "__main__": + driver(None) diff --git a/src/questions/dbnomics/update_questions/requirements.txt b/src/orchestration/func_dbnomics_update/requirements.txt similarity index 52% rename from src/questions/dbnomics/update_questions/requirements.txt rename to src/orchestration/func_dbnomics_update/requirements.txt index cebc5cb6..961b4524 100644 --- a/src/questions/dbnomics/update_questions/requirements.txt +++ b/src/orchestration/func_dbnomics_update/requirements.txt @@ -1,6 +1,6 @@ google-cloud-storage -google-cloud-secret-manager pandas>=2.2.2,<3.0 -tqdm pandera -termcolor +requests +backoff +numpy diff --git a/src/questions/dbnomics/fetch/main.py b/src/questions/dbnomics/fetch/main.py deleted file mode 100644 index f47948cb..00000000 --- a/src/questions/dbnomics/fetch/main.py +++ /dev/null @@ -1,93 +0,0 @@ -"""Fetch data from DBnomics API.""" - -import json -import logging -import os -import sys - -import backoff -import pandas as pd -import requests - -sys.path.append(os.path.join(os.path.dirname(__file__), "../../..")) -from helpers import constants, data_utils, dates, dbnomics, decorator, env # noqa: E402 - -sys.path.append(os.path.join(os.path.dirname(__file__), "../../../..")) -from utils import gcp # noqa: E402 - -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - -source = "dbnomics" -filenames = data_utils.generate_filenames(source=source) - - -@backoff.on_exception( - backoff.expo, - requests.exceptions.RequestException, - max_time=300, - on_backoff=data_utils.print_error_info_handler, -) -def _call_endpoint(id): - """Fetch data from DBnomics.""" - logger.info(f"Calling DBnomics for series {id}") - endpoint = "https://api.db.nomics.world/v22/series/" + id - params = {"observations": "true"} - response = requests.get(url=endpoint, params=params) - if not response.ok: - logger.error("Request to DBnomics API endpoint failed.") - response.raise_for_status() - data = response.json() - docs = data.get("series", {}).get("docs", [{}])[0] - id_safe = id.replace("/", "_") - df = pd.DataFrame( - { - "id": id_safe, - "period": docs.get("period"), - "value": docs.get("value"), - "provider_name": data.get("provider", {}).get("name"), - "dataset_name": docs.get("dataset_name"), - "series_name": docs.get("series_name"), - } - ) - df["period"] = pd.to_datetime(df["period"]).dt.date - # Filter to record start date and beyond - df = df[ - (df["period"] >= constants.QUESTION_BANK_DATA_STORAGE_START_DATE) - & (df["period"] < dates.get_date_today()) - ].reset_index(drop=True) - return df if not df.empty else None - - -@decorator.log_runtime -def driver(_): - """Fetch DBnomics data and store in GCP Cloud Storage.""" - # Get the latest DBnomics data - logger.info("Downloading DBnomics data.") - - df = None - - for row in pd.DataFrame(dbnomics.CONSTANTS).itertuples(): - id = row.id - new_rows = _call_endpoint(id=id) - df = new_rows if df is None else pd.concat([df, new_rows]) - - df["period"] = df["period"].astype(str) - - # Save - with open(filenames["local_fetch"], "w", encoding="utf-8") as f: - for record in df.to_dict(orient="records"): - jsonl_str = json.dumps(record, ensure_ascii=False) - f.write(jsonl_str + "\n") - - # Upload - gcp.storage.upload( - bucket_name=env.QUESTION_BANK_BUCKET, - local_filename=filenames["local_fetch"], - ) - - logger.info("Done.") - - -if __name__ == "__main__": - driver(None) diff --git a/src/questions/dbnomics/update_questions/main.py b/src/questions/dbnomics/update_questions/main.py deleted file mode 100644 index 0e8904f1..00000000 --- a/src/questions/dbnomics/update_questions/main.py +++ /dev/null @@ -1,156 +0,0 @@ -"""Generate DBnomics questions.""" - -import json -import logging -import os -import sys - -import pandas as pd - -sys.path.append(os.path.join(os.path.dirname(__file__), "../../..")) -from helpers import constants, data_utils, dates, dbnomics, decorator, env # noqa: E402 - -sys.path.append(os.path.join(os.path.dirname(__file__), "../../../..")) # noqa: E402 -from utils import gcp # noqa: E402 - -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - -source = "dbnomics" -filenames = data_utils.generate_filenames(source=source) - -""" Some dataseries with regular updates have large numbers of NA values during -periods in which data is not being reported. observations_without_data is -designed to detect these quiet periods and exclude the series from being -formed into a question during them (since it's unclear if we'll be able to -resolve them and the freeze values become increasingly irrelevant). """ -observations_without_data = 10 - - -def create_resolution_file(id, df): - """ - Create or update a resolution file for a given question. - - Args: - id (str): Identifier for the question - df (DataFrame): dataframe containing fetch information related to the question - """ - basename = f"{id}.jsonl" - remote_filename = f"{source}/{basename}" - local_filename = "/tmp/tmp.jsonl" - - df = df[["id", "period", "value"]].rename(columns={"period": "date"}) - df = df.astype(dtype=constants.RESOLUTION_FILE_COLUMN_DTYPE) - - df["value"] = df["value"].replace("NA", "N/A") - - df.to_json(local_filename, orient="records", lines=True, date_format="iso") - gcp.storage.upload( - bucket_name=env.QUESTION_BANK_BUCKET, - local_filename=local_filename, - filename=remote_filename, - ) - - -def _construct_questions(dff, dfq): - """Construct question and resolution tables.""" - # For each seriesIds, construct question data from request - new_series = None - for row in dbnomics.CONSTANTS: - id = row["id"].replace("/", "_") - create_resolution_file(id, df=dff[dff["id"] == id]) - provider_name = dff[dff["id"] == id]["provider_name"].iloc[0] - dataset_name = dff[dff["id"] == id]["dataset_name"].iloc[0] - series_name = dff[dff["id"] == id]["series_name"].iloc[0] - question = row["question_text"] - url = f"https://db.nomics.world/{row['id']}" - background = ( - f"The history of {dataset_name} - {series_name} from {provider_name} is available at " - f"{url}." - ) - freeze_datetime_value_explanation = row["freeze_datetime_value_explanation"] - series_values = dff[dff["id"] == id]["value"] - series_dates = pd.to_datetime(dff[dff["id"] == id]["period"]) - - last_fetch_date = series_dates.iloc[-1] - last_fetch_value = series_values.iloc[-1] - freeze_datetime_value = ( - float(last_fetch_value) - if last_fetch_date.date() > dates.get_date_yesterday() and last_fetch_value != "NA" - else "N/A" - ) - - if (series_values.tail(observations_without_data) != "NA").any(): - new_row = { - "id": id, - "question": question, - "background": background, - "market_info_resolution_criteria": "N/A", - "market_info_open_datetime": "N/A", - "market_info_close_datetime": "N/A", - "url": url, - "market_info_resolution_datetime": "N/A", - "resolved": False, - "forecast_horizons": constants.FORECAST_HORIZONS_IN_DAYS, - "freeze_datetime_value": freeze_datetime_value, - "freeze_datetime_value_explanation": freeze_datetime_value_explanation, - } - new_row = pd.DataFrame([new_row]) - if id not in dfq["id"].tolist(): - new_series = ( - new_row - if new_series is None - else pd.concat([new_series, new_row], ignore_index=True) - ) - else: - dfq.loc[dfq["id"] == id, "freeze_datetime_value"] = float( - series_values[series_values != "NA"].iloc[-1] - ) - dfq.loc[dfq["id"] == id, "url"] = url - dfq.loc[dfq["id"] == id, "background"] = background - - new_series = new_series if new_series is not None else pd.DataFrame() - return new_series - - -@decorator.log_runtime -def driver(_): - """Pull in fetched data and update questions and resolved values in question bank.""" - # Download pertinent files from Cloud Storage - logger.info("Downloading previously-fetched DBnomics data from Cloud.") - dff = data_utils.download_and_read( - filename=filenames["jsonl_fetch"], - local_filename=filenames["local_fetch"], - df_tmp=pd.DataFrame(columns=dbnomics.FETCH_COLUMNS), - dtype=dbnomics.FETCH_COLUMN_DTYPE, - ) - dfq = data_utils.download_and_read( - filename=filenames["jsonl_question"], - local_filename=filenames["local_question"], - df_tmp=pd.DataFrame(columns=constants.QUESTION_FILE_COLUMNS), - dtype=constants.QUESTION_FILE_COLUMN_DTYPE, - ) - - # Update questions file - new_series = _construct_questions(dff=dff, dfq=dfq) - dfq = pd.concat([dfq, new_series]) - - logger.info(f"Found {len(dfq):,} questions of {len(dbnomics.CONSTANTS):,} possible.") - - # Save - with open(filenames["local_question"], "w", encoding="utf-8") as f: - for record in dfq.to_dict(orient="records"): - jsonl_str = json.dumps(record, ensure_ascii=False) - f.write(jsonl_str + "\n") - - # Upload - gcp.storage.upload( - bucket_name=env.QUESTION_BANK_BUCKET, - local_filename=filenames["local_question"], - ) - - logger.info("Done.") - - -if __name__ == "__main__": - driver(None) diff --git a/src/sources/dbnomics.py b/src/sources/dbnomics.py index 3294be82..f56a4a6a 100644 --- a/src/sources/dbnomics.py +++ b/src/sources/dbnomics.py @@ -2,20 +2,296 @@ from __future__ import annotations -from typing import ClassVar +import logging +from typing import Any, ClassVar + +import backoff +import pandas as pd +import pandera.pandas as pa +import requests +from pandera.typing import DataFrame + +from _fb_types import UpdateResult +from _schemas import DbnomicsFetchFrame, QuestionFrame, ResolutionFrame +from helpers import constants, data_utils, dates from ._dataset import DatasetSource +logger = logging.getLogger(__name__) + +_BASE_URL = "https://api.db.nomics.world/v22/series/" + +# Some dataseries with regular updates have large numbers of NA values during periods in which +# data is not being reported. _OBSERVATIONS_WITHOUT_DATA detects these quiet periods and excludes +# the series from being formed into a question during them (since it's unclear if we'll be able to +# resolve them and the freeze values become increasingly irrelevant). +_OBSERVATIONS_WITHOUT_DATA = 10 + +_METEOFRANCE_STATIONS = [ + {"id": "07005", "station": "Abbeville"}, + {"id": "07015", "station": "Lille Airport"}, + {"id": "07020", "station": "Pointe De La Hague"}, + {"id": "07027", "station": "Caen – Carpiquet Airport"}, + {"id": "07037", "station": "Rouen Airport"}, + {"id": "07072", "station": "Reims – Prunay Aerodrome"}, + {"id": "07110", "station": "Brest Bretagne Airport"}, + {"id": "07117", "station": "Ploumanac'h"}, + {"id": "07130", "station": "Rennes–Saint-Jacques Airport"}, + {"id": "07139", "station": "Alençon"}, + {"id": "07149", "station": "Orly"}, + {"id": "07168", "station": "Troyes-Barberey Airport"}, + {"id": "07181", "station": "Nancy – Ochey Air Base"}, + {"id": "07190", "station": "Strasbourg Airport"}, + {"id": "07222", "station": "Nantes Atlantique Airport"}, + {"id": "07240", "station": "Tours"}, + {"id": "07255", "station": "Bourges"}, + {"id": "07280", "station": "Dijon-Bourgogne Airport"}, + {"id": "07299", "station": "EuroAirport Basel Mulhouse Freiburg"}, + {"id": "07335", "station": "Poitiers–Biard Airport"}, + {"id": "07434", "station": "Limoges – Bellegarde Airport"}, + {"id": "07460", "station": "Clermont-Ferrand Auvergne Airport"}, + {"id": "07471", "station": "Le Puy – Loudes Airport"}, + {"id": "07481", "station": "Lyon–Saint Exupéry Airport"}, + {"id": "07510", "station": "Bordeaux–Mérignac Airport"}, + {"id": "07535", "station": "Gourdon"}, + {"id": "07558", "station": "Millau"}, + {"id": "07577", "station": "Montélimar"}, + {"id": "07591", "station": "Embrun"}, + {"id": "07607", "station": "Mont-de-Marsan"}, + {"id": "07621", "station": "Tarbes–Lourdes–Pyrénées Airport"}, + {"id": "07627", "station": "Saint-Girons"}, + {"id": "07630", "station": "Toulouse–Blagnac Airport"}, + {"id": "07650", "station": "Marignane"}, + {"id": "07690", "station": "Nice"}, + {"id": "07747", "station": "Perpignan"}, + {"id": "07761", "station": "Ajaccio"}, + {"id": "61968", "station": "Glorioso Islands"}, + {"id": "61970", "station": "Juan de Nova Island"}, + {"id": "61972", "station": "Europa Island"}, + {"id": "61976", "station": "Tromelin Island"}, + {"id": "61980", "station": "Roland Garros Airport"}, + {"id": "61996", "station": "Amsterdam Island"}, + {"id": "61997", "station": "Île de la Possession"}, + {"id": "61998", "station": "Grande Terre"}, + {"id": "67005", "station": "Pamandzi"}, + {"id": "71805", "station": "Saint-Pierre"}, + {"id": "78890", "station": "La Désirade"}, + {"id": "78894", "station": "Saint Barthélemy"}, + {"id": "78897", "station": "Pointe-à-Pitre International Airport"}, + {"id": "78925", "station": "Martinique Aimé Césaire International Airport"}, + {"id": "81401", "station": "Saint-Laurent"}, + {"id": "81405", "station": "Cayenne – Félix Éboué Airport"}, +] + +_QUESTION_TEMPLATES = { + "meteofrance": ( + "What is the probability that the daily average temperature at the French weather station " + "at {station} will be higher on {resolution_date} than on {forecast_due_date}?" + ) +} + +_VALUE_EXPLANATIONS = { + "meteofrance": "The daily average temperature at the French weather station at {station}." +} + + +def _create_meteofrance_constants(stations: list[dict]) -> list[dict]: + """Convert station data into the series config consumed by fetch and update. + + Args: + stations (list[dict]): MeteoFrance station entries with ``id`` and ``station``. + """ + constants_list = [] + for item in stations: + station_id = item["id"] + station = item["station"] + question_text = _QUESTION_TEMPLATES["meteofrance"].replace("{station}", station) + explanation = _VALUE_EXPLANATIONS["meteofrance"].format(station=station) + constants_list.append( + { + "id": f"meteofrance/TEMPERATURE/celsius.{station_id}.D", + "question_text": question_text, + "freeze_datetime_value_explanation": explanation, + } + ) + return constants_list + + +_CONSTANTS = _create_meteofrance_constants(_METEOFRANCE_STATIONS) + class DbnomicsSource(DatasetSource): """DBnomics economic data source.""" name: ClassVar[str] = "dbnomics" - def fetch(self, **kwargs): - """Fetch DBnomics data from external API.""" - raise NotImplementedError + # ------------------------------------------------------------------ + # Public: fetch + # ------------------------------------------------------------------ + + @pa.check_types + def fetch(self, **kwargs: Any) -> DataFrame[DbnomicsFetchFrame]: + """Fetch DBnomics series data from the public API.""" + # Compute 'today' once and thread it to every series call so a run straddling midnight + # uses one consistent upper bound across all of its requests. + today = dates.get_date_today() + logger.info("Downloading DBnomics data.") + + df = None + for row in _CONSTANTS: + new_rows = self._call_endpoint(id=row["id"], today=today) + df = new_rows if df is None else pd.concat([df, new_rows]) + + df["period"] = df["period"].astype(str) + return df + + # ------------------------------------------------------------------ + # Public: update + # ------------------------------------------------------------------ + + @pa.check_types + def update( + self, + dfq: DataFrame[QuestionFrame], + dff: DataFrame[DbnomicsFetchFrame], + **kwargs: Any, + ) -> UpdateResult: + """Process fetched data into updated questions and resolution files. + + Args: + dfq (DataFrame[QuestionFrame]): Existing questions. + dff (DataFrame[DbnomicsFetchFrame]): Freshly fetched data. + """ + # Reproduce the legacy FETCH_COLUMN_DTYPE read: id/period/value are strings. Without this + # the value column would be inferred as floats and resolution files would store JSON + # numbers instead of the strings ("NA"/"12.3") the legacy job wrote. + dff = dff.copy() + dff[["id", "period", "value"]] = dff[["id", "period", "value"]].astype(str) + + yesterday = dates.get_date_yesterday() + resolution_files: dict[str, pd.DataFrame] = {} + + new_series = None + for row in _CONSTANTS: + id = row["id"].replace("/", "_") + df_series = dff[dff["id"] == id] + + resolution_files[id] = self._build_resolution_df(df_series) + + provider_name = df_series["provider_name"].iloc[0] + dataset_name = df_series["dataset_name"].iloc[0] + series_name = df_series["series_name"].iloc[0] + question = row["question_text"] + url = f"https://db.nomics.world/{row['id']}" + background = ( + f"The history of {dataset_name} - {series_name} from {provider_name} is available " + f"at {url}." + ) + freeze_datetime_value_explanation = row["freeze_datetime_value_explanation"] + series_values = df_series["value"] + series_dates = pd.to_datetime(df_series["period"]) + + last_fetch_date = series_dates.iloc[-1] + last_fetch_value = series_values.iloc[-1] + freeze_datetime_value = ( + float(last_fetch_value) + if last_fetch_date.date() > yesterday and last_fetch_value != "NA" + else "N/A" + ) + + if (series_values.tail(_OBSERVATIONS_WITHOUT_DATA) != "NA").any(): + new_row = { + "id": id, + "question": question, + "background": background, + "market_info_resolution_criteria": "N/A", + "market_info_open_datetime": "N/A", + "market_info_close_datetime": "N/A", + "url": url, + "market_info_resolution_datetime": "N/A", + "resolved": False, + "forecast_horizons": constants.FORECAST_HORIZONS_IN_DAYS, + "freeze_datetime_value": freeze_datetime_value, + "freeze_datetime_value_explanation": freeze_datetime_value_explanation, + } + new_row = pd.DataFrame([new_row]) + if id not in dfq["id"].tolist(): + new_series = ( + new_row + if new_series is None + else pd.concat([new_series, new_row], ignore_index=True) + ) + else: + dfq.loc[dfq["id"] == id, "freeze_datetime_value"] = float( + series_values[series_values != "NA"].iloc[-1] + ) + dfq.loc[dfq["id"] == id, "url"] = url + dfq.loc[dfq["id"] == id, "background"] = background + + if new_series is not None: + dfq = pd.concat([dfq, new_series]) + + logger.info(f"Found {len(dfq):,} questions of {len(_CONSTANTS):,} possible.") + + return UpdateResult(dfq=dfq, resolution_files=resolution_files) + + # ------------------------------------------------------------------ + # Private: API calls + # ------------------------------------------------------------------ + + @backoff.on_exception( + backoff.expo, + requests.exceptions.RequestException, + max_time=300, + on_backoff=data_utils.print_error_info_handler, + ) + def _call_endpoint(self, id: str, today) -> pd.DataFrame | None: + """Fetch a single DBnomics series and return its observation rows (or None if empty). + + Args: + id (str): DBnomics series ID (with ``/`` separators). + today (date): Exclusive upper bound for observation periods. + """ + logger.info(f"Calling DBnomics for series {id}") + endpoint = _BASE_URL + id + params = {"observations": "true"} + response = requests.get(url=endpoint, params=params) + if not response.ok: + logger.error("Request to DBnomics API endpoint failed.") + response.raise_for_status() + data = response.json() + docs = data.get("series", {}).get("docs", [{}])[0] + id_safe = id.replace("/", "_") + df = pd.DataFrame( + { + "id": id_safe, + "period": docs.get("period"), + "value": docs.get("value"), + "provider_name": data.get("provider", {}).get("name"), + "dataset_name": docs.get("dataset_name"), + "series_name": docs.get("series_name"), + } + ) + df["period"] = pd.to_datetime(df["period"]).dt.date + # Filter to record start date and beyond. + df = df[ + (df["period"] >= constants.QUESTION_BANK_DATA_STORAGE_START_DATE) + & (df["period"] < today) + ].reset_index(drop=True) + return df if not df.empty else None + + # ------------------------------------------------------------------ + # Private: resolution dataframe building + # ------------------------------------------------------------------ + + @staticmethod + def _build_resolution_df(df: pd.DataFrame) -> DataFrame[ResolutionFrame]: + """Build a resolution DataFrame ([id, date, value]) for a single series. - def update(self, dfq, dff, **kwargs): - """Process fetched DBnomics data into questions and resolution files.""" - raise NotImplementedError + Args: + df (pd.DataFrame): Fetched rows for this series. + """ + df = df[["id", "period", "value"]].rename(columns={"period": "date"}) + df = df.astype(dtype=constants.RESOLUTION_FILE_COLUMN_DTYPE) + df["value"] = df["value"].replace("NA", "N/A") + return df diff --git a/src/tests/conftest.py b/src/tests/conftest.py index 0af0d240..f5f741c6 100644 --- a/src/tests/conftest.py +++ b/src/tests/conftest.py @@ -8,6 +8,7 @@ import pytest from sources.acled import AcledSource +from sources.dbnomics import DbnomicsSource from sources.fred import FredSource from sources.infer import InferSource from sources.manifold import ManifoldSource @@ -106,6 +107,12 @@ def yfinance_source(): return YfinanceSource() +@pytest.fixture() +def dbnomics_source(): + """Return a DbnomicsSource instance.""" + return DbnomicsSource() + + # --------------------------------------------------------------------------- # DataFrame factories # --------------------------------------------------------------------------- @@ -519,3 +526,56 @@ def make_polymarket_fetch_df(rows): if col not in df.columns: df[col] = default return df + + +# --------------------------------------------------------------------------- +# DBnomics-specific factories +# --------------------------------------------------------------------------- + + +def make_dbnomics_api_response( + period_values, + provider_name="MeteoFrance", + dataset_name="Temperature", + series_name="Abbeville", +): + """Build a DBnomics ``/series`` API response dict. + + Args: + period_values (list): List of (period_str, value) tuples; value is a float or "NA". + provider_name (str): Provider name returned under provider.name. + dataset_name (str): Dataset name on the series doc. + series_name (str): Series name on the series doc. + """ + periods = [p for p, _ in period_values] + values = [v for _, v in period_values] + return { + "provider": {"name": provider_name}, + "series": { + "docs": [ + { + "period": periods, + "value": values, + "dataset_name": dataset_name, + "series_name": series_name, + } + ] + }, + } + + +def make_dbnomics_fetch_df(rows): + """Build a DataFrame matching DbnomicsFetchFrame (one row per observation). + + Each row should have at least 'id', 'period', 'value'. Missing columns get defaults. + """ + defaults = { + "provider_name": "MeteoFrance", + "dataset_name": "Temperature", + "series_name": "Abbeville", + } + df = pd.DataFrame(rows) + for col, default in defaults.items(): + if col not in df.columns: + df[col] = default + return df diff --git a/src/tests/test_dbnomics.py b/src/tests/test_dbnomics.py new file mode 100644 index 00000000..1117478e --- /dev/null +++ b/src/tests/test_dbnomics.py @@ -0,0 +1,203 @@ +"""Tests for DbnomicsSource: fetch and update logic.""" + +from datetime import date +from unittest.mock import Mock, patch + +import pandas as pd + +from helpers import constants +from helpers import dbnomics as dbnomics_helper +from sources.dbnomics import DbnomicsSource + +from .conftest import ( + make_dbnomics_api_response, + make_dbnomics_fetch_df, + make_question_df, +) + +# A small stand-in for the 52-station _CONSTANTS so update tests don't need every series present +# in dff (legacy/update indexes each series' first row, so absent series would raise). +_TEST_CONSTANTS = [ + { + "id": "meteofrance/TEMPERATURE/celsius.07005.D", + "question_text": ( + "What is the probability that the daily average temperature at the French weather " + "station at Abbeville will be higher on {resolution_date} than on {forecast_due_date}?" + ), + "freeze_datetime_value_explanation": ( + "The daily average temperature at the French weather station at Abbeville." + ), + } +] +_RAW_ID = "meteofrance/TEMPERATURE/celsius.07005.D" +_SAFE_ID = "meteofrance_TEMPERATURE_celsius.07005.D" + + +def _empty_dfq(): + """Return an empty question bank with the canonical columns (so dfq['id'] exists).""" + return pd.DataFrame(columns=constants.QUESTION_FILE_COLUMNS) + + +# --------------------------------------------------------------------------- +# Backwards-compat shim +# --------------------------------------------------------------------------- + + +class TestHelperShim: + """The helpers/dbnomics.py shim still exposes identity for question_curation.""" + + def test_exposes_intro_and_criteria(self): + assert isinstance(dbnomics_helper.SOURCE_INTRO, str) and dbnomics_helper.SOURCE_INTRO + assert ( + isinstance(dbnomics_helper.RESOLUTION_CRITERIA, str) + and dbnomics_helper.RESOLUTION_CRITERIA + ) + + +# --------------------------------------------------------------------------- +# _call_endpoint +# --------------------------------------------------------------------------- + + +class TestCallEndpoint: + """Single-series API call: id safe-ification + date-window filtering.""" + + def test_safe_ids_and_window_filter(self, dbnomics_source): + today = date(2026, 1, 15) + resp = Mock() + resp.ok = True + resp.json.return_value = make_dbnomics_api_response( + [ + ("2025-01-01", 5.0), # in window -> keep + ("2026-01-14", 7.0), # yesterday -> keep + ("2026-01-15", 9.0), # today -> dropped (period < today) + ("2026-01-20", 9.0), # future -> dropped + ] + ) + with patch("sources.dbnomics.requests.get", return_value=resp): + df = dbnomics_source._call_endpoint(id=_RAW_ID, today=today) + + assert df["id"].unique().tolist() == [_SAFE_ID] + assert set(df["period"]) == {date(2025, 1, 1), date(2026, 1, 14)} + + def test_returns_none_when_window_empty(self, dbnomics_source): + today = date(2026, 1, 15) + resp = Mock() + resp.ok = True + resp.json.return_value = make_dbnomics_api_response([("2026-01-20", 9.0)]) + with patch("sources.dbnomics.requests.get", return_value=resp): + df = dbnomics_source._call_endpoint(id=_RAW_ID, today=today) + + assert df is None + + +# --------------------------------------------------------------------------- +# fetch +# --------------------------------------------------------------------------- + + +class TestFetch: + """fetch() threads a single 'today' and concatenates per-series frames.""" + + @patch("sources.dbnomics._CONSTANTS", _TEST_CONSTANTS) + def test_concatenates_and_casts_period_to_str(self, dbnomics_source, freeze_today): + freeze_today(date(2026, 1, 15)) + fake = pd.DataFrame( + { + "id": _SAFE_ID, + "period": [date(2026, 1, 13), date(2026, 1, 14)], + "value": [5.0, 6.0], + "provider_name": "MeteoFrance", + "dataset_name": "Temperature", + "series_name": "Abbeville", + } + ) + with patch.object(DbnomicsSource, "_call_endpoint", return_value=fake) as mocked: + dff = dbnomics_source.fetch() + + # today computed once on the surface and threaded down. + assert mocked.call_args.kwargs["today"] == date(2026, 1, 15) + assert dff["period"].tolist() == ["2026-01-13", "2026-01-14"] + assert dff["id"].tolist() == [_SAFE_ID, _SAFE_ID] + + +# --------------------------------------------------------------------------- +# update +# --------------------------------------------------------------------------- + + +class TestUpdate: + """update() builds resolution files for every series and upserts questions.""" + + @patch("sources.dbnomics._CONSTANTS", _TEST_CONSTANTS) + def test_new_question_inserted_with_resolution_file(self, dbnomics_source, freeze_today): + freeze_today(date(2026, 1, 15)) + dff = make_dbnomics_fetch_df( + [ + {"id": _SAFE_ID, "period": "2026-01-13", "value": 5.0}, + {"id": _SAFE_ID, "period": "2026-01-14", "value": 6.0}, + ] + ) + result = dbnomics_source.update(_empty_dfq(), dff) + + assert _SAFE_ID in result.dfq["id"].tolist() + row = result.dfq[result.dfq["id"] == _SAFE_ID].iloc[0] + assert not row["resolved"] + assert row["url"] == f"https://db.nomics.world/{_RAW_ID}" + + assert _SAFE_ID in result.resolution_files + rf = result.resolution_files[_SAFE_ID] + assert list(rf.columns) == ["id", "date", "value"] + + @patch("sources.dbnomics._CONSTANTS", _TEST_CONSTANTS) + def test_existing_question_updated_in_place(self, dbnomics_source, freeze_today): + freeze_today(date(2026, 1, 15)) + dff = make_dbnomics_fetch_df( + [ + {"id": _SAFE_ID, "period": "2026-01-13", "value": 5.0}, + {"id": _SAFE_ID, "period": "2026-01-14", "value": 6.0}, + ] + ) + dfq = make_question_df([{"id": _SAFE_ID, "freeze_datetime_value": "stale"}]) + + result = dbnomics_source.update(dfq, dff) + + # No duplicate row; freeze value refreshed to the last non-NA value. The QuestionFrame + # contract coerces freeze_datetime_value to a string on update() output (downstream always + # reads it back as str), so the float 6.0 surfaces as "6.0". + assert (result.dfq["id"] == _SAFE_ID).sum() == 1 + row = result.dfq[result.dfq["id"] == _SAFE_ID].iloc[0] + assert row["freeze_datetime_value"] == "6.0" + + @patch("sources.dbnomics._CONSTANTS", _TEST_CONSTANTS) + def test_all_na_window_skipped_from_questions_but_resolution_built( + self, dbnomics_source, freeze_today + ): + freeze_today(date(2026, 1, 15)) + # 11 consecutive "NA"s: the last 10 are all NA -> series is not minted into a question. + rows = [{"id": _SAFE_ID, "period": f"2026-01-{d:02d}", "value": "NA"} for d in range(1, 12)] + dff = make_dbnomics_fetch_df(rows) + + result = dbnomics_source.update(_empty_dfq(), dff) + + assert _SAFE_ID not in result.dfq["id"].tolist() + assert _SAFE_ID in result.resolution_files # resolution file still written + + @patch("sources.dbnomics._CONSTANTS", _TEST_CONSTANTS) + def test_resolution_file_na_to_not_available_and_values_are_strings( + self, dbnomics_source, freeze_today + ): + freeze_today(date(2026, 1, 15)) + dff = make_dbnomics_fetch_df( + [ + {"id": _SAFE_ID, "period": "2026-01-13", "value": "NA"}, + {"id": _SAFE_ID, "period": "2026-01-14", "value": 6.0}, + ] + ) + + result = dbnomics_source.update(_empty_dfq(), dff) + values = result.resolution_files[_SAFE_ID]["value"].tolist() + + assert "N/A" in values # "NA" rewritten + assert "NA" not in values + assert "6.0" in values # numeric value preserved as a string (legacy parity)