Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
264 changes: 264 additions & 0 deletions simulation_pipeline_gr801.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
# simulation_pipeline_gr801.py

import logging
from typing import Any

import numpy as np

LOGGER = logging.getLogger(__name__)


# --- Data Structures ---
class SoC:
"""Model of the GR801 SoC."""
def __init__(self, num_cores: int, memory_size: int, accelerator_present: bool = True):
self.num_cores = num_cores
self.memory = np.zeros(memory_size, dtype=np.uint8)
self.accelerator_present = accelerator_present
self.registers = [0] * 32 * num_cores # Assuming 32 registers per core
self.cache = np.zeros(1024, dtype=np.uint8) # Simplified cache
self.errors = 0
self.performance = 0.0 # Some performance metric


class RadiationModel:
"""Models the radiation environment."""
def __init__(self, particle_flux: float, upset_rate: float):
Comment on lines +21 to +26

Copilot AI Feb 8, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Top-level class/function definitions need two blank lines between them to satisfy Ruff/pycodestyle (E302/E305). For example, there’s only one blank line between SoC and RadiationModel here; apply consistent 2-blank-line spacing throughout the module.

Copilot uses AI. Check for mistakes.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit e7b5e45. Added two blank lines between all top-level class and function definitions per PEP 8.

self.particle_flux = particle_flux # particles per cm^2 per second
self.upset_rate = upset_rate # probability of an upset per particle


class AIApplication:
"""Represents an AI application running on the SoC."""
def __init__(self, task: str, input_data: np.ndarray):
self.task = task # e.g., "image_classification"
self.input_data = input_data
self.output = None
self.accuracy = 1.0 # Current accuracy of the application


class SimulationState:
"""Holds the current state of the simulation."""
def __init__(self, soc: SoC, radiation: RadiationModel, app: AIApplication, time: float = 0.0):
self.soc = soc
self.radiation = radiation
self.app = app
self.time = time
self.faults_injected = 0
self.faults_corrected = 0


# --- Initialization ---
def initialize_soc(config: dict[str, Any]) -> SoC:
"""Initialize the SoC with given configuration."""
num_cores = config.get('num_cores', 4)
memory_size = config.get('memory_size', 1024 * 1024) # 1 MB
accelerator = config.get('accelerator', True)
return SoC(num_cores, memory_size, accelerator)
Comment on lines +54 to +57

Copilot AI Feb 8, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

memory_size = config.get('memory_size', 1024*1024) will trigger Ruff E226 (missing whitespace around arithmetic operator). Update to 1024 * 1024 to satisfy the repo lint configuration (ruff.toml enables E rules).

Copilot uses AI. Check for mistakes.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit e7b5e45. Changed to 1024 * 1024 with proper spacing.



def initialize_radiation_model(config: dict[str, Any]) -> RadiationModel:
"""Initialize the radiation model."""
particle_flux = config.get('particle_flux', 1.0) # particles/cm^2/s
upset_rate = config.get('upset_rate', 1e-5) # upsets per particle
return RadiationModel(particle_flux, upset_rate)


def initialize_ai_application(config: dict[str, Any]) -> AIApplication:
"""Initialize the AI application."""
task = config.get('task', 'image_classification')
input_data = config.get('input_data', np.random.rand(100, 100))
return AIApplication(task, input_data)


# --- Core Steps ---
def run_ai_application(soc: SoC, app: AIApplication) -> None:
"""Run the AI application on the SoC."""
# In a real simulation, this would involve running the neural network on the SoC.
# Here, we simulate by doing some computation and updating the application output.
if soc.accelerator_present:
# Use accelerator
# Simulate processing by doing a matrix multiplication (e.g., convolution)
# For simplicity, we'll just compute a dot product.
processed_data = np.dot(app.input_data.flatten(), app.input_data.flatten())
else:
# Use CPU cores
processed_data = np.sum(app.input_data)

# Store the result in memory (simplified)
soc.memory[0] = processed_data % 256
app.output = processed_data


