Skip to content

Commit d040f11

Browse files
author
RTOSploit
committed
fix: use UC_HOOK_MEM_READ for peripheral MMIO instead of UC_HOOK_MEM_READ_UNMAPPED
The unmapped hook only fires once per page — after the first read maps the page, subsequent reads go to stale mapped memory. Now peripheral region is mapped as R+W and every read/write is intercepted via UC_HOOK_MEM_READ/WRITE hooks on the address range. Results: vuln-firmware.elf now shows 962+ PIP reads per execution (was 1 before). Fix also resolves circular import in fuzzing/__init__.py and corrects vector table section (address 0) being skipped in load_elf. 1550 tests passing.
1 parent 19983f8 commit d040f11

5 files changed

Lines changed: 95 additions & 107 deletions

File tree

rtosploit/fuzzing/__init__.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,19 @@
77
from rtosploit.fuzzing.fuzz_input import FuzzInputStream, InputExhausted
88
from rtosploit.fuzzing.input_injector import FuzzableInput, InputInjector
99
from rtosploit.fuzzing.mutator import Mutator
10-
from rtosploit.fuzzing.unicorn_worker import (
11-
UnicornFuzzEngine,
12-
UnicornFuzzStats,
13-
UnicornFuzzWorker,
14-
)
10+
11+
# Unicorn imports are lazy to avoid circular dependency:
12+
# fuzzing/__init__ -> unicorn_worker -> unicorn_engine -> fuzzing/execution -> fuzzing/__init__
13+
14+
15+
def _get_unicorn_classes():
16+
from rtosploit.fuzzing.unicorn_worker import (
17+
UnicornFuzzEngine,
18+
UnicornFuzzStats,
19+
UnicornFuzzWorker,
20+
)
21+
return UnicornFuzzEngine, UnicornFuzzStats, UnicornFuzzWorker
22+
1523

1624
__all__ = [
1725
"CorpusManager",
@@ -26,8 +34,5 @@
2634
"InputInjector",
2735
"Mutator",
2836
"StopReason",
29-
"UnicornFuzzEngine",
30-
"UnicornFuzzStats",
31-
"UnicornFuzzWorker",
3237
"make_result",
3338
]

rtosploit/peripherals/unicorn_engine.py

Lines changed: 61 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -201,17 +201,35 @@ def setup(self) -> None:
201201
# Raw binary: write entire firmware data to flash base
202202
uc.mem_write(self._firmware.base_address, self._firmware.data)
203203

204-
# 4. Peripheral region (0x40000000-0x5FFFFFFF): intentionally NOT mapped
205-
# MMIO accesses trigger unmapped hooks -> PIP/SVD/fallback
204+
# 4. Map peripheral region (0x40000000-0x5FFFFFFF) as R+W
205+
# We map it so reads don't trigger UC_HOOK_MEM_READ_UNMAPPED (which
206+
# only fires once per page). Instead, we use UC_HOOK_MEM_READ on the
207+
# range to intercept EVERY read for PIP/SVD routing.
208+
periph_size = _PERIPH_END - _PERIPH_START
209+
uc.mem_map(_PERIPH_START, periph_size, 3) # UC_PROT_READ | UC_PROT_WRITE
210+
for page in range(_PERIPH_START, _PERIPH_END, 0x1000):
211+
self._mapped_pages.add(page)
206212

207213
# 5. Map system registers region (0xE0000000-0xE00FFFFF) with R+W
208214
uc.mem_map(_SYSTEM_REG_START, _SYSTEM_REG_END - _SYSTEM_REG_START, 3)
209215
for page in range(_SYSTEM_REG_START, _SYSTEM_REG_END, 0x1000):
210216
self._mapped_pages.add(page)
211217

