-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcot_steps.py
More file actions
422 lines (355 loc) · 15.2 KB
/
cot_steps.py
File metadata and controls
422 lines (355 loc) · 15.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
import json
from pathlib import Path
from tqdm import tqdm
import numpy as np
import os
# lmdeploy for model inference
from lmdeploy import pipeline, TurbomindEngineConfig, GenerationConfig
from lmdeploy.vl.constants import IMAGE_TOKEN
# nuscenes-devkit for loading data
from nuscenes.nuscenes import NuScenes
import argparse
from datasets import load_dataset
NUM_SAMPLED_FRAMES = 4
def initialize_model(model_path):
"""Initializes and returns the InternVL model pipeline."""
print(f"Initializing model: {model_path}...")
backend_config = TurbomindEngineConfig(session_len=32768, tp=1)
pipe = pipeline(
model_path,
backend_config=backend_config
)
print("Model initialized successfully.")
return pipe
def initialize_nuscenes(nuscenes_dataroot, nuscenes_version):
"""Initializes and returns the NuScenes dataset object."""
print(f"Initializing NuScenes (version: {nuscenes_version})...")
if not nuscenes_dataroot.exists():
raise FileNotFoundError(
f"NuScenes dataroot not found at: {nuscenes_dataroot}\n"
"Please update the NUSCENES_DATAROOT variable in the script."
)
nusc = NuScenes(version=nuscenes_version, dataroot=str(nuscenes_dataroot), verbose=False)
print("NuScenes initialized successfully.")
return nusc
def load_data(file_path):
"""Loads a .jsonl file into a list of dictionaries."""
if not file_path.exists():
raise FileNotFoundError(f"Input file not found: {file_path}")
with open(file_path, 'r') as f:
return [json.loads(line) for line in f]
def create_scene_segment_map(keysegments_data):
"""
Creates a nested dictionary for efficient lookup: scene_token -> segment_key_token -> [frame_tokens].
Uses the 'keyframes_token_formatted' field created during processing.
"""
scene_map = {}
print("Pre-processing keysegments data for efficient lookup...")
# Iterate over the dataset
for item in keysegments_data:
scene_token = item['scene_token']
segments_for_scene = {}
# Structure is list of dicts: [{'segment_key': '...', 'frames': [...]}, ...]
formatted_segments = item.get('keyframes_token_formatted', [])
for segment_item in formatted_segments:
s_key = segment_item['segment_key']
frames = segment_item['frames']
segments_for_scene[s_key] = frames
scene_map[scene_token] = segments_for_scene
return scene_map
def sample_frames(frame_tokens: list, num_frames: int) -> list:
"""
Uniformly samples a specified number of frames from a list of frame tokens.
If the list has fewer frames than required, it returns all of them.
"""
if len(frame_tokens) <= num_frames:
return frame_tokens
indices = np.linspace(0, len(frame_tokens) - 1, num_frames, dtype=int)
sampled_tokens = [frame_tokens[i] for i in indices]
return sampled_tokens
def get_image_paths_for_scene(nusc, sample_tokens):
"""Gets the CAM_FRONT image paths for a list of sample_tokens."""
image_paths = []
for token in sample_tokens:
sample_record = nusc.get('sample', token)
cam_front_token = sample_record['data']['CAM_FRONT']
image_path = nusc.get_sample_data_path(cam_front_token)
image_paths.append(image_path)
return image_paths
# --- Chain-of-Thought Prompting Functions ---
def format_step1_scene_description_prompt(num_images: int) -> str:
"""Formats the prompt for Step 1: General Scene Description."""
image_placeholders = [f'Frame{i+1}: {IMAGE_TOKEN}' for i in range(num_images)]
image_str = '\n'.join(image_placeholders)
prompt = """You are an expert in autonomous driving scene understanding. You are given a sequence of frames from the front camera of a vehicle.
Your first task is to provide a concise, high-level description of the overall scene including the nearby vehicles with distinguishable descriptions.
Describe the scene in a few sentences. Focus only on what you can see."""
full_prompt = f"{image_str}\n\n{prompt}"
return full_prompt.strip()
def format_step2_ego_motion_prompt(step1_output: str) -> str:
"""Formats the prompt for Step 2: Ego Vehicle Motion Analysis."""
prompt = f"""Based on the provided frames and your previous scene description:
---
{step1_output}
---
Now, focus *only* on the ego vehicle's motion. Analyze how the background and lane markings move across the frames. Is the ego vehicle moving forward, turning, changing lanes, starting from a stop, or stopping?
Describe your reasoning in one or two sentences, and then conclude with the most likely motion type.
**Your final conclusion must be one of these exact phrases**:
- Stopped
- Turn left
- Turn right
- Change lane to the left
- Change lane to the right
- Starting
- Stopping
- Straight, constant speed"""
return prompt.strip()
def format_step3_nearby_vehicles_prompt(step1_output, step2_output: str) -> str:
"""Formats the prompt for Step 3: Nearby Vehicles Motion Analysis."""
prompt = f"""Excellent. Your analysis of the scene and the ego vehicle's motion was:
---
{step1_output}
{step2_output}
---
Now, analyze nearby vehicles (cars, trucks, buses, bicycles, motorcycles, trailer, construction vehicle) in front of or near the ego vehicle. For each distinct vehicle, provide a brief reasoning and conclude with its most likely motion type (including stopped) from the following list:
- Stopped
- Turn left
- Turn right
- Change lane to the left
- Change lane to the right
- Starting
- Stopping
- Straight, constant speed
If there are no other vehicles or their motion is unclear, state that."""
return prompt.strip()
def format_step4_final_json_prompt(step2_output: str, step3_output: str) -> str:
"""Formats the prompt for Step 4: Final JSON Generation."""
classification_to_phrase = {
"Stopped": "stopped.",
"Turn left": "turning to the left.",
"Turn right": "turning to the right.",
"Change lane to the left": "changing lane to the left.",
"Change lane to the right": "changing lane to the right.",
"Starting": "starting from a stop.",
"Stopping": "stopping.",
"Straight, constant speed": "moving forward at a relatively constant speed."
}
prompt = f"""Based on all the previous analysis:
---
Ego Vehicle Analysis Summary:
{step2_output}
---
Nearby Vehicles Analysis Summary:
{step3_output}
---
Your final task is to consolidate this information into a single JSON object.
- First, extract the final motion classification for the ego vehicle and each nearby vehicle from your previous responses.
- Second, map each classification to the corresponding descriptive phrase using the provided dictionary.
- Finally, construct the JSON object.
**Motion Phrases Dictionary**:
{json.dumps(classification_to_phrase, indent=2)}
**Output Format**:
{{
"ego_vehicle_motion": "<phrase from dictionary>",
"nearby_vehicles_motion": [
{{
"vehicle_id": "<a brief, unique description, e.g., 'white SUV in front'>",
"motion": "<phrase from dictionary>"
}},
...
]
}}
**Important**: Respond with *only* the raw JSON object and nothing else. Do not wrap it in markdown or add any explanations."""
return prompt.strip()
def insert_description(caption_text, seg_num):
description_line = f"Description of video keysegment {seg_num}:"
# Split the string into a list of lines
lines = caption_text.splitlines()
# Find the index of the opening "```json" line
try:
json_start_index = lines.index("```json")
except ValueError:
raise ValueError("No JSON block found in caption text.")
# Insert the description line before the JSON block
lines.insert(json_start_index, description_line)
# Join the list back into a single string
return "\n".join(lines)
def ensure_json_block(s: str) -> str:
"""
Ensures the given string starts with a ```json code block indicator.
If it's missing, it will be added.
"""
s = s.strip()
if not s.startswith("```json"):
# Add the json code block markers
s = f"```json\n{s}\n```"
return s
def main(args):
NUSCENES_VERSION = args.nuscenes_version
NUSCENES_DATAROOT = args.nuscenes_dataroot
BENCHMARK_FILES = args.benchmark_files
OUTPUT_DIR = args.output_dir
# --- Model Configuration ---
MODEL_PATH = args.model_path
MAX_PATCH = args.max_patch
# Initialization
pipe = initialize_model(MODEL_PATH)
nusc = initialize_nuscenes(NUSCENES_DATAROOT, NUSCENES_VERSION)
# HF load
keysegments_data = load_dataset(os.path.join(BENCHMARK_FILES, 'keysegments'), split='train')
scene_segment_map = create_scene_segment_map(keysegments_data)
model_name_tag = Path(MODEL_PATH).name
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
output_file = OUTPUT_DIR / f"{model_name_tag}_captions.jsonl"
print(f"Temporal captions will be saved to: {output_file}")
with open(output_file, 'w') as f:
pass
tasks = []
for scene_token, segments in scene_segment_map.items():
for segment_key_token, frame_tokens in segments.items():
tasks.append((scene_token, segment_key_token, frame_tokens))
previous_scene_token = "previous token"
segmet_counter = 1
# --- Main processing loop with Chain-of-Thought ---
for scene_token, segment_key_token, all_frame_tokens in tqdm(tasks, desc="Generating Temporal Captions"):
if previous_scene_token in scene_token:
segmet_counter += 1
else:
segmet_counter = 1
try:
sampled_frame_tokens = sample_frames(all_frame_tokens, num_frames=NUM_SAMPLED_FRAMES)
if not sampled_frame_tokens:
print(f"Warning: No frames to process for segment '{segment_key_token}'. Skipping.")
continue
image_paths = get_image_paths_for_scene(nusc, sampled_frame_tokens)
messages = []
gen_config = GenerationConfig(max_new_tokens=1024, temperature=0.0)
prompt1 = format_step1_scene_description_prompt(len(image_paths))
content1 = [dict(type='text', text=prompt1)]
min_pixels = args.min_pixels #256 * 28 * 28
max_pixels = args.max_pixels #512 * 28 * 28
for image_path in image_paths:
content1.append(dict(
type='image_url',
image_url=dict(
max_dynamic_patch=MAX_PATCH,
min_pixels=min_pixels,
max_pixels=max_pixels,
url=image_path
)
))
messages.append(dict(role='user', content=content1))
response1 = pipe(messages, gen_config=gen_config)
step1_output = response1.text.strip()
messages.append(dict(role='assistant', content=step1_output))
prompt2 = format_step2_ego_motion_prompt(step1_output)
messages.append(dict(role='user', content=prompt2))
response2 = pipe(messages, gen_config=gen_config)
step2_output = response2.text.strip()
messages.append(dict(role='assistant', content=step2_output))
# --- STEP 3: Nearby Vehicles Motion ---
prompt3 = format_step3_nearby_vehicles_prompt(step1_output, step2_output)
messages.append(dict(role='user', content=prompt3))
response3 = pipe(messages, gen_config=gen_config)
step3_output = response3.text.strip()
messages.append(dict(role='assistant', content=step3_output))
prompt4 = format_step4_final_json_prompt(step2_output, step3_output)
messages.append(dict(role='user', content=prompt4))
final_response = pipe(messages, gen_config=gen_config)
final_json_output = final_response.text.strip()
#print(final_json_output)
final_json_output = ensure_json_block(final_json_output)
#print(final_json_output)
final_json_output = insert_description(final_json_output,segmet_counter)
# 6. Assemble and append the final result to the output file
result = {
"scene_token": scene_token,
"segment_token": segment_key_token,
"caption": final_json_output,
"chain_of_thought_history": {
"step1_scene_description": step1_output,
"step2_ego_motion_analysis": step2_output,
"step3_nearby_vehicles_analysis": step3_output,
}
}
previous_scene_token = scene_token
with open(output_file, 'a') as f:
f.write(json.dumps(result) + '\n')
except Exception as e:
print(f"\nAn error occurred while processing segment {segment_key_token} for scene {scene_token}: {e}")
error_result = {
"scene_token": scene_token,
"segment_token": segment_key_token,
"caption": f"ERROR: {str(e)}"
}
previous_scene_token = scene_token
with open(output_file, 'a') as f:
f.write(json.dumps(error_result) + '\n')
print("\n--- Temporal captioning complete! ---")
print(f"All captions have been saved to {output_file}")
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="NuScenes inference configuration")
parser.add_argument(
"--gpu-id",
type=str,
default="5",
help="Comma-separated list of GPU device IDs to make visible to CUDA"
)
parser.add_argument(
"--nuscenes-version",
type=str,
default="v1.0-trainval",
help="NuScenes dataset version"
)
parser.add_argument(
"--nuscenes-dataroot",
type=str,
default="/home/ma-user/work/kevin/data/AD/NuScenes",
help="Path to NuScenes dataset root"
)
parser.add_argument(
"--benchmark-files",
type=str,
default="/home/ma-user/work/saeed/TAD_code_data_submission/TAD/TAD_HF",
help="Path to benchmark files"
)
parser.add_argument(
"--output-dir",
type=str,
default="scene_cot_captions/",
help="Directory to save predictions"
)
# --- Model Configuration ---
parser.add_argument(
"--model-path",
type=str,
default="/home/ma-user/work/pretrained_models/InternVL3-8B/",
help="Path to the pretrained model"
)
parser.add_argument(
"--max-patch",
type=int,
default=1,
help="Maximum patch size"
)
parser.add_argument(
"--min-pixels",
type=int,
default=256 * 28 * 28,
help="Minimum number of pixels"
)
parser.add_argument(
"--max-pixels",
type=int,
default=512 * 28 * 28,
help="Maximum number of pixels"
)
args = parser.parse_args()
args.nuscenes_dataroot = Path(args.nuscenes_dataroot)
args.output_dir = Path(args.output_dir)
# Apply GPU visibility
os.environ["CUDA_VISIBLE_DEVICES"] = args.gpu_id
print("### Running on GPU", os.environ["CUDA_VISIBLE_DEVICES"])
# Print configuration summary
print("Model path:", args.model_path)
main(args)