Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,141 changes: 1,115 additions & 26 deletions Cargo.lock

Large diffs are not rendered by default.

18 changes: 18 additions & 0 deletions crates/ziggurat-driver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,26 @@ ziggurat-zigbee.workspace = true

abstract-bits = "0.2.0"
arbitrary-int = "2.1.1"
futures = { version = "0.3", default-features = false }
tracing = "0.1"
parking_lot = "0.12.4"
rand = "0.10.1"
thiserror = "2.0.12"
tokio = { version = "1.43.0", features = ["rt", "macros", "time", "sync", "io-util"] }

# The embassy runtime adapter, host-runnable via arch-std so it can stand in for tokio.
embassy-executor = { version = "0.7", features = [
"arch-std",
"executor-thread",
], optional = true }
embassy-time = { version = "0.4", features = ["std"], optional = true }
embassy-sync = { version = "0.6", optional = true }
spin = { version = "0.9", default-features = false, features = [
"spin_mutex",
], optional = true }

[features]
default = []
# Select the embassy runtime adapter (and its no_std-friendly sync primitives) instead of
# the default tokio one. Mutually overrides tokio at the `sync`/`runtime` seam.
embassy = ["dep:embassy-executor", "dep:embassy-time", "dep:embassy-sync", "dep:spin"]
3 changes: 3 additions & 0 deletions crates/ziggurat-driver/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
pub mod runtime;
pub mod signal;
pub mod sync;
pub mod zigbee_stack;

pub use ziggurat_ieee_802154;
Expand Down
244 changes: 244 additions & 0 deletions crates/ziggurat-driver/src/runtime.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
//! Async runtime abstraction layer.

use core::future::Future;
use core::ops::Add;
use core::pin::Pin;
use core::time::Duration;

/// A detached background task, boxed so one spawn path serves every runtime.
///
/// Tokio drops it into a tracked `JoinSet`; embassy (later) hands it to a static
/// task-pool runner. Our tasks capture only `Arc<ZigbeeStack>` and never hold a
/// `CoreGuard` across an `.await`, so they are genuinely `Send` — no
/// single-threaded-executor `unsafe` is needed.
pub type SpawnedTask = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;

/// Spawns the stack's background tasks.
///
/// A value, not a static method, because embassy spawning needs its `Spawner` token
/// (which tokio's global spawn doesn't) — so the stack is handed one at construction.
/// Reached via[`Runtime::Spawner`].
pub trait Spawn: Send + Sync + 'static {
/// Spawn a detached background task.
fn spawn(&self, task: SpawnedTask);

/// Stop every task spawned through this spawner and wait for them to finish, so a
/// replaced host stack provably stops before its successor runs. A no-op on
/// executors that cannot cancel tasks (embassy).
fn shutdown(&self) -> impl Future<Output = ()> + Send;
}

/// The instant type a [`Runtime`] measures time with. Bounded for exactly the
/// arithmetic the driver performs on deadlines.
pub trait RtInstant: Copy + Send + Sync + 'static + Add<Duration, Output = Self> {
/// Saturating `self - earlier`, never panicking when `earlier` is in the future.
fn saturating_duration_since(self, earlier: Self) -> Duration;
}

impl RtInstant for tokio::time::Instant {
fn saturating_duration_since(self, earlier: Self) -> Duration {
Self::saturating_duration_since(&self, earlier)
}
}

/// A deadline elapsed before the awaited future completed. Replaces
/// `tokio::time::error::Elapsed` so the stack's error type stays runtime-agnostic.
#[derive(Debug, thiserror::Error)]
#[error("deadline elapsed")]
pub struct Elapsed;

/// The async runtime the driver runs on. Implemented by [`TokioRuntime`] for the
/// host server and (later) an embassy runtime for the MCU.
pub trait Runtime: Send + Sync + 'static {
type Instant: RtInstant;

/// Spawns the stack's background tasks; see [`Spawn`].
type Spawner: Spawn;

/// The current monotonic instant.
fn now() -> Self::Instant;

/// Sleep for `duration`.
fn sleep(duration: Duration) -> impl Future<Output = ()> + Send;

/// Sleep until `deadline`.
fn sleep_until(deadline: Self::Instant) -> impl Future<Output = ()> + Send;

