Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions src/orchestration/_source_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,24 @@ def write_fetch_output(source: str, dff: pd.DataFrame) -> None:
)


def list_existing_resolution_ids(source: str) -> set[str]:
"""Return the set of bare question IDs that already have a resolution file.

Lists <source>/*.jsonl and strips to bare IDs. This is a pure existence
listing: it includes files whose contents are empty, mirroring the old
inline list_with_prefix check. Do NOT derive this from
load_existing_resolution_files, which drops empty frames.

Args:
source (str): Source name (e.g. "infer").

Returns:
set of bare question IDs present in storage for this source.
"""
paths = gcp.storage.list_with_prefix(bucket_name=env.QUESTION_BANK_BUCKET, prefix=f"{source}/")
return {os.path.basename(p).removesuffix(".jsonl") for p in paths if p.endswith(".jsonl")}


def load_existing_resolution_files(
source: str,
ids: Iterable[str] | None = None,
Expand All @@ -52,12 +70,7 @@ def load_existing_resolution_files(
dict mapping question_id to its resolution DataFrame.
"""
if ids is None:
paths = gcp.storage.list_with_prefix(
bucket_name=env.QUESTION_BANK_BUCKET, prefix=f"{source}/"
)
question_ids = [
os.path.basename(p).removesuffix(".jsonl") for p in paths if p.endswith(".jsonl")
]
question_ids = list(list_existing_resolution_ids(source))
else:
question_ids = [str(qid) for qid in ids]

Expand Down
9 changes: 3 additions & 6 deletions src/orchestration/func_infer_fetch/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@
import logging
from typing import Any

from helpers import data_utils, decorator, env, keys
from helpers import data_utils, decorator, keys
from orchestration import _source_io
from sources.infer import InferSource
from utils import gcp

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
Expand All @@ -23,11 +22,9 @@ def driver(_: Any) -> None:
source.api_key = keys.API_KEY_INFER

dfq = data_utils.get_data_from_cloud_storage(SOURCE, return_question_data=True)
files_in_storage = gcp.storage.list_with_prefix(
bucket_name=env.QUESTION_BANK_BUCKET, prefix=SOURCE
)
existing_resolution_ids = _source_io.list_existing_resolution_ids(SOURCE)

dff = source.fetch(dfq=dfq, files_in_storage=files_in_storage)
dff = source.fetch(dfq=dfq, existing_resolution_ids=existing_resolution_ids)

_source_io.write_fetch_output(SOURCE, dff)
logger.info("Done.")
Expand Down
9 changes: 3 additions & 6 deletions src/orchestration/func_manifold_update/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@
import logging
from typing import Any

from helpers import data_utils, decorator, env
from helpers import data_utils, decorator
from orchestration import _source_io
from sources.manifold import ManifoldSource
from utils import gcp

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
Expand All @@ -32,15 +31,13 @@ def driver(_: Any) -> None:
existing_resolution_files = _source_io.load_existing_resolution_files(SOURCE)
logger.info(f"Loaded {len(existing_resolution_files)} resolution files")

files_in_storage = gcp.storage.list_with_prefix(
bucket_name=env.QUESTION_BANK_BUCKET, prefix=SOURCE
)
existing_resolution_ids = _source_io.list_existing_resolution_ids(SOURCE)

result = source.update(
dfq,
dff,
existing_resolution_files=existing_resolution_files,
files_in_storage=files_in_storage,
existing_resolution_ids=existing_resolution_ids,
)

logger.info("Uploading to GCP...")
Expand Down
9 changes: 3 additions & 6 deletions src/orchestration/func_metaculus_update/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@
import logging
from typing import Any

from helpers import data_utils, decorator, env, keys
from helpers import data_utils, decorator, keys
from orchestration import _source_io
from sources.metaculus import MetaculusSource
from utils import gcp

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
Expand All @@ -26,14 +25,12 @@ def driver(_: Any) -> None:
SOURCE, return_question_data=True, return_fetch_data=True
)

files_in_storage = gcp.storage.list_with_prefix(
bucket_name=env.QUESTION_BANK_BUCKET, prefix=SOURCE
)
existing_resolution_ids = _source_io.list_existing_resolution_ids(SOURCE)

result = source.update(
dfq,
dff,
files_in_storage=files_in_storage,
existing_resolution_ids=existing_resolution_ids,
)

logger.info("Uploading to GCP...")
Expand Down
9 changes: 5 additions & 4 deletions src/sources/infer.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,17 @@ def fetch(
self,
*,
dfq: DataFrame[QuestionFrame] | None = None,
files_in_storage: list[str] | None = None,
existing_resolution_ids: set[str] | None = None,
) -> DataFrame[InferFetchFrame]:
"""Fetch questions from the INFER API.

Args:
dfq (DataFrame[QuestionFrame] | None): Existing question bank.
files_in_storage (list[str] | None): Existing resolution file paths.
existing_resolution_ids (set[str] | None): Bare IDs that already have a resolution
file in storage.
"""
self._require_api_key()
files_in_storage = files_in_storage or []
existing_resolution_ids = existing_resolution_ids or set()

# Determine which existing questions need re-fetching
resolved_ids: list[str] = []
Expand All @@ -62,7 +63,7 @@ def fetch(
logger.info(f"Number unresolved_ids: {len(unresolved_ids)}")

resolved_ids_without_files = [
id for id in resolved_ids if f"{self.name}/{id}.jsonl" not in files_in_storage
id for id in resolved_ids if str(id) not in existing_resolution_ids
]
logger.info(f"resolved_ids_without_resolution_files: {resolved_ids_without_files}")

Expand Down
10 changes: 5 additions & 5 deletions src/sources/manifold.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def update(
dff: DataFrame[ManifoldFetchFrame],
*,
existing_resolution_files: dict[str, DataFrame[ResolutionFrame]] | None = None,
files_in_storage: list[str] | None = None,
existing_resolution_ids: set[str] | None = None,
) -> UpdateResult:
"""Process fetched IDs into updated questions and resolution files.

Expand All @@ -124,10 +124,11 @@ def update(
dfq (DataFrame[QuestionFrame]): Existing questions.
dff (DataFrame[ManifoldFetchFrame]): Freshly fetched market IDs.
existing_resolution_files (dict | None): Per-question existing resolution data.
files_in_storage (list[str] | None): Existing resolution file paths in storage.
existing_resolution_ids (set[str] | None): Bare IDs that already have a resolution
file in storage.
"""
existing_resolution_files = existing_resolution_files or {}
files_in_storage = files_in_storage or []
existing_resolution_ids = existing_resolution_ids or set()
resolution_files: dict[str, pd.DataFrame] = {}

# --- Append new IDs from dff to dfq ---
Expand Down Expand Up @@ -182,8 +183,7 @@ def update(

# --- Regenerate missing resolution files for resolved questions ---
for _index, row in dfq[dfq["resolved"]].iterrows():
filename = f"{self.name}/{row['id']}.jsonl"
if filename not in files_in_storage and row["id"] not in resolution_files:
if str(row["id"]) not in existing_resolution_ids and row["id"] not in resolution_files:
market = self._get_market(row["id"])
df_res = self._build_resolution_file(
market=market,
Expand Down
11 changes: 5 additions & 6 deletions src/sources/metaculus.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def update(
dfq: DataFrame[QuestionFrame],
dff: DataFrame[MetaculusFetchFrame],
*,
files_in_storage: list[str] | None = None,
existing_resolution_ids: set[str] | None = None,
) -> UpdateResult:
"""Fetch full question data and build resolution files.

Expand All @@ -106,8 +106,8 @@ def update(
Args:
dfq (DataFrame[QuestionFrame]): Existing questions.
dff (DataFrame[MetaculusFetchFrame]): Discovered question IDs from fetch().
files_in_storage (list[str] | None): Existing resolution file paths in storage,
used to decide which resolved questions need regenerating.
existing_resolution_ids (set[str] | None): Bare IDs that already have a resolution
file in storage, used to decide which resolved questions need regenerating.
"""
self._require_api_key()
resolution_files: dict[str, pd.DataFrame] = {}
Expand Down Expand Up @@ -167,11 +167,10 @@ def update(
dfq.at[index, "freeze_datetime_value"] = "N/A"

# Regenerate missing resolution files for resolved questions
files_in_storage = files_in_storage or []
existing_resolution_ids = existing_resolution_ids or set()
for index, row in dfq[dfq["resolved"]].iterrows():
question_id = str(row["id"])
filename = f"{self.name}/{question_id}.jsonl"
if filename not in files_in_storage and question_id not in resolution_files:
if question_id not in existing_resolution_ids and question_id not in resolution_files:
market = self._get_market(row["id"])
df_res = self._create_resolution_file(dfq, index, market)
if df_res is not None:
Expand Down
6 changes: 3 additions & 3 deletions src/tests/test_infer.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ def test_deduplication_active_wins(self, mock_api, infer_source):
[make_infer_api_question(id=100, state="active")], # active fetch
]
dfq = make_question_df([{"id": "100", "resolved": False}])
dff = infer_source.fetch(dfq=dfq, files_in_storage=[])
dff = infer_source.fetch(dfq=dfq, existing_resolution_ids=set())

assert len(dff) == 1

Expand All @@ -350,7 +350,7 @@ def test_resolved_without_files_refetched(self, mock_api, infer_source):
]
dfq = make_question_df([{"id": "100", "resolved": True}])
# No resolution file in storage → should re-fetch
dff = infer_source.fetch(dfq=dfq, files_in_storage=[])
dff = infer_source.fetch(dfq=dfq, existing_resolution_ids=set())

assert len(dff) == 1
mock_api.assert_any_call(status="all", question_ids=["100"])
Expand All @@ -361,7 +361,7 @@ def test_empty_dfq(self, mock_api, infer_source):
mock_api.side_effect = [
[make_infer_api_question(id=300)],
]
dff = infer_source.fetch(dfq=None, files_in_storage=[])
dff = infer_source.fetch(dfq=None, existing_resolution_ids=set())
assert len(dff) == 1

def test_api_key_required(self):
Expand Down
4 changes: 2 additions & 2 deletions src/tests/test_manifold.py
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ def test_regenerates_missing_resolved_files(self, mock_market, mock_build, manif
)
dff = make_manifold_fetch_df([{"id": "mkt_001"}])

result = manifold_source.update(dfq, dff, files_in_storage=[])
result = manifold_source.update(dfq, dff, existing_resolution_ids=set())

assert "mkt_001" in result.resolution_files

Expand All @@ -533,7 +533,7 @@ def test_skips_resolved_already_in_storage(self, mock_market, mock_build, manifo
)
dff = make_manifold_fetch_df([{"id": "mkt_001"}])

result = manifold_source.update(dfq, dff, files_in_storage=["manifold/mkt_001.jsonl"])
result = manifold_source.update(dfq, dff, existing_resolution_ids={"mkt_001"})

# _get_market should not be called for the resolved question
mock_market.assert_not_called()
Expand Down
2 changes: 1 addition & 1 deletion src/tests/test_metaculus.py
Original file line number Diff line number Diff line change
Expand Up @@ -912,7 +912,7 @@ def test_resolved_with_existing_file_not_regenerated(
)
dff = make_metaculus_fetch_df([])

metaculus_source.update(dfq, dff, files_in_storage=["metaculus/42472.jsonl"])
metaculus_source.update(dfq, dff, existing_resolution_ids={"42472"})

mock_market.assert_not_called()
mock_res.assert_not_called()
Expand Down
Loading