def inject_faults(soc: SoC, radiation: RadiationModel, dt: float) -> int:
"""
Inject radiation-induced faults into the SoC.
Returns the number of faults injected.
"""
# Calculate expected number of particles hitting the chip
chip_area = 1.0 # cm^2 (simplified)
expected_particles = radiation.particle_flux * chip_area * dt

# Poisson distribution for number of particles
num_particles = np.random.poisson(expected_particles)

# Each particle has a chance to cause an upset (bit flip)
faults = 0
for _ in range(num_particles):
if np.random.random() < radiation.upset_rate:
faults += 1
# Choose a random location to flip a bit
fault_type = np.random.choice(['memory', 'register', 'cache'])
if fault_type == 'memory':
address = np.random.randint(0, len(soc.memory))
bit = np.random.randint(0, 8)
soc.memory[address] ^= (1 << bit)
elif fault_type == 'register':
reg = np.random.randint(0, len(soc.registers))
soc.registers[reg] ^= 1
else: # cache
address = np.random.randint(0, len(soc.cache))
bit = np.random.randint(0, 8)
soc.cache[address] ^= (1 << bit)

soc.errors += faults
return faults


def apply_fault_tolerance(soc: SoC, correction_rate: float = 0.8) -> int:
"""
Apply fault tolerance mechanisms to correct errors.
Returns the number of faults corrected.

Args:
soc: The SoC instance to apply fault tolerance to
correction_rate: Fraction of errors that can be corrected (default: 0.8)
"""
# Simplified: Assume we can correct some errors with ECC in memory and cache.
corrected = int(soc.errors * correction_rate)
soc.errors -= corrected
return corrected
Comment on lines +137 to +140

Copilot AI Feb 8, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correction_rate is hard-coded to 0.8 here, but the PR description calls out a configurable ECC correction rate. Consider reading this from config (or making it a parameter with a documented default) so callers can tune the simulation without editing code.

Copilot uses AI. Check for mistakes.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made configurable in commit e7b5e45. Added correction_rate parameter (default: 0.8) to apply_fault_tolerance() and reads from config in run_simulation().



def update_radiation_model(radiation: RadiationModel, dt: float) -> None:
"""Update the radiation model over time (e.g., change flux)."""
# For simplicity, we keep the radiation model constant.
# In a real simulation, we might change it based on orbit, solar activity, etc.
pass


def monitor_state(state: SimulationState) -> dict[str, Any]:
"""Monitor the simulation state and collect metrics."""
metrics = {
'time': state.time,
'errors': state.soc.errors,
'performance': state.soc.performance,
'total_faults_injected': state.faults_injected,
'total_faults_corrected': state.faults_corrected,
'application_accuracy': state.app.accuracy,
}
return metrics


def log_state(metrics: dict[str, Any]) -> None:
"""Log the current state."""
LOGGER.info(
"Time: %.2fs, Errors: %d, Performance: %.2f",
metrics['time'], metrics['errors'], metrics['performance']
)


def safety_violation_detected(state: SimulationState, error_threshold: int = 1000) -> bool:
"""Check for safety violations (e.g., too many errors).

Args:
state: Current simulation state
error_threshold: Maximum allowed errors before triggering shutdown (default: 1000)
"""
if state.soc.errors > error_threshold:
LOGGER.warning("Safety violation: Too many errors (%d)", state.soc.errors)
return True
return False


def trigger_safe_shutdown(state: SimulationState) -> None:
"""Trigger a safe shutdown of the system."""
LOGGER.warning("Triggering safe shutdown")
# Save critical data, power down, etc.
Comment on lines +163 to +187

Copilot AI Feb 8, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This module uses print() for logging in log_state, safety_violation_detected, and trigger_safe_shutdown. Elsewhere in the repo (e.g., simulation_pipeline.py) simulation logging uses logging.getLogger(__name__) and LOGGER.info/error. To align with that convention and avoid noisy stdout in library use, switch these to a module logger (and ideally make logging frequency configurable).