/// Run `future`, returning [`Elapsed`] if `duration` passes first.
fn timeout<F>(
duration: Duration,
future: F,
) -> impl Future<Output = Result<F::Output, Elapsed>> + Send
where
F: Future + Send,
F::Output: Send,
{
async move {
let future = core::pin::pin!(future);
let sleep = core::pin::pin!(Self::sleep(duration));
match futures::future::select(future, sleep).await {
futures::future::Either::Left((output, _)) => Ok(output),
futures::future::Either::Right(((), _)) => Err(Elapsed),
}
}
}

/// Run `future`, returning [`Elapsed`] if `deadline` passes first.
fn timeout_at<F>(
deadline: Self::Instant,
future: F,
) -> impl Future<Output = Result<F::Output, Elapsed>> + Send
where
F: Future + Send,
F::Output: Send,
{
async move {
let future = core::pin::pin!(future);
let sleep = core::pin::pin!(Self::sleep_until(deadline));
match futures::future::select(future, sleep).await {
futures::future::Either::Left((output, _)) => Ok(output),
futures::future::Either::Right(((), _)) => Err(Elapsed),
}
}
}
}

/// The tokio runtime: the host server's executor.
#[derive(Debug, Clone, Copy)]
pub struct TokioRuntime;

impl Runtime for TokioRuntime {
type Instant = tokio::time::Instant;
type Spawner = TokioSpawner;

fn now() -> Self::Instant {
tokio::time::Instant::now()
}

fn sleep(duration: Duration) -> impl Future<Output = ()> + Send {
tokio::time::sleep(duration)
}

fn sleep_until(deadline: Self::Instant) -> impl Future<Output = ()> + Send {
tokio::time::sleep_until(deadline)
}
}

/// The tokio spawner: tasks go into a `JoinSet` so a replaced stack can abort them.
#[derive(Default)]
pub struct TokioSpawner {
tasks: parking_lot::Mutex<tokio::task::JoinSet<()>>,
}

impl Spawn for TokioSpawner {
fn spawn(&self, task: SpawnedTask) {
let mut tasks = self.tasks.lock();

// A completed task's cell lingers until reaped; drain here so the set tracks live
// tasks instead of growing by one dead entry per spawn.
while let Some(result) = tasks.try_join_next() {
if let Err(e) = result
&& e.is_panic()
{
tracing::error!("Background task panicked: {e}");
}
}

tasks.spawn(task);
}

async fn shutdown(&self) {
let mut tasks = core::mem::take(&mut *self.tasks.lock());
tasks.abort_all();
while tasks.join_next().await.is_some() {}
}
}

/// The embassy runtime adapter. Host-runnable through `arch-std` so it can stand in for
/// tokio, and the same impl drives the MCU once an esp PHY backs it.
#[cfg(feature = "embassy")]
pub use embassy_impl::{EmbassyRuntime, EmbassySpawner};

