From e999f811ade69e293321303d56f60320f8d1bd67 Mon Sep 17 00:00:00 2001 From: danielnovais-tech Date: Sat, 7 Feb 2026 23:03:58 -0300 Subject: [PATCH] Add GR801 simulation framework scaffold --- gr801_simulation_framework.py | 939 +++++++++++++++++++++++ tests/test_gr801_simulation_framework.py | 33 + 2 files changed, 972 insertions(+) create mode 100644 gr801_simulation_framework.py create mode 100644 tests/test_gr801_simulation_framework.py diff --git a/gr801_simulation_framework.py b/gr801_simulation_framework.py new file mode 100644 index 0000000..25bd85c --- /dev/null +++ b/gr801_simulation_framework.py @@ -0,0 +1,939 @@ +"""GR801 Radiation-Hardened AI SoC Simulation Framework. + +Gaisler Research Artificial Intelligence NOEL-V (GRAIN) Product Line +For space-based AI applications in harsh radiation environments. + +Note: This is a *simulation scaffold* intended to be lightweight and safe to import. +It avoids allocating multi-GB arrays or iterating per-CPU-cycle in Python. +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass +from datetime import datetime +from enum import Enum, IntEnum +from typing import Any + +import numpy as np + +logger = logging.getLogger("GR801_Simulator") + + +class GR801Config: + """GR801 SoC Configuration Parameters.""" + + # Core Configuration + NUM_CORES = 4 # NOEL-V RISC-V cores + CORE_FREQUENCY = 100e6 # 100 MHz (radiation-hardened) + CACHE_LINE_SIZE = 64 # bytes + L1_CACHE_SIZE = 32 * 1024 # 32 KB per core + L2_CACHE_SIZE = 256 * 1024 # 256 KB shared + + # AI Accelerator + AI_ACCELERATOR_PRESENT = True + NEURAL_ENGINE_OPS = 8 # TOPS (Tera Operations Per Second) + MATRIX_UNITS = 64 + VECTOR_WIDTH = 512 # bits + + # Memory Configuration + DDR4_ECC_SIZE = 4 * 1024**3 # 4 GB address space with ECC + SCRUB_RATE = 1e6 # scrubs per second + + # Radiation Hardening + SEU_THRESHOLD = 37 # MeV-cm²/mg (LET threshold) + TID_TOLERANCE = 300 # krad(Si) + LATCHUP_IMMUNE = True + + # Power Management + NOMINAL_POWER = 15 # Watts + LOW_POWER_MODE = 5 # Watts + PEAK_POWER = 25 # Watts + + # Thermal Limits + MAX_TEMP = 125 # °C + MIN_TEMP = -55 # °C + OP_TEMP_RANGE = (-40, 100) # °C + + +class RadiationEnvironment(Enum): + """Space radiation environments.""" + + LEO = "low_earth_orbit" + GEO = "geostationary_orbit" + DEEP_SPACE = "deep_space" + SOLAR_MAX = "solar_maximum" + JUPITER = "jupiter_radiation_belts" + + +@dataclass(frozen=True) +class RadiationFlux: + """Models radiation flux in a space environment.""" + + proton_flux: float # protons/cm²/s + heavy_ion_flux: float # ions/cm²/s + electron_flux: float # electrons/cm²/s + total_ionizing_dose_rate: float # rad(Si)/s + let_spectrum: dict[float, float] # LET vs flux + + @classmethod + def for_environment(cls, environment: RadiationEnvironment) -> RadiationFlux: + """Get radiation flux for specific space environment.""" + + fluxes: dict[RadiationEnvironment, RadiationFlux] = { + RadiationEnvironment.LEO: cls( + proton_flux=1e4, + heavy_ion_flux=1e2, + electron_flux=1e5, + total_ionizing_dose_rate=1e-3, + let_spectrum={0.1: 1e5, 1.0: 1e4, 10.0: 1e3, 100.0: 1e2}, + ), + RadiationEnvironment.GEO: cls( + proton_flux=1e5, + heavy_ion_flux=1e3, + electron_flux=1e6, + total_ionizing_dose_rate=1e-2, + let_spectrum={0.1: 1e6, 1.0: 1e5, 10.0: 1e4, 100.0: 1e3}, + ), + RadiationEnvironment.JUPITER: cls( + proton_flux=1e8, + heavy_ion_flux=1e6, + electron_flux=1e9, + total_ionizing_dose_rate=1.0, + let_spectrum={0.1: 1e9, 1.0: 1e8, 10.0: 1e7, 100.0: 1e6}, + ), + } + return fluxes.get(environment, fluxes[RadiationEnvironment.LEO]) + + +class SEUType(IntEnum): + """Types of Single Event Effects.""" + + SEU = 1 # Single Event Upset (bit flip) + SET = 2 # Single Event Transient + SEFI = 3 # Single Event Functional Interrupt + SEL = 4 # Single Event Latchup (mitigated in GR801) + SEB = 5 # Single Event Burnout + + +@dataclass +class SEUEvent: + """Single Event Upset event data.""" + + timestamp: float + location: tuple[int, int, int] # (core, memory_type, address) + bit_position: int + particle_let: float # MeV-cm²/mg + effect_type: SEUType + corrected: bool = False + critical: bool = False + + +class GR801Core: + """NOEL-V RISC-V radiation-hardened core model (lightweight).""" + + def __init__(self, core_id: int, frequency: float = GR801Config.CORE_FREQUENCY): + self.core_id = int(core_id) + self.frequency = float(frequency) + self.power_state = "ACTIVE" + self.temperature = 25.0 # °C + self.error_count = 0 + self.corrected_errors = 0 + + # Register file (32 registers) + self.registers = np.zeros(32, dtype=np.uint32) + + # Cache with ECC protection (modeled, not fully implemented) + self.cache_data = np.zeros(GR801Config.L1_CACHE_SIZE, dtype=np.uint8) + self.cache_ecc = np.zeros(GR801Config.L1_CACHE_SIZE // 8, dtype=np.uint8) + self.cache_tags = np.zeros(GR801Config.L1_CACHE_SIZE // 64, dtype=np.uint64) + + # Performance counters + self.instructions_executed = 0 + self.cache_hits = 0 + self.cache_misses = 0 + + # RNG (seedable per instance if needed) + self._rng = np.random.default_rng() + + def execute_cycle(self, radiation_flux: RadiationFlux | None = None, *, cycles: int = 1): + """Execute one or more CPU cycles with potential SEU injection. + + This method batches work to avoid per-cycle Python loops. + """ + + cycles_i = int(max(1, cycles)) + self.instructions_executed += cycles_i + + if radiation_flux is not None: + self._simulate_radiation_effects(radiation_flux, cycles=cycles_i) + + self._update_temperature(cycles=cycles_i) + + def _simulate_radiation_effects(self, flux: RadiationFlux, *, cycles: int): + """Simulate radiation effects on this core.""" + + # Simplified cross-section model: expected upsets scale with cycles. + # Keep lambda small; this is a toy model. + seu_probability_per_cycle = flux.heavy_ion_flux * 1e-10 + expected_events = float(cycles) * float(seu_probability_per_cycle) + num_events = int(self._rng.poisson(lam=max(0.0, expected_events))) + + for _ in range(num_events): + location_type = self._rng.choice(["REGISTER", "CACHE", "CONTROL"]) + self._inject_seu(str(location_type), flux) + + def _inject_seu(self, location: str, flux: RadiationFlux): + """Inject a simulated SEU at specified location.""" + + bit_to_flip = int(self._rng.integers(0, 32)) + + if location == "REGISTER": + reg_idx = int(self._rng.integers(0, 32)) + bit_mask = np.uint32(1) << np.uint32(bit_to_flip) + self.registers[reg_idx] ^= bit_mask + self.error_count += 1 + return + + if location == "CACHE": + addr = int(self._rng.integers(0, len(self.cache_data))) + bit_mask = np.uint8(1) << np.uint8(bit_to_flip % 8) + self.cache_data[addr] ^= bit_mask + self.error_count += 1 + + if self._ecc_can_correct(addr): + self.corrected_errors += 1 + self.cache_data[addr] ^= bit_mask + return + + # CONTROL / other: count as a (potentially) critical event. + self.error_count += 1 + _ = flux + + def _ecc_can_correct(self, address: int) -> bool: + """Check if ECC can correct the error (simplified).""" + + _ = address + return bool(self._rng.random() < 0.95) + + def _update_temperature(self, *, cycles: int): + """Update core temperature based on activity.""" + + # Scale heating with cycles. + base_temp_increase = 0.001 if self.power_state == "ACTIVE" else 0.0001 + self.temperature += base_temp_increase * float(cycles) + + # Simple cooling model. + cooling_rate = 0.0005 * float(cycles) + self.temperature -= cooling_rate * (self.temperature - 25.0) + + self.temperature = float(np.clip(self.temperature, -55, 125)) + + +class AIAccelerator: + """GR801 Neural Network Accelerator Model (approximate).""" + + def __init__(self): + self.active = True + self.temperature = 25.0 + self.power_consumption = 0.0 + self.throughput = float(GR801Config.NEURAL_ENGINE_OPS) * 1e12 # TOPS to OPS + + self.matrix_units = np.zeros( + (GR801Config.MATRIX_UNITS, GR801Config.VECTOR_WIDTH // 32), + dtype=np.float32, + ) + + # Keep these lightweight; treat as capacity rather than fully modeled ECC. + self.weight_memory_size = 16 * 1024 * 1024 + self.activation_memory_size = 4 * 1024 * 1024 + + self.operations_completed = 0 + self.mac_utilization = 0.0 + self.error_rate = 0.0 + + self._rng = np.random.default_rng() + + def execute_inference(self, model_ops: int, input_data: np.ndarray) -> np.ndarray: + """Execute neural network inference.""" + + if not self.active: + raise RuntimeError("AI Accelerator not active") + + model_ops_i = int(model_ops) + _execution_time = float(model_ops_i) / self.throughput + + self.power_consumption = GR801Config.NOMINAL_POWER * 1.5 + + result = self._simulate_matrix_operations(input_data) + + self.operations_completed += model_ops_i + self.mac_utilization = float(self._rng.uniform(0.7, 0.95)) + + self._update_temperature() + return result + + def _simulate_matrix_operations(self, input_data: np.ndarray) -> np.ndarray: + """Simulate matrix operations without allocating huge weight matrices.""" + + if input_data.ndim < 2: + raise ValueError("input_data must include a batch dimension") + + batch_size = int(input_data.shape[0]) + output_size = 1000 + + flat = input_data.reshape(batch_size, -1).astype(np.float32, copy=False) + + # Downsample features to keep this fast. + max_features = 1024 + if flat.shape[1] > max_features: + stride = max(1, flat.shape[1] // max_features) + flat = flat[:, ::stride][:, :max_features] + + # Deterministic-ish random projection per instance. + weights = self._rng.standard_normal((flat.shape[1], output_size), dtype=np.float32) + result = flat @ weights + return np.tanh(result).astype(np.float32, copy=False) + + def _update_temperature(self): + """Update accelerator temperature.""" + + temp_increase = float(self.power_consumption) * 0.1 + self.temperature += temp_increase + + cooling_rate = 0.05 + self.temperature -= cooling_rate * (self.temperature - 25.0) + + self.temperature = float(np.clip(self.temperature, -55, 125)) + + +class _PagedMemory: + """Sparse paged byte-addressable memory to avoid multi-GB allocations.""" + + def __init__(self, size: int, *, page_size: int = 4096): + self.size = int(size) + self.page_size = int(page_size) + if self.size <= 0: + raise ValueError("size must be positive") + if self.page_size <= 0: + raise ValueError("page_size must be positive") + + self._pages: dict[int, np.ndarray] = {} + + def read(self, address: int, size: int) -> np.ndarray: + address_i = int(address) + size_i = int(size) + if address_i < 0 or size_i < 0 or address_i + size_i > self.size: + raise ValueError("Memory access out of bounds") + + out = np.zeros(size_i, dtype=np.uint8) + if size_i == 0: + return out + + start = address_i + end = address_i + size_i + page_size = self.page_size + + pos = 0 + while start < end: + page_index = start // page_size + page_offset = start % page_size + take = min(end - start, page_size - page_offset) + + page = self._pages.get(page_index) + if page is not None: + out[pos : pos + take] = page[page_offset : page_offset + take] + + pos += take + start += take + + return out + + def write(self, address: int, data: np.ndarray): + address_i = int(address) + data_u8 = np.asarray(data, dtype=np.uint8) + size_i = int(data_u8.size) + + if address_i < 0 or address_i + size_i > self.size: + raise ValueError("Memory access out of bounds") + if size_i == 0: + return + + start = address_i + end = address_i + size_i + page_size = self.page_size + + pos = 0 + while start < end: + page_index = start // page_size + page_offset = start % page_size + take = min(end - start, page_size - page_offset) + + page = self._pages.get(page_index) + if page is None: + page = np.zeros(page_size, dtype=np.uint8) + self._pages[page_index] = page + + page[page_offset : page_offset + take] = data_u8[pos : pos + take] + + pos += take + start += take + + @property + def allocated_pages(self) -> int: + return len(self._pages) + + +class MemoryController: + """ECC-protected DDR4 Memory Controller (sparse address-space model).""" + + def __init__(self, size: int = GR801Config.DDR4_ECC_SIZE, *, page_size: int = 4096): + self.size = int(size) + self._mem = _PagedMemory(self.size, page_size=page_size) + + self.scrub_pointer = 0 + self.scrub_rate = float(GR801Config.SCRUB_RATE) + self.errors_detected = 0 + self.errors_corrected = 0 + + self.seu_events: list[SEUEvent] = [] + self._rng = np.random.default_rng() + + def read(self, address: int, size: int) -> np.ndarray: + """Read memory with ECC checking (simplified).""" + + data = self._mem.read(address, size) + + # Simulate occasional ECC detection/correction. We don't store ECC bits; + # we simulate the outcomes and counters. + for i in range(0, int(size), 8): + chunk_addr = int(address) + i + if self._check_ecc(chunk_addr): + if self._correct_ecc_error(chunk_addr): + self.errors_corrected += 1 + else: + self.errors_detected += 1 + self._log_memory_error(chunk_addr, "UNCORRECTABLE") + + return data + + def write(self, address: int, data: np.ndarray): + """Write memory (ECC generation is modeled, not stored).""" + + self._mem.write(address, data) + + def scrub_cycle(self, radiation_flux: RadiationFlux | None = None, *, cycles: int = 1): + """Execute one or more scrubbing cycles.""" + + cycles_i = int(max(1, cycles)) + chunk_size = 64 + + # Advance pointer efficiently. + self.scrub_pointer = int((self.scrub_pointer + chunk_size * cycles_i) % self.size) + + if radiation_flux is not None: + self._simulate_radiation_errors(radiation_flux, cycles=cycles_i) + + def _check_ecc(self, address: int) -> bool: + _ = address + return bool(self._rng.random() < 0.001) + + def _correct_ecc_error(self, address: int) -> bool: + _ = address + return bool(self._rng.random() < 0.999) + + def _simulate_radiation_errors(self, flux: RadiationFlux, *, cycles: int): + """Simulate radiation-induced memory errors.""" + + error_probability_per_cycle = flux.heavy_ion_flux * 1e-12 + expected = float(cycles) * float(error_probability_per_cycle) + num_events = int(self._rng.poisson(lam=max(0.0, expected))) + + for _ in range(num_events): + error_addr = int(self._rng.integers(0, self.size)) + error_bit = int(self._rng.integers(0, 8)) + + # Flip the bit in sparse memory: read-modify-write one byte. + byte = self._mem.read(error_addr, 1) + byte[0] ^= np.uint8(1 << error_bit) + self._mem.write(error_addr, byte) + + self.seu_events.append( + SEUEvent( + timestamp=datetime.now().timestamp(), + location=(0, 1, error_addr), + bit_position=error_bit, + particle_let=float(self._rng.uniform(1, 100)), + effect_type=SEUType.SEU, + ) + ) + + def _log_memory_error(self, address: int, error_type: str): + logger.warning("Memory error at 0x%08X: %s", int(address), str(error_type)) + + @property + def allocated_pages(self) -> int: + return self._mem.allocated_pages + + +class AIWorkload(Enum): + """Types of AI workloads for space applications.""" + + IMAGE_CLASSIFICATION = "image_classification" + OBJECT_DETECTION = "object_detection" + ANOMALY_DETECTION = "anomaly_detection" + NAVIGATION = "navigation" + COMMUNICATIONS = "communications" + + +@dataclass(frozen=True) +class NeuralNetworkModel: + """Neural network model for space AI applications.""" + + name: str + input_shape: tuple[int, ...] + output_shape: tuple[int, ...] + operations: int + parameters: int + memory_footprint: int + accuracy: float + radiation_tolerance: float + + @classmethod + def get_preset_model(cls, model_type: AIWorkload) -> NeuralNetworkModel: + models: dict[AIWorkload, NeuralNetworkModel] = { + AIWorkload.IMAGE_CLASSIFICATION: cls( + name="EfficientNet-Space", + input_shape=(224, 224, 3), + output_shape=(1000,), + operations=int(4e9), + parameters=int(20e6), + memory_footprint=80 * 1024 * 1024, + accuracy=0.85, + radiation_tolerance=0.95, + ), + AIWorkload.OBJECT_DETECTION: cls( + name="YOLO-Space", + input_shape=(416, 416, 3), + output_shape=(13, 13, 425), + operations=int(10e9), + parameters=int(60e6), + memory_footprint=240 * 1024 * 1024, + accuracy=0.78, + radiation_tolerance=0.90, + ), + AIWorkload.ANOMALY_DETECTION: cls( + name="AutoEncoder-Space", + input_shape=(100,), + output_shape=(100,), + operations=int(1e6), + parameters=int(1e5), + memory_footprint=400 * 1024, + accuracy=0.92, + radiation_tolerance=0.98, + ), + } + return models.get(model_type, models[AIWorkload.IMAGE_CLASSIFICATION]) + + +class GR801System: + """Complete GR801 SoC System Simulation.""" + + def __init__(self, environment: RadiationEnvironment = RadiationEnvironment.LEO): + self.environment = environment + self.radiation_flux = RadiationFlux.for_environment(environment) + + self.cores = [GR801Core(i) for i in range(GR801Config.NUM_CORES)] + self.ai_accelerator = AIAccelerator() if GR801Config.AI_ACCELERATOR_PRESENT else None + self.memory = MemoryController() + + self.uptime = 0.0 + self.total_power = 0.0 + self.system_temperature = 25.0 + self.total_ionizing_dose = 0.0 + + self.inference_count = 0 + self.total_operations = 0 + self.system_errors = 0 + self.corrected_errors = 0 + + self.current_workload: AIWorkload | None = None + self.current_model: NeuralNetworkModel | None = None + + self.seu_log: list[SEUEvent] = [] + self.last_seu_check = 0.0 + + def execute_timestep(self, dt: float = 1e-3): + """Execute one simulation timestep.""" + + dt_f = float(dt) + if dt_f <= 0: + raise ValueError("dt must be positive") + + self.total_ionizing_dose += float(self.radiation_flux.total_ionizing_dose_rate) * dt_f + + # Batch CPU core work (no per-cycle loops). + cycles = int(max(1, GR801Config.CORE_FREQUENCY * dt_f)) + for core in self.cores: + core.execute_cycle(self.radiation_flux, cycles=cycles) + + # Batch memory scrubbing. + scrub_cycles = int(max(1, GR801Config.SCRUB_RATE * dt_f)) + self.memory.scrub_cycle(self.radiation_flux, cycles=scrub_cycles) + + self.uptime += dt_f + self._update_power_consumption() + self._update_temperature() + self._check_radiation_limits() + + if int(self.uptime * 100) % 10 == 0: + self._log_system_state() + + def run_inference(self, workload: AIWorkload, input_data: np.ndarray) -> dict[str, Any]: + """Run AI inference on the GR801 system.""" + + self.current_workload = workload + self.current_model = NeuralNetworkModel.get_preset_model(workload) + + if not self._system_operational(): + raise RuntimeError("System not operational due to radiation damage") + + model = self.current_model + if model is None: + raise RuntimeError("No model selected") + + if not self._input_shape_matches(input_data, model.input_shape): + raise ValueError(f"Input shape mismatch: expected {model.input_shape} (optionally batched)") + + start_time = self.uptime + + if self.ai_accelerator is not None: + result = self.ai_accelerator.execute_inference(model.operations, input_data) + self.total_power += GR801Config.NOMINAL_POWER * 0.5 + else: + result = self._cpu_inference(input_data) + + execution_time = self.uptime - start_time + accuracy = self._calculate_radiation_accuracy() + + self.inference_count += 1 + self.total_operations += int(model.operations) + + return { + "result": result, + "execution_time": execution_time, + "estimated_accuracy": accuracy, + "power_consumed": self.total_power * max(0.0, execution_time), + "radiation_effects": len([e for e in self.seu_log if not e.corrected]), + } + + def _input_shape_matches(self, input_data: np.ndarray, expected: tuple[int, ...]) -> bool: + if tuple(input_data.shape) == tuple(expected): + return True + if input_data.ndim == len(expected) + 1 and tuple(input_data.shape[1:]) == tuple(expected): + return True + return False + + def _cpu_inference(self, input_data: np.ndarray) -> np.ndarray: + """Fallback CPU-based inference (toy).""" + + _ = input_data + model = self.current_model + if model is None: + raise RuntimeError("No model selected") + + results = [np.random.standard_normal(model.output_shape).astype(np.float32) for _core in self.cores] + return np.mean(results, axis=0) + + def _calculate_radiation_accuracy(self) -> float: + model = self.current_model + if model is None: + return 0.0 + + base_accuracy = float(model.accuracy) + radiation_tolerance = float(model.radiation_tolerance) + + uncorrected_critical = len([e for e in self.seu_log if (not e.corrected and e.critical)]) + error_impact = float(uncorrected_critical) * 0.01 + + tid_impact = min(1.0, float(self.total_ionizing_dose) / float(GR801Config.TID_TOLERANCE * 1000)) + + accuracy = base_accuracy * radiation_tolerance * (1.0 - error_impact) * (1.0 - tid_impact * 0.1) + return float(max(0.0, min(1.0, accuracy))) + + def _system_operational(self) -> bool: + if self.system_temperature > GR801Config.MAX_TEMP: + logger.error("System temperature too high: %s°C", self.system_temperature) + return False + + if self.system_temperature < GR801Config.MIN_TEMP: + logger.error("System temperature too low: %s°C", self.system_temperature) + return False + + if self.total_ionizing_dose > GR801Config.TID_TOLERANCE * 1000: + logger.error("Total ionizing dose exceeded: %s rad", self.total_ionizing_dose) + return False + + critical_errors = len([e for e in self.seu_log if (not e.corrected and e.critical)]) + if critical_errors > 10: + logger.error("Too many critical errors: %s", critical_errors) + return False + + return True + + def _update_power_consumption(self): + base_power = float(GR801Config.NOMINAL_POWER) + + active_cores = sum(1 for core in self.cores if core.power_state == "ACTIVE") + core_power = float(active_cores) * 2.0 + + accelerator_power = 0.0 + if self.ai_accelerator is not None and self.ai_accelerator.active: + accelerator_power = float(self.ai_accelerator.power_consumption) + + memory_power = 1.0 + temp_factor = 1.0 + 0.01 * (self.system_temperature - 25.0) + + self.total_power = (base_power + core_power + accelerator_power + memory_power) * temp_factor + + def _update_temperature(self): + heat_generated = float(self.total_power) * 0.8 + radiation_cooling = 0.1 * (self.system_temperature**4 - 3.0**4) * 5.67e-8 + conduction_cooling = 0.05 * (self.system_temperature - 20.0) + + delta_t = (heat_generated - radiation_cooling - conduction_cooling) * 0.01 + self.system_temperature = float( + np.clip(self.system_temperature + delta_t, GR801Config.MIN_TEMP, GR801Config.MAX_TEMP) + ) + + def _check_radiation_limits(self): + high_let_events = [e for e in self.seu_log if e.particle_let > GR801Config.SEU_THRESHOLD] + + if high_let_events and (self.uptime - self.last_seu_check > 1.0): + logger.warning("High LET particle detected: %s events", len(high_let_events)) + self.last_seu_check = self.uptime + + if len(high_let_events) > 5: + self._activate_radiation_mitigation() + + def _activate_radiation_mitigation(self): + logger.info("Activating radiation mitigation strategies") + + for core in self.cores: + core.frequency *= 0.8 + + self.memory.scrub_rate *= 2.0 + + if self.ai_accelerator is not None: + self.ai_accelerator.power_consumption *= 0.7 + + def _log_system_state(self): + state = { + "uptime": self.uptime, + "temperature": f"{self.system_temperature:.1f}°C", + "power": f"{self.total_power:.1f}W", + "tid": f"{self.total_ionizing_dose:.1f} rad", + "inferences": self.inference_count, + "total_operations": f"{self.total_operations:.1e}", + "errors": self.system_errors, + "corrected_errors": self.corrected_errors, + "seu_rate": len(self.seu_log) / max(1.0, self.uptime), + } + + logger.info("System State: %s", state) + + +class SpaceMissionSimulator: + """Simulates a complete space mission with GR801.""" + + def __init__(self, mission_duration: float = 86400.0): + self.mission_duration = float(mission_duration) + self.simulation_time = 0.0 + self.gr801_system = GR801System(RadiationEnvironment.LEO) + + self.mission_phase = "LAUNCH" + self.mission_success = True + self.science_data_collected = 0.0 + + self.mission_workloads: dict[str, AIWorkload] = { + "LAUNCH": AIWorkload.ANOMALY_DETECTION, + "ORBIT_INSERTION": AIWorkload.NAVIGATION, + "SCIENCE_OPS": AIWorkload.IMAGE_CLASSIFICATION, + "COMMUNICATIONS": AIWorkload.COMMUNICATIONS, + "DEORBIT": AIWorkload.OBJECT_DETECTION, + } + + def run_mission(self): + logger.info("Starting space mission simulation for %.0f seconds", self.mission_duration) + + dt = 0.1 + timesteps = int(max(1, self.mission_duration / dt)) + + for step in range(timesteps): + self.simulation_time += dt + self._update_mission_phase() + + try: + self.gr801_system.execute_timestep(dt) + self._execute_mission_workload() + except Exception as exc: # noqa: BLE001 + logger.error("Mission failure at t=%.1fs: %s", self.simulation_time, exc) + self.mission_success = False + break + + if not self._mission_objectives_met(): + self.mission_success = False + break + + if step % 1000 == 0: + self._mission_status_report() + + self._generate_mission_report() + + def _update_mission_phase(self): + phase_thresholds: dict[str, float] = { + "LAUNCH": 300, + "ORBIT_INSERTION": 1800, + "SCIENCE_OPS": 3600, + "COMMUNICATIONS": 7200, + "DEORBIT": self.mission_duration - 1800, + } + + for phase, threshold in phase_thresholds.items(): + if self.simulation_time <= threshold: + self.mission_phase = phase + break + + def _execute_mission_workload(self): + workload = self.mission_workloads.get(self.mission_phase, AIWorkload.ANOMALY_DETECTION) + + if workload == AIWorkload.IMAGE_CLASSIFICATION: + input_data = np.random.standard_normal((1, 224, 224, 3)).astype(np.float32) + elif workload == AIWorkload.OBJECT_DETECTION: + input_data = np.random.standard_normal((1, 416, 416, 3)).astype(np.float32) + else: + input_data = np.random.standard_normal((1, 100)).astype(np.float32) + + result = self.gr801_system.run_inference(workload, input_data) + + if workload == AIWorkload.IMAGE_CLASSIFICATION: + confidence = float(np.max(result["result"])) + self.science_data_collected += confidence * 10.0 + elif workload == AIWorkload.ANOMALY_DETECTION: + if float(result["estimated_accuracy"]) < 0.8: + logger.warning( + "Low anomaly detection accuracy: %.2f", + float(result["estimated_accuracy"]), + ) + + def _mission_objectives_met(self) -> bool: + if not self.gr801_system._system_operational(): + logger.error("GR801 system failure") + return False + + if self.mission_phase == "SCIENCE_OPS" and self.science_data_collected < 1000: + return False + + return True + + def _mission_status_report(self): + status = { + "mission_time": f"{self.simulation_time:.1f}s", + "mission_phase": self.mission_phase, + "system_operational": self.gr801_system._system_operational(), + "science_data": f"{self.science_data_collected:.0f} units", + "inferences_completed": self.gr801_system.inference_count, + "radiation_dose": f"{self.gr801_system.total_ionizing_dose:.1f} rad", + "temperature": f"{self.gr801_system.system_temperature:.1f}°C", + } + + logger.info("Mission Status: %s", status) + + def _generate_mission_report(self): + report: dict[str, Any] = { + "mission_duration": self.simulation_time, + "mission_success": self.mission_success, + "final_system_state": { + "temperature": self.gr801_system.system_temperature, + "total_ionizing_dose": self.gr801_system.total_ionizing_dose, + "seu_events": len(self.gr801_system.seu_log), + "uncorrected_errors": len([e for e in self.gr801_system.seu_log if not e.corrected]), + "inferences_completed": self.gr801_system.inference_count, + "total_operations": self.gr801_system.total_operations, + }, + "science_return": self.science_data_collected, + "performance_metrics": { + "inference_rate": self.gr801_system.inference_count / max(1.0, self.simulation_time), + "error_rate": self.gr801_system.system_errors + / max(1.0, float(self.gr801_system.total_operations)), + "power_efficiency": self.gr801_system.total_operations + / max(1.0, self.gr801_system.uptime * self.gr801_system.total_power), + }, + } + + logger.info("%s", "=" * 50) + logger.info("MISSION REPORT") + logger.info("%s", "=" * 50) + for key, value in report.items(): + if isinstance(value, dict): + logger.info("%s:", key) + for subkey, subvalue in value.items(): + logger.info(" %s: %s", subkey, subvalue) + else: + logger.info("%s: %s", key, value) + + logger.info("✓ MISSION ACCOMPLISHED" if self.mission_success else "✗ MISSION FAILED") + + +def simulate_gr801_mission() -> SpaceMissionSimulator: + """Example: Simulate a 12-hour space mission with GR801.""" + + simulator = SpaceMissionSimulator(mission_duration=12 * 3600) + simulator.run_mission() + return simulator + + +def test_gr801_radiation_tolerance() -> dict[str, Any]: + """Test GR801 radiation tolerance in different environments (quick toy run).""" + + environments = [RadiationEnvironment.LEO, RadiationEnvironment.GEO, RadiationEnvironment.JUPITER] + + results: dict[str, Any] = {} + for env in environments: + logger.info("Testing GR801 in %s environment", env.value) + system = GR801System(environment=env) + + # 10 seconds simulated time in 100 ms steps. + for _ in range(100): + system.execute_timestep(dt=0.1) + + model = NeuralNetworkModel.get_preset_model(AIWorkload.IMAGE_CLASSIFICATION) + input_data = np.random.standard_normal((1, *model.input_shape)).astype(np.float32) + + result = system.run_inference(AIWorkload.IMAGE_CLASSIFICATION, input_data) + + results[env.value] = { + "final_tid": system.total_ionizing_dose, + "seu_count": len(system.seu_log), + "inference_accuracy": result["estimated_accuracy"], + "system_operational": system._system_operational(), + } + + return results + + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) + + logger.info("GR801 Radiation-Hardened AI SoC Simulation Framework") + logger.info("%s", "=" * 60) + + results = test_gr801_radiation_tolerance() + logger.info("Radiation Tolerance Test Results:") + for env, result in results.items(): + logger.info("%s: %s", env, result) + + logger.info("%s", "=" * 60) + logger.info("Starting Full Space Mission Simulation") + + _mission = simulate_gr801_mission() diff --git a/tests/test_gr801_simulation_framework.py b/tests/test_gr801_simulation_framework.py new file mode 100644 index 0000000..d991edf --- /dev/null +++ b/tests/test_gr801_simulation_framework.py @@ -0,0 +1,33 @@ +import numpy as np + +import gr801_simulation_framework as gr + + +def test_memory_controller_sparse_does_not_allocate_full_space(): + mem = gr.MemoryController(size=gr.GR801Config.DDR4_ECC_SIZE) + assert mem.size == gr.GR801Config.DDR4_ECC_SIZE + assert mem.allocated_pages == 0 + + payload = np.arange(256, dtype=np.uint8) + mem.write(12345, payload) + out = mem.read(12345, payload.size) + + assert np.array_equal(out, payload) + assert mem.allocated_pages > 0 + + +def test_system_inference_accepts_batched_inputs_and_returns_result(): + np.random.seed(0) + + system = gr.GR801System(environment=gr.RadiationEnvironment.LEO) + model = gr.NeuralNetworkModel.get_preset_model(gr.AIWorkload.IMAGE_CLASSIFICATION) + + input_data = np.random.standard_normal((1, *model.input_shape)).astype(np.float32) + result = system.run_inference(gr.AIWorkload.IMAGE_CLASSIFICATION, input_data) + + assert "result" in result + assert "estimated_accuracy" in result + + y = result["result"] + assert isinstance(y, np.ndarray) + assert y.shape[0] == 1