-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbatch.py
More file actions
100 lines (85 loc) · 3.89 KB
/
Copy pathbatch.py
File metadata and controls
100 lines (85 loc) · 3.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
#!/usr/bin/env python3
"""
batch.py — 把 exam.json 裡每一題都跑過 v0 pipeline 產生 MP4
使用: python3 batch.py <exam.json> [output_dir]
"""
import argparse
import json
import subprocess
import sys
from pathlib import Path
# v0 pipeline 的位置(同目錄)
PIPELINE = Path(__file__).parent / "pipeline.py"
def problem_to_v0_json(exam_title: str, prob: dict) -> dict:
"""把 v1 的 problem 轉成 v0 pipeline 吃的格式"""
v0_dict = {
"title": f"{exam_title} — {prob['number']}",
"subtitle": prob["problem"][:30] + ("..." if len(prob["problem"]) > 30 else ""),
"problem": prob["problem"],
"steps": prob["steps"],
}
if "image" in prob:
v0_dict["image"] = prob["image"]
return v0_dict
def main():
from core.runtime import setup_utf8_stdout
setup_utf8_stdout()
ap = argparse.ArgumentParser()
ap.add_argument("exam_json", help="v1 exam.json 路徑")
ap.add_argument("output_dir", nargs="?", default="./videos",
help="輸出根目錄 (預設 ./videos),實際寫到 <output_dir>/<exam_stem>/")
ap.add_argument("--only", nargs="+", help="只跑特定題目 id,例如 --only q1 q3")
ap.add_argument("--step", type=int, help="從第幾個步驟(0-indexed)開始重新渲染(僅用於 --only 單題時)")
args = ap.parse_args()
exam_path = Path(args.exam_json)
# 各考卷獨立 subfolder,避免多份 exam 都有 q1.mp4 會互相覆蓋
out_dir = Path(args.output_dir) / exam_path.stem
out_dir.mkdir(parents=True, exist_ok=True)
data = json.loads(exam_path.read_text(encoding="utf-8"))
problems = data["problems"]
if args.only:
problems = [p for p in problems if p["id"] in args.only]
if not problems:
sys.exit(f"❌ 找不到題目 id: {args.only}")
print(f"📦 準備生成 {len(problems)} 支影片,輸出到 {out_dir}")
if not PIPELINE.exists():
sys.exit(f"❌ 找不到 v0 pipeline: {PIPELINE}")
results = []
exam_stem = exam_path.stem # 用考卷檔名當 work/output 的命名空間, 避免跨考卷串檔
for i, prob in enumerate(problems):
pid = prob["id"]
v0_json_path = out_dir / f"{pid}.json"
v0_data = problem_to_v0_json(data["exam_title"], prob)
v0_json_path.write_text(
json.dumps(v0_data, ensure_ascii=False, indent=2), encoding="utf-8"
)
# pipeline 的 out_name 用「考卷__題號」避免不同考卷的 q1 撞檔
unique_name = f"{exam_stem}__{pid}"
print(f"\n[{i+1}/{len(problems)}] 處理 {pid}: {prob['number']} (work: {unique_name})")
try:
cmd = [sys.executable, str(PIPELINE), str(v0_json_path.resolve()), unique_name]
if args.step is not None and len(problems) == 1:
# pipeline.py 內部的 --step 是 1-indexed (第 1 步開始)
cmd += ["--step", str(args.step + 1)]
subprocess.run(cmd, check=True)
# pipeline.py 輸出到 output/<unique_name>.mp4, 搬到 videos/<exam_stem>/<pid>.mp4
pipeline_out = Path(__file__).parent / "output"
src_mp4 = pipeline_out / f"{unique_name}.mp4"
src_srt = pipeline_out / f"{unique_name}.srt"
dst_mp4 = out_dir / f"{pid}.mp4"
dst_srt = out_dir / f"{pid}.srt"
if src_mp4.exists():
src_mp4.replace(dst_mp4)
if src_srt.exists():
src_srt.replace(dst_srt)
results.append((pid, True, dst_mp4))
except subprocess.CalledProcessError as e:
print(f" ❌ 失敗: {e}")
results.append((pid, False, None))
print(f"\n{'=' * 50}")
print(f"完成: {sum(1 for _, ok, _ in results if ok)}/{len(results)} 成功")
for pid, ok, path in results:
status = "✅" if ok else "❌"
print(f" {status} {pid}: {path if ok else '失敗'}")
if __name__ == "__main__":
main()