-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathevaluate.py
More file actions
379 lines (332 loc) · 14.4 KB
/
evaluate.py
File metadata and controls
379 lines (332 loc) · 14.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
"""
SynthesisBench Evaluation Harness.
Evaluates LLM agents on evidence synthesis tasks (risk of bias, eligibility)
using Cochrane review ground truth. Supports Anthropic and OpenAI models,
custom agents, resumable runs, and deterministic scoring.
Usage:
python3 evaluate.py --task-type rob --model claude-sonnet-4-5-20250929
python3 evaluate.py --task-type eligibility --model gpt-4o --limit 100
python3 evaluate.py --resume 20260208-143022-rob-claude-sonnet-4-5-20250929
python3 evaluate.py --score-only 20260208-rob-run-id
"""
import argparse
import asyncio
import json
import logging
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
from dotenv import load_dotenv
load_dotenv()
from synthesisbench.evaluate.agents import get_agent, load_custom_agent
from synthesisbench.evaluate.runner import (
load_instances, load_papers, check_resume,
estimate_tokens, run_evaluation,
)
from synthesisbench.evaluate.tasks import TASK_TYPES
DATA_DIR = Path("data")
PAPERS_DIR = DATA_DIR / "paper_store"
SUBSETS_DIR = DATA_DIR / "subsets"
RUNS_DIR = DATA_DIR / "runs"
def load_subset(subset_arg: str, task_type: str, tier: str) -> set[str]:
"""Load a subset manifest, returning set of task_ids."""
path = Path(subset_arg)
if not path.exists():
path = SUBSETS_DIR / f"{subset_arg}.json"
if not path.exists():
print(f"Error: subset manifest not found: {subset_arg}")
sys.exit(1)
manifest = json.loads(path.read_text())
if manifest.get("task_type") and manifest["task_type"] != task_type:
print(f"Error: subset '{manifest.get('name')}' is for task type "
f"'{manifest['task_type']}', not '{task_type}'")
sys.exit(1)
manifest_tier = manifest.get("tier")
if manifest_tier and tier != "both" and manifest_tier != tier:
print(f"Error: subset '{manifest.get('name')}' is for tier "
f"'{manifest_tier}', not '{tier}'")
sys.exit(1)
if tier == "both":
# task_id overlap exists across tiers (especially eligibility), so
# tier-specific subsets are unsafe for --tier both.
if manifest_tier is None:
print(f"Error: subset '{manifest.get('name')}' does not declare a tier, "
"so it cannot be safely used with --tier both")
sys.exit(1)
if manifest_tier != "both":
print(f"Error: subset '{manifest.get('name')}' is tier '{manifest_tier}' "
"and cannot be used with --tier both")
sys.exit(1)
return set(manifest["task_ids"])
def generate_run_id(model: str, task_type: str = "rob") -> str:
"""Generate a timestamped run ID."""
ts = datetime.now().strftime("%Y%m%d-%H%M%S")
short = re.sub(r"[^A-Za-z0-9._-]", "_", model[:40])
return f"{ts}-{task_type}-{short}"
def build_config(args) -> dict:
"""Build run configuration from CLI args."""
task_type = getattr(args, "task_type", "rob") or "rob"
tier = args.tier or TASK_TYPES[task_type].DEFAULT_TIER
if args.resume:
run_id = args.resume
config_path = RUNS_DIR / run_id / "config.json"
if config_path.exists():
existing = json.loads(config_path.read_text())
existing.setdefault("task_type", "rob")
existing.setdefault("tier", "abstract")
existing.setdefault("concurrency", 4)
existing.setdefault("batch", False)
existing.setdefault("limit", None)
existing.setdefault("extended_thinking", False)
existing.setdefault("thinking_budget", 10_000)
return existing
resume_model = args.model or ""
return {
"run_id": run_id,
"task_type": task_type,
"model": resume_model,
"tier": tier,
"limit": args.limit,
"concurrency": args.concurrency or (10 if resume_model.startswith("gemini-") else 4),
"started_at": datetime.now(timezone.utc).isoformat(),
}
model = args.model or "custom"
run_id = args.run_id or generate_run_id(model, task_type)
if args.concurrency is not None:
concurrency = args.concurrency
elif model.startswith("gemini-"):
concurrency = 10
else:
concurrency = 4
config = {
"run_id": run_id,
"task_type": task_type,
"model": model,
"agent_path": args.agent,
"tier": tier,
"limit": args.limit,
"seed": args.seed,
"subset": getattr(args, "subset", None),
"batch": getattr(args, "batch", False),
"concurrency": concurrency,
"started_at": datetime.now(timezone.utc).isoformat(),
}
if getattr(args, "extended_thinking", False):
config["extended_thinking"] = True
config["thinking_budget"] = getattr(args, "thinking_budget", 10_000)
return config
def _load_responses_from_disk(run_dir: Path, tier: str) -> list[dict]:
"""Load all saved response files for scoring.
Each response is tagged with its tier so it can be joined correctly.
Corrupt JSON files are skipped with a warning.
"""
responses = []
tiers = ["abstract", "fulltext"] if tier == "both" else [tier]
for t in tiers:
resp_dir = run_dir / "responses" / t
if not resp_dir.exists():
continue
for path in sorted(resp_dir.glob("*.json")):
try:
resp = json.loads(path.read_text())
except (json.JSONDecodeError, ValueError):
logging.getLogger(__name__).warning(
f"Skipping corrupt response file: {path}")
continue
resp.setdefault("tier", t)
responses.append(resp)
return responses
async def _main_async(args):
"""Async main entry point."""
# Score-only mode — handle before creating any run directory
if args.score_only:
score_run_id = args.score_only
score_dir = RUNS_DIR / score_run_id
score_config_path = score_dir / "config.json"
if score_config_path.exists():
score_config = json.loads(score_config_path.read_text())
score_config.setdefault("task_type", "rob")
score_tier = score_config.get("tier", "abstract")
else:
# Infer tier from which response subdirectories exist
has_abstract = (score_dir / "responses" / "abstract").exists()
has_fulltext = (score_dir / "responses" / "fulltext").exists()
if has_abstract and has_fulltext:
score_tier = "both"
elif has_fulltext:
score_tier = "fulltext"
else:
score_tier = "abstract"
score_task_type = getattr(args, "task_type", "rob") or "rob"
score_config = {
"run_id": score_run_id, "model": "unknown",
"tier": score_tier, "task_type": score_task_type,
}
task = TASK_TYPES[score_config["task_type"]]
instances = load_instances(
task.TASKS_DIR, score_tier,
required_fields=task.REQUIRED_FIELDS,
validate_ground_truth=task.validate_ground_truth,
)
instances_by_key = {(i.get("tier", "abstract"), i["task_id"]): i for i in instances}
responses = _load_responses_from_disk(score_dir, score_tier)
records = task.build_scoring_records(responses, instances_by_key)
if not records:
print("No scoreable responses found.")
sys.exit(1)
metrics = task.compute_metrics(records)
token_usage = {
"total_input": sum(r.get("input_tokens", 0) for r in responses),
"total_output": sum(r.get("output_tokens", 0) for r in responses),
}
task.print_report(score_config, metrics, token_usage)
task.write_report_json(score_dir, score_config, metrics, token_usage)
return
config = build_config(args)
run_id = config["run_id"]
tier = config["tier"]
task = TASK_TYPES[config["task_type"]]
run_dir = RUNS_DIR / run_id
run_dir.mkdir(parents=True, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
handlers=[
logging.FileHandler(run_dir / "run.log"),
logging.StreamHandler(),
],
)
# Load instances
batch_mode = config.get("batch", False)
if batch_mode and not task.supports_batch:
print(f"Error: batch mode not supported for task type '{config['task_type']}'")
sys.exit(1)
subset_ids = None
if config.get("subset"):
subset_ids = load_subset(config["subset"], config["task_type"], tier)
instances = load_instances(
task.TASKS_DIR, tier, run_id=run_id,
limit=config.get("limit"), seed=config.get("seed"),
batch=batch_mode,
required_fields=task.REQUIRED_FIELDS,
validate_ground_truth=task.validate_ground_truth,
subset=subset_ids,
)
if not instances:
print("No task instances found. Run the data pipeline first.")
sys.exit(1)
config["task_count"] = len(instances)
config_path = run_dir / "config.json"
config_path.write_text(json.dumps(config, indent=2))
papers = load_papers(PAPERS_DIR)
# Create or load agent
if config.get("agent_path"):
agent = load_custom_agent(config["agent_path"])
model_name = "custom"
else:
model_name = config["model"]
if not model_name:
print("Error: --model or --agent required")
sys.exit(1)
agent = get_agent(
model_name,
extended_thinking=config.get("extended_thinking", False),
thinking_budget=config.get("thinking_budget", 10_000),
)
# Pre-run estimate
if not args.yes and not config.get("limit"):
sample = []
for inst in instances[:10]:
p = task.build_prompt(inst, papers)
if p:
sample.append(p)
est = estimate_tokens(sample, len(instances))
print(f"\nEstimated input tokens: ~{est:,} across {len(instances)} instances.")
confirm = input("Proceed? [Y/n] ").strip().lower()
if confirm and confirm != "y":
print("Aborted.")
sys.exit(0)
# Check resume
completed_ids = set()
if args.resume:
completed_ids = check_resume(run_dir, tier)
mode_str = " | Mode: batch" if batch_mode else ""
thinking_str = ""
if config.get("extended_thinking"):
if "opus-4-6" in model_name:
thinking_str = " | Thinking: adaptive"
else:
thinking_str = f" | Thinking: {config.get('thinking_budget', 10_000):,} tokens"
print(f"\nStarting evaluation: {run_id}")
print(f"Task: {config['task_type']} | Model: {model_name} | Tier: {tier} | "
f"Instances: {len(instances)} | Concurrency: {config['concurrency']}"
f"{mode_str}{thinking_str}")
try:
responses = await run_evaluation(
agent=agent,
instances=instances,
papers=papers,
run_dir=run_dir,
model_name=model_name,
tier=tier,
concurrency=config["concurrency"],
completed_ids=completed_ids,
batch=batch_mode,
task_module=task,
)
except KeyboardInterrupt:
print(f"\nInterrupted. Resume with: python3 evaluate.py --task-type {config['task_type']} --resume {run_id}")
sys.exit(1)
all_responses = _load_responses_from_disk(run_dir, tier)
instances_by_key = {(i.get("tier", "abstract"), i["task_id"]): i for i in instances}
records = task.build_scoring_records(all_responses, instances_by_key)
if not records:
print("No scoreable responses.")
sys.exit(1)
metrics = task.compute_metrics(records)
token_usage = {
"total_input": sum(r.get("input_tokens", 0) for r in all_responses),
"total_output": sum(r.get("output_tokens", 0) for r in all_responses),
}
task.print_report(config, metrics, token_usage)
task.write_report_json(run_dir, config, metrics, token_usage)
def main():
parser = argparse.ArgumentParser(
description="SynthesisBench Evaluation Harness")
parser.add_argument("--task-type",
choices=sorted(TASK_TYPES.keys()),
help="Task type to evaluate (required unless --score-only or --resume)")
parser.add_argument("--model", help="LLM model identifier (e.g. claude-sonnet-4-5-20250929, gpt-4o)")
parser.add_argument("--agent", help="Path to custom agent Python file")
parser.add_argument("--tier", default=None,
choices=["abstract", "fulltext", "both"],
help="Evaluation tier (inferred from task type if omitted)")
parser.add_argument("--limit", type=int, help="Max instances to evaluate")
parser.add_argument("--resume", help="Run ID to resume")
parser.add_argument("--run-id", help="Custom run ID")
parser.add_argument("--score-only", help="Score existing run without re-running")
parser.add_argument("--seed", type=int,
help="Shuffle seed for reproducible task selection across runs")
parser.add_argument("--subset",
help="Subset manifest name or path (e.g. 'eligibility-abstract-public-v1')")
parser.add_argument("--batch", action="store_true",
help="Batch mode: group items by study for fewer API calls")
parser.add_argument("--concurrency", type=int, default=None, choices=range(1, 65),
metavar="N",
help="Parallel API calls, 1-64 (default: 4, 10 for Gemini)")
parser.add_argument("--extended-thinking", action="store_true",
help="Enable extended thinking (Claude models only)")
parser.add_argument("--thinking-budget", type=int, default=10_000,
metavar="N",
help="Token budget for thinking (default: 10000, ignored for Opus 4.6 adaptive)")
parser.add_argument("--yes", action="store_true",
help="Skip token estimate confirmation")
args = parser.parse_args()
if not args.task_type and not args.resume and not args.score_only:
parser.error("--task-type is required unless --score-only or --resume is used")
if not args.model and not args.agent and not args.resume and not args.score_only:
parser.error("One of --model, --agent, --resume, or --score-only is required")
asyncio.run(_main_async(args))
if __name__ == "__main__":
main()