-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathaugment.py
More file actions
112 lines (89 loc) · 3.64 KB
/
augment.py
File metadata and controls
112 lines (89 loc) · 3.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import os
import numpy as np
from pathlib import Path
import shutil
import random
import multiprocessing
from tqdm import tqdm
# --- Configuration ---
# Input: Your Clean Training Goodware
SOURCE_DIR = Path("data_filtered/32bit/train/goodware")
# Output: Where the new augmented files will go
DEST_DIR = Path("data_filtered/32bit/train/goodware_augmented")
# Size Constraints (Matches Challenge Set Distribution)
# We want a spread from slightly padded (1MB) to massive installers (60MB)
MIN_ADD_MB = 1
MAX_ADD_MB = 65
# Augmentation Mix
# 70% High Entropy (Matches your specific Challenge Set problem)
# 30% Low Entropy (Prevents model from overfitting to high entropy)
HIGH_ENTROPY_RATIO = 0.7
def create_augmented_file(file_path):
try:
filename = file_path.name
# Decide Mode: High Entropy vs Low Entropy
is_high_entropy = random.random() < HIGH_ENTROPY_RATIO
# Decide Size: Random amount to append
add_size_bytes = random.randint(MIN_ADD_MB, MAX_ADD_MB) * 1024 * 1024
# Generate the suffix name so we know what we did
type_str = "high_ent" if is_high_entropy else "low_ent"
size_str = f"{add_size_bytes // (1024*1024)}MB"
new_filename = f"{type_str}_{size_str}_{filename}"
dest_path = DEST_DIR / new_filename
# 1. Read Original Content
with open(file_path, 'rb') as f:
original_content = f.read()
# 2. Generate Overlay
if is_high_entropy:
# Random bytes (Simulates packed data/encrypted payloads)
overlay = np.random.bytes(add_size_bytes)
else:
# Zeros (Simulates uninitialized .bss sections or padding)
overlay = b'\x00' * add_size_bytes
# 3. Write Combined File
with open(dest_path, 'wb') as f:
f.write(original_content)
f.write(overlay)
return "success"
except Exception as e:
return f"error: {e}"
def main():
# 1. Setup
if not SOURCE_DIR.exists():
print(f"[Error] Source directory not found: {SOURCE_DIR}")
return
# Clean/Create Destination
DEST_DIR.mkdir(parents=True, exist_ok=True)
print(f"[Augment] Reading from: {SOURCE_DIR}")
print(f"[Augment] Writing to: {DEST_DIR}")
print(f"[Config] Overlay Size: {MIN_ADD_MB}MB - {MAX_ADD_MB}MB")
print(f"[Config] High Entropy Ratio: {HIGH_ENTROPY_RATIO * 100}%")
# 2. Collect Files
files = list(SOURCE_DIR.glob("*"))
print(f"[Augment] Found {len(files)} base files.")
if len(files) == 0:
print("No files to augment.")
return
# 3. Run Multiprocessing
# We use CPU count - 1 to keep system responsive
num_workers = max(1, multiprocessing.cpu_count() - 1)
print(f"[Augment] Generating augmented dataset with {num_workers} workers...")
pool = multiprocessing.Pool(processes=num_workers)
results = list(tqdm(pool.imap_unordered(create_augmented_file, files), total=len(files)))
# 4. Report
success = results.count("success")
errors = len(results) - success
print("\n" + "="*30)
print(" AUGMENTATION COMPLETE")
print("="*30)
print(f"Original Files : {len(files)}")
print(f"New Files : {success}")
print(f"Errors : {errors}")
print(f"Total Goodware : {len(files) + success} (Original + Augmented)")
print("="*30)
print("Next Steps:")
print("1. Run preprocessing script (512x512 OOM Safe) to convert these new files.")
print("2. Run training script. The model will now see Goodware of all sizes/entropies.")
if __name__ == "__main__":
multiprocessing.freeze_support()
main()