Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions src/orchestration/_source_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,24 @@ def write_fetch_output(source: str, dff: pd.DataFrame) -> None:
)


def list_existing_resolution_ids(source: str) -> set[str]:
"""Return the set of bare question IDs that already have a resolution file.

Lists <source>/*.jsonl and strips to bare IDs. This is a pure existence
listing: it includes files whose contents are empty, mirroring the old
inline list_with_prefix check. Do NOT derive this from
load_existing_resolution_files, which drops empty frames.

Args:
source (str): Source name (e.g. "infer").

Returns:
set of bare question IDs present in storage for this source.
"""
paths = gcp.storage.list_with_prefix(bucket_name=env.QUESTION_BANK_BUCKET, prefix=f"{source}/")
return {os.path.basename(p).removesuffix(".jsonl") for p in paths if p.endswith(".jsonl")}


def load_existing_resolution_files(
source: str,
ids: Iterable[str] | None = None,
Expand All @@ -52,12 +70,7 @@ def load_existing_resolution_files(
dict mapping question_id to its resolution DataFrame.
"""
if ids is None:
paths = gcp.storage.list_with_prefix(
bucket_name=env.QUESTION_BANK_BUCKET, prefix=f"{source}/"
)
question_ids = [
os.path.basename(p).removesuffix(".jsonl") for p in paths if p.endswith(".jsonl")
]
question_ids = list(list_existing_resolution_ids(source))
else:
question_ids = [str(qid) for qid in ids]

Expand Down
9 changes: 3 additions & 6 deletions src/orchestration/func_infer_fetch/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@
import logging
from typing import Any

from helpers import data_utils, decorator, env, keys
from helpers import data_utils, decorator, keys
from orchestration import _source_io
from sources.infer import InferSource
from utils import gcp

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
Expand All @@ -23,11 +22,9 @@ def driver(_: Any) -> None:
source.api_key = keys.API_KEY_INFER

dfq = data_utils.get_data_from_cloud_storage(SOURCE, return_question_data=True)
files_in_storage = gcp.storage.list_with_prefix(
bucket_name=env.QUESTION_BANK_BUCKET, prefix=SOURCE
)
existing_resolution_ids = _source_io.list_existing_resolution_ids(SOURCE)

dff = source.fetch(dfq=dfq, files_in_storage=files_in_storage)
dff = source.fetch(dfq=dfq, existing_resolution_ids=existing_resolution_ids)

_source_io.write_fetch_output(SOURCE, dff)
logger.info("Done.")
Expand Down
9 changes: 3 additions & 6 deletions src/orchestration/func_manifold_update/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@
import logging
from typing import Any

from helpers import data_utils, decorator, env
from helpers import data_utils, decorator
from orchestration import _source_io
from sources.manifold import ManifoldSource
from utils import gcp

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
Expand All @@ -32,15 +31,13 @@ def driver(_: Any) -> None:
existing_resolution_files = _source_io.load_existing_resolution_files(SOURCE)
logger.info(f"Loaded {len(existing_resolution_files)} resolution files")

files_in_storage = gcp.storage.list_with_prefix(
bucket_name=env.QUESTION_BANK_BUCKET, prefix=SOURCE
)
existing_resolution_ids = _source_io.list_existing_resolution_ids(SOURCE)

result = source.update(
dfq,
dff,
existing_resolution_files=existing_resolution_files,
files_in_storage=files_in_storage,
existing_resolution_ids=existing_resolution_ids,
)

logger.info("Uploading to GCP...")
Expand Down
9 changes: 3 additions & 6 deletions src/orchestration/func_metaculus_update/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@
import logging
from typing import Any

from helpers import data_utils, decorator, env, keys
from helpers import data_utils, decorator, keys
from orchestration import _source_io
from sources.metaculus import MetaculusSource
from utils import gcp

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
Expand All @@ -26,14 +25,12 @@ def driver(_: Any) -> None:
SOURCE, return_question_data=True, return_fetch_data=True
)

files_in_storage = gcp.storage.list_with_prefix(
bucket_name=env.QUESTION_BANK_BUCKET, prefix=SOURCE
)
existing_resolution_ids = _source_io.list_existing_resolution_ids(SOURCE)

result = source.update(
dfq,
dff,
files_in_storage=files_in_storage,
existing_resolution_ids=existing_resolution_ids,
)

logger.info("Uploading to GCP...")
Expand Down
17 changes: 9 additions & 8 deletions src/sources/infer.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,17 @@ def fetch(
self,
*,
dfq: DataFrame[QuestionFrame] | None = None,
files_in_storage: list[str] | None = None,
existing_resolution_ids: set[str] | None = None,
) -> DataFrame[InferFetchFrame]:
"""Fetch questions from the INFER API.

Args:
dfq (DataFrame[QuestionFrame] | None): Existing question bank.
files_in_storage (list[str] | None): Existing resolution file paths.
existing_resolution_ids (set[str] | None): Bare IDs that already have a resolution
file in storage.
"""
self._require_api_key()
files_in_storage = files_in_storage or []
existing_resolution_ids = existing_resolution_ids or set()

# Determine which existing questions need re-fetching
resolved_ids: list[str] = []
Expand All @@ -62,7 +63,7 @@ def fetch(
logger.info(f"Number unresolved_ids: {len(unresolved_ids)}")

resolved_ids_without_files = [
id for id in resolved_ids if f"{self.name}/{id}.jsonl" not in files_in_storage
id for id in resolved_ids if str(id) not in existing_resolution_ids
]
logger.info(f"resolved_ids_without_resolution_files: {resolved_ids_without_files}")

Expand Down Expand Up @@ -128,7 +129,7 @@ def update(

# Build/update resolution file
existing_df = existing_resolution_files.get(question_id)
df_res = self._build_resolution_file(
df_res = self._build_resolution_df(
question=question,
resolved=question["resolved"],
existing_df=existing_df,
Expand Down Expand Up @@ -319,7 +320,7 @@ def _get_historical_forecasts(
# Private: resolution file building
# ------------------------------------------------------------------

def _build_resolution_file(
def _build_resolution_df(
self,
question: dict,
resolved: bool,
Expand Down Expand Up @@ -386,14 +387,14 @@ def _build_resolution_file(

@staticmethod
def _finalize_resolution_df(df: pd.DataFrame) -> DataFrame[ResolutionFrame]:
"""Apply date filtering and return as validated ResolutionFrame.
"""Apply date filtering and select resolution columns.

Args:
df (pd.DataFrame): Raw resolution data with id, date, value columns.
"""
df["date"] = pd.to_datetime(df["date"])
df = df[df["date"].dt.date >= constants.BENCHMARK_START_DATE_DATETIME_DATE]
return ResolutionFrame.validate(df[["id", "date", "value"]])
return df[["id", "date", "value"]].astype(dtype=constants.RESOLUTION_FILE_COLUMN_DTYPE)

# ------------------------------------------------------------------
# Private: question transformation
Expand Down
24 changes: 9 additions & 15 deletions src/sources/manifold.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def update(
dff: DataFrame[ManifoldFetchFrame],
*,
existing_resolution_files: dict[str, DataFrame[ResolutionFrame]] | None = None,
files_in_storage: list[str] | None = None,
existing_resolution_ids: set[str] | None = None,
) -> UpdateResult:
"""Process fetched IDs into updated questions and resolution files.

Expand All @@ -124,10 +124,11 @@ def update(
dfq (DataFrame[QuestionFrame]): Existing questions.
dff (DataFrame[ManifoldFetchFrame]): Freshly fetched market IDs.
existing_resolution_files (dict | None): Per-question existing resolution data.
files_in_storage (list[str] | None): Existing resolution file paths in storage.
existing_resolution_ids (set[str] | None): Bare IDs that already have a resolution
file in storage.
"""
existing_resolution_files = existing_resolution_files or {}
files_in_storage = files_in_storage or []
existing_resolution_ids = existing_resolution_ids or set()
resolution_files: dict[str, pd.DataFrame] = {}

# --- Append new IDs from dff to dfq ---
Expand Down Expand Up @@ -166,7 +167,7 @@ def update(

# Build resolution file
existing_df = existing_resolution_files.get(row["id"])
df_res = self._build_resolution_file(
df_res = self._build_resolution_df(
market=market,
market_info_resolution_datetime=dfq.at[index, "market_info_resolution_datetime"],
existing_df=existing_df,
Expand All @@ -182,10 +183,9 @@ def update(

# --- Regenerate missing resolution files for resolved questions ---
for _index, row in dfq[dfq["resolved"]].iterrows():
filename = f"{self.name}/{row['id']}.jsonl"
if filename not in files_in_storage and row["id"] not in resolution_files:
if str(row["id"]) not in existing_resolution_ids and row["id"] not in resolution_files:
market = self._get_market(row["id"])
df_res = self._build_resolution_file(
df_res = self._build_resolution_df(
market=market,
market_info_resolution_datetime=row["market_info_resolution_datetime"],
existing_df=None,
Expand Down Expand Up @@ -331,7 +331,7 @@ def _get_market_bets(self, market_id: str) -> list[dict]:
# Private: resolution file building
# ------------------------------------------------------------------

def _build_resolution_file(
def _build_resolution_df(
self,
market: dict,
market_info_resolution_datetime: str,
Expand Down Expand Up @@ -403,15 +403,9 @@ def _build_resolution_file(
df = df.ffill()

df["id"] = market_id
return self._finalize_resolution_df(df)

@staticmethod
def _finalize_resolution_df(df: pd.DataFrame) -> DataFrame[ResolutionFrame]:
"""Filter to benchmark period and validate as ResolutionFrame."""
df["date"] = pd.to_datetime(df["date"])
df = df[df["date"].dt.date >= constants.BENCHMARK_START_DATE_DATETIME_DATE]
df = df[["id", "date", "value"]].astype(dtype=constants.RESOLUTION_FILE_COLUMN_DTYPE)
return ResolutionFrame.validate(df)
return df[["id", "date", "value"]].astype(dtype=constants.RESOLUTION_FILE_COLUMN_DTYPE)

@staticmethod
def _get_resolved_market_value(market: dict) -> float:
Expand Down
36 changes: 10 additions & 26 deletions src/sources/metaculus.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def update(
dfq: DataFrame[QuestionFrame],
dff: DataFrame[MetaculusFetchFrame],
*,
files_in_storage: list[str] | None = None,
existing_resolution_ids: set[str] | None = None,
) -> UpdateResult:
"""Fetch full question data and build resolution files.

Expand All @@ -106,8 +106,8 @@ def update(
Args:
dfq (DataFrame[QuestionFrame]): Existing questions.
dff (DataFrame[MetaculusFetchFrame]): Discovered question IDs from fetch().
files_in_storage (list[str] | None): Existing resolution file paths in storage,
used to decide which resolved questions need regenerating.
existing_resolution_ids (set[str] | None): Bare IDs that already have a resolution
file in storage, used to decide which resolved questions need regenerating.
"""
self._require_api_key()
resolution_files: dict[str, pd.DataFrame] = {}
Expand Down Expand Up @@ -157,7 +157,7 @@ def update(
dfq.at[index, "forecast_horizons"] = "N/A"

# Build resolution file
df_res = self._create_resolution_file(dfq, index, market)
df_res = self._build_resolution_df(dfq, index, market)
if df_res is not None:
resolution_files[str(row["id"])] = df_res
dfq.at[index, "freeze_datetime_value"] = (
Expand All @@ -167,13 +167,12 @@ def update(
dfq.at[index, "freeze_datetime_value"] = "N/A"

# Regenerate missing resolution files for resolved questions
files_in_storage = files_in_storage or []
existing_resolution_ids = existing_resolution_ids or set()
for index, row in dfq[dfq["resolved"]].iterrows():
question_id = str(row["id"])
filename = f"{self.name}/{question_id}.jsonl"
if filename not in files_in_storage and question_id not in resolution_files:
if question_id not in existing_resolution_ids and question_id not in resolution_files:
market = self._get_market(row["id"])
df_res = self._create_resolution_file(dfq, index, market)
df_res = self._build_resolution_df(dfq, index, market)
if df_res is not None:
resolution_files[question_id] = df_res

Expand Down Expand Up @@ -325,12 +324,12 @@ def _get_resolved_market_value(market: dict) -> float:
return 0
return np.nan

def _create_resolution_file(
def _build_resolution_df(
self,
dfq: pd.DataFrame,
index: int,
market: dict,
) -> pd.DataFrame | None:
) -> DataFrame[ResolutionFrame] | None:
"""Build the resolution file for a market from its aggregation history.

Overwrites the resolution file entirely on each run (Metaculus returns the
Expand Down Expand Up @@ -440,20 +439,5 @@ def set_date(end_datetime):
}

df["id"] = str(market["id"])
df = df[["id", "date", "value"]]

return self._finalize_resolution_df(df)

@staticmethod
def _finalize_resolution_df(df: pd.DataFrame) -> DataFrame[ResolutionFrame]:
"""Cast types and return as a validated ResolutionFrame.

Unlike infer/manifold, Metaculus does not filter to the benchmark start date:
the aggregation history is already bounded by the question's open window, and
this preserves the legacy job's output exactly.

Args:
df (pd.DataFrame): Raw resolution data with id, date, value columns.
"""
df = df[["id", "date", "value"]].astype(dtype=constants.RESOLUTION_FILE_COLUMN_DTYPE)
return ResolutionFrame.validate(df)
return df[["id", "date", "value"]].astype(dtype=constants.RESOLUTION_FILE_COLUMN_DTYPE)
7 changes: 3 additions & 4 deletions src/sources/polymarket.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ def update(
question_id = str(question["id"])

# Build the resolution file from the embedded historical prices.
resolution_files[question_id] = self._build_resolution_file(question)
resolution_files[question_id] = self._build_resolution_df(question)

# Strip transient fields (not part of QuestionFrame).
del question["fetch_datetime"]
Expand Down Expand Up @@ -538,13 +538,12 @@ def _transform_question(
# Private: resolution file building
# ------------------------------------------------------------------

def _build_resolution_file(self, question: dict) -> DataFrame[ResolutionFrame]:
def _build_resolution_df(self, question: dict) -> DataFrame[ResolutionFrame]:
"""Build a resolution file from a fetched question's embedded historical prices.

Args:
question (dict): A PolymarketFetchFrame row with a ``historical_prices`` list.
"""
df = pd.DataFrame(question["historical_prices"])
df["id"] = question["id"]
df = df[["id", "date", "value"]].astype(dtype=constants.RESOLUTION_FILE_COLUMN_DTYPE)
return ResolutionFrame.validate(df)
return df[["id", "date", "value"]].astype(dtype=constants.RESOLUTION_FILE_COLUMN_DTYPE)
Loading
Loading