-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathvln_n1.py
More file actions
205 lines (162 loc) · 7.29 KB
/
vln_n1.py
File metadata and controls
205 lines (162 loc) · 7.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
import logging
import warnings
import os
from datetime import datetime
from logging.handlers import RotatingFileHandler
from argparse import ArgumentParser
from pathlib import Path
# 0. Pre-parse raw_dir for log filename
temp_parser = ArgumentParser(add_help=False)
temp_parser.add_argument("--raw_dir", type=str, default="InternData-n1-demo")
known_args, _ = temp_parser.parse_known_args()
raw_dir_name = Path(known_args.raw_dir).name
# Clean up the name for filename safety
raw_dir_name = "".join(c if c.isalnum() or c in ('-', '_') else '_' for c in raw_dir_name)
# 1. root logger:最低 INFO
root = logging.getLogger()
root.setLevel(logging.INFO)
# 2. 控制台 handler(INFO+)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_formatter = logging.Formatter(
"%(asctime)s %(levelname)s %(message)s"
)
console_handler.setFormatter(console_formatter)
# 3. 文件 handler(WARNING+)
# 使用包含时间戳和 PID 的独立日志文件名,避免多进程冲突
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
pid = os.getpid()
log_filename = f"warnings_{raw_dir_name}_{timestamp}_{pid}.log"
file_handler = RotatingFileHandler(
log_filename,
maxBytes=50 * 1024 * 1024,
backupCount=5,
encoding="utf-8", # 强烈建议
)
file_handler.setLevel(logging.WARNING)
file_formatter = logging.Formatter(
"%(asctime)s %(levelname)s %(message)s"
)
file_handler.setFormatter(file_formatter)
# 4. 挂 handler(避免重复挂)
root.handlers.clear()
root.addHandler(console_handler)
root.addHandler(file_handler)
# 5. 捕获 warnings.warn
logging.captureWarnings(True)
import time
import json
from pathlib import Path
from lerobot.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata
from utils import Trajectories, get_task_idx
from utils.vln_n1 import VLN_N1_Trajectories
from functools import partial
from argparse import ArgumentParser
from utils.video import use_encoding
parser = ArgumentParser(description="Port VLN-N1 dataset to LeRobotDataset format")
parser.add_argument("--raw_dir", type=str, default="InternData-n1-demo", help="Path to the raw VLN-N1 dataset directory")
parser.add_argument("--output_dir", type=str, default=".", help="Path to the output LeRobotDataset directory")
parser.add_argument("--codec", type=str, default="h264", choices=["h264", "hevc", "libsvtav1"], help="Video codec to use for encoding")
parser.add_argument("--num_threads", type=int, default=4, help="Number of threads for image writing")
parser.add_argument("--num_processes", type=int, default=0, help="Number of processes for image writing")
parser.add_argument("--batch_size", type=int, default=50, help="Batch size for video encoding")
parser.add_argument("--roll_limit", type=float, default=5.0, help="Roll limit for filtering trajectories")
args = parser.parse_args()
use_encoding(args.codec)
def port(
raw_dir: str,
repo_id: str,
root: str,
traj_cls: type[Trajectories],
num_threads: int,
num_processes: int,
batch_size: int,
roll_limit: float = 5.0,
*args, **kwargs
):
"""Port raw dataset to LeRobotDataset format."""
logging.info(f"Porting raw dataset from {raw_dir} to LeRobotDataset repo {repo_id}")
# Determine features dynamically
features = traj_cls.get_features(raw_dir)
if root and Path(root).exists():
logging.info(f"Loading existing dataset from {root}")
lerobot_dataset = LeRobotDataset(repo_id, root=root, batch_encoding_size=batch_size)
lerobot_dataset.start_image_writer(num_processes=num_processes, num_threads=num_threads)
else:
logging.info(f"Creating new dataset at {root}")
lerobot_dataset = LeRobotDataset.create(
repo_id=repo_id,
root=root,
robot_type=traj_cls.ROBOT_TYPE,
fps=traj_cls.FPS,
features=features,
image_writer_processes=num_processes,
image_writer_threads=num_threads,
batch_encoding_size=batch_size,
)
filter_condition = {"roll_limit": roll_limit}
trajectories = traj_cls(raw_dir, get_task_idx=partial(get_task_idx, lerobot_dataset), features=features, filter_condition=filter_condition)
start_time = time.time()
num_episodes = len(trajectories)
logging.info(f"Number of episodes {num_episodes}")
ensure_meta_dir = lerobot_dataset.root / "meta"
ensure_meta_dir.mkdir(parents=True, exist_ok=True)
extras_path = ensure_meta_dir / "episodes_extras.jsonl"
with open(extras_path, "a", encoding="utf-8") as extras_file:
for episode_index, episode in enumerate(trajectories):
elapsed_time = time.time() - start_time
logging.info(f"{episode_index + 1} / {num_episodes} episodes processed (after {elapsed_time:.3f} seconds)")
for frame, task in episode:
lerobot_dataset.add_frame(frame, task=task)
lerobot_dataset.save_episode()
logging.info("Save_episode")
# Save extra metadata
ep_idx = lerobot_dataset.num_episodes - 1
extra_data = {"episode_index": ep_idx}
# Merge with trajectory metadata if available
extra_data.update(episode.metadata)
extras_file.write(json.dumps(extra_data) + "\n")
extras_file.flush()
# Manually flush any remaining videos when batch_encoding_size > 1.
if lerobot_dataset.batch_encoding_size > 1 and lerobot_dataset.episodes_since_last_encoding > 0:
start_ep = lerobot_dataset.num_episodes - lerobot_dataset.episodes_since_last_encoding
logging.info(
f"Batch encoding remaining {lerobot_dataset.episodes_since_last_encoding} episodes "
f"[{start_ep}, {lerobot_dataset.num_episodes})"
)
lerobot_dataset.batch_encode_videos(start_episode=start_ep, end_episode=lerobot_dataset.num_episodes)
lerobot_dataset.episodes_since_last_encoding = 0
# Ensure asynchronous image writer workers finish before validation.
lerobot_dataset.stop_image_writer()
def validate_dataset(repo_id, root: str):
"""Sanity check that ensure meta data can be loaded and all files are present."""
meta = LeRobotDatasetMetadata(repo_id, root=root)
if meta.total_episodes == 0:
raise ValueError("Number of episodes is 0.")
for ep_idx in range(meta.total_episodes):
data_path = meta.root / meta.get_data_file_path(ep_idx)
if not data_path.exists():
raise ValueError(f"Parquet file is missing in: {data_path}")
for vid_key in meta.video_keys:
vid_path = meta.root / meta.get_video_file_path(ep_idx, vid_key)
if not vid_path.exists():
raise ValueError(f"Video file is missing in: {vid_path}")
def main():
raw_dir = Path(args.raw_dir)
folder_name = raw_dir.name
output_dir = Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
root = output_dir / f"VLN-N1-{folder_name}"
port(
raw_dir=raw_dir,
repo_id=f"VLN-N1-{folder_name}",
root=root,
traj_cls=VLN_N1_Trajectories,
num_threads=args.num_threads,
num_processes=args.num_processes,
batch_size=args.batch_size,
roll_limit=args.roll_limit,
)
validate_dataset(f"VLN-N1-{folder_name}", root=root)
if __name__ == "__main__":
main()