From 0f091ee85960b6171725aeac65d2ce0679d1f084 Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Fri, 5 Jun 2026 21:19:42 +0300 Subject: [PATCH] refactor: harmonize resolution-file existence checks across market sources --- src/orchestration/_source_io.py | 25 ++++++++++++++----- src/orchestration/func_infer_fetch/main.py | 9 +++---- .../func_manifold_update/main.py | 9 +++---- .../func_metaculus_update/main.py | 9 +++---- src/sources/infer.py | 9 ++++--- src/sources/manifold.py | 10 ++++---- src/sources/metaculus.py | 11 ++++---- src/tests/test_infer.py | 6 ++--- src/tests/test_manifold.py | 4 +-- src/tests/test_metaculus.py | 2 +- 10 files changed, 49 insertions(+), 45 deletions(-) diff --git a/src/orchestration/_source_io.py b/src/orchestration/_source_io.py index 10a7d436..a28a6a7f 100644 --- a/src/orchestration/_source_io.py +++ b/src/orchestration/_source_io.py @@ -34,6 +34,24 @@ def write_fetch_output(source: str, dff: pd.DataFrame) -> None: ) +def list_existing_resolution_ids(source: str) -> set[str]: + """Return the set of bare question IDs that already have a resolution file. + + Lists /*.jsonl and strips to bare IDs. This is a pure existence + listing: it includes files whose contents are empty, mirroring the old + inline list_with_prefix check. Do NOT derive this from + load_existing_resolution_files, which drops empty frames. + + Args: + source (str): Source name (e.g. "infer"). + + Returns: + set of bare question IDs present in storage for this source. + """ + paths = gcp.storage.list_with_prefix(bucket_name=env.QUESTION_BANK_BUCKET, prefix=f"{source}/") + return {os.path.basename(p).removesuffix(".jsonl") for p in paths if p.endswith(".jsonl")} + + def load_existing_resolution_files( source: str, ids: Iterable[str] | None = None, @@ -52,12 +70,7 @@ def load_existing_resolution_files( dict mapping question_id to its resolution DataFrame. """ if ids is None: - paths = gcp.storage.list_with_prefix( - bucket_name=env.QUESTION_BANK_BUCKET, prefix=f"{source}/" - ) - question_ids = [ - os.path.basename(p).removesuffix(".jsonl") for p in paths if p.endswith(".jsonl") - ] + question_ids = list(list_existing_resolution_ids(source)) else: question_ids = [str(qid) for qid in ids] diff --git a/src/orchestration/func_infer_fetch/main.py b/src/orchestration/func_infer_fetch/main.py index f39baf1e..968feb87 100644 --- a/src/orchestration/func_infer_fetch/main.py +++ b/src/orchestration/func_infer_fetch/main.py @@ -5,10 +5,9 @@ import logging from typing import Any -from helpers import data_utils, decorator, env, keys +from helpers import data_utils, decorator, keys from orchestration import _source_io from sources.infer import InferSource -from utils import gcp logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -23,11 +22,9 @@ def driver(_: Any) -> None: source.api_key = keys.API_KEY_INFER dfq = data_utils.get_data_from_cloud_storage(SOURCE, return_question_data=True) - files_in_storage = gcp.storage.list_with_prefix( - bucket_name=env.QUESTION_BANK_BUCKET, prefix=SOURCE - ) + existing_resolution_ids = _source_io.list_existing_resolution_ids(SOURCE) - dff = source.fetch(dfq=dfq, files_in_storage=files_in_storage) + dff = source.fetch(dfq=dfq, existing_resolution_ids=existing_resolution_ids) _source_io.write_fetch_output(SOURCE, dff) logger.info("Done.") diff --git a/src/orchestration/func_manifold_update/main.py b/src/orchestration/func_manifold_update/main.py index a152a0dc..8279dc19 100644 --- a/src/orchestration/func_manifold_update/main.py +++ b/src/orchestration/func_manifold_update/main.py @@ -5,10 +5,9 @@ import logging from typing import Any -from helpers import data_utils, decorator, env +from helpers import data_utils, decorator from orchestration import _source_io from sources.manifold import ManifoldSource -from utils import gcp logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -32,15 +31,13 @@ def driver(_: Any) -> None: existing_resolution_files = _source_io.load_existing_resolution_files(SOURCE) logger.info(f"Loaded {len(existing_resolution_files)} resolution files") - files_in_storage = gcp.storage.list_with_prefix( - bucket_name=env.QUESTION_BANK_BUCKET, prefix=SOURCE - ) + existing_resolution_ids = _source_io.list_existing_resolution_ids(SOURCE) result = source.update( dfq, dff, existing_resolution_files=existing_resolution_files, - files_in_storage=files_in_storage, + existing_resolution_ids=existing_resolution_ids, ) logger.info("Uploading to GCP...") diff --git a/src/orchestration/func_metaculus_update/main.py b/src/orchestration/func_metaculus_update/main.py index b013829a..63baa0e3 100644 --- a/src/orchestration/func_metaculus_update/main.py +++ b/src/orchestration/func_metaculus_update/main.py @@ -5,10 +5,9 @@ import logging from typing import Any -from helpers import data_utils, decorator, env, keys +from helpers import data_utils, decorator, keys from orchestration import _source_io from sources.metaculus import MetaculusSource -from utils import gcp logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -26,14 +25,12 @@ def driver(_: Any) -> None: SOURCE, return_question_data=True, return_fetch_data=True ) - files_in_storage = gcp.storage.list_with_prefix( - bucket_name=env.QUESTION_BANK_BUCKET, prefix=SOURCE - ) + existing_resolution_ids = _source_io.list_existing_resolution_ids(SOURCE) result = source.update( dfq, dff, - files_in_storage=files_in_storage, + existing_resolution_ids=existing_resolution_ids, ) logger.info("Uploading to GCP...") diff --git a/src/sources/infer.py b/src/sources/infer.py index 7cf88c10..29c4fbd4 100644 --- a/src/sources/infer.py +++ b/src/sources/infer.py @@ -40,16 +40,17 @@ def fetch( self, *, dfq: DataFrame[QuestionFrame] | None = None, - files_in_storage: list[str] | None = None, + existing_resolution_ids: set[str] | None = None, ) -> DataFrame[InferFetchFrame]: """Fetch questions from the INFER API. Args: dfq (DataFrame[QuestionFrame] | None): Existing question bank. - files_in_storage (list[str] | None): Existing resolution file paths. + existing_resolution_ids (set[str] | None): Bare IDs that already have a resolution + file in storage. """ self._require_api_key() - files_in_storage = files_in_storage or [] + existing_resolution_ids = existing_resolution_ids or set() # Determine which existing questions need re-fetching resolved_ids: list[str] = [] @@ -62,7 +63,7 @@ def fetch( logger.info(f"Number unresolved_ids: {len(unresolved_ids)}") resolved_ids_without_files = [ - id for id in resolved_ids if f"{self.name}/{id}.jsonl" not in files_in_storage + id for id in resolved_ids if str(id) not in existing_resolution_ids ] logger.info(f"resolved_ids_without_resolution_files: {resolved_ids_without_files}") diff --git a/src/sources/manifold.py b/src/sources/manifold.py index 9adbdf01..d2aefdf7 100644 --- a/src/sources/manifold.py +++ b/src/sources/manifold.py @@ -112,7 +112,7 @@ def update( dff: DataFrame[ManifoldFetchFrame], *, existing_resolution_files: dict[str, DataFrame[ResolutionFrame]] | None = None, - files_in_storage: list[str] | None = None, + existing_resolution_ids: set[str] | None = None, ) -> UpdateResult: """Process fetched IDs into updated questions and resolution files. @@ -124,10 +124,11 @@ def update( dfq (DataFrame[QuestionFrame]): Existing questions. dff (DataFrame[ManifoldFetchFrame]): Freshly fetched market IDs. existing_resolution_files (dict | None): Per-question existing resolution data. - files_in_storage (list[str] | None): Existing resolution file paths in storage. + existing_resolution_ids (set[str] | None): Bare IDs that already have a resolution + file in storage. """ existing_resolution_files = existing_resolution_files or {} - files_in_storage = files_in_storage or [] + existing_resolution_ids = existing_resolution_ids or set() resolution_files: dict[str, pd.DataFrame] = {} # --- Append new IDs from dff to dfq --- @@ -182,8 +183,7 @@ def update( # --- Regenerate missing resolution files for resolved questions --- for _index, row in dfq[dfq["resolved"]].iterrows(): - filename = f"{self.name}/{row['id']}.jsonl" - if filename not in files_in_storage and row["id"] not in resolution_files: + if str(row["id"]) not in existing_resolution_ids and row["id"] not in resolution_files: market = self._get_market(row["id"]) df_res = self._build_resolution_file( market=market, diff --git a/src/sources/metaculus.py b/src/sources/metaculus.py index c0eec80d..6504a578 100644 --- a/src/sources/metaculus.py +++ b/src/sources/metaculus.py @@ -93,7 +93,7 @@ def update( dfq: DataFrame[QuestionFrame], dff: DataFrame[MetaculusFetchFrame], *, - files_in_storage: list[str] | None = None, + existing_resolution_ids: set[str] | None = None, ) -> UpdateResult: """Fetch full question data and build resolution files. @@ -106,8 +106,8 @@ def update( Args: dfq (DataFrame[QuestionFrame]): Existing questions. dff (DataFrame[MetaculusFetchFrame]): Discovered question IDs from fetch(). - files_in_storage (list[str] | None): Existing resolution file paths in storage, - used to decide which resolved questions need regenerating. + existing_resolution_ids (set[str] | None): Bare IDs that already have a resolution + file in storage, used to decide which resolved questions need regenerating. """ self._require_api_key() resolution_files: dict[str, pd.DataFrame] = {} @@ -167,11 +167,10 @@ def update( dfq.at[index, "freeze_datetime_value"] = "N/A" # Regenerate missing resolution files for resolved questions - files_in_storage = files_in_storage or [] + existing_resolution_ids = existing_resolution_ids or set() for index, row in dfq[dfq["resolved"]].iterrows(): question_id = str(row["id"]) - filename = f"{self.name}/{question_id}.jsonl" - if filename not in files_in_storage and question_id not in resolution_files: + if question_id not in existing_resolution_ids and question_id not in resolution_files: market = self._get_market(row["id"]) df_res = self._create_resolution_file(dfq, index, market) if df_res is not None: diff --git a/src/tests/test_infer.py b/src/tests/test_infer.py index e382c146..3443a467 100644 --- a/src/tests/test_infer.py +++ b/src/tests/test_infer.py @@ -337,7 +337,7 @@ def test_deduplication_active_wins(self, mock_api, infer_source): [make_infer_api_question(id=100, state="active")], # active fetch ] dfq = make_question_df([{"id": "100", "resolved": False}]) - dff = infer_source.fetch(dfq=dfq, files_in_storage=[]) + dff = infer_source.fetch(dfq=dfq, existing_resolution_ids=set()) assert len(dff) == 1 @@ -350,7 +350,7 @@ def test_resolved_without_files_refetched(self, mock_api, infer_source): ] dfq = make_question_df([{"id": "100", "resolved": True}]) # No resolution file in storage → should re-fetch - dff = infer_source.fetch(dfq=dfq, files_in_storage=[]) + dff = infer_source.fetch(dfq=dfq, existing_resolution_ids=set()) assert len(dff) == 1 mock_api.assert_any_call(status="all", question_ids=["100"]) @@ -361,7 +361,7 @@ def test_empty_dfq(self, mock_api, infer_source): mock_api.side_effect = [ [make_infer_api_question(id=300)], ] - dff = infer_source.fetch(dfq=None, files_in_storage=[]) + dff = infer_source.fetch(dfq=None, existing_resolution_ids=set()) assert len(dff) == 1 def test_api_key_required(self): diff --git a/src/tests/test_manifold.py b/src/tests/test_manifold.py index 58332a2e..9c1f85b9 100644 --- a/src/tests/test_manifold.py +++ b/src/tests/test_manifold.py @@ -514,7 +514,7 @@ def test_regenerates_missing_resolved_files(self, mock_market, mock_build, manif ) dff = make_manifold_fetch_df([{"id": "mkt_001"}]) - result = manifold_source.update(dfq, dff, files_in_storage=[]) + result = manifold_source.update(dfq, dff, existing_resolution_ids=set()) assert "mkt_001" in result.resolution_files @@ -533,7 +533,7 @@ def test_skips_resolved_already_in_storage(self, mock_market, mock_build, manifo ) dff = make_manifold_fetch_df([{"id": "mkt_001"}]) - result = manifold_source.update(dfq, dff, files_in_storage=["manifold/mkt_001.jsonl"]) + result = manifold_source.update(dfq, dff, existing_resolution_ids={"mkt_001"}) # _get_market should not be called for the resolved question mock_market.assert_not_called() diff --git a/src/tests/test_metaculus.py b/src/tests/test_metaculus.py index 26f4448b..8e0df704 100644 --- a/src/tests/test_metaculus.py +++ b/src/tests/test_metaculus.py @@ -912,7 +912,7 @@ def test_resolved_with_existing_file_not_regenerated( ) dff = make_metaculus_fetch_df([]) - metaculus_source.update(dfq, dff, files_in_storage=["metaculus/42472.jsonl"]) + metaculus_source.update(dfq, dff, existing_resolution_ids={"42472"}) mock_market.assert_not_called() mock_res.assert_not_called()