diff --git a/src/external-submissions/Makefile b/src/external-submissions/Makefile new file mode 100644 index 00000000..95d7001c --- /dev/null +++ b/src/external-submissions/Makefile @@ -0,0 +1,40 @@ +-include ../../variables.mk + +.PHONY: deploy-onboard setup-permissions clean + +REGION := us-central1 +SA_EMAIL ?= $(SUBMISSIONS_SA_EMAIL) +ROOT_DIR ?= ../../ + +FUNC_DIR = functions + +ONBOARD_ENV_VARS = \ + RUN_MODE=$(RUN_MODE),\ + SMTP_USER=$(SMTP_USER),\ + SMTP_PASSWORD=$(SMTP_PASSWORD),\ + UPLOAD_BUCKET=$(SUBMISSIONS_BUCKET),\ + NEXT_DUE_DATE=$(NEXT_DUE_DATE) + +deploy-onboard: + cp $(ROOT_DIR)src/helpers/email.py $(FUNC_DIR)/onboard/email_utils.py + gcloud functions deploy onboard-team \ + --gen2 \ + --project=$(CLOUD_PROJECT) \ + --region=$(REGION) \ + --runtime=python312 \ + --source=$(FUNC_DIR)/onboard \ + --entry-point=onboard \ + --trigger-http \ + --no-allow-unauthenticated \ + --service-account=$(SA_EMAIL) \ + --memory=256Mi \ + --timeout=120s \ + --set-env-vars=$(ONBOARD_ENV_VARS) + rm -f $(FUNC_DIR)/onboard/email_utils.py + +setup-permissions: + eval $$(cat ../../variables.mk | grep -v '^#' | xargs) \ + python $(FUNC_DIR)/setup_permissions.py + +clean: + rm -f $(FUNC_DIR)/onboard/email_utils.py diff --git a/src/external-submissions/PIPELINE.md b/src/external-submissions/PIPELINE.md new file mode 100644 index 00000000..3df2a8b1 --- /dev/null +++ b/src/external-submissions/PIPELINE.md @@ -0,0 +1,93 @@ +# ForecastBench External Submission Pipeline + +## Firestore — `teams` collection + +One document per team. + +| Field | Notes | +| --- | --- | +| `team_id` | `team1`, `team2`, ... — permanent internal ID, used as GCS folder name | +| `team_name` | Optional internal label (unique). Used to distinguish multiple teams from the same org (e.g. "GDM A", "GDM B"). Never shown publicly or in emails. | +| `organization` | Public name. `"Anonymous N"` if anonymous. | +| `deanonymized_organization` | Always the real org name. Never shown publicly. | +| `emails` | Used for IAM and email notifications | +| `service_accounts` | GCP service accounts for automated uploads. No emails sent to these. | +| `anonymous` | bool | +| `created_at` | Firestore server timestamp | +| `active` | bool — set to false on removal | + +### Counter document + +`counters/teams` holds `{team_count: N, anon_count: M}` for atomic ID allocation. +Initialize before first deployment (set N and M to the current team and anon counts): + +```python +db.collection("counters").document("teams").set({"team_count": N, "anon_count": M}) +``` + +--- + +## Register a new team + +POST to the `onboard-team` Cloud Function: + +```json +{ + "organization": "Acme Corp", + "team_name": "acme-a", + "emails": ["alice@acme.com", "bob@acme.com"], + "service_accounts": ["submissions@acme.iam.gserviceaccount.com"], + "anonymous": false +} +``` + +Fields: + +- `organization` (required) — real org name +- `team_name` (optional) — internal label, must be unique +- `emails` (required) — list of member addresses; must be Gmail/Google Workspace for GCS access +- `service_accounts` (optional) — GCP SAs; always get GCS access +- `anonymous` (optional, default false) — if true, public name becomes `"Anonymous N"` + +The function: + +1. Allocates the next `teamN` ID atomically via `counters/teams` +2. Creates a `gs:///teamN/.keep` placeholder +3. Grants `roles/storage.objectUser` + `roles/storage.objectViewer` on the `teamN/` prefix +4. Writes the Firestore document +5. Sends a welcome email to `emails` + +If any email is not a Google account, registration succeeds but a warning is returned — those members won't be able to upload to GCS directly. + +--- + +## Remove a team + +DELETE to the `onboard-team` Cloud Function: + +```json +{ "team_id": "team7" } +``` + +Revokes GCS access and marks the team inactive. IAM removal failure returns a 500 — the team is **not** deactivated if permissions cannot be revoked. + +--- + +## Deploy + +From `src/external-submissions/`: + +```bash +make deploy-onboard +``` + +Required variables in `variables.mk` (at repo root): + +```makefile +CLOUD_PROJECT=... +SUBMISSIONS_SA_EMAIL=... +SUBMISSIONS_BUCKET=... +SMTP_USER=... +SMTP_PASSWORD=... +NEXT_DUE_DATE=YYYY-MM-DD +``` diff --git a/src/external-submissions/functions/onboard/main.py b/src/external-submissions/functions/onboard/main.py new file mode 100644 index 00000000..8a843a5a --- /dev/null +++ b/src/external-submissions/functions/onboard/main.py @@ -0,0 +1,293 @@ +"""ForecastBench team onboarding handler. + +Each team gets a unique internal ID (team1, team2, ...) used as their GCS folder name. +One organization can have multiple teams — team_name is an optional internal label to +distinguish them (e.g. "GDM A", "GDM B"). It is never shown in emails. + +Accepts JSON POST with: + - organization: real org name (required, stored privately if anonymous) + - team_name: optional internal label (must be unique if provided) + - emails: list of member email addresses (required) + - service_accounts: list of GCP service accounts (optional, no emails sent) + - anonymous: bool (optional, default false) + +Environment variables: + - UPLOAD_BUCKET: GCS bucket name + - NEXT_DUE_DATE: next forecast due date (YYYY-MM-DD), included in welcome email +""" + +import json +import os +import traceback + +import dns.resolver +from email_utils import send_welcome +from google.cloud import firestore, storage + +UPLOAD_BUCKET = os.environ.get("UPLOAD_BUCKET", "") +NEXT_DUE_DATE = os.environ.get("NEXT_DUE_DATE", "") + +db = firestore.Client() +gcs = storage.Client() + + +@firestore.transactional +def _allocate_ids(transaction, counter_ref, anonymous): + """Atomically allocate the next team_id and (if anonymous) anon number.""" + snap = counter_ref.get(transaction=transaction) + data = snap.to_dict() if snap.exists else {"team_count": 0, "anon_count": 0} + team_n = data.get("team_count", 0) + 1 + anon_n = data.get("anon_count", 0) + (1 if anonymous else 0) + transaction.set(counter_ref, {"team_count": team_n, "anon_count": anon_n}) + return f"team{team_n}", anon_n + + +def _team_name_taken(db, team_name): + """Return True if any active team already has this internal label.""" + results = list( + db.collection("teams") + .where("team_name", "==", team_name) + .where("active", "==", True) + .stream() + ) + return len(results) > 0 + + +GOOGLE_MX_SUFFIXES = ("google.com", "googlemail.com") + + +def _is_google_account(email): + """Return True if the email is a Gmail or Google Workspace account.""" + if email.endswith("@gmail.com"): + return True + domain = email.split("@")[-1] + try: + records = dns.resolver.resolve(domain, "MX") + return any( + str(r.exchange).rstrip(".").endswith(suffix) + for r in records + for suffix in GOOGLE_MX_SUFFIXES + ) + except Exception: + return False + + +def _warn_non_google_emails(emails): + """Return a warning string if any emails are not Google accounts, else None.""" + non_google = [e for e in emails if not _is_google_account(e)] + if non_google: + return ( + f"{len(non_google)} email(s) do not appear to be Google accounts: {non_google}. " + f"GCS upload permissions require Gmail or Google Workspace accounts. " + f"See wiki for handling non-Google accounts." + ) + return None + + +def _set_folder_permissions(bucket_name, team_id, emails, service_accounts): + """Grant objectUser + objectViewer on the team's folder prefix only.""" + client = storage.Client() + bucket = client.bucket(bucket_name) + policy = bucket.get_iam_policy(requested_policy_version=3) + policy.version = 3 + + folder_prefix = f"projects/_/buckets/{bucket_name}/objects/{team_id}/" + + def _principal(email): + if email.endswith(".iam.gserviceaccount.com") or "gserviceaccount" in email: + return f"serviceAccount:{email}" + return f"user:{email}" + + members = set(_principal(e) for e in emails + service_accounts) + condition = { + "title": f"{team_id} folder access", + "expression": f'resource.name.startsWith("{folder_prefix}")', + } + for role in ("roles/storage.objectUser", "roles/storage.objectViewer"): + policy.bindings.append({"role": role, "members": members, "condition": condition}) + + bucket.set_iam_policy(policy) + + +def _remove_folder_permissions(bucket_name, team_id, emails, service_accounts): + """Remove all IAM bindings for a team's folder from the bucket.""" + client = storage.Client() + bucket = client.bucket(bucket_name) + policy = bucket.get_iam_policy(requested_policy_version=3) + policy.version = 3 + + folder_prefix = f"projects/_/buckets/{bucket_name}/objects/{team_id}/" + + def _principal(email): + if email.endswith(".iam.gserviceaccount.com") or "gserviceaccount" in email: + return f"serviceAccount:{email}" + return f"user:{email}" + + principals = set(_principal(e) for e in emails + service_accounts) + policy.bindings = [ + b + for b in policy.bindings + if not ( + folder_prefix in b.get("condition", {}).get("expression", "") + and principals & set(b.get("members", [])) + ) + ] + bucket.set_iam_policy(policy) + + +def onboard(request): + """Handle team onboarding (POST) and removal (DELETE).""" + if request.method == "OPTIONS": + return ( + "", + 204, + { + "Access-Control-Allow-Origin": "*", + "Access-Control-Allow-Methods": "POST, DELETE", + "Access-Control-Allow-Headers": "Content-Type", + }, + ) + + cors = {"Access-Control-Allow-Origin": "*"} + + if request.method == "DELETE": + return _handle_remove(request, cors) + return _handle_onboard(request, cors) + + +def _handle_remove(request, cors): + """Deactivate a team: revoke GCS access, then mark inactive in Firestore.""" + try: + data = request.get_json(force=True, silent=True) or {} + team_id = data.get("team_id", "").strip() + if not team_id: + return (json.dumps({"success": False, "error": "'team_id' is required"}), 400, cors) + + team_docs = list(db.collection("teams").where("team_id", "==", team_id).stream()) + if not team_docs: + return ( + json.dumps({"success": False, "error": f"Team '{team_id}' not found"}), + 404, + cors, + ) + + team = team_docs[0].to_dict() + emails = team.get("emails", []) + service_accounts = team.get("service_accounts", []) + + _remove_folder_permissions(UPLOAD_BUCKET, team_id, emails, service_accounts) + team_docs[0].reference.update({"active": False}) + + return ( + json.dumps( + { + "success": True, + "team_id": team_id, + "message": "Team deactivated and GCS access revoked.", + } + ), + 200, + cors, + ) + + except Exception as e: + print(traceback.format_exc()) + return (json.dumps({"success": False, "error": f"Internal error: {str(e)}"}), 500, cors) + + +def _handle_onboard(request, cors): + """Register a new team.""" + try: + data = request.get_json(force=True, silent=True) + if data is None: + return ( + json.dumps({"success": False, "error": "Invalid or missing JSON body"}), + 400, + cors, + ) + + organization = data.get("organization", "").strip() + team_name = data.get("team_name", "").strip() + emails = data.get("emails", []) + service_accounts = data.get("service_accounts", []) + anonymous = data.get("anonymous", False) + + errors = [] + if not organization: + errors.append("'organization' is required") + if not emails or not isinstance(emails, list): + errors.append("'emails' must be a non-empty list") + else: + emails = [e.strip().lower() for e in emails if isinstance(e, str) and e.strip()] + if not emails: + errors.append("'emails' must contain at least one valid address") + if not isinstance(service_accounts, list): + errors.append("'service_accounts' must be a list") + else: + service_accounts = [ + e.strip().lower() for e in service_accounts if isinstance(e, str) and e.strip() + ] + + if team_name and _team_name_taken(db, team_name): + errors.append(f"Team name '{team_name}' is already taken.") + + if errors: + return (json.dumps({"success": False, "errors": errors}), 400, cors) + + counter_ref = db.collection("counters").document("teams") + team_id, anon_number = _allocate_ids(db.transaction(), counter_ref, anonymous) + display_org = f"Anonymous {anon_number}" if anonymous else organization + + bucket = gcs.bucket(UPLOAD_BUCKET) + bucket.blob(f"{team_id}/.keep").upload_from_string("", content_type="application/x-empty") + + all_principals = emails + service_accounts + _set_folder_permissions(UPLOAD_BUCKET, team_id, all_principals) + + db.collection("teams").add( + { + "team_id": team_id, + "team_name": team_name or None, + "organization": display_org, + "deanonymized_organization": organization, + "emails": emails, + "service_accounts": service_accounts, + "anonymous": anonymous, + "created_at": firestore.SERVER_TIMESTAMP, + "active": True, + } + ) + + email_warning = _warn_non_google_emails(emails) + + try: + send_welcome(emails, team_id, display_org, UPLOAD_BUCKET, anonymous, NEXT_DUE_DATE) + except Exception as e: + print(f"send_welcome failed: {e}") + + response = { + "success": True, + "team_id": team_id, + "team_name": team_name or None, + "organization": display_org, + "upload_folder": f"gs://{UPLOAD_BUCKET}/{team_id}/", + "instructions": ( + f"Upload forecast files to gs://{UPLOAD_BUCKET}/{team_id}/ " + f"using gsutil or the GCP Console. " + f"Name files: {{forecast_due_date}}.{{organization}}.{{N}}.json. " + f"Deadline: 23:59:59 UTC." + ), + } + + if anonymous: + response["note"] = ( + f"Public name is '{display_org}'. Use this as 'organization' in forecast files." + ) + if email_warning: + response["warning"] = email_warning + + return (json.dumps(response, indent=2), 201, cors) + + except Exception as e: + print(traceback.format_exc()) + return (json.dumps({"success": False, "error": f"Internal error: {str(e)}"}), 500, cors) diff --git a/src/external-submissions/functions/onboard/requirements.txt b/src/external-submissions/functions/onboard/requirements.txt new file mode 100644 index 00000000..5f956dec --- /dev/null +++ b/src/external-submissions/functions/onboard/requirements.txt @@ -0,0 +1,6 @@ +functions-framework==3.* +google-cloud-storage +google-cloud-firestore +dnspython +pytz +python-dateutil diff --git a/src/external-submissions/functions/setup_permissions.py b/src/external-submissions/functions/setup_permissions.py new file mode 100644 index 00000000..d6d490a1 --- /dev/null +++ b/src/external-submissions/functions/setup_permissions.py @@ -0,0 +1,129 @@ +#!/usr/bin/env python3 +"""Set up IAM permissions for the ForecastBench submissions service account. + +Run once per environment after creating the service account: + + eval $(cat ../../variables.mk | grep -v '^#' | xargs) python setup_permissions.py + +Requirements: + pip install google-cloud-storage google-cloud-resource-manager +""" + +import os +import subprocess +import sys + +from google.cloud import resourcemanager_v3, storage +from google.iam.v1 import iam_policy_pb2, policy_pb2 + + +def _require(name: str) -> str: + val = os.environ.get(name, "") + if not val: + print(f"ERROR: {name} must be set", file=sys.stderr) + sys.exit(1) + return val + + +def _add_bucket_role(gcs: storage.Client, bucket_name: str, member: str, role: str) -> None: + bucket = gcs.bucket(bucket_name) + policy = bucket.get_iam_policy(requested_policy_version=3) + policy.version = 3 + existing = next((b for b in policy.bindings if b["role"] == role), None) + if existing: + existing["members"].add(member) + else: + policy.bindings.append({"role": role, "members": {member}}) + bucket.set_iam_policy(policy) + print(f" gs://{bucket_name}: granted {role} to {member}") + + +def _add_project_role( + rm: resourcemanager_v3.ProjectsClient, project: str, member: str, role: str +) -> None: + resource = f"projects/{project}" + policy = rm.get_iam_policy(request=iam_policy_pb2.GetIamPolicyRequest(resource=resource)) + for binding in policy.bindings: + if binding.role == role: + if member not in binding.members: + binding.members.append(member) + rm.set_iam_policy( + request=iam_policy_pb2.SetIamPolicyRequest(resource=resource, policy=policy) + ) + print(f" project/{project}: granted {role} to {member}") + return + policy.bindings.append(policy_pb2.Binding(role=role, members=[member])) + rm.set_iam_policy(request=iam_policy_pb2.SetIamPolicyRequest(resource=resource, policy=policy)) + print(f" project/{project}: granted {role} to {member}") + + +def main() -> None: + """Set up IAM permissions for the submissions service account.""" + project = _require("CLOUD_PROJECT") + sa_email = _require("SUBMISSIONS_SA_EMAIL") + upload_bucket = _require("SUBMISSIONS_BUCKET") + deployer = _require("SUBMISSIONS_DEPLOYER") + + interstitial_bucket = os.environ.get("SUBMISSIONS_INTERSTITIAL_BUCKET", "") + forecast_sets_bucket = os.environ.get("FORECAST_SETS_BUCKET", "") + history_bucket = os.environ.get("SUBMISSIONS_HISTORY_BUCKET", "") + + member = f"serviceAccount:{sa_email}" + gcs = storage.Client() + rm = resourcemanager_v3.ProjectsClient() + + print(f"Configuring permissions for {sa_email} in project {project}...") + + # Upload bucket: storage.admin — onboarding sets per-team conditional IAM bindings + _add_bucket_role(gcs, upload_bucket, member, "roles/storage.admin") + + # Interstitial: objectAdmin — post-round may overwrite files on reruns + if interstitial_bucket: + _add_bucket_role(gcs, interstitial_bucket, member, "roles/storage.objectAdmin") + + # History + forecast-sets: write-only; paths include round_date, no collision risk + for bucket in filter(None, [history_bucket, forecast_sets_bucket]): + _add_bucket_role(gcs, bucket, member, "roles/storage.objectCreator") + + # Derive project number for Eventarc and GCS SAs + project_obj = rm.get_project(name=f"projects/{project}") + project_number = project_obj.name.split("/")[-1] + + # Eventarc SA: needs legacyBucketReader to validate the GCS trigger at deploy time + eventarc_sa = f"serviceAccount:service-{project_number}@gcp-sa-eventarc.iam.gserviceaccount.com" + _add_bucket_role(gcs, upload_bucket, eventarc_sa, "roles/storage.legacyBucketReader") + + # GCS SA: needs pubsub.publisher to emit object-finalized events via Eventarc + gcs_sa = f"serviceAccount:service-{project_number}@gs-project-accounts.iam.gserviceaccount.com" + _add_project_role(rm, project, gcs_sa, "roles/pubsub.publisher") + + # Project-level roles for the submissions SA + for role in ( + "roles/datastore.user", + "roles/eventarc.eventReceiver", + "roles/run.invoker", + ): + _add_project_role(rm, project, member, role) + + # Allow the deploying user to deploy functions that run as this SA. + # Uses gcloud since google-cloud-iam-admin is not a standard dependency. + subprocess.run( + [ + "gcloud", + "iam", + "service-accounts", + "add-iam-policy-binding", + sa_email, + f"--project={project}", + f"--member=user:{deployer}", + "--role=roles/iam.serviceAccountUser", + ], + check=True, + ) + print(f" SA {sa_email}: granted roles/iam.serviceAccountUser to user:{deployer}") + + print("Done.") + + +if __name__ == "__main__": + main() diff --git a/src/helpers/email.py b/src/helpers/email.py new file mode 100644 index 00000000..265361d5 --- /dev/null +++ b/src/helpers/email.py @@ -0,0 +1,95 @@ +"""ForecastBench email notifications via SMTP. + +Reads credentials from environment variables: + SMTP_USER sender email address + SMTP_PASSWORD app password for the sender account + SMTP_HOST SMTP server (default: smtp.gmail.com) + SMTP_PORT SMTP port (default: 587) +""" + +import os +import smtplib +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText + +SMTP_USER = os.environ.get("SMTP_USER", "") +SMTP_PASSWORD = os.environ.get("SMTP_PASSWORD", "") +SMTP_HOST = os.environ.get("SMTP_HOST", "smtp.gmail.com") +SMTP_PORT = int(os.environ.get("SMTP_PORT", "587")) + + +def _send(to_emails, subject, body): + if not SMTP_USER or not SMTP_PASSWORD: + return + if not to_emails: + return + + msg = MIMEMultipart() + msg["From"] = SMTP_USER + msg["To"] = ", ".join(to_emails) + msg["Subject"] = subject + msg.attach(MIMEText(body, "plain")) + + with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=15) as server: + server.starttls() + server.login(SMTP_USER, SMTP_PASSWORD) + server.sendmail(SMTP_USER, to_emails, msg.as_string()) + + +def send_welcome( + emails: list, + team_id: str, + organization: str, + upload_bucket: str, + anonymous: bool = False, + next_due_date: str = "", +) -> None: + """Send a welcome email to a newly registered team. + + Args: + emails (list): Recipient email addresses. + team_id (str): Internal team ID (team1, team2, ...). + organization (str): Public-facing organization name shown to the team. + upload_bucket (str): GCS bucket name. + anonymous (bool): Whether the team is anonymous. + next_due_date (str): Next forecast due date (YYYY-MM-DD), or empty to omit. + """ + subject = "ForecastBench — Your team has been registered" + + due_date_line = f"\nNext forecast due date: {next_due_date}\n" if next_due_date else "" + + body = f"""Hi, + +Your team has been registered on ForecastBench. + +Team: {organization} +Upload folder: gs://{upload_bucket}/{team_id}/ +{due_date_line} +To submit a forecast: +1. Download the question set at 0:00 UTC on the forecast due date: + https://github.com/forecastingresearch/forecastbench-datasets +2. Generate your forecasts. +3. Name your file: {{forecast_due_date}}.{{organization}}.{{N}}.json +4. Upload it to your folder: + gsutil cp your-file.json gs://{upload_bucket}/{team_id}/ + gcloud storage cp your-file.json gs://{upload_bucket}/{team_id}/ + +To test your upload permissions before the due date: + gcloud storage cp test.json gs://{upload_bucket}/{team_id}/test/ + +Deadline: 23:59:59 UTC on the forecast due date. +Max submissions: 3 per round (one per model). + +Submission instructions: +https://github.com/forecastingresearch/forecastbench/wiki/How-to-submit-to-ForecastBench + +If you have any questions, reply to this email. + +ForecastBench team +""" + if anonymous: + body += ( + f"\nNote: Your public name is '{organization}'." + " Use this as 'organization' in your forecast files.\n" + ) + _send(emails, subject, body) diff --git a/src/helpers/submissions.py b/src/helpers/submissions.py new file mode 100644 index 00000000..350f253a --- /dev/null +++ b/src/helpers/submissions.py @@ -0,0 +1,33 @@ +"""Submission pipeline date utilities.""" + +import os +from datetime import datetime, timezone + +from constants import RunMode + +RUN_MODE = RunMode(os.environ.get("RUN_MODE", "PROD")) +MOCK_DATE = os.environ.get("MOCK_DATE", "") + + +def get_datetime_now_utc() -> datetime: + """Get current UTC datetime, with mock-date support in TEST mode.""" + if RUN_MODE == RunMode.TEST and MOCK_DATE: + try: + return datetime.strptime(MOCK_DATE, "%Y-%m-%d").replace( + hour=12, minute=0, second=0, tzinfo=timezone.utc + ) + except ValueError: + pass + from dates import get_datetime_today + + return get_datetime_today() + + +def is_past_deadline(round_date_str: str) -> bool: + """Return True if the current UTC time is past 23:59:59 on round_date_str.""" + try: + due = datetime.strptime(round_date_str, "%Y-%m-%d") + deadline = due.replace(hour=23, minute=59, second=59, tzinfo=timezone.utc) + return get_datetime_now_utc() > deadline + except ValueError: + return False