#[cfg(feature = "embassy")]
mod embassy_impl {
use super::{RtInstant, Runtime, Spawn, SpawnedTask};
use core::future::Future;
use core::ops::Add;
use core::time::Duration;

const fn to_embassy(duration: Duration) -> embassy_time::Duration {
embassy_time::Duration::from_micros(duration.as_micros() as u64)
}

const fn from_embassy(duration: embassy_time::Duration) -> Duration {
Duration::from_micros(duration.as_micros())
}

/// Wraps `embassy_time::Instant` so the trait's `core::time::Duration` arithmetic
/// works against embassy's own `Duration` type.
#[derive(Copy, Clone)]
pub struct EmbassyInstant(embassy_time::Instant);

impl Add<Duration> for EmbassyInstant {
type Output = Self;

fn add(self, rhs: Duration) -> Self {
Self(self.0 + to_embassy(rhs))
}
}

impl RtInstant for EmbassyInstant {
fn saturating_duration_since(self, earlier: Self) -> Duration {
from_embassy(self.0.saturating_duration_since(earlier.0))
}
}

pub struct EmbassyRuntime;

impl Runtime for EmbassyRuntime {
type Instant = EmbassyInstant;
type Spawner = EmbassySpawner;

fn now() -> Self::Instant {
EmbassyInstant(embassy_time::Instant::now())
}

fn sleep(duration: Duration) -> impl Future<Output = ()> + Send {
embassy_time::Timer::after(to_embassy(duration))
}

fn sleep_until(deadline: Self::Instant) -> impl Future<Output = ()> + Send {
embassy_time::Timer::at(deadline.0)
}
}

/// Each detached task runs in one slot of this fixed pool — embassy has no dynamic
/// spawn, so the size bounds the stack's concurrent background tasks (long-lived
/// reactors plus the transient ZDP/indirect/route-request ones).
#[embassy_executor::task(pool_size = 24)]
async fn task_runner(task: SpawnedTask) {
task.await;
}

/// Spawns into the embassy executor. Holds a [`SendSpawner`](embassy_executor::SendSpawner)
/// so it is `Send + Sync`; obtained from the executor at startup.
pub struct EmbassySpawner(embassy_executor::SendSpawner);

impl EmbassySpawner {
pub const fn new(spawner: embassy_executor::SendSpawner) -> Self {
Self(spawner)
}
}

impl Spawn for EmbassySpawner {
fn spawn(&self, task: SpawnedTask) {
if self.0.spawn(task_runner(task)).is_err() {
tracing::error!("embassy task pool exhausted; background task dropped");
}
}

// Embassy cannot cancel spawned tasks; the MCU stack is never replaced, so there
// is nothing to stop.
async fn shutdown(&self) {}
}
}
114 changes: 114 additions & 0 deletions crates/ziggurat-driver/src/signal.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
//! `Signal` primitive: effectively a `Mutex` plus a `Notify`.

use crate::sync::{Mutex, Notify};
use core::fmt;
use std::sync::Arc;

/// The producer was dropped without ever signalling a value.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct Closed;

enum State<T> {
/// No value yet, producer still alive.
Pending,
/// A value was signalled and not yet taken.
Ready(T),
/// The producer was dropped without signalling.
Closed,
}

struct Inner<T> {
slot: Mutex<State<T>>,
ready: Notify,
}

/// The producer half. Signalling (or dropping) it wakes the [`SignalWaiter`].
pub struct Signal<T> {
inner: Arc<Inner<T>>,
}

/// The consumer half. [`wait`](SignalWaiter::wait) resolves once the producer signals a
/// value or is dropped.
pub struct SignalWaiter<T> {
inner: Arc<Inner<T>>,
}

/// Create a producer/waiter pair sharing a single-value slot.
pub fn channel<T>() -> (Signal<T>, SignalWaiter<T>) {
let inner = Arc::new(Inner {
slot: Mutex::new(State::Pending),
ready: Notify::new(),
});
(
Signal {
inner: inner.clone(),
},
SignalWaiter { inner },
)
}

impl<T> Signal<T> {
/// Hand `value` to the waiter. A dropped waiter just discards it.
pub fn signal(self, value: T) {
*self.inner.slot.lock() = State::Ready(value);
self.inner.ready.notify_one();
// `self` drops here; `Drop` sees `Ready` and leaves the value in place.
}
}

impl<T> Drop for Signal<T> {
fn drop(&mut self) {
let closed = {
let mut state = self.inner.slot.lock();
if matches!(*state, State::Pending) {
*state = State::Closed;
true
} else {
false
}
};
if closed {
self.inner.ready.notify_one();
}
}
}

impl<T> SignalWaiter<T> {
/// Wait for the producer to signal a value, or `Err(Closed)` if it was dropped first.
pub async fn wait(&self) -> Result<T, Closed> {
loop {
// `notify_one` stores a permit when no waiter is registered, so a signal that
// lands between the check and the await is not lost.
if let Some(result) = self.take() {
return result;
}
self.inner.ready.notified().await;
}
}

fn take(&self) -> Option<Result<T, Closed>> {
let mut state = self.inner.slot.lock();
let result = match core::mem::replace(&mut *state, State::Pending) {
State::Pending => None,
State::Ready(value) => Some(Ok(value)),
State::Closed => {
*state = State::Closed;
Some(Err(Closed))
}
};
drop(state);
result
}
}

impl<T> fmt::Debug for Signal<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("Signal")
}
}

impl<T> fmt::Debug for SignalWaiter<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("SignalWaiter")
}
}
Loading
Loading