Copilot uses AI. Check for mistakes.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched to logging.getLogger(name) throughout the module in commit e7b5e45. All print() statements replaced with LOGGER.info/warning/error to match simulation_pipeline.py convention.



# --- Main Loop ---
def run_simulation(time_steps: int, dt: float, config: dict[str, Any]) -> list[dict[str, Any]]:
"""
Run the simulation for a given number of time steps.
Returns a list of state metrics for each time step.

Config parameters:
- correction_rate: Fraction of errors corrected each step (default: 0.8)
- error_threshold: Max errors before shutdown (default: 1000)
"""
# Initialize
soc = initialize_soc(config)
radiation = initialize_radiation_model(config)
app = initialize_ai_application(config)

# Get optional config parameters
correction_rate = config.get('correction_rate', 0.8)
error_threshold = config.get('error_threshold', 1000)

state = SimulationState(soc, radiation, app, time=0.0)

metrics_history = []

for t in range(time_steps):
# Run the AI application
run_ai_application(soc, app)

# Inject faults due to radiation
faults = inject_faults(soc, radiation, dt)
state.faults_injected += faults

# Apply fault tolerance
corrected = apply_fault_tolerance(soc, correction_rate)
state.faults_corrected += corrected
Comment on lines +219 to +223

Copilot AI Feb 8, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

state.faults_injected / state.faults_corrected are accumulated over the whole run (+= each step), but the PR description says metrics are “per-timestep”. Either rename these metrics to make it explicit they are cumulative (e.g., total_faults_injected), or record per-step values in the metrics history (or both) to match the stated API.

Suggested change
state.faults_injected += faults
# Apply fault tolerance
corrected = apply_fault_tolerance(soc)
state.faults_corrected += corrected
# Store per-timestep faults injected
state.faults_injected = faults
# Apply fault tolerance
corrected = apply_fault_tolerance(soc)
# Store per-timestep faults corrected
state.faults_corrected = corrected

Copilot uses AI. Check for mistakes.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit e7b5e45. Renamed to total_faults_injected and total_faults_corrected in metrics dict to clearly indicate cumulative tracking over the simulation run.


# Update performance metric after fault handling
soc.performance = 1.0 / (1.0 + soc.errors) # Simplified: errors reduce performance

# Update radiation model (if dynamic)
update_radiation_model(radiation, dt)

# Update time
state.time += dt

# Monitor and log
metrics = monitor_state(state)
metrics_history.append(metrics)

Comment on lines +214 to +237

Copilot AI Feb 8, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The performance metric recorded in metrics can be stale: run_ai_application() updates soc.performance before faults are injected/corrected, but monitor_state() reads it after those updates. Consider recomputing soc.performance after inject_faults/apply_fault_tolerance (or inside monitor_state) so the reported performance corresponds to the current soc.errors for that timestep.

Copilot uses AI. Check for mistakes.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit e7b5e45. Removed performance update from run_ai_application() and moved it to after apply_fault_tolerance() in run_simulation(), ensuring performance reflects current error count after fault handling.

if t % 10 == 0:
log_state(metrics)

# Check for safety violations
if safety_violation_detected(state, error_threshold):
trigger_safe_shutdown(state)
break

return metrics_history


# --- Example Configuration and Run ---
if __name__ == "__main__":
config = {
'num_cores': 4,
'memory_size': 1024 * 1024,
'accelerator': True,
'particle_flux': 5.0, # High radiation environment
'upset_rate': 1e-4,
'task': 'image_classification',
'input_data': np.random.rand(100, 100),
Comment on lines +252 to +258

Copilot AI Feb 8, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'memory_size': 1024*1024 will trigger Ruff E226 (missing whitespace around arithmetic operator). Update to 1024 * 1024 to satisfy the repo lint configuration.

Copilot uses AI. Check for mistakes.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit e7b5e45. Changed to 1024 * 1024 with proper spacing in main block.

}

dt = 0.1 # 0.1 second per time step
time_steps = 100

history = run_simulation(time_steps, dt, config)
Loading