212218
# Set up hooks
213-
uc.hook_add(UC_HOOK_MEM_READ_UNMAPPED, self._hook_mem_read_unmapped)
214-
uc.hook_add(UC_HOOK_MEM_WRITE_UNMAPPED, self._hook_mem_write_unmapped)
219+
# Use UC_HOOK_MEM_READ on peripheral range to intercept EVERY MMIO read
220+
# (not just unmapped — the region is mapped but we override values via hook)
221+
uc.hook_add(UC_HOOK_MEM_READ, self._hook_periph_read,
222+
begin=_PERIPH_START, end=_PERIPH_END - 1)
223+
uc.hook_add(UC_HOOK_MEM_WRITE, self._hook_periph_write,
224+
begin=_PERIPH_START, end=_PERIPH_END - 1)
225+
# System register reads/writes
226+
uc.hook_add(UC_HOOK_MEM_READ, self._hook_sysreg_read,
227+
begin=_SYSTEM_REG_START, end=_SYSTEM_REG_END - 1)
228+
uc.hook_add(UC_HOOK_MEM_WRITE, self._hook_sysreg_write,
229+
begin=_SYSTEM_REG_START, end=_SYSTEM_REG_END - 1)
230+
# Unmapped access outside peripheral/system = crash
231+
uc.hook_add(UC_HOOK_MEM_READ_UNMAPPED, self._hook_unmapped_access)
232+
uc.hook_add(UC_HOOK_MEM_WRITE_UNMAPPED, self._hook_unmapped_access)
215233
uc.hook_add(UC_HOOK_BLOCK, self._hook_block)
216234
uc.hook_add(UC_HOOK_CODE, self._hook_code)
217235

@@ -384,88 +402,54 @@ def restore_snapshot(self, snapshot: UnicornSnapshot) -> None:
384402
# MMIO hooks with PIP routing
385403
# ------------------------------------------------------------------
386404

387-
def _map_page_if_needed(self, uc, address: int) -> bool:
388-
"""Map a 4KB page if not already mapped. Returns True if mapped."""
389-
page_base = address & ~0xFFF
390-
if page_base in self._mapped_pages:
391-
return True
392-
try:
393-
uc.mem_map(page_base, 0x1000, 3) # R+W
394-
self._mapped_pages.add(page_base)
395-
return True
396-
except Exception:
397-
return False
405+
def _hook_periph_read(self, uc, access, address, size, value, user_data):
406+
"""Intercept EVERY read in the peripheral MMIO range.
398407
399-
def _hook_mem_read_unmapped(self, uc, access, address, size, value, user_data):
400-
"""Handle unmapped memory reads with PIP routing.
401-
402-
Routing:
403-
- Peripheral range (0x40000000-0x5FFFFFFF): SVD -> PIP -> fallback
404-
- System registers (0xE0000000-0xE00FFFFF): system reg handler
405-
- Other: crash (unmapped non-peripheral access)
408+
Routes through CompositeMMIOHandler (SVD -> PIP -> fallback).
409+
The peripheral region is mapped as R+W so this hook fires on every
410+
access, not just the first (unlike UC_HOOK_MEM_READ_UNMAPPED).
406411
"""
407-
if _PERIPH_START <= address < _PERIPH_END:
408-
# Peripheral MMIO range -> route through composite handler
409-
try:
410-
result = self._mmio_handler.read(address, size)
411-
except InputExhausted:
412-
self._stopped = True
413-
self._stop_reason = "input_exhausted"
414-
self._stop_reason_enum = StopReason.INPUT_EXHAUSTED
415-
uc.emu_stop()
416-
return False
417-
self._map_page_if_needed(uc, address)
418-
uc.mem_write(address, struct.pack("<I", result & 0xFFFFFFFF)[:size])
419-
return True
420-
421-
elif _SYSTEM_REG_START <= address < _SYSTEM_REG_END:
422-
# System registers -> composite handler routes to CortexMSystemRegisters
412+
try:
423413
result = self._mmio_handler.read(address, size)
424-
self._map_page_if_needed(uc, address)
425-
uc.mem_write(address, struct.pack("<I", result & 0xFFFFFFFF)[:size])
426-
return True
427-
428-
else:
429-
# Non-peripheral unmapped access -> crash
414+
except InputExhausted:
430415
self._stopped = True
431-
self._stop_reason = "unmapped_access"
432-
self._stop_reason_enum = StopReason.UNMAPPED_ACCESS
433-
self._crash_address = address
434-
self._crash_type = f"unmapped_read at 0x{address:08X}"
435-
logger.debug("Unmapped non-peripheral read at 0x%08X", address)
416+
self._stop_reason = "input_exhausted"
417+
self._stop_reason_enum = StopReason.INPUT_EXHAUSTED
436418
uc.emu_stop()
437-
return False
419+
return
420+
# Write the PIP/SVD result into mapped memory so the CPU reads it
421+
uc.mem_write(address, struct.pack("<I", result & 0xFFFFFFFF)[:size])
438422

439-
def _hook_mem_write_unmapped(self, uc, access, address, size, value, user_data):
440-
"""Handle unmapped memory writes with PIP routing."""
441-
if _PERIPH_START <= address < _PERIPH_END:
442-
# Peripheral MMIO write
443-
try:
444-
self._mmio_handler.write(address, value, size)
445-
except InputExhausted:
446-
self._stopped = True
447-
self._stop_reason = "input_exhausted"
448-
self._stop_reason_enum = StopReason.INPUT_EXHAUSTED
449-
uc.emu_stop()
450-
return False
451-
self._map_page_if_needed(uc, address)
452-
return True
453-
454-
elif _SYSTEM_REG_START <= address < _SYSTEM_REG_END:
423+
def _hook_periph_write(self, uc, access, address, size, value, user_data):
424+
"""Intercept EVERY write in the peripheral MMIO range."""
425+
try:
455426
self._mmio_handler.write(address, value, size)
456-
self._map_page_if_needed(uc, address)
457-
return True
458-
459-
else:
460-
# Non-peripheral unmapped access -> crash
427+
except InputExhausted:
461428
self._stopped = True
462-
self._stop_reason = "unmapped_access"
463-
self._stop_reason_enum = StopReason.UNMAPPED_ACCESS
464-
self._crash_address = address
465-
self._crash_type = f"unmapped_write at 0x{address:08X}"
466-
logger.debug("Unmapped non-peripheral write at 0x%08X", address)
429+
self._stop_reason = "input_exhausted"
430+
self._stop_reason_enum = StopReason.INPUT_EXHAUSTED
467431
uc.emu_stop()
468-
return False
432+
433+
def _hook_sysreg_read(self, uc, access, address, size, value, user_data):
434+
"""Intercept reads in the system register range (0xE0000000+)."""
435+
result = self._mmio_handler.read(address, size)
436+
uc.mem_write(address, struct.pack("<I", result & 0xFFFFFFFF)[:size])
437+
438+
def _hook_sysreg_write(self, uc, access, address, size, value, user_data):
439+
"""Intercept writes in the system register range."""
440+
self._mmio_handler.write(address, value, size)
441+
442+
def _hook_unmapped_access(self, uc, access, address, size, value, user_data):
443+
"""Handle unmapped access outside peripheral/system ranges = crash."""
444+
is_write = access in (2, 3, 4, 5) # UC_MEM_WRITE variants
445+
self._stopped = True
446+
self._stop_reason = "unmapped_access"
447+
self._stop_reason_enum = StopReason.UNMAPPED_ACCESS
448+
self._crash_address = address
449+
self._crash_type = f"unmapped_{'write' if is_write else 'read'} at 0x{address:08X}"
450+
logger.debug("Unmapped access at 0x%08X (crash)", address)
451+
uc.emu_stop()
452+
return False
469453

470454
# ------------------------------------------------------------------
471455
# Block hook (coverage + interrupts)

rtosploit/utils/binary.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,9 +225,12 @@ def load_elf(path: Path) -> FirmwareImage:
225225
base_address = addr
226226

227227
for section in elf.iter_sections():
228-
if section.header.sh_addr == 0:
229-
continue
230228
flags = section.header.sh_flags
229+
# Skip sections without ALLOC flag (debug, metadata, comments)
230+
# Do NOT skip address-0 sections — Cortex-M vector table lives there
231+
if not (flags & 0x2): # SHF_ALLOC = 0x2
232+
if section.header.sh_addr == 0:
233+
continue
231234
perms = ""
232235
if flags & 0x4:
233236
perms += "r"

tests/unit/test_unicorn_engine.py

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -96,37 +96,35 @@ def test_setup_loads_firmware_sections(self) -> None:
9696
assert engine.execution_count == 0
9797

9898
def test_mmio_read_routes_to_handler(self) -> None:
99-
"""Unmapped MMIO reads are routed to the CompositeMMIOHandler."""
99+
"""Peripheral MMIO reads are routed to the CompositeMMIOHandler."""
100100
from rtosploit.peripherals.models.mmio_fallback import CompositeMMIOHandler
101101

102102
handler = CompositeMMIOHandler()
103103
fw = _make_firmware(with_sections=False)
104104
engine = UnicornRehostEngine(fw, mmio_handler=handler)
105105
engine.setup()
106106

107-
# Directly invoke the hook to verify routing
108-
# Peripheral region (0x40000000+) is unmapped on purpose
109-
uc_mock = MagicMock()
110-
result = engine._hook_mem_read_unmapped(
107+
# Directly invoke the peripheral read hook
108+
uc_mock = engine._uc # Use the real Unicorn instance
109+
engine._hook_periph_read(
111110
uc_mock, None, 0x40000000, 4, 0, None,
112111
)
113-
assert result is True
114112
# The handler should have been called (fallback returns 1 for first read)
113+
assert handler.get_coverage_stats()["total"] > 0
115114

116115
def test_mmio_write_routes_to_handler(self) -> None:
117-
"""Unmapped MMIO writes are routed to the CompositeMMIOHandler."""
116+
"""Peripheral MMIO writes are routed to the CompositeMMIOHandler."""
118117
from rtosploit.peripherals.models.mmio_fallback import CompositeMMIOHandler
119118

120119
handler = CompositeMMIOHandler()
121120
fw = _make_firmware(with_sections=False)
122121
engine = UnicornRehostEngine(fw, mmio_handler=handler)
123122
engine.setup()
124123

125-
uc_mock = MagicMock()
126-
result = engine._hook_mem_write_unmapped(
127-
uc_mock, None, 0x40000000, 4, 0xDEADBEEF, None,
124+
engine._hook_periph_write(
125+
engine._uc, None, 0x40000000, 4, 0xDEADBEEF, None,
128126
)
129-
assert result is True
127+
assert handler.get_coverage_stats()["total"] > 0
130128

131129
def test_hal_hook_registration(self) -> None:
132130
"""add_hal_hook stores the handler for the given address."""

tests/unit/test_unicorn_pip_integration.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -226,14 +226,14 @@ def test_unmapped_non_peripheral_access_is_crash(self) -> None:
226226
engine = UnicornRehostEngine(fw, mmio_handler=handler)
227227
engine.setup()
228228

229-
# Directly test the hook with a non-peripheral address
229+
# Directly test the unmapped access hook with a non-peripheral address
230230
uc_mock = MagicMock()
231-
result = engine._hook_mem_read_unmapped(
232-
uc_mock, None, 0x00000004, 4, 0, None,
231+
result = engine._hook_unmapped_access(
232+
uc_mock, 0, 0x70000004, 4, 0, None, # Address outside all mapped regions
233233
)
234234
assert result is False
235235
assert engine._stop_reason_enum == StopReason.UNMAPPED_ACCESS
236-
assert engine._crash_address == 0x00000004
236+
assert engine._crash_address == 0x70000004
237237

238238
def test_peripheral_read_routes_through_pip(self) -> None:
239239
"""Peripheral MMIO reads route through PIP when configured."""
@@ -245,12 +245,10 @@ def test_peripheral_read_routes_through_pip(self) -> None:
245245
# Set up fuzz input with enough data for PIP
246246
engine.set_fuzz_input(b"\xFF" * 64)
247247

248-
# Directly test hook with a peripheral address
249-
uc_mock = MagicMock()
250-
result = engine._hook_mem_read_unmapped(
251-
uc_mock, None, 0x40000100, 4, 0, None,
248+
# Directly test peripheral read hook
249+
engine._hook_periph_read(
250+
engine._uc, None, 0x40000100, 4, 0, None,
252251
)
253-
assert result is True
254252
# PIP should have been invoked (pip_handled counter incremented)
255253
stats = handler.get_coverage_stats()
256254
assert stats["pip_handled"] > 0

0 commit comments

Comments
 (0)