-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathvideo_processor.py
More file actions
221 lines (174 loc) · 7.85 KB
/
video_processor.py
File metadata and controls
221 lines (174 loc) · 7.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
from __future__ import annotations
from pathlib import Path
import shutil
from typing import Callable, Optional
import av
import cv2
import numpy as np
import torch
# Try PyTorch detector first (GPU accelerated), fall back to DeepFace if not available
try:
from face_detector_pytorch import RobustFaceCensor
print("[INFO] Using InsightFace RetinaFace detector (GPU accelerated)")
USE_PYTORCH = True
except ImportError:
from face_detector import RobustFaceCensor
print("[INFO] Using DeepFace detector (CPU only)")
USE_PYTORCH = False
PreviewCallback = Callable[[np.ndarray], None]
ProgressCallback = Callable[[int, int], None]
CancelCheckCallback = Callable[[], bool]
def _choose_speed_profile(width: int, height: int, fps: float) -> tuple[int, int, int, int]:
"""Choose speed profile based on video resolution and frame rate.
Since DeepFace/TensorFlow on CPU is very slow (~2-3 seconds per detection),
we need AGGRESSIVE frame skipping to make processing feasible.
"""
pixels = width * height
# Be very aggressive with frame skipping for CPU
# This trades quality for speed - detects only every N frames
max_detection_size = 1280
detect_every_n_frames = 1
# Adjust based on resolution - higher res = more skipping needed
if pixels >= 3840 * 2160: # 4K - skip a LOT
max_detection_size = 512
detect_every_n_frames = 16 # Detect only 2 times per second
elif pixels >= 2560 * 1440: # 2K
max_detection_size = 640
detect_every_n_frames = 12
elif pixels >= 1920 * 1080: # Full HD
max_detection_size = 768
detect_every_n_frames = 10 # Detect 3 times per second at 30fps
elif pixels >= 1280 * 720: # HD
max_detection_size = 896
detect_every_n_frames = 8 # Detect 3-4 times per second
else: # Lower res
max_detection_size = 1024
detect_every_n_frames = 6
# Update based on frame rate if very high
if fps > 60:
detect_every_n_frames = max(detect_every_n_frames, 12)
elif fps > 30:
pass # Keep as is
# Preview and progress updates
preview_every_n_frames = max(1, int(round(fps / 8.0)))
progress_every_n_frames = max(1, int(round(fps / 3.0)))
return max_detection_size, detect_every_n_frames, preview_every_n_frames, progress_every_n_frames
def build_output_path(input_path: str, suffix: str) -> str:
src = Path(input_path)
clean_suffix = suffix.strip() or "censored"
return str(src.with_name(f"{src.stem}_{clean_suffix}{src.suffix}"))
def _copy_file(src_path: str, dst_path: str) -> None:
src = Path(src_path)
dst = Path(dst_path)
if src.resolve() == dst.resolve():
return
shutil.copyfile(src, dst)
def _mux_original_audio(video_only_path: str, original_input_path: str, final_output_path: str) -> None:
with av.open(original_input_path) as input_container:
input_audio_streams = [stream for stream in input_container.streams if stream.type == "audio"]
if not input_audio_streams:
_copy_file(video_only_path, final_output_path)
return
input_audio_stream = input_audio_streams[0]
with av.open(video_only_path) as processed_video_container, av.open(final_output_path, mode="w") as output_container:
processed_video_stream = processed_video_container.streams.video[0]
out_video_stream = output_container.add_stream(template=processed_video_stream)
out_audio_stream = output_container.add_stream(template=input_audio_stream)
for packet in processed_video_container.demux(processed_video_stream):
if packet.dts is None:
continue
packet.stream = out_video_stream
output_container.mux(packet)
for packet in input_container.demux(input_audio_stream):
if packet.dts is None:
continue
packet.stream = out_audio_stream
output_container.mux(packet)
def censor_video(
input_path: str,
output_path: str,
on_preview: Optional[PreviewCallback] = None,
on_progress: Optional[ProgressCallback] = None,
pixelation_strength: float = 5.0,
detect_every_n_frames: Optional[int] = None,
cancel_check: Optional[CancelCheckCallback] = None,
) -> str:
final_output = Path(output_path)
temp_video_only_output = str(final_output.with_name(f"{final_output.stem}.video_only.tmp{final_output.suffix}"))
capture = cv2.VideoCapture(input_path)
if not capture.isOpened():
raise RuntimeError("Could not open the selected video.")
fps = capture.get(cv2.CAP_PROP_FPS)
if fps <= 0:
fps = 30.0
width = int(capture.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(capture.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(capture.get(cv2.CAP_PROP_FRAME_COUNT))
print(f"[DEBUG] Video: {width}x{height} @ {fps} FPS, {total_frames} frames")
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
writer = cv2.VideoWriter(temp_video_only_output, fourcc, fps, (width, height))
if not writer.isOpened():
capture.release()
raise RuntimeError("Could not create the output MP4 file.")
cv2.setUseOptimized(True)
max_detection_size, auto_detect_every_n_frames, preview_every_n_frames, progress_every_n_frames = _choose_speed_profile(
width,
height,
fps,
)
# Use user-specified value if provided, otherwise use automatic profile
if detect_every_n_frames is None:
detect_every_n_frames = auto_detect_every_n_frames
print(f"[DEBUG] Speed profile: max_detection={max_detection_size}, detect_every={detect_every_n_frames}, preview_every={preview_every_n_frames}")
detector = RobustFaceCensor(
mode="pixel",
detect_every_n_frames=detect_every_n_frames,
max_detection_size=max_detection_size,
pixelation_strength=pixelation_strength,
)
frame_index = 0
detection_log_interval = max(1, total_frames // 20) # Log detection stats every 5%
write_errors = 0
try:
while True:
# Check for cancellation
if cancel_check is not None and cancel_check():
print("[DEBUG] Processing cancelled by user")
break
ok, frame = capture.read()
if not ok:
break
censored_frame, face_count = detector.censor_frame(frame)
# Write frame and check for errors
write_ok = writer.write(censored_frame)
if not write_ok:
write_errors += 1
if write_errors <= 3:
print(f"[WARNING] Frame {frame_index} write failed")
frame_index += 1
# Log detection stats periodically (every 5% of video)
if frame_index % detection_log_interval == 0 or frame_index <= 3:
print(f"[DEBUG] Frame {frame_index}/{total_frames}: {face_count} faces detected")
if on_preview is not None and (frame_index % preview_every_n_frames == 0):
on_preview(censored_frame)
should_emit_progress = (
frame_index == 1
or frame_index == total_frames
or (frame_index % progress_every_n_frames == 0)
)
if on_progress is not None and should_emit_progress:
on_progress(frame_index, total_frames)
finally:
detector.close()
writer.release()
capture.release()
try:
_mux_original_audio(temp_video_only_output, input_path, output_path)
except Exception as mux_error:
print(f"[WARNING] Could not preserve audio track: {mux_error}")
_copy_file(temp_video_only_output, output_path)
finally:
temp_path = Path(temp_video_only_output)
if temp_path.exists():
temp_path.unlink()
return output_path