From 5f3c90bf1c31277448d9468311ed70554e096e02 Mon Sep 17 00:00:00 2001
From: rUv
Date: Mon, 23 Mar 2026 21:37:52 -0400
Subject: [PATCH 01/12] fix(sensing-server): add real hysteresis to person
count estimation (#295)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
The person-count heuristic was causing widespread flickering (#237, #249,
#280, #292) because:
1. Threshold 0.50 for 2-persons was too low — multipath reflections in
small rooms easily exceeded it
2. No actual hysteresis despite the comment claiming asymmetric thresholds
3. EMA smoothing (α=0.15) was too responsive to transient spikes
Changes:
- Raise up-thresholds: 1→2 persons at 0.65 (was 0.50), 2→3 at 0.85 (was 0.80)
- Add true hysteresis with asymmetric down-thresholds: 2→1 at 0.45, 3→2 at 0.70
- Track prev_person_count in SensingState for state-aware transitions
- Increase EMA smoothing to α=0.10 (~2s time constant at 20 Hz)
- Update all 4 call sites (ESP32, Windows WiFi, multi-BSSID, simulated)
Fixes #292, #280, #237
Co-authored-by: Reuven
---
.../wifi-densepose-sensing-server/src/main.rs | 90 ++++++++++++++-----
1 file changed, 66 insertions(+), 24 deletions(-)
diff --git a/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/main.rs b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/main.rs
index 7497c95a0..7c074bf84 100644
--- a/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/main.rs
+++ b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/main.rs
@@ -304,6 +304,8 @@ struct AppStateInner {
model_loaded: bool,
/// Smoothed person count (EMA) for hysteresis — prevents frame-to-frame jumping.
smoothed_person_score: f64,
+ /// Previous person count for hysteresis (asymmetric up/down thresholds).
+ prev_person_count: usize,
// ── Motion smoothing & adaptive baseline (ADR-047 tuning) ────────────
/// EMA-smoothed motion score (alpha ~0.15 for ~10 FPS → ~1s time constant).
smoothed_motion: f64,
@@ -1247,12 +1249,15 @@ async fn windows_wifi_task(state: SharedState, tick_ms: u64) {
let feat_variance = features.variance;
- // Multi-person estimation with temporal smoothing (EMA α=0.15).
+ // Multi-person estimation with temporal smoothing (EMA α=0.10).
let raw_score = compute_person_score(&features);
- s.smoothed_person_score = s.smoothed_person_score * 0.85 + raw_score * 0.15;
+ s.smoothed_person_score = s.smoothed_person_score * 0.90 + raw_score * 0.10;
let est_persons = if classification.presence {
- score_to_person_count(s.smoothed_person_score)
+ let count = score_to_person_count(s.smoothed_person_score, s.prev_person_count);
+ s.prev_person_count = count;
+ count
} else {
+ s.prev_person_count = 0;
0
};
@@ -1377,12 +1382,15 @@ async fn windows_wifi_fallback_tick(state: &SharedState, seq: u32) {
let feat_variance = features.variance;
- // Multi-person estimation with temporal smoothing.
+ // Multi-person estimation with temporal smoothing (EMA α=0.10).
let raw_score = compute_person_score(&features);
- s.smoothed_person_score = s.smoothed_person_score * 0.85 + raw_score * 0.15;
+ s.smoothed_person_score = s.smoothed_person_score * 0.90 + raw_score * 0.10;
let est_persons = if classification.presence {
- score_to_person_count(s.smoothed_person_score)
+ let count = score_to_person_count(s.smoothed_person_score, s.prev_person_count);
+ s.prev_person_count = count;
+ count
} else {
+ s.prev_person_count = 0;
0
};
@@ -1724,18 +1732,45 @@ fn compute_person_score(feat: &FeatureInfo) -> f64 {
/// Convert smoothed person score to discrete count with hysteresis.
///
-/// Uses asymmetric thresholds: higher threshold to add a person, lower to remove.
-/// This prevents flickering at the boundary.
-fn score_to_person_count(smoothed_score: f64) -> usize {
- // Thresholds chosen conservatively for single-ESP32 link:
- // score > 0.50 → 2 persons (needs sustained high variance + change points)
- // score > 0.80 → 3 persons (very high activity, rare with single link)
- if smoothed_score > 0.80 {
- 3
- } else if smoothed_score > 0.50 {
- 2
- } else {
- 1
+/// Uses asymmetric thresholds: higher threshold to *add* a person, lower to
+/// *drop* one. This prevents flickering when the score hovers near a boundary
+/// (the #1 user-reported issue — see #237, #249, #280, #292).
+fn score_to_person_count(smoothed_score: f64, prev_count: usize) -> usize {
+ // Up-thresholds (must exceed to increase count):
+ // 1→2: 0.65 (raised from 0.50 — multipath in small rooms hit 0.50 easily)
+ // 2→3: 0.85 (raised from 0.80 — 3 persons needs strong sustained signal)
+ // Down-thresholds (must drop below to decrease count):
+ // 2→1: 0.45 (hysteresis gap of 0.20)
+ // 3→2: 0.70 (hysteresis gap of 0.15)
+ match prev_count {
+ 0 | 1 => {
+ if smoothed_score > 0.85 {
+ 3
+ } else if smoothed_score > 0.65 {
+ 2
+ } else {
+ 1
+ }
+ }
+ 2 => {
+ if smoothed_score > 0.85 {
+ 3
+ } else if smoothed_score < 0.45 {
+ 1
+ } else {
+ 2 // hold — within hysteresis band
+ }
+ }
+ _ => {
+ // prev_count >= 3
+ if smoothed_score < 0.45 {
+ 1
+ } else if smoothed_score < 0.70 {
+ 2
+ } else {
+ 3 // hold
+ }
+ }
}
}
@@ -2824,12 +2859,15 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
let vitals = smooth_vitals(&mut s, &raw_vitals);
s.latest_vitals = vitals.clone();
- // Multi-person estimation with temporal smoothing.
+ // Multi-person estimation with temporal smoothing (EMA α=0.10).
let raw_score = compute_person_score(&features);
- s.smoothed_person_score = s.smoothed_person_score * 0.85 + raw_score * 0.15;
+ s.smoothed_person_score = s.smoothed_person_score * 0.90 + raw_score * 0.10;
let est_persons = if classification.presence {
- score_to_person_count(s.smoothed_person_score)
+ let count = score_to_person_count(s.smoothed_person_score, s.prev_person_count);
+ s.prev_person_count = count;
+ count
} else {
+ s.prev_person_count = 0;
0
};
@@ -2929,12 +2967,15 @@ async fn simulated_data_task(state: SharedState, tick_ms: u64) {
let frame_amplitudes = frame.amplitudes.clone();
let frame_n_sub = frame.n_subcarriers;
- // Multi-person estimation with temporal smoothing.
+ // Multi-person estimation with temporal smoothing (EMA α=0.10).
let raw_score = compute_person_score(&features);
- s.smoothed_person_score = s.smoothed_person_score * 0.85 + raw_score * 0.15;
+ s.smoothed_person_score = s.smoothed_person_score * 0.90 + raw_score * 0.10;
let est_persons = if classification.presence {
- score_to_person_count(s.smoothed_person_score)
+ let count = score_to_person_count(s.smoothed_person_score, s.prev_person_count);
+ s.prev_person_count = count;
+ count
} else {
+ s.prev_person_count = 0;
0
};
@@ -3577,6 +3618,7 @@ async fn main() {
active_sona_profile: None,
model_loaded,
smoothed_person_score: 0.0,
+ prev_person_count: 0,
smoothed_motion: 0.0,
current_motion_level: "absent".to_string(),
debounce_counter: 0,
From 6c98c989208e80d9d3e2d53ebe7897a30a7ab2ad Mon Sep 17 00:00:00 2001
From: Reuven
Date: Mon, 23 Mar 2026 21:51:43 -0400
Subject: [PATCH 02/12] docs(adr): ADR-067 RuVector v2.0.5 upgrade + new crate
adoption plan
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
4-phase plan to upgrade core ruvector dependencies and adopt new crates:
- Phase 1: Bump 5 core crates 2.0.4→2.0.5 (10-30% mincut perf, security fixes)
- Phase 2: Add ruvector-coherence for spectral multi-node CSI coherence
- Phase 3: Add SONA adaptive learning to replace manual logistic regression
- Phase 4: Evaluate ruvector-core ONNX embeddings for CSI pattern matching
Co-Authored-By: claude-flow
---
docs/adr/ADR-067-ruvector-v2.0.5-upgrade.md | 151 ++++++++++++++++++++
1 file changed, 151 insertions(+)
create mode 100644 docs/adr/ADR-067-ruvector-v2.0.5-upgrade.md
diff --git a/docs/adr/ADR-067-ruvector-v2.0.5-upgrade.md b/docs/adr/ADR-067-ruvector-v2.0.5-upgrade.md
new file mode 100644
index 000000000..a01a5f817
--- /dev/null
+++ b/docs/adr/ADR-067-ruvector-v2.0.5-upgrade.md
@@ -0,0 +1,151 @@
+# ADR-067: RuVector v2.0.4 to v2.0.5 Upgrade + New Crate Adoption
+
+**Status:** Proposed
+**Date:** 2026-03-23
+**Deciders:** @ruvnet
+**Related:** ADR-016 (RuVector training pipeline integration), ADR-017 (RuVector signal + MAT integration), ADR-029 (RuvSense multistatic sensing)
+
+## Context
+
+RuView currently pins all five core RuVector crates at **v2.0.4** (from crates.io) plus a vendored `ruvector-crv` v0.1.1 and optional `ruvector-gnn` v2.0.5. The upstream RuVector workspace has moved to **v2.0.5** with meaningful improvements to the crates we depend on, and has introduced new crates that could benefit RuView's detection pipeline.
+
+### Current Integration Map
+
+| RuView Module | RuVector Crate | Current Version | Purpose |
+|---------------|----------------|-----------------|---------|
+| `signal/subcarrier.rs` | ruvector-mincut | 2.0.4 | Graph min-cut subcarrier partitioning |
+| `signal/spectrogram.rs` | ruvector-attn-mincut | 2.0.4 | Attention-gated spectrogram denoising |
+| `signal/bvp.rs` | ruvector-attention | 2.0.4 | Attention-weighted BVP aggregation |
+| `signal/fresnel.rs` | ruvector-solver | 2.0.4 | Fresnel geometry estimation |
+| `mat/triangulation.rs` | ruvector-solver | 2.0.4 | TDoA survivor localization |
+| `mat/breathing.rs` | ruvector-temporal-tensor | 2.0.4 | Tiered compressed breathing buffer |
+| `mat/heartbeat.rs` | ruvector-temporal-tensor | 2.0.4 | Tiered compressed heartbeat spectrogram |
+| `viewpoint/*` (4 files) | ruvector-attention | 2.0.4 | Cross-viewpoint fusion with geometric bias |
+| `crv/` (optional) | ruvector-crv | 0.1.1 (vendored) | CRV protocol integration |
+| `crv/` (optional) | ruvector-gnn | 2.0.5 | GNN graph topology |
+
+### What Changed Upstream (v2.0.4 → v2.0.5 → HEAD)
+
+**ruvector-mincut:**
+- Flat capacity matrix + allocation reuse — **10-30% faster** for all min-cut operations
+- Tier 2-3 Dynamic MinCut (ADR-124): Gomory-Hu tree construction for fast global min-cut, incremental edge insert/delete without full recomputation
+- Source-anchored canonical min-cut with SHA-256 witness hashing
+- Fixed: unsafe indexing removed, WASM Node.js panic from `std::time`
+
+**ruvector-attention / ruvector-attn-mincut:**
+- Migrated to workspace versioning (no API changes)
+- Documentation improvements
+
+**ruvector-temporal-tensor:**
+- Formatting fixes only (no API changes)
+
+**ruvector-gnn:**
+- Panic replaced with `Result` in `MultiHeadAttention` and `RuvectorLayer` constructors (breaking improvement — safer)
+- Bumped to v2.0.5
+
+**sona (new — Self-Optimizing Neural Architecture):**
+- v0.1.6 → v0.1.8: state persistence (`loadState`/`saveState`), trajectory counter fix
+- Micro-LoRA and Base-LoRA for instant and background learning
+- EWC++ (Elastic Weight Consolidation) to prevent catastrophic forgetting
+- ReasoningBank pattern extraction and similarity search
+- WASM support for edge devices
+
+**ruvector-coherence (new):**
+- Spectral coherence scoring for graph index health
+- Fiedler eigenvalue estimation, effective resistance sampling
+- HNSW health monitoring with alerts
+- Batch evaluation of attention mechanism quality
+
+**ruvector-core (new):**
+- ONNX embedding support for real semantic embeddings
+- HNSW index with SIMD-accelerated distance metrics
+- Quantization (4-32x memory reduction)
+- Arena allocator for cache-optimized operations
+
+## Decision
+
+### Phase 1: Version Bump (Low Risk)
+
+Bump the 5 core crates from v2.0.4 to v2.0.5 in the workspace `Cargo.toml`:
+
+```toml
+ruvector-mincut = "2.0.5" # was 2.0.4 — 10-30% faster, safer
+ruvector-attn-mincut = "2.0.5" # was 2.0.4 — workspace versioning
+ruvector-temporal-tensor = "2.0.5" # was 2.0.4 — fmt only
+ruvector-solver = "2.0.5" # was 2.0.4 — workspace versioning
+ruvector-attention = "2.0.5" # was 2.0.4 — workspace versioning
+```
+
+**Expected impact:** The mincut performance improvement directly benefits `signal/subcarrier.rs` which runs subcarrier graph partitioning every tick. 10-30% faster partitioning reduces per-frame CPU cost.
+
+### Phase 2: Add ruvector-coherence (Medium Value)
+
+Add `ruvector-coherence` with `spectral` feature to `wifi-densepose-ruvector`:
+
+**Use case:** Replace or augment the custom phase coherence logic in `viewpoint/coherence.rs` with spectral graph coherence scoring. The current implementation uses phasor magnitude for phase coherence — spectral Fiedler estimation would provide a more robust measure of multi-node CSI consistency, especially for detecting when a node's signal quality degrades.
+
+**Integration point:** `viewpoint/coherence.rs` — add `SpectralCoherenceScore` as a secondary coherence metric alongside existing phase phasor coherence. Use spectral gap estimation to detect structural changes in the multi-node CSI graph (e.g., a node dropping out or a new reflector appearing).
+
+### Phase 3: Add SONA for Adaptive Learning (High Value)
+
+Replace the logistic regression adaptive classifier in the sensing server with a SONA-backed learning engine:
+
+**Current state:** The sensing server's adaptive training (`POST /api/v1/adaptive/train`) uses a hand-rolled logistic regression on 15 CSI features. It requires explicit labeled recordings and provides no cross-session persistence.
+
+**Proposed improvement:** Use `sona::SonaEngine` to:
+1. **Learn from implicit feedback** — trajectory tracking on person-count decisions (was the count stable? did the user correct it?)
+2. **Persist across sessions** — `saveState()`/`loadState()` replaces the current `adaptive_model.json`
+3. **Pattern matching** — `find_patterns()` enables "this CSI signature looks like room X where we learned Y"
+4. **Prevent forgetting** — EWC++ ensures learning in a new room doesn't overwrite patterns from previous rooms
+
+**Integration point:** New `adaptive_sona.rs` module in `wifi-densepose-sensing-server`, behind a `sona` feature flag. The existing logistic regression remains the default.
+
+### Phase 4: Evaluate ruvector-core for CSI Embeddings (Exploratory)
+
+**Current state:** The person detection pipeline uses hand-crafted features (variance, change_points, motion_band_power, spectral_power) with fixed normalization ranges.
+
+**Potential:** Use `ruvector-core`'s ONNX embedding support to generate learned CSI embeddings that capture room geometry, person count, and activity patterns in a single vector. This would enable:
+- Similarity search: "is this CSI frame similar to known 2-person patterns?"
+- Transfer learning: embeddings learned in one room partially transfer to similar rooms
+- Quantized storage: 4-32x memory reduction for pattern databases
+
+**Status:** Exploratory — requires training data collection and embedding model design. Not a near-term target.
+
+## Consequences
+
+### Positive
+- **Phase 1:** Free 10-30% performance gain in subcarrier partitioning. Security fixes (unsafe indexing, WASM panic). Zero API changes required.
+- **Phase 2:** More robust multi-node coherence detection. Helps with the "flickering persons" issue (#292) by providing a second opinion on signal quality.
+- **Phase 3:** Fundamentally improves the adaptive learning pipeline. Users no longer need to manually record labeled data — the system learns from ongoing use.
+- **Phase 4:** Path toward real ML-based detection instead of heuristic thresholds.
+
+### Negative
+- **Phase 1:** Minimal risk — semver minor bump, no API breaks.
+- **Phase 2:** Adds a dependency. Spectral computation has O(n) cost per tick for Fiedler estimation (n = number of subcarriers, typically 56-128). Acceptable.
+- **Phase 3:** SONA adds ~200KB to the binary. The learning loop needs careful tuning to avoid adapting to noise.
+- **Phase 4:** Requires significant research and training data. Not guaranteed to outperform tuned heuristics for WiFi CSI.
+
+### Risks
+- `ruvector-gnn` v2.0.5 changed constructors from panic to `Result` — any existing `crv` feature users need to handle the `Result`. Our vendored `ruvector-crv` may need updates.
+- SONA's WASM support is experimental — keep it behind a feature flag until validated.
+
+## Implementation Plan
+
+| Phase | Scope | Effort | Priority |
+|-------|-------|--------|----------|
+| 1 | Bump 5 crates to v2.0.5 | 1 hour | High — free perf + security |
+| 2 | Add ruvector-coherence | 1 day | Medium — improves multi-node stability |
+| 3 | SONA adaptive learning | 3 days | Medium — replaces manual training workflow |
+| 4 | CSI embeddings via ruvector-core | 1-2 weeks | Low — exploratory research |
+
+## Vendor Submodule
+
+The `vendor/ruvector` git submodule has been updated from commit `f8f2c60` (v2.0.4 era) to `51a3557` (latest `origin/main`). This provides local reference for the full upstream source when developing Phases 2-4.
+
+## References
+
+- Upstream repo: https://github.com/ruvnet/ruvector
+- ADR-124 (Dynamic MinCut): `vendor/ruvector/docs/adr/ADR-124*.md`
+- SONA docs: `vendor/ruvector/crates/sona/src/lib.rs`
+- ruvector-coherence spectral: `vendor/ruvector/crates/ruvector-coherence/src/spectral.rs`
+- ruvector-core embeddings: `vendor/ruvector/crates/ruvector-core/src/embeddings.rs`
From 7a13877fa3bd7f2766666f4674ccf9bfe93d5997 Mon Sep 17 00:00:00 2001
From: rUv
Date: Tue, 24 Mar 2026 08:00:18 -0400
Subject: [PATCH 03/12] fix(sensing-server): detect ESP32 offline after 5s
frame timeout (#300)
The source field was set to "esp32" on the first UDP frame but never
reverted when frames stopped arriving. This caused the UI to show
"Real hardware connected" indefinitely after powering off all nodes.
Changes:
- Add last_esp32_frame timestamp to AppStateInner
- Add effective_source() method with 5-second timeout
- Source becomes "esp32:offline" when no frames received within 5s
- Health endpoint shows "degraded" instead of "healthy" when offline
- All 6 status/health/info API endpoints use effective_source()
Fixes #297
Co-authored-by: Reuven
---
.../wifi-densepose-sensing-server/src/main.rs | 44 +++++++++++++++----
1 file changed, 35 insertions(+), 9 deletions(-)
diff --git a/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/main.rs b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/main.rs
index 7c074bf84..4bd84c066 100644
--- a/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/main.rs
+++ b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/main.rs
@@ -285,6 +285,8 @@ struct AppStateInner {
frame_history: VecDeque>,
tick: u64,
source: String,
+ /// Instant of the last ESP32 UDP frame received (for offline detection).
+ last_esp32_frame: Option,
tx: broadcast::Sender,
total_detections: u64,
start_time: std::time::Instant,
@@ -364,6 +366,25 @@ struct AppStateInner {
adaptive_model: Option,
}
+/// If no ESP32 frame arrives within this duration, source reverts to offline.
+const ESP32_OFFLINE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
+
+impl AppStateInner {
+ /// Return the effective data source, accounting for ESP32 frame timeout.
+ /// If the source is "esp32" but no frame has arrived in 5 seconds, returns
+ /// "esp32:offline" so the UI can distinguish active vs stale connections.
+ fn effective_source(&self) -> String {
+ if self.source == "esp32" {
+ if let Some(last) = self.last_esp32_frame {
+ if last.elapsed() > ESP32_OFFLINE_TIMEOUT {
+ return "esp32:offline".to_string();
+ }
+ }
+ }
+ self.source.clone()
+ }
+}
+
/// Number of frames retained in `frame_history` for temporal analysis.
/// At 500 ms ticks this covers ~50 seconds; at 100 ms ticks ~10 seconds.
const FRAME_HISTORY_CAPACITY: usize = 100;
@@ -1669,7 +1690,7 @@ async fn health(State(state): State) -> Json {
let s = state.read().await;
Json(serde_json::json!({
"status": "ok",
- "source": s.source,
+ "source": s.effective_source(),
"tick": s.tick,
"clients": s.tx.receiver_count(),
}))
@@ -1977,7 +1998,7 @@ async fn health_ready(State(state): State) -> Json) -> Json 0 { "healthy" } else { "idle" },
"message": format!("{} client(s)", s.tx.receiver_count()) },
@@ -2028,7 +2052,7 @@ async fn api_info(State(state): State) -> Json {
"version": env!("CARGO_PKG_VERSION"),
"environment": "production",
"backend": "rust",
- "source": s.source,
+ "source": s.effective_source(),
"features": {
"wifi_sensing": true,
"pose_estimation": true,
@@ -2049,7 +2073,7 @@ async fn pose_current(State(state): State) -> Json) -> Json
"total_detections": s.total_detections,
"average_confidence": 0.87,
"frames_processed": s.tick,
- "source": s.source,
+ "source": s.effective_source(),
}))
}
@@ -2083,7 +2107,7 @@ async fn stream_status(State(state): State) -> Json 1 { 10u64 } else { 0u64 },
- "source": s.source,
+ "source": s.effective_source(),
}))
}
@@ -2619,7 +2643,7 @@ async fn vital_signs_endpoint(State(state): State) -> Json
Date: Wed, 25 Mar 2026 21:21:58 -0400
Subject: [PATCH 04/12] Enhance README with Cognitum.One reference
Updated project description to include Cognitum.One.
---
README.md | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/README.md b/README.md
index 10860dd0f..b17ca3d12 100644
--- a/README.md
+++ b/README.md
@@ -14,7 +14,7 @@
Instead of relying on cameras or cloud models, it observes whatever signals exist in a space such as WiFi, radio waves across the spectrum, motion patterns, vibration, sound, or other sensory inputs and builds an understanding of what is happening locally.
-Built on top of [RuVector](https://github.com/ruvnet/ruvector/), the project became widely known for its implementation of WiFi DensePose — a sensing technique first explored in academic research such as Carnegie Mellon University's *DensePose From WiFi* work. That research demonstrated that WiFi signals can be used to reconstruct human pose.
+Built on top of [RuVector](https://github.com/ruvnet/ruvector/) Self Learning Vector Memory system and [Cognitum.One](https://Cognitum.One) , the project became widely known for its implementation of WiFi DensePose — a sensing technique first explored in academic research such as Carnegie Mellon University's *DensePose From WiFi* work. That research demonstrated that WiFi signals can be used to reconstruct human pose.
RuView extends that concept into a practical edge system. By analyzing Channel State Information (CSI) disturbances caused by human movement, RuView reconstructs body position, breathing rate, heart rate, and presence in real time using physics-based signal processing and machine learning.
From 022499b2f56d6d780cb04d8e4748ca7d1151c991 Mon Sep 17 00:00:00 2001
From: rUv
Date: Fri, 27 Mar 2026 17:31:03 -0400
Subject: [PATCH 05/12] fix: add wifi_densepose package for correct module
import (#314)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
The README Quick Start tells users to `pip install wifi-densepose` and then
`from wifi_densepose import WiFiDensePose`, but no `wifi_densepose` Python
package existed — only `v1/src`. This adds a top-level `wifi_densepose/`
package with a WiFiDensePose facade class matching the documented API, and
updates pyproject.toml to include it in the distribution.
Closes #314
---
pyproject.toml | 2 +-
wifi_densepose/__init__.py | 137 +++++++++++++++++++++++++++++++++++++
2 files changed, 138 insertions(+), 1 deletion(-)
create mode 100644 wifi_densepose/__init__.py
diff --git a/pyproject.toml b/pyproject.toml
index bb44b2a67..aa03506b8 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -185,7 +185,7 @@ package-dir = {"" = "."}
[tool.setuptools.packages.find]
where = ["."]
-include = ["src*"]
+include = ["wifi_densepose*", "src*"]
exclude = ["tests*", "docs*", "scripts*"]
[tool.setuptools.package-data]
diff --git a/wifi_densepose/__init__.py b/wifi_densepose/__init__.py
new file mode 100644
index 000000000..83d6e204f
--- /dev/null
+++ b/wifi_densepose/__init__.py
@@ -0,0 +1,137 @@
+"""
+WiFi-DensePose — WiFi-based human pose estimation using CSI data.
+
+Usage:
+ from wifi_densepose import WiFiDensePose
+
+ system = WiFiDensePose()
+ system.start()
+ poses = system.get_latest_poses()
+ system.stop()
+"""
+
+__version__ = "1.2.0"
+
+import sys
+import os
+import logging
+
+logger = logging.getLogger(__name__)
+
+# Allow importing the v1 src package when installed from the repo
+_v1_src = os.path.join(os.path.dirname(os.path.dirname(__file__)), "v1")
+if os.path.isdir(_v1_src) and _v1_src not in sys.path:
+ sys.path.insert(0, _v1_src)
+
+
+class WiFiDensePose:
+ """High-level facade for the WiFi-DensePose sensing system.
+
+ This is the primary entry point documented in the README Quick Start.
+ It wraps the underlying ServiceOrchestrator and exposes a simple
+ start / get_latest_poses / stop interface.
+ """
+
+ def __init__(self, host: str = "0.0.0.0", port: int = 3000, **kwargs):
+ self.host = host
+ self.port = port
+ self._config = kwargs
+ self._orchestrator = None
+ self._server_task = None
+ self._poses = []
+ self._running = False
+
+ # ------------------------------------------------------------------
+ # Public API (matches README Quick Start)
+ # ------------------------------------------------------------------
+
+ def start(self):
+ """Start the sensing system (blocking until ready)."""
+ import asyncio
+
+ loop = _get_or_create_event_loop()
+ loop.run_until_complete(self._async_start())
+
+ async def _async_start(self):
+ try:
+ from src.config.settings import get_settings
+ from src.services.orchestrator import ServiceOrchestrator
+
+ settings = get_settings()
+ self._orchestrator = ServiceOrchestrator(settings)
+ await self._orchestrator.initialize()
+ await self._orchestrator.start()
+ self._running = True
+ logger.info("WiFiDensePose system started on %s:%s", self.host, self.port)
+ except ImportError:
+ raise ImportError(
+ "Core dependencies not found. Make sure you installed "
+ "from the repository root:\n"
+ " cd wifi-densepose && pip install -e .\n"
+ "Or install the v1 package:\n"
+ " cd wifi-densepose/v1 && pip install -e ."
+ )
+
+ def stop(self):
+ """Stop the sensing system."""
+ import asyncio
+
+ if self._orchestrator is not None:
+ loop = _get_or_create_event_loop()
+ loop.run_until_complete(self._orchestrator.shutdown())
+ self._running = False
+ logger.info("WiFiDensePose system stopped")
+
+ def get_latest_poses(self):
+ """Return the most recent list of detected pose dicts."""
+ if self._orchestrator is None:
+ return []
+ try:
+ import asyncio
+
+ loop = _get_or_create_event_loop()
+ return loop.run_until_complete(self._fetch_poses())
+ except Exception:
+ return []
+
+ async def _fetch_poses(self):
+ try:
+ pose_svc = self._orchestrator.pose_service
+ if pose_svc and hasattr(pose_svc, "get_latest"):
+ return await pose_svc.get_latest()
+ except Exception:
+ pass
+ return []
+
+ # ------------------------------------------------------------------
+ # Context-manager support
+ # ------------------------------------------------------------------
+
+ def __enter__(self):
+ self.start()
+ return self
+
+ def __exit__(self, *exc):
+ self.stop()
+
+ # ------------------------------------------------------------------
+ # Convenience re-exports
+ # ------------------------------------------------------------------
+
+ @staticmethod
+ def version():
+ return __version__
+
+
+def _get_or_create_event_loop():
+ import asyncio
+
+ try:
+ return asyncio.get_event_loop()
+ except RuntimeError:
+ loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(loop)
+ return loop
+
+
+__all__ = ["WiFiDensePose", "__version__"]
From 40f19622af9109127e3ca894f2141ee8613e0bef Mon Sep 17 00:00:00 2001
From: rUv
Date: Fri, 27 Mar 2026 17:31:06 -0400
Subject: [PATCH 06/12] fix(firmware,server): watchdog crash + no detection
from edge vitals (#321, #323)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
* fix(firmware,server): watchdog crash on busy LANs + no detection from edge vitals (#321, #323)
**Firmware (#321):** edge_dsp task now batch-limits frame processing to 4
frames before a 10ms yield. On corporate LANs with high CSI frame rates,
the previous 1-tick-per-frame yield wasn't enough to prevent IDLE1
starvation and task watchdog triggers.
**Sensing server (#323):** When ESP32 runs the edge DSP pipeline (Tier 2+),
it sends vitals packets (magic 0xC5110002) instead of raw CSI frames.
Previously, the server broadcast these as raw edge_vitals but never
generated a sensing_update, so the UI showed "connected" but "0 persons".
Now synthesizes a full sensing_update from vitals data including
classification, person count, and pose generation.
Closes #321
Closes #323
Co-Authored-By: claude-flow
* fix(firmware): address review findings — idle busy-spin and observability
- Fix pdMS_TO_TICKS(5)==0 at 100Hz causing busy-spin in idle path (use
vTaskDelay(1) instead)
- Post-batch yield now 2 ticks (20ms) for genuinely longer pause
- Add s_ring_drops counter to ring_push for diagnosing frame drops
- Expose drop count in periodic vitals log line
Co-Authored-By: claude-flow
* fix(server): set breathing_band_power for skeleton animation from vitals
When presence is detected via edge vitals, set breathing_band_power to
0.5 so the UI's torso breathing animation works. Previously hardcoded
to 0.0 which made the skeleton appear static even when breathing rate
was being reported.
Co-Authored-By: claude-flow
---
.../esp32-csi-node/main/edge_processing.c | 37 +++++---
.../wifi-densepose-sensing-server/src/main.rs | 84 +++++++++++++++++++
2 files changed, 111 insertions(+), 10 deletions(-)
diff --git a/firmware/esp32-csi-node/main/edge_processing.c b/firmware/esp32-csi-node/main/edge_processing.c
index 0911f3831..1cd74a65d 100644
--- a/firmware/esp32-csi-node/main/edge_processing.c
+++ b/firmware/esp32-csi-node/main/edge_processing.c
@@ -41,12 +41,14 @@ static const char *TAG = "edge_proc";
* ====================================================================== */
static edge_ring_buf_t s_ring;
+static uint32_t s_ring_drops; /* Frames dropped due to full ring buffer. */
static inline bool ring_push(const uint8_t *iq, uint16_t len,
int8_t rssi, uint8_t channel)
{
uint32_t next = (s_ring.head + 1) % EDGE_RING_SLOTS;
if (next == s_ring.tail) {
+ s_ring_drops++;
return false; /* Full — drop frame. */
}
@@ -788,12 +790,13 @@ static void process_frame(const edge_ring_slot_t *slot)
if ((s_frame_count % 200) == 0) {
ESP_LOGI(TAG, "Vitals: br=%.1f hr=%.1f motion=%.4f pres=%s "
- "fall=%s persons=%u frames=%lu",
+ "fall=%s persons=%u frames=%lu drops=%lu",
s_breathing_bpm, s_heartrate_bpm, s_motion_energy,
s_presence_detected ? "YES" : "no",
s_fall_detected ? "YES" : "no",
(unsigned)s_latest_pkt.n_persons,
- (unsigned long)s_frame_count);
+ (unsigned long)s_frame_count,
+ (unsigned long)s_ring_drops);
}
}
@@ -831,18 +834,32 @@ static void edge_task(void *arg)
edge_ring_slot_t slot;
+ /* Maximum frames to process before a longer yield. On busy LANs
+ * (corporate networks, many APs), the ring buffer fills continuously.
+ * Without a batch limit the task processes frames back-to-back with
+ * only 1-tick yields, which on high frame rates can still starve
+ * IDLE1 enough to trip the 5-second task watchdog. See #266, #321. */
+ const uint8_t BATCH_LIMIT = 4;
+
while (1) {
- if (ring_pop(&slot)) {
+ uint8_t processed = 0;
+
+ while (processed < BATCH_LIMIT && ring_pop(&slot)) {
process_frame(&slot);
- /* Yield after every frame to feed the Core 1 watchdog.
- * process_frame() is CPU-intensive (biquad filters, Welford stats,
- * BPM estimation, multi-person vitals) and can take several ms.
- * Without this yield, edge_dsp at priority 5 starves IDLE1 at
- * priority 0, triggering the task watchdog. See issue #266. */
+ processed++;
+ /* 1-tick yield between frames within a batch. */
vTaskDelay(1);
+ }
+
+ if (processed > 0) {
+ /* Post-batch yield: 2 ticks (~20 ms at 100 Hz) so IDLE1 can
+ * run and feed the Core 1 watchdog even under sustained load.
+ * This is intentionally longer than the 1-tick inter-frame yield. */
+ vTaskDelay(2);
} else {
- /* No frames available — yield briefly. */
- vTaskDelay(pdMS_TO_TICKS(1));
+ /* No frames available — sleep one full tick.
+ * NOTE: pdMS_TO_TICKS(5) == 0 at 100 Hz, which would busy-spin. */
+ vTaskDelay(1);
}
}
}
diff --git a/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/main.rs b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/main.rs
index 4bd84c066..1ae12c87c 100644
--- a/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/main.rs
+++ b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/main.rs
@@ -2820,6 +2820,90 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
})) {
let _ = s.tx.send(json);
}
+
+ // Issue #323: Also emit a sensing_update so the UI renders
+ // detections for ESP32 nodes running the edge DSP pipeline
+ // (Tier 2+). Without this, vitals arrive but the UI shows
+ // "no detection" because it only renders sensing_update msgs.
+ s.source = "esp32".to_string();
+ s.last_esp32_frame = Some(std::time::Instant::now());
+ s.tick += 1;
+ let tick = s.tick;
+
+ let motion_level = if vitals.motion { "present_moving" }
+ else if vitals.presence { "present_still" }
+ else { "absent" };
+ let motion_score = if vitals.motion { 0.8 }
+ else if vitals.presence { 0.3 }
+ else { 0.05 };
+ let est_persons = if vitals.presence {
+ (vitals.n_persons as usize).max(1)
+ } else {
+ 0
+ };
+
+ let features = FeatureInfo {
+ mean_rssi: vitals.rssi as f64,
+ variance: vitals.motion_energy as f64,
+ motion_band_power: vitals.motion_energy as f64,
+ breathing_band_power: if vitals.presence { 0.5 } else { 0.0 },
+ dominant_freq_hz: vitals.breathing_rate_bpm / 60.0,
+ change_points: 0,
+ spectral_power: vitals.motion_energy as f64,
+ };
+ let classification = ClassificationInfo {
+ motion_level: motion_level.to_string(),
+ presence: vitals.presence,
+ confidence: vitals.presence_score as f64,
+ };
+ let signal_field = generate_signal_field(
+ vitals.rssi as f64, motion_score, vitals.breathing_rate_bpm / 60.0,
+ (vitals.presence_score as f64).min(1.0), &[],
+ );
+
+ let mut update = SensingUpdate {
+ msg_type: "sensing_update".to_string(),
+ timestamp: chrono::Utc::now().timestamp_millis() as f64 / 1000.0,
+ source: "esp32".to_string(),
+ tick,
+ nodes: vec![NodeInfo {
+ node_id: vitals.node_id,
+ rssi_dbm: vitals.rssi as f64,
+ position: [2.0, 0.0, 1.5],
+ amplitude: vec![],
+ subcarrier_count: 0,
+ }],
+ features: features.clone(),
+ classification,
+ signal_field,
+ vital_signs: Some(VitalSigns {
+ breathing_rate_bpm: if vitals.breathing_rate_bpm > 0.0 { Some(vitals.breathing_rate_bpm) } else { None },
+ heart_rate_bpm: if vitals.heartrate_bpm > 0.0 { Some(vitals.heartrate_bpm) } else { None },
+ breathing_confidence: if vitals.presence { 0.7 } else { 0.0 },
+ heartbeat_confidence: if vitals.presence { 0.7 } else { 0.0 },
+ signal_quality: vitals.presence_score as f64,
+ }),
+ enhanced_motion: None,
+ enhanced_breathing: None,
+ posture: None,
+ signal_quality_score: None,
+ quality_verdict: None,
+ bssid_count: None,
+ pose_keypoints: None,
+ model_status: None,
+ persons: None,
+ estimated_persons: if est_persons > 0 { Some(est_persons) } else { None },
+ };
+
+ let persons = derive_pose_from_sensing(&update);
+ if !persons.is_empty() {
+ update.persons = Some(persons);
+ }
+
+ if let Ok(json) = serde_json::to_string(&update) {
+ let _ = s.tx.send(json);
+ }
+ s.latest_update = Some(update);
s.edge_vitals = Some(vitals);
continue;
}
From c2e564a9f480fa08431f44d40e46ecc9b474ec44 Mon Sep 17 00:00:00 2001
From: ruv
Date: Fri, 27 Mar 2026 17:40:39 -0400
Subject: [PATCH 07/12] docs(readme): expand alpha notice with known
limitations
List specific known issues (multi-node detection, training plateau,
no pre-trained weights, hardware compatibility) to set expectations
for new users.
Co-Authored-By: claude-flow
---
README.md | 9 +++++++++
1 file changed, 9 insertions(+)
diff --git a/README.md b/README.md
index b17ca3d12..6f635b1b3 100644
--- a/README.md
+++ b/README.md
@@ -6,6 +6,15 @@
+> **Alpha Software** — This project is under active development. APIs, firmware behavior, and documentation may change. Known limitations:
+> - Multi-node person counting may show identical output regardless of the number of people (#249)
+> - Training pipeline on MM-Fi dataset may plateau at low PCK (#318) — hyperparameter tuning in progress
+> - No pre-trained model weights are provided; training from scratch is required
+> - ESP32-C3 and original ESP32 are not supported (single-core, insufficient for CSI DSP)
+> - Single ESP32 deployments have limited spatial resolution
+>
+> Contributions and bug reports welcome at [Issues](https://github.com/ruvnet/RuView/issues).
+
## **See through walls with WiFi + Ai** ##
**Perceive the world through signals.** No cameras. No wearables. No Internet. Just physics.
From 23dedecf0c5a2221f1496cdd432fba0f52076ec3 Mon Sep 17 00:00:00 2001
From: ruv
Date: Fri, 27 Mar 2026 17:45:23 -0400
Subject: [PATCH 08/12] docs(adr): ADR-068 per-node state pipeline for
multi-node sensing (#249)
Documents the architectural change from single shared state to per-node
HashMap in the sensing server. Includes scaling analysis
(256 nodes < 13 MB), QEMU validation plan, and aggregation strategy.
Also links README hero image to the explainer video.
Co-Authored-By: claude-flow
---
README.md | 2 +-
docs/adr/ADR-068-per-node-state-pipeline.md | 182 ++++++++++++++++++++
2 files changed, 183 insertions(+), 1 deletion(-)
create mode 100644 docs/adr/ADR-068-per-node-state-pipeline.md
diff --git a/README.md b/README.md
index 6f635b1b3..de0546e09 100644
--- a/README.md
+++ b/README.md
@@ -1,7 +1,7 @@
# π RuView
diff --git a/docs/adr/ADR-068-per-node-state-pipeline.md b/docs/adr/ADR-068-per-node-state-pipeline.md
new file mode 100644
index 000000000..4438b714a
--- /dev/null
+++ b/docs/adr/ADR-068-per-node-state-pipeline.md
@@ -0,0 +1,182 @@
+# ADR-068: Per-Node State Pipeline for Multi-Node Sensing
+
+| Field | Value |
+|------------|-------------------------------------|
+| Status | Accepted |
+| Date | 2026-03-27 |
+| Authors | rUv, claude-flow |
+| Drivers | #249, #237, #276, #282 |
+| Supersedes | — |
+
+## Context
+
+The sensing server (`wifi-densepose-sensing-server`) was originally designed for
+single-node operation. When multiple ESP32 nodes send CSI frames simultaneously,
+all data is mixed into a single shared pipeline:
+
+- **One** `frame_history` VecDeque for all nodes
+- **One** `smoothed_person_score` / `smoothed_motion` / vital sign buffers
+- **One** baseline and debounce state
+
+This means the classification, person count, and vital signs reported to the UI
+are an uncontrolled aggregate of all nodes' data. The result: the detection
+window shows identical output regardless of how many nodes are deployed, where
+people stand, or how many people are in the room (#249 — 24 comments, the most
+reported issue).
+
+### Root Cause Verified
+
+Investigation of `AppStateInner` (main.rs lines 279-367) confirmed:
+
+| Shared field | Impact |
+|---------------------------|--------------------------------------------|
+| `frame_history` | Temporal analysis mixes all nodes' CSI data |
+| `smoothed_person_score` | Person count aggregates all nodes |
+| `smoothed_motion` | Motion classification undifferentiated |
+| `smoothed_hr` / `br` | Vital signs are global, not per-node |
+| `baseline_motion` | Adaptive baseline learned from mixed data |
+| `debounce_counter` | All nodes share debounce state |
+
+## Decision
+
+Introduce **per-node state tracking** via a `HashMap` in
+`AppStateInner`. Each ESP32 node (identified by its `node_id` byte) gets an
+independent sensing pipeline with its own temporal history, smoothing buffers,
+baseline, and classification state.
+
+### Architecture
+
+```
+ ┌─────────────────────────────────────────┐
+ UDP frames │ AppStateInner │
+ ───────────► │ │
+ node_id=1 ──► │ node_states: HashMap │
+ node_id=2 ──► │ ├── 1: NodeState { frame_history, │
+ node_id=3 ──► │ │ smoothed_motion, vitals, ... }│
+ │ ├── 2: NodeState { ... } │
+ │ └── 3: NodeState { ... } │
+ │ │
+ │ ┌── Per-Node Pipeline ──┐ │
+ │ │ extract_features() │ │
+ │ │ smooth_and_classify() │ │
+ │ │ smooth_vitals() │ │
+ │ │ score_to_person_count()│ │
+ │ └────────────────────────┘ │
+ │ │
+ │ ┌── Multi-Node Fusion ──┐ │
+ │ │ Aggregate person count │ │
+ │ │ Per-node classification│ │
+ │ │ All-nodes WebSocket msg│ │
+ │ └────────────────────────┘ │
+ │ │
+ │ ──► WebSocket broadcast (sensing_update) │
+ └─────────────────────────────────────────┘
+```
+
+### NodeState Struct
+
+```rust
+struct NodeState {
+ frame_history: VecDeque>,
+ smoothed_person_score: f64,
+ prev_person_count: usize,
+ smoothed_motion: f64,
+ current_motion_level: String,
+ debounce_counter: u32,
+ debounce_candidate: String,
+ baseline_motion: f64,
+ baseline_frames: u64,
+ smoothed_hr: f64,
+ smoothed_br: f64,
+ smoothed_hr_conf: f64,
+ smoothed_br_conf: f64,
+ hr_buffer: VecDeque,
+ br_buffer: VecDeque,
+ rssi_history: VecDeque,
+ vital_detector: VitalSignDetector,
+ latest_vitals: VitalSigns,
+ last_frame_time: Option,
+ edge_vitals: Option,
+}
+```
+
+### Multi-Node Aggregation
+
+- **Person count**: Sum of per-node `prev_person_count` for active nodes
+ (seen within last 10 seconds).
+- **Classification**: Per-node classification included in `SensingUpdate.nodes`.
+- **Vital signs**: Per-node vital signs; UI can render per-node or aggregate.
+- **Signal field**: Generated from the most-recently-updated node's features.
+- **Stale nodes**: Nodes with no frame for >10 seconds are excluded from
+ aggregation and marked offline (consistent with PR #300).
+
+### Backward Compatibility
+
+- The simulated data path (`simulated_data_task`) continues using global state.
+- Single-node deployments behave identically (HashMap has one entry).
+- The WebSocket message format (`sensing_update`) remains the same but the
+ `nodes` array now contains all active nodes, and `estimated_persons` reflects
+ the cross-node aggregate.
+- The edge vitals path (#323 fix) also uses per-node state.
+
+## Scaling Characteristics
+
+| Nodes | Per-Node Memory | Total Overhead | Notes |
+|-------|----------------|----------------|-------|
+| 1 | ~50 KB | ~50 KB | Identical to current |
+| 3 | ~50 KB | ~150 KB | Typical home setup |
+| 10 | ~50 KB | ~500 KB | Small office |
+| 50 | ~50 KB | ~2.5 MB | Building floor |
+| 100 | ~50 KB | ~5 MB | Large deployment |
+| 256 | ~50 KB | ~12.8 MB | Max (u8 node_id) |
+
+Memory is dominated by `frame_history` (100 frames x ~500 bytes each = ~50 KB
+per node). This scales linearly and fits comfortably in server memory even at
+256 nodes.
+
+## QEMU Validation
+
+The existing QEMU swarm infrastructure (ADR-062, `scripts/qemu_swarm.py`)
+supports multi-node simulation with configurable topologies:
+
+- `star`: Central coordinator + sensor nodes
+- `mesh`: Fully connected peer network
+- `line`: Sequential chain
+- `ring`: Circular topology
+
+Each QEMU instance runs with a unique `node_id` via NVS provisioning. The
+swarm health validator (`scripts/swarm_health.py`) checks per-node UART output.
+
+Validation plan:
+1. QEMU swarm with 3-5 nodes in mesh topology
+2. Verify server produces distinct per-node classifications
+3. Verify aggregate person count reflects multi-node contributions
+4. Verify stale-node eviction after timeout
+
+## Consequences
+
+### Positive
+- Each node's CSI data is processed independently — no cross-contamination
+- Person count scales with the number of deployed nodes
+- Vital signs are per-node, enabling room-level health monitoring
+- Foundation for spatial localization (per-node positions + triangulation)
+- Scales to 256 nodes with <13 MB memory overhead
+
+### Negative
+- Slightly more memory per node (~50 KB each)
+- `smooth_and_classify_node` function duplicates some logic from global version
+- Per-node `VitalSignDetector` instances add CPU cost proportional to node count
+
+### Risks
+- Node ID collisions (mitigated by NVS persistence since v0.5.0)
+- HashMap growth without cleanup (mitigated by stale-node eviction)
+
+## References
+
+- Issue #249: Detection window same regardless (24 comments)
+- Issue #237: Same display for 0/1/2 people (12 comments)
+- Issue #276: Only one can be detected (8 comments)
+- Issue #282: Detection fail (5 comments)
+- PR #295: Hysteresis smoothing (partial mitigation)
+- PR #300: ESP32 offline detection after 5s
+- ADR-062: QEMU Swarm Configurator
From 3c02f6cfb05391fd1d54648aad921c8ea6c3fb55 Mon Sep 17 00:00:00 2001
From: rUv
Date: Fri, 27 Mar 2026 17:52:51 -0400
Subject: [PATCH 09/12] feat(server): per-node state pipeline for multi-node
sensing (#249)
* docs(adr): ADR-068 per-node state pipeline for multi-node sensing (#249)
Documents the architectural change from single shared state to per-node
HashMap in the sensing server. Includes scaling analysis
(256 nodes < 13 MB), QEMU validation plan, and aggregation strategy.
Also links README hero image to the explainer video.
Co-Authored-By: claude-flow
* feat(server): per-node state pipeline for multi-node sensing (ADR-068, #249)
Replaces the single shared state pipeline with per-node HashMap.
Each ESP32 node now gets independent:
- frame_history (temporal analysis)
- smoothed_person_score / prev_person_count
- smoothed_motion / baseline / debounce state
- vital sign detector + smoothing buffers
- RSSI history
Multi-node aggregation:
- Person count = sum of per-node counts for active nodes (seen <10s)
- SensingUpdate.nodes includes all active nodes
- estimated_persons reflects cross-node aggregate
Single-node deployments behave identically (HashMap has one entry).
Simulated data path unchanged for backward compatibility.
Closes #249
Refs #237, #276, #282
Co-Authored-By: claude-flow
---
.../wifi-densepose-sensing-server/src/main.rs | 326 +++++++++++++++---
1 file changed, 279 insertions(+), 47 deletions(-)
diff --git a/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/main.rs b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/main.rs
index 1ae12c87c..b0c16803a 100644
--- a/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/main.rs
+++ b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/main.rs
@@ -16,7 +16,7 @@ mod vital_signs;
// Training pipeline modules (exposed via lib.rs)
use wifi_densepose_sensing_server::{graph_transformer, trainer, dataset, embedding};
-use std::collections::VecDeque;
+use std::collections::{HashMap, VecDeque};
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
@@ -275,6 +275,59 @@ struct BoundingBox {
height: f64,
}
+/// Per-node sensing state for multi-node deployments (issue #249).
+/// Each ESP32 node gets its own frame history, smoothing buffers, and vital
+/// sign detector so that data from different nodes is never mixed.
+struct NodeState {
+ frame_history: VecDeque>,
+ smoothed_person_score: f64,
+ prev_person_count: usize,
+ smoothed_motion: f64,
+ current_motion_level: String,
+ debounce_counter: u32,
+ debounce_candidate: String,
+ baseline_motion: f64,
+ baseline_frames: u64,
+ smoothed_hr: f64,
+ smoothed_br: f64,
+ smoothed_hr_conf: f64,
+ smoothed_br_conf: f64,
+ hr_buffer: VecDeque,
+ br_buffer: VecDeque,
+ rssi_history: VecDeque,
+ vital_detector: VitalSignDetector,
+ latest_vitals: VitalSigns,
+ last_frame_time: Option,
+ edge_vitals: Option,
+}
+
+impl NodeState {
+ fn new() -> Self {
+ Self {
+ frame_history: VecDeque::new(),
+ smoothed_person_score: 0.0,
+ prev_person_count: 0,
+ smoothed_motion: 0.0,
+ current_motion_level: "absent".to_string(),
+ debounce_counter: 0,
+ debounce_candidate: "absent".to_string(),
+ baseline_motion: 0.0,
+ baseline_frames: 0,
+ smoothed_hr: 0.0,
+ smoothed_br: 0.0,
+ smoothed_hr_conf: 0.0,
+ smoothed_br_conf: 0.0,
+ hr_buffer: VecDeque::with_capacity(8),
+ br_buffer: VecDeque::with_capacity(8),
+ rssi_history: VecDeque::new(),
+ vital_detector: VitalSignDetector::new(10.0),
+ latest_vitals: VitalSigns::default(),
+ last_frame_time: None,
+ edge_vitals: None,
+ }
+ }
+}
+
/// Shared application state
struct AppStateInner {
latest_update: Option,
@@ -364,6 +417,10 @@ struct AppStateInner {
// ── Adaptive classifier (environment-tuned) ──────────────────────────
/// Trained adaptive model (loaded from data/adaptive_model.json or trained at runtime).
adaptive_model: Option,
+ // ── Per-node state (issue #249) ─────────────────────────────────────
+ /// Per-node sensing state for multi-node deployments.
+ /// Keyed by `node_id` from the ESP32 frame header.
+ node_states: HashMap,
}
/// If no ESP32 frame arrives within this duration, source reverts to offline.
@@ -964,6 +1021,44 @@ fn smooth_and_classify(state: &mut AppStateInner, raw: &mut ClassificationInfo,
raw.confidence = (0.4 + sm * 0.6).clamp(0.0, 1.0);
}
+/// Per-node variant of `smooth_and_classify` that operates on a `NodeState`
+/// instead of `AppStateInner` (issue #249).
+fn smooth_and_classify_node(ns: &mut NodeState, raw: &mut ClassificationInfo, raw_motion: f64) {
+ ns.baseline_frames += 1;
+ if ns.baseline_frames < BASELINE_WARMUP {
+ ns.baseline_motion = ns.baseline_motion * 0.9 + raw_motion * 0.1;
+ } else if raw_motion < ns.smoothed_motion + 0.05 {
+ ns.baseline_motion = ns.baseline_motion * (1.0 - BASELINE_EMA_ALPHA)
+ + raw_motion * BASELINE_EMA_ALPHA;
+ }
+
+ let adjusted = (raw_motion - ns.baseline_motion * 0.7).max(0.0);
+
+ ns.smoothed_motion = ns.smoothed_motion * (1.0 - MOTION_EMA_ALPHA)
+ + adjusted * MOTION_EMA_ALPHA;
+ let sm = ns.smoothed_motion;
+
+ let candidate = raw_classify(sm);
+
+ if candidate == ns.current_motion_level {
+ ns.debounce_counter = 0;
+ ns.debounce_candidate = candidate;
+ } else if candidate == ns.debounce_candidate {
+ ns.debounce_counter += 1;
+ if ns.debounce_counter >= DEBOUNCE_FRAMES {
+ ns.current_motion_level = candidate;
+ ns.debounce_counter = 0;
+ }
+ } else {
+ ns.debounce_candidate = candidate;
+ ns.debounce_counter = 1;
+ }
+
+ raw.motion_level = ns.current_motion_level.clone();
+ raw.presence = sm > 0.03;
+ raw.confidence = (0.4 + sm * 0.6).clamp(0.0, 1.0);
+}
+
/// If an adaptive model is loaded, override the classification with the
/// model's prediction. Uses the full 15-feature vector for higher accuracy.
fn adaptive_override(state: &AppStateInner, features: &FeatureInfo, classification: &mut ClassificationInfo) {
@@ -1064,6 +1159,55 @@ fn smooth_vitals(state: &mut AppStateInner, raw: &VitalSigns) -> VitalSigns {
}
}
+/// Per-node variant of `smooth_vitals` that operates on a `NodeState` (issue #249).
+fn smooth_vitals_node(ns: &mut NodeState, raw: &VitalSigns) -> VitalSigns {
+ let raw_hr = raw.heart_rate_bpm.unwrap_or(0.0);
+ let raw_br = raw.breathing_rate_bpm.unwrap_or(0.0);
+
+ let hr_ok = ns.smoothed_hr < 1.0 || (raw_hr - ns.smoothed_hr).abs() < HR_MAX_JUMP;
+ let br_ok = ns.smoothed_br < 1.0 || (raw_br - ns.smoothed_br).abs() < BR_MAX_JUMP;
+
+ if hr_ok && raw_hr > 0.0 {
+ ns.hr_buffer.push_back(raw_hr);
+ if ns.hr_buffer.len() > VITAL_MEDIAN_WINDOW { ns.hr_buffer.pop_front(); }
+ }
+ if br_ok && raw_br > 0.0 {
+ ns.br_buffer.push_back(raw_br);
+ if ns.br_buffer.len() > VITAL_MEDIAN_WINDOW { ns.br_buffer.pop_front(); }
+ }
+
+ let trimmed_hr = trimmed_mean(&ns.hr_buffer);
+ let trimmed_br = trimmed_mean(&ns.br_buffer);
+
+ if trimmed_hr > 0.0 {
+ if ns.smoothed_hr < 1.0 {
+ ns.smoothed_hr = trimmed_hr;
+ } else if (trimmed_hr - ns.smoothed_hr).abs() > HR_DEAD_BAND {
+ ns.smoothed_hr = ns.smoothed_hr * (1.0 - VITAL_EMA_ALPHA)
+ + trimmed_hr * VITAL_EMA_ALPHA;
+ }
+ }
+ if trimmed_br > 0.0 {
+ if ns.smoothed_br < 1.0 {
+ ns.smoothed_br = trimmed_br;
+ } else if (trimmed_br - ns.smoothed_br).abs() > BR_DEAD_BAND {
+ ns.smoothed_br = ns.smoothed_br * (1.0 - VITAL_EMA_ALPHA)
+ + trimmed_br * VITAL_EMA_ALPHA;
+ }
+ }
+
+ ns.smoothed_hr_conf = ns.smoothed_hr_conf * 0.92 + raw.heartbeat_confidence * 0.08;
+ ns.smoothed_br_conf = ns.smoothed_br_conf * 0.92 + raw.breathing_confidence * 0.08;
+
+ VitalSigns {
+ breathing_rate_bpm: if ns.smoothed_br > 1.0 { Some(ns.smoothed_br) } else { None },
+ heart_rate_bpm: if ns.smoothed_hr > 1.0 { Some(ns.smoothed_hr) } else { None },
+ breathing_confidence: ns.smoothed_br_conf,
+ heartbeat_confidence: ns.smoothed_hr_conf,
+ signal_quality: raw.signal_quality,
+ }
+}
+
/// Trimmed mean: sort, drop top/bottom 25%, average the middle 50%.
/// More robust than median (uses more data) and less noisy than raw mean.
fn trimmed_mean(buf: &VecDeque) -> f64 {
@@ -2827,6 +2971,23 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
// "no detection" because it only renders sensing_update msgs.
s.source = "esp32".to_string();
s.last_esp32_frame = Some(std::time::Instant::now());
+
+ // ── Per-node state for edge vitals (issue #249) ──────
+ let node_id = vitals.node_id;
+ let ns = s.node_states.entry(node_id).or_insert_with(NodeState::new);
+ ns.last_frame_time = Some(std::time::Instant::now());
+ ns.edge_vitals = Some(vitals.clone());
+ ns.rssi_history.push_back(vitals.rssi as f64);
+ if ns.rssi_history.len() > 60 { ns.rssi_history.pop_front(); }
+
+ // Store per-node person count from edge vitals.
+ let node_est = if vitals.presence {
+ (vitals.n_persons as usize).max(1)
+ } else {
+ 0
+ };
+ ns.prev_person_count = node_est;
+
s.tick += 1;
let tick = s.tick;
@@ -2836,11 +2997,25 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
let motion_score = if vitals.motion { 0.8 }
else if vitals.presence { 0.3 }
else { 0.05 };
- let est_persons = if vitals.presence {
- (vitals.n_persons as usize).max(1)
- } else {
- 0
- };
+
+ // Aggregate person count across all active nodes.
+ let now = std::time::Instant::now();
+ let total_persons: usize = s.node_states.values()
+ .filter(|n| n.last_frame_time.map_or(false, |t| now.duration_since(t).as_secs() < 10))
+ .map(|n| n.prev_person_count)
+ .sum();
+
+ // Build nodes array with all active nodes.
+ let active_nodes: Vec = s.node_states.iter()
+ .filter(|(_, n)| n.last_frame_time.map_or(false, |t| now.duration_since(t).as_secs() < 10))
+ .map(|(&id, n)| NodeInfo {
+ node_id: id,
+ rssi_dbm: n.rssi_history.back().copied().unwrap_or(0.0),
+ position: [2.0, 0.0, 1.5],
+ amplitude: vec![],
+ subcarrier_count: 0,
+ })
+ .collect();
let features = FeatureInfo {
mean_rssi: vitals.rssi as f64,
@@ -2866,13 +3041,7 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
timestamp: chrono::Utc::now().timestamp_millis() as f64 / 1000.0,
source: "esp32".to_string(),
tick,
- nodes: vec![NodeInfo {
- node_id: vitals.node_id,
- rssi_dbm: vitals.rssi as f64,
- position: [2.0, 0.0, 1.5],
- amplitude: vec![],
- subcarrier_count: 0,
- }],
+ nodes: active_nodes,
features: features.clone(),
classification,
signal_field,
@@ -2892,7 +3061,7 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
pose_keypoints: None,
model_status: None,
persons: None,
- estimated_persons: if est_persons > 0 { Some(est_persons) } else { None },
+ estimated_persons: if total_persons > 0 { Some(total_persons) } else { None },
};
let persons = derive_pose_from_sensing(&update);
@@ -2935,24 +3104,90 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
s.source = "esp32".to_string();
s.last_esp32_frame = Some(std::time::Instant::now());
- // Append current amplitudes to history before extracting features so
- // that temporal analysis includes the most recent frame.
+ // Also maintain global frame_history for backward compat
+ // (simulation path, REST endpoints, etc.).
s.frame_history.push_back(frame.amplitudes.clone());
if s.frame_history.len() > FRAME_HISTORY_CAPACITY {
s.frame_history.pop_front();
}
- let sample_rate_hz = 1000.0 / 500.0_f64; // default tick; ESP32 frames arrive as fast as they come
+ // ── Per-node processing (issue #249) ──────────────────
+ // Process entirely within per-node state so different
+ // ESP32 nodes never mix their smoothing/vitals buffers.
+ // We scope the mutable borrow of node_states so we can
+ // access other AppStateInner fields afterward.
+ let node_id = frame.node_id;
+ let adaptive_model_ref = s.adaptive_model.as_ref().map(|m| m as *const _);
+ let ns = s.node_states.entry(node_id).or_insert_with(NodeState::new);
+ ns.last_frame_time = Some(std::time::Instant::now());
+
+ ns.frame_history.push_back(frame.amplitudes.clone());
+ if ns.frame_history.len() > FRAME_HISTORY_CAPACITY {
+ ns.frame_history.pop_front();
+ }
+
+ let sample_rate_hz = 1000.0 / 500.0_f64;
let (features, mut classification, breathing_rate_hz, sub_variances, raw_motion) =
- extract_features_from_frame(&frame, &s.frame_history, sample_rate_hz);
- smooth_and_classify(&mut s, &mut classification, raw_motion);
- adaptive_override(&s, &features, &mut classification);
+ extract_features_from_frame(&frame, &ns.frame_history, sample_rate_hz);
+ smooth_and_classify_node(ns, &mut classification, raw_motion);
+
+ // SAFETY: adaptive_model_ref points into s which we hold
+ // via write lock; the model is not mutated here. We use a
+ // raw pointer to break the borrow-checker deadlock between
+ // node_states and adaptive_model (both inside s).
+ if let Some(model_ptr) = adaptive_model_ref {
+ let model: &adaptive_classifier::AdaptiveModel = unsafe { &*model_ptr };
+ let amps = ns.frame_history.back()
+ .map(|v| v.as_slice())
+ .unwrap_or(&[]);
+ let feat_arr = adaptive_classifier::features_from_runtime(
+ &serde_json::json!({
+ "variance": features.variance,
+ "motion_band_power": features.motion_band_power,
+ "breathing_band_power": features.breathing_band_power,
+ "spectral_power": features.spectral_power,
+ "dominant_freq_hz": features.dominant_freq_hz,
+ "change_points": features.change_points,
+ "mean_rssi": features.mean_rssi,
+ }),
+ amps,
+ );
+ let (label, conf) = model.classify(&feat_arr);
+ classification.motion_level = label.to_string();
+ classification.presence = label != "absent";
+ classification.confidence = (conf * 0.7 + classification.confidence * 0.3).clamp(0.0, 1.0);
+ }
+
+ ns.rssi_history.push_back(features.mean_rssi);
+ if ns.rssi_history.len() > 60 {
+ ns.rssi_history.pop_front();
+ }
+
+ let raw_vitals = ns.vital_detector.process_frame(
+ &frame.amplitudes,
+ &frame.phases,
+ );
+ let vitals = smooth_vitals_node(ns, &raw_vitals);
+ ns.latest_vitals = vitals.clone();
+
+ let raw_score = compute_person_score(&features);
+ ns.smoothed_person_score = ns.smoothed_person_score * 0.90 + raw_score * 0.10;
+ if classification.presence {
+ let count = score_to_person_count(ns.smoothed_person_score, ns.prev_person_count);
+ ns.prev_person_count = count;
+ } else {
+ ns.prev_person_count = 0;
+ }
+
+ // Done with per-node mutable borrow; now read aggregated
+ // state from all nodes (the borrow of `ns` ends here).
+ // (We re-borrow node_states immutably via `s` below.)
- // Update RSSI history
s.rssi_history.push_back(features.mean_rssi);
if s.rssi_history.len() > 60 {
s.rssi_history.pop_front();
}
+ s.latest_vitals = vitals.clone();
s.tick += 1;
let tick = s.tick;
@@ -2961,37 +3196,33 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
else if classification.motion_level == "present_still" { 0.3 }
else { 0.05 };
- let raw_vitals = s.vital_detector.process_frame(
- &frame.amplitudes,
- &frame.phases,
- );
- let vitals = smooth_vitals(&mut s, &raw_vitals);
- s.latest_vitals = vitals.clone();
-
- // Multi-person estimation with temporal smoothing (EMA α=0.10).
- let raw_score = compute_person_score(&features);
- s.smoothed_person_score = s.smoothed_person_score * 0.90 + raw_score * 0.10;
- let est_persons = if classification.presence {
- let count = score_to_person_count(s.smoothed_person_score, s.prev_person_count);
- s.prev_person_count = count;
- count
- } else {
- s.prev_person_count = 0;
- 0
- };
+ // Aggregate person count across all active nodes.
+ let now = std::time::Instant::now();
+ let total_persons: usize = s.node_states.values()
+ .filter(|n| n.last_frame_time.map_or(false, |t| now.duration_since(t).as_secs() < 10))
+ .map(|n| n.prev_person_count)
+ .sum();
+
+ // Build nodes array with all active nodes.
+ let active_nodes: Vec = s.node_states.iter()
+ .filter(|(_, n)| n.last_frame_time.map_or(false, |t| now.duration_since(t).as_secs() < 10))
+ .map(|(&id, n)| NodeInfo {
+ node_id: id,
+ rssi_dbm: n.rssi_history.back().copied().unwrap_or(0.0),
+ position: [2.0, 0.0, 1.5],
+ amplitude: n.frame_history.back()
+ .map(|a| a.iter().take(56).cloned().collect())
+ .unwrap_or_default(),
+ subcarrier_count: n.frame_history.back().map_or(0, |a| a.len()),
+ })
+ .collect();
let mut update = SensingUpdate {
msg_type: "sensing_update".to_string(),
timestamp: chrono::Utc::now().timestamp_millis() as f64 / 1000.0,
source: "esp32".to_string(),
tick,
- nodes: vec![NodeInfo {
- node_id: frame.node_id,
- rssi_dbm: features.mean_rssi,
- position: [2.0, 0.0, 1.5],
- amplitude: frame.amplitudes.iter().take(56).cloned().collect(),
- subcarrier_count: frame.n_subcarriers as usize,
- }],
+ nodes: active_nodes,
features: features.clone(),
classification,
signal_field: generate_signal_field(
@@ -3008,7 +3239,7 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
pose_keypoints: None,
model_status: None,
persons: None,
- estimated_persons: if est_persons > 0 { Some(est_persons) } else { None },
+ estimated_persons: if total_persons > 0 { Some(total_persons) } else { None },
};
let persons = derive_pose_from_sensing(&update);
@@ -3760,6 +3991,7 @@ async fn main() {
m.trained_frames, m.training_accuracy * 100.0);
m
}),
+ node_states: HashMap::new(),
}));
// Start background tasks based on source
From d88994816f7bcec69f687405cab57605983b6e73 Mon Sep 17 00:00:00 2001
From: Taylor Dawson
Date: Fri, 27 Mar 2026 21:21:15 -0700
Subject: [PATCH 10/12] feat: dynamic classifier classes, per-node UI, XSS fix,
RSSI fix
Complements #326 (per-node state pipeline) with additional features:
- Dynamic adaptive classifier: discover activity classes from training
data filenames instead of hardcoded array. Users add classes via
filename convention (train__.jsonl), no code changes.
- Per-node UI cards: SensingTab shows individual node status with
color-coded markers, RSSI, variance, and classification per node.
- Colored node markers in 3D gaussian splat view (8-color palette).
- Per-node RSSI history tracking in sensing service.
- XSS fix: UI uses createElement/textContent instead of innerHTML.
- RSSI sign fix: ensure dBm values are always negative.
- GET /api/v1/nodes endpoint for per-node health monitoring.
- node_features field in WebSocket SensingUpdate messages.
- Firmware watchdog fix: yield after every frame to prevent IDLE1 starvation.
Addresses #237, #276, #282
Co-Authored-By: Claude Opus 4.6 (1M context)
---
.../esp32-csi-node/main/edge_processing.c | 37 +---
.../src/adaptive_classifier.rs | 182 ++++++++++++------
.../wifi-densepose-sensing-server/src/main.rs | 54 +++++-
ui/components/SensingTab.js | 67 ++++++-
ui/components/gaussian-splats.js | 42 +++-
ui/services/sensing.service.js | 19 ++
6 files changed, 310 insertions(+), 91 deletions(-)
diff --git a/firmware/esp32-csi-node/main/edge_processing.c b/firmware/esp32-csi-node/main/edge_processing.c
index 1cd74a65d..0911f3831 100644
--- a/firmware/esp32-csi-node/main/edge_processing.c
+++ b/firmware/esp32-csi-node/main/edge_processing.c
@@ -41,14 +41,12 @@ static const char *TAG = "edge_proc";
* ====================================================================== */
static edge_ring_buf_t s_ring;
-static uint32_t s_ring_drops; /* Frames dropped due to full ring buffer. */
static inline bool ring_push(const uint8_t *iq, uint16_t len,
int8_t rssi, uint8_t channel)
{
uint32_t next = (s_ring.head + 1) % EDGE_RING_SLOTS;
if (next == s_ring.tail) {
- s_ring_drops++;
return false; /* Full — drop frame. */
}
@@ -790,13 +788,12 @@ static void process_frame(const edge_ring_slot_t *slot)
if ((s_frame_count % 200) == 0) {
ESP_LOGI(TAG, "Vitals: br=%.1f hr=%.1f motion=%.4f pres=%s "
- "fall=%s persons=%u frames=%lu drops=%lu",
+ "fall=%s persons=%u frames=%lu",
s_breathing_bpm, s_heartrate_bpm, s_motion_energy,
s_presence_detected ? "YES" : "no",
s_fall_detected ? "YES" : "no",
(unsigned)s_latest_pkt.n_persons,
- (unsigned long)s_frame_count,
- (unsigned long)s_ring_drops);
+ (unsigned long)s_frame_count);
}
}
@@ -834,32 +831,18 @@ static void edge_task(void *arg)
edge_ring_slot_t slot;
- /* Maximum frames to process before a longer yield. On busy LANs
- * (corporate networks, many APs), the ring buffer fills continuously.
- * Without a batch limit the task processes frames back-to-back with
- * only 1-tick yields, which on high frame rates can still starve
- * IDLE1 enough to trip the 5-second task watchdog. See #266, #321. */
- const uint8_t BATCH_LIMIT = 4;
-
while (1) {
- uint8_t processed = 0;
-
- while (processed < BATCH_LIMIT && ring_pop(&slot)) {
+ if (ring_pop(&slot)) {
process_frame(&slot);
- processed++;
- /* 1-tick yield between frames within a batch. */
+ /* Yield after every frame to feed the Core 1 watchdog.
+ * process_frame() is CPU-intensive (biquad filters, Welford stats,
+ * BPM estimation, multi-person vitals) and can take several ms.
+ * Without this yield, edge_dsp at priority 5 starves IDLE1 at
+ * priority 0, triggering the task watchdog. See issue #266. */
vTaskDelay(1);
- }
-
- if (processed > 0) {
- /* Post-batch yield: 2 ticks (~20 ms at 100 Hz) so IDLE1 can
- * run and feed the Core 1 watchdog even under sustained load.
- * This is intentionally longer than the 1-tick inter-frame yield. */
- vTaskDelay(2);
} else {
- /* No frames available — sleep one full tick.
- * NOTE: pdMS_TO_TICKS(5) == 0 at 100 Hz, which would busy-spin. */
- vTaskDelay(1);
+ /* No frames available — yield briefly. */
+ vTaskDelay(pdMS_TO_TICKS(1));
}
}
}
diff --git a/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/adaptive_classifier.rs b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/adaptive_classifier.rs
index 80d2364d5..b89cb58cf 100644
--- a/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/adaptive_classifier.rs
+++ b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/adaptive_classifier.rs
@@ -10,6 +10,10 @@
//!
//! The trained model is serialised as JSON and hot-loaded at runtime so that
//! the classification thresholds adapt to the specific room and ESP32 placement.
+//!
+//! Classes are discovered dynamically from training data filenames instead of
+//! being hardcoded, so new activity classes can be added just by recording data
+//! with the appropriate filename convention.
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
@@ -20,9 +24,8 @@ use std::path::{Path, PathBuf};
/// Extended feature vector: 7 server features + 8 subcarrier-derived features = 15.
const N_FEATURES: usize = 15;
-/// Activity classes we recognise.
-pub const CLASSES: &[&str] = &["absent", "present_still", "present_moving", "active"];
-const N_CLASSES: usize = 4;
+/// Default class names for backward compatibility with old saved models.
+const DEFAULT_CLASSES: &[&str] = &["absent", "present_still", "present_moving", "active"];
/// Extract extended feature vector from a JSONL frame (features + raw amplitudes).
pub fn features_from_frame(frame: &serde_json::Value) -> [f64; N_FEATURES] {
@@ -124,8 +127,9 @@ pub struct ClassStats {
pub struct AdaptiveModel {
/// Per-class feature statistics (centroid + spread).
pub class_stats: Vec,
- /// Logistic regression weights: [N_CLASSES x (N_FEATURES + 1)] (last = bias).
- pub weights: Vec<[f64; N_FEATURES + 1]>,
+ /// Logistic regression weights: [n_classes x (N_FEATURES + 1)] (last = bias).
+ /// Dynamic: the outer Vec length equals the number of discovered classes.
+ pub weights: Vec>,
/// Global feature normalisation: mean and stddev across all training data.
pub global_mean: [f64; N_FEATURES],
pub global_std: [f64; N_FEATURES],
@@ -133,27 +137,38 @@ pub struct AdaptiveModel {
pub trained_frames: usize,
pub training_accuracy: f64,
pub version: u32,
+ /// Dynamically discovered class names (in index order).
+ #[serde(default = "default_class_names")]
+ pub class_names: Vec,
+}
+
+/// Backward-compatible fallback for models saved without class_names.
+fn default_class_names() -> Vec {
+ DEFAULT_CLASSES.iter().map(|s| s.to_string()).collect()
}
impl Default for AdaptiveModel {
fn default() -> Self {
+ let n_classes = DEFAULT_CLASSES.len();
Self {
class_stats: Vec::new(),
- weights: vec![[0.0; N_FEATURES + 1]; N_CLASSES],
+ weights: vec![vec![0.0; N_FEATURES + 1]; n_classes],
global_mean: [0.0; N_FEATURES],
global_std: [1.0; N_FEATURES],
trained_frames: 0,
training_accuracy: 0.0,
version: 1,
+ class_names: default_class_names(),
}
}
}
impl AdaptiveModel {
/// Classify a raw feature vector. Returns (class_label, confidence).
- pub fn classify(&self, raw_features: &[f64; N_FEATURES]) -> (&'static str, f64) {
- if self.weights.is_empty() || self.class_stats.is_empty() {
- return ("present_still", 0.5);
+ pub fn classify(&self, raw_features: &[f64; N_FEATURES]) -> (String, f64) {
+ let n_classes = self.weights.len();
+ if n_classes == 0 || self.class_stats.is_empty() {
+ return ("present_still".to_string(), 0.5);
}
// Normalise features.
@@ -163,8 +178,8 @@ impl AdaptiveModel {
}
// Compute logits: w·x + b for each class.
- let mut logits = [0.0f64; N_CLASSES];
- for c in 0..N_CLASSES.min(self.weights.len()) {
+ let mut logits: Vec = vec![0.0; n_classes];
+ for c in 0..n_classes {
let w = &self.weights[c];
let mut z = w[N_FEATURES]; // bias
for i in 0..N_FEATURES {
@@ -176,8 +191,8 @@ impl AdaptiveModel {
// Softmax.
let max_logit = logits.iter().cloned().fold(f64::NEG_INFINITY, f64::max);
let exp_sum: f64 = logits.iter().map(|z| (z - max_logit).exp()).sum();
- let mut probs = [0.0f64; N_CLASSES];
- for c in 0..N_CLASSES {
+ let mut probs: Vec = vec![0.0; n_classes];
+ for c in 0..n_classes {
probs[c] = ((logits[c] - max_logit).exp()) / exp_sum;
}
@@ -185,7 +200,11 @@ impl AdaptiveModel {
let (best_c, best_p) = probs.iter().enumerate()
.max_by(|a, b| a.1.partial_cmp(b.1).unwrap())
.unwrap();
- let label = if best_c < CLASSES.len() { CLASSES[best_c] } else { "present_still" };
+ let label = if best_c < self.class_names.len() {
+ self.class_names[best_c].clone()
+ } else {
+ "present_still".to_string()
+ };
(label, *best_p)
}
@@ -228,48 +247,88 @@ fn load_recording(path: &Path, class_idx: usize) -> Vec {
}).collect()
}
-/// Map a recording filename to a class index.
-fn classify_recording_name(name: &str) -> Option {
+/// Map a recording filename to a class name (String).
+/// Returns the discovered class name for the file, or None if it cannot be determined.
+fn classify_recording_name(name: &str) -> Option {
let lower = name.to_lowercase();
- if lower.contains("empty") || lower.contains("absent") { Some(0) }
- else if lower.contains("still") || lower.contains("sitting") || lower.contains("standing") { Some(1) }
- else if lower.contains("walking") || lower.contains("moving") { Some(2) }
- else if lower.contains("active") || lower.contains("exercise") || lower.contains("running") { Some(3) }
- else { None }
+ // Strip "train_" prefix and ".jsonl" suffix, then extract the class label.
+ // Convention: train__.jsonl
+ // The class is the first segment after "train_" that matches a known pattern,
+ // or the entire middle portion if no pattern matches.
+
+ // Check common patterns first for backward compat
+ if lower.contains("empty") || lower.contains("absent") { return Some("absent".into()); }
+ if lower.contains("still") || lower.contains("sitting") || lower.contains("standing") { return Some("present_still".into()); }
+ if lower.contains("walking") || lower.contains("moving") { return Some("present_moving".into()); }
+ if lower.contains("active") || lower.contains("exercise") || lower.contains("running") { return Some("active".into()); }
+
+ // Fallback: extract class from filename structure train__*.jsonl
+ let stem = lower.trim_start_matches("train_").trim_end_matches(".jsonl");
+ let class_name = stem.split('_').next().unwrap_or(stem);
+ if !class_name.is_empty() {
+ Some(class_name.to_string())
+ } else {
+ None
+ }
}
/// Train a model from labeled JSONL recordings in a directory.
///
-/// Recordings are matched to classes by filename pattern:
-/// - `*empty*` / `*absent*` → absent (0)
-/// - `*still*` / `*sitting*` → present_still (1)
-/// - `*walking*` / `*moving*` → present_moving (2)
-/// - `*active*` / `*exercise*`→ active (3)
+/// Recordings are matched to classes by filename pattern. Classes are discovered
+/// dynamically from the training data filenames:
+/// - `*empty*` / `*absent*` → absent
+/// - `*still*` / `*sitting*` → present_still
+/// - `*walking*` / `*moving*` → present_moving
+/// - `*active*` / `*exercise*`→ active
+/// - Any other `train__*.jsonl` →
pub fn train_from_recordings(recordings_dir: &Path) -> Result {
- // Scan for train_* files.
- let mut samples: Vec = Vec::new();
- let entries = std::fs::read_dir(recordings_dir)
- .map_err(|e| format!("Cannot read {}: {}", recordings_dir.display(), e))?;
-
- for entry in entries.flatten() {
+ // First pass: scan filenames to discover all unique class names.
+ let entries: Vec<_> = std::fs::read_dir(recordings_dir)
+ .map_err(|e| format!("Cannot read {}: {}", recordings_dir.display(), e))?
+ .flatten()
+ .collect();
+
+ let mut class_map: HashMap = HashMap::new();
+ let mut class_names: Vec = Vec::new();
+
+ // Collect (entry, class_name) pairs for files that match.
+ let mut file_classes: Vec<(PathBuf, String, String)> = Vec::new(); // (path, fname, class_name)
+ for entry in &entries {
let fname = entry.file_name().to_string_lossy().to_string();
if !fname.starts_with("train_") || !fname.ends_with(".jsonl") {
continue;
}
- if let Some(class_idx) = classify_recording_name(&fname) {
- let loaded = load_recording(&entry.path(), class_idx);
- eprintln!(" Loaded {}: {} frames → class '{}'",
- fname, loaded.len(), CLASSES[class_idx]);
- samples.extend(loaded);
+ if let Some(class_name) = classify_recording_name(&fname) {
+ if !class_map.contains_key(&class_name) {
+ let idx = class_names.len();
+ class_map.insert(class_name.clone(), idx);
+ class_names.push(class_name.clone());
+ }
+ file_classes.push((entry.path(), fname, class_name));
}
}
+ let n_classes = class_names.len();
+ if n_classes == 0 {
+ return Err("No training samples found. Record data with train_* prefix.".into());
+ }
+
+ // Second pass: load recordings with the discovered class indices.
+ let mut samples: Vec = Vec::new();
+ for (path, fname, class_name) in &file_classes {
+ let class_idx = class_map[class_name];
+ let loaded = load_recording(path, class_idx);
+ eprintln!(" Loaded {}: {} frames → class '{}'",
+ fname, loaded.len(), class_name);
+ samples.extend(loaded);
+ }
+
if samples.is_empty() {
return Err("No training samples found. Record data with train_* prefix.".into());
}
let n = samples.len();
- eprintln!("Total training samples: {n}");
+ eprintln!("Total training samples: {n} across {n_classes} classes: {:?}", class_names);
// ── Compute global normalisation stats ──
let mut global_mean = [0.0f64; N_FEATURES];
@@ -289,9 +348,9 @@ pub fn train_from_recordings(recordings_dir: &Path) -> Result Result Result Result> = vec![vec![0.0f64; N_FEATURES + 1]; n_classes];
let lr = 0.1;
let epochs = 200;
let batch_size = 32;
@@ -348,19 +407,19 @@ pub fn train_from_recordings(recordings_dir: &Path) -> Result> = vec![vec![0.0f64; N_FEATURES + 1]; n_classes];
for (x, target) in batch {
// Forward: softmax.
- let mut logits = [0.0f64; N_CLASSES];
- for c in 0..N_CLASSES {
+ let mut logits: Vec = vec![0.0; n_classes];
+ for c in 0..n_classes {
logits[c] = weights[c][N_FEATURES]; // bias
for i in 0..N_FEATURES {
logits[c] += weights[c][i] * x[i];
@@ -368,8 +427,8 @@ pub fn train_from_recordings(recordings_dir: &Path) -> Result = vec![0.0; n_classes];
+ for c in 0..n_classes {
probs[c] = ((logits[c] - max_l).exp()) / exp_sum;
}
@@ -377,7 +436,7 @@ pub fn train_from_recordings(recordings_dir: &Path) -> Result Result Result = vec![0.0; n_classes];
+ for c in 0..n_classes {
logits[c] = weights[c][N_FEATURES];
for i in 0..N_FEATURES {
logits[c] += weights[c][i] * x[i];
@@ -422,12 +481,12 @@ pub fn train_from_recordings(recordings_dir: &Path) -> Result = vec![0.0; n_classes];
+ for c in 0..n_classes {
logits[c] = weights[c][N_FEATURES];
for i in 0..N_FEATURES {
logits[c] += weights[c][i] * x[i];
@@ -438,9 +497,9 @@ pub fn train_from_recordings(recordings_dir: &Path) -> Result Result,
+ /// Per-node feature breakdown for multi-node deployments.
+ #[serde(skip_serializing_if = "Option::is_none")]
+ node_features: Option>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -328,6 +331,18 @@ impl NodeState {
}
}
+/// Per-node feature info for WebSocket broadcasts (multi-node support).
+#[derive(Debug, Clone, Serialize, Deserialize)]
+struct PerNodeFeatureInfo {
+ node_id: u8,
+ features: FeatureInfo,
+ classification: ClassificationInfo,
+ rssi_dbm: f64,
+ last_seen_ms: u64,
+ frame_rate_hz: f64,
+ stale: bool,
+}
+
/// Shared application state
struct AppStateInner {
latest_update: Option,
@@ -570,7 +585,9 @@ fn parse_esp32_frame(buf: &[u8]) -> Option {
let n_subcarriers = buf[6];
let freq_mhz = u16::from_le_bytes([buf[8], buf[9]]);
let sequence = u32::from_le_bytes([buf[10], buf[11], buf[12], buf[13]]);
- let rssi = buf[14] as i8;
+ let rssi_raw = buf[14] as i8;
+ // Fix RSSI sign: ensure it's always negative (dBm convention).
+ let rssi = if rssi_raw > 0 { rssi_raw.saturating_neg() } else { rssi_raw };
let noise_floor = buf[15] as i8;
let iq_start = 20;
@@ -1455,6 +1472,7 @@ async fn windows_wifi_task(state: SharedState, tick_ms: u64) {
model_status: None,
persons: None,
estimated_persons: if est_persons > 0 { Some(est_persons) } else { None },
+ node_features: None,
};
// Populate persons from the sensing update.
@@ -1588,6 +1606,7 @@ async fn windows_wifi_fallback_tick(state: &SharedState, seq: u32) {
model_status: None,
persons: None,
estimated_persons: if est_persons > 0 { Some(est_persons) } else { None },
+ node_features: None,
};
let persons = derive_pose_from_sensing(&update);
@@ -2907,6 +2926,34 @@ async fn sona_activate(
}
}
+/// GET /api/v1/nodes — per-node health and feature info.
+async fn nodes_endpoint(State(state): State) -> Json {
+ let s = state.read().await;
+ let now = std::time::Instant::now();
+ let nodes: Vec = s.node_states.iter()
+ .map(|(&id, ns)| {
+ let elapsed_ms = ns.last_frame_time
+ .map(|t| now.duration_since(t).as_millis() as u64)
+ .unwrap_or(999999);
+ let stale = elapsed_ms > 5000;
+ let status = if stale { "stale" } else { "active" };
+ let rssi = ns.rssi_history.back().copied().unwrap_or(-90.0);
+ serde_json::json!({
+ "node_id": id,
+ "status": status,
+ "last_seen_ms": elapsed_ms,
+ "rssi_dbm": rssi,
+ "motion_level": &ns.current_motion_level,
+ "person_count": ns.prev_person_count,
+ })
+ })
+ .collect();
+ Json(serde_json::json!({
+ "nodes": nodes,
+ "total": nodes.len(),
+ }))
+}
+
async fn info_page() -> Html {
Html(format!(
"\
@@ -3062,6 +3109,7 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
model_status: None,
persons: None,
estimated_persons: if total_persons > 0 { Some(total_persons) } else { None },
+ node_features: None,
};
let persons = derive_pose_from_sensing(&update);
@@ -3240,6 +3288,7 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
model_status: None,
persons: None,
estimated_persons: if total_persons > 0 { Some(total_persons) } else { None },
+ node_features: None,
};
let persons = derive_pose_from_sensing(&update);
@@ -3358,6 +3407,7 @@ async fn simulated_data_task(state: SharedState, tick_ms: u64) {
},
persons: None,
estimated_persons: if est_persons > 0 { Some(est_persons) } else { None },
+ node_features: None,
};
// Populate persons from the sensing update.
@@ -4045,6 +4095,8 @@ async fn main() {
.route("/api/v1/metrics", get(health_metrics))
// Sensing endpoints
.route("/api/v1/sensing/latest", get(latest))
+ // Per-node health endpoint
+ .route("/api/v1/nodes", get(nodes_endpoint))
// Vital sign endpoints
.route("/api/v1/vital-signs", get(vital_signs_endpoint))
.route("/api/v1/edge-vitals", get(edge_vitals_endpoint))
diff --git a/ui/components/SensingTab.js b/ui/components/SensingTab.js
index 6c3115c12..33387eefe 100644
--- a/ui/components/SensingTab.js
+++ b/ui/components/SensingTab.js
@@ -110,12 +110,18 @@ export class SensingTab {
About This Data
Metrics are computed from WiFi Channel State Information (CSI).
- With 1 ESP32 you get presence detection, breathing
+ With 0 ESP32 node(s) you get presence detection, breathing
estimation, and gross motion. Add 3-4+ ESP32 nodes
around the room for spatial resolution and limb-level tracking.
+
+
+
NODE STATUS
+
+
+
Details
@@ -193,6 +199,9 @@ export class SensingTab {
// Update HUD
this._updateHUD(data);
+
+ // Update per-node panels
+ this._updateNodePanels(data);
}
_onStateChange(state) {
@@ -233,6 +242,11 @@ export class SensingTab {
const f = data.features || {};
const c = data.classification || {};
+ // Node count
+ const nodeCount = (data.nodes || []).length;
+ const countEl = this.container.querySelector('#sensingNodeCount');
+ if (countEl) countEl.textContent = String(nodeCount);
+
// RSSI
this._setText('sensingRssi', `${(f.mean_rssi || -80).toFixed(1)} dBm`);
this._setText('sensingSource', data.source || '');
@@ -309,6 +323,57 @@ export class SensingTab {
ctx.stroke();
}
+ // ---- Per-node panels ---------------------------------------------------
+
+ _updateNodePanels(data) {
+ const container = this.container.querySelector('#nodeStatusContainer');
+ if (!container) return;
+ const nodeFeatures = data.node_features || [];
+ if (nodeFeatures.length === 0) {
+ container.textContent = '';
+ const msg = document.createElement('div');
+ msg.style.cssText = 'color:#888;font-size:12px;padding:8px;';
+ msg.textContent = 'No nodes detected';
+ container.appendChild(msg);
+ return;
+ }
+ const NODE_COLORS = ['#00ccff', '#ff6600', '#00ff88', '#ff00cc', '#ffcc00', '#8800ff', '#00ffcc', '#ff0044'];
+ container.textContent = '';
+ for (const nf of nodeFeatures) {
+ const color = NODE_COLORS[nf.node_id % NODE_COLORS.length];
+ const statusColor = nf.stale ? '#888' : '#0f0';
+
+ const row = document.createElement('div');
+ row.style.cssText = `display:flex;align-items:center;gap:8px;padding:6px 8px;margin-bottom:4px;background:rgba(255,255,255,0.03);border-radius:6px;border-left:3px solid ${color};`;
+
+ const idCol = document.createElement('div');
+ idCol.style.minWidth = '50px';
+ const nameEl = document.createElement('div');
+ nameEl.style.cssText = `font-size:11px;font-weight:600;color:${color};`;
+ nameEl.textContent = 'Node ' + nf.node_id;
+ const statusEl = document.createElement('div');
+ statusEl.style.cssText = `font-size:9px;color:${statusColor};`;
+ statusEl.textContent = nf.stale ? 'STALE' : 'ACTIVE';
+ idCol.appendChild(nameEl);
+ idCol.appendChild(statusEl);
+
+ const metricsCol = document.createElement('div');
+ metricsCol.style.cssText = 'flex:1;font-size:10px;color:#aaa;';
+ metricsCol.textContent = (nf.rssi_dbm || -80).toFixed(0) + ' dBm · var ' + (nf.features?.variance || 0).toFixed(1);
+
+ const classCol = document.createElement('div');
+ classCol.style.cssText = 'font-size:10px;font-weight:600;color:#ccc;';
+ const motion = (nf.classification?.motion_level || 'absent').toUpperCase();
+ const conf = ((nf.classification?.confidence || 0) * 100).toFixed(0);
+ classCol.textContent = motion + ' ' + conf + '%';
+
+ row.appendChild(idCol);
+ row.appendChild(metricsCol);
+ row.appendChild(classCol);
+ container.appendChild(row);
+ }
+ }
+
// ---- Resize ------------------------------------------------------------
_setupResize() {
diff --git a/ui/components/gaussian-splats.js b/ui/components/gaussian-splats.js
index ecab6e481..5f7227fa3 100644
--- a/ui/components/gaussian-splats.js
+++ b/ui/components/gaussian-splats.js
@@ -66,6 +66,10 @@ function valueToColor(v) {
return [r, g, b];
}
+// ---- Node marker color palette -------------------------------------------
+
+const NODE_MARKER_COLORS = [0x00ccff, 0xff6600, 0x00ff88, 0xff00cc, 0xffcc00, 0x8800ff, 0x00ffcc, 0xff0044];
+
// ---- GaussianSplatRenderer -----------------------------------------------
export class GaussianSplatRenderer {
@@ -108,6 +112,10 @@ export class GaussianSplatRenderer {
// Node markers (ESP32 / router positions)
this._createNodeMarkers(THREE);
+ // Dynamic per-node markers (multi-node support)
+ this.nodeMarkers = new Map(); // nodeId -> THREE.Mesh
+ this._THREE = THREE;
+
// Body disruption blob
this._createBodyBlob(THREE);
@@ -369,11 +377,43 @@ export class GaussianSplatRenderer {
bGeo.attributes.splatSize.needsUpdate = true;
}
- // -- Update node positions ---------------------------------------------
+ // -- Update node positions (legacy single-node) ------------------------
if (nodes.length > 0 && nodes[0].position) {
const pos = nodes[0].position;
this.nodeMarker.position.set(pos[0], 0.5, pos[2]);
}
+
+ // -- Update dynamic per-node markers (multi-node support) --------------
+ if (nodes && nodes.length > 0 && this.scene) {
+ const THREE = this._THREE || window.THREE;
+ if (THREE) {
+ const activeIds = new Set();
+ for (const node of nodes) {
+ activeIds.add(node.node_id);
+ if (!this.nodeMarkers.has(node.node_id)) {
+ const geo = new THREE.SphereGeometry(0.25, 16, 16);
+ const mat = new THREE.MeshBasicMaterial({
+ color: NODE_MARKER_COLORS[node.node_id % NODE_MARKER_COLORS.length],
+ transparent: true,
+ opacity: 0.8,
+ });
+ const marker = new THREE.Mesh(geo, mat);
+ this.scene.add(marker);
+ this.nodeMarkers.set(node.node_id, marker);
+ }
+ const marker = this.nodeMarkers.get(node.node_id);
+ const pos = node.position || [0, 0, 0];
+ marker.position.set(pos[0], 0.5, pos[2]);
+ }
+ // Remove stale markers
+ for (const [id, marker] of this.nodeMarkers) {
+ if (!activeIds.has(id)) {
+ this.scene.remove(marker);
+ this.nodeMarkers.delete(id);
+ }
+ }
+ }
+ }
}
// ---- Render loop -------------------------------------------------------
diff --git a/ui/services/sensing.service.js b/ui/services/sensing.service.js
index 4931e86e2..0992483bc 100644
--- a/ui/services/sensing.service.js
+++ b/ui/services/sensing.service.js
@@ -84,6 +84,11 @@ class SensingService {
return [...this._rssiHistory];
}
+ /** Get per-node RSSI history (object keyed by node_id). */
+ getPerNodeRssiHistory() {
+ return { ...(this._perNodeRssiHistory || {}) };
+ }
+
/** Current connection state. */
get state() {
return this._state;
@@ -327,6 +332,20 @@ class SensingService {
}
}
+ // Per-node RSSI tracking
+ if (!this._perNodeRssiHistory) this._perNodeRssiHistory = {};
+ if (data.node_features) {
+ for (const nf of data.node_features) {
+ if (!this._perNodeRssiHistory[nf.node_id]) {
+ this._perNodeRssiHistory[nf.node_id] = [];
+ }
+ this._perNodeRssiHistory[nf.node_id].push(nf.rssi_dbm);
+ if (this._perNodeRssiHistory[nf.node_id].length > this._maxHistory) {
+ this._perNodeRssiHistory[nf.node_id].shift();
+ }
+ }
+ }
+
// Notify all listeners
for (const cb of this._listeners) {
try {
From 74e0ebbd41492beac69623d2f9aeb988d278c48a Mon Sep 17 00:00:00 2001
From: rUv
Date: Mon, 30 Mar 2026 15:04:30 +0000
Subject: [PATCH 11/12] =?UTF-8?q?feat(server):=20accuracy=20sprint=20001?=
=?UTF-8?q?=20=E2=80=94=20Kalman=20tracker,=20multi-node=20fusion,=20eigen?=
=?UTF-8?q?value=20counting?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Wire three existing signal-crate components into the live sensing path:
Step 1 — Kalman Tracker (tracker_bridge.rs):
- PoseTracker from wifi-densepose-signal wired into all 5 mutable
derive_pose_from_sensing call sites
- Stable TrackId-based person IDs replace ephemeral 0-based indices
- Greedy Mahalanobis assignment with proper lifecycle transitions
(Tentative → Active → Lost → Terminated)
- Kalman-smoothed keypoint positions reduce frame-to-frame jitter
Step 2 — Multi-Node Fusion (multistatic_bridge.rs):
- MultistaticFuser replaces naive .sum() aggregation at both ESP32 paths
- Attention-weighted CSI fusion across nodes with cosine-similarity weights
- Fallback uses max (not sum) to avoid double-counting overlapping coverage
- Node positions configurable via --node-positions CLI arg
- Single-node passthrough preserved (min_nodes=1)
Step 3 — Eigenvalue Person Counting (field_model.rs upgrade):
- Full covariance matrix accumulation (replaces diagonal variance approx)
- True eigendecomposition via ndarray-linalg Eigh (Marcenko-Pastur threshold)
- estimate_occupancy() for runtime eigenvalue-based counting
- Calibration API: POST /calibration/start|stop, GET /calibration/status
- Graceful fallback to score_to_person_count when uncalibrated
New files: tracker_bridge.rs, multistatic_bridge.rs, field_bridge.rs
Modified: sensing-server main.rs, Cargo.toml; signal field_model.rs, Cargo.toml
Refs: .swarm/plans/accuracy-sprint-001.md
Co-Authored-By: claude-flow
---
.../wifi-densepose-sensing-server/Cargo.toml | 3 +
.../src/field_bridge.rs | 142 +++++
.../wifi-densepose-sensing-server/src/main.rs | 279 ++++++++--
.../src/multistatic_bridge.rs | 263 ++++++++++
.../src/tracker_bridge.rs | 397 ++++++++++++++
.../crates/wifi-densepose-signal/Cargo.toml | 1 +
.../src/ruvsense/field_model.rs | 495 ++++++++++++++++--
7 files changed, 1499 insertions(+), 81 deletions(-)
create mode 100644 rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/field_bridge.rs
create mode 100644 rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/multistatic_bridge.rs
create mode 100644 rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/tracker_bridge.rs
diff --git a/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/Cargo.toml b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/Cargo.toml
index ee3ce0bef..a76e6f1c1 100644
--- a/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/Cargo.toml
+++ b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/Cargo.toml
@@ -43,5 +43,8 @@ clap = { workspace = true }
# Multi-BSSID WiFi scanning pipeline (ADR-022 Phase 3)
wifi-densepose-wifiscan = { version = "0.3.0", path = "../wifi-densepose-wifiscan" }
+# Signal processing with RuvSense pose tracker (accuracy sprint)
+wifi-densepose-signal = { version = "0.3.0", path = "../wifi-densepose-signal" }
+
[dev-dependencies]
tempfile = "3.10"
diff --git a/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/field_bridge.rs b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/field_bridge.rs
new file mode 100644
index 000000000..001f933cd
--- /dev/null
+++ b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/field_bridge.rs
@@ -0,0 +1,142 @@
+//! Bridge between sensing-server frame data and signal crate FieldModel
+//! for eigenvalue-based person counting.
+//!
+//! The FieldModel decomposes CSI observations into environmental drift and
+//! body perturbation via SVD eigenmodes. When calibrated, perturbation energy
+//! provides a physics-grounded occupancy estimate that supplements the
+//! score-based heuristic in `score_to_person_count`.
+
+use std::collections::VecDeque;
+use wifi_densepose_signal::ruvsense::field_model::{CalibrationStatus, FieldModel};
+
+use super::score_to_person_count;
+
+/// Number of recent frames to feed into perturbation extraction.
+const OCCUPANCY_WINDOW: usize = 50;
+
+/// Perturbation energy threshold for detecting a second person.
+const ENERGY_THRESH_2: f64 = 12.0;
+/// Perturbation energy threshold for detecting a third person.
+const ENERGY_THRESH_3: f64 = 25.0;
+
+/// Estimate occupancy using the FieldModel when calibrated, falling back
+/// to the score-based heuristic otherwise.
+///
+/// When the field model is `Fresh` or `Stale`, we extract body perturbation
+/// from the most recent frames and map total energy to a person count.
+/// On any error or when uncalibrated, we fall through to `score_to_person_count`.
+pub fn occupancy_or_fallback(
+ field: &FieldModel,
+ frame_history: &VecDeque>,
+ smoothed_score: f64,
+ prev_count: usize,
+) -> usize {
+ match field.status() {
+ CalibrationStatus::Fresh | CalibrationStatus::Stale => {
+ let frames: Vec> = frame_history
+ .iter()
+ .rev()
+ .take(OCCUPANCY_WINDOW)
+ .cloned()
+ .collect();
+
+ if frames.is_empty() {
+ return score_to_person_count(smoothed_score, prev_count);
+ }
+
+ // Use the most recent frame as the observation for perturbation
+ // extraction. The FieldModel expects [n_links][n_subcarriers],
+ // so we wrap the single frame as a single-link observation.
+ let observation = vec![frames[0].clone()];
+ match field.extract_perturbation(&observation) {
+ Ok(perturbation) => {
+ if perturbation.total_energy > ENERGY_THRESH_3 {
+ 3
+ } else if perturbation.total_energy > ENERGY_THRESH_2 {
+ 2
+ } else {
+ 1
+ }
+ }
+ Err(e) => {
+ tracing::warn!("FieldModel perturbation failed, using fallback: {e}");
+ score_to_person_count(smoothed_score, prev_count)
+ }
+ }
+ }
+ _ => score_to_person_count(smoothed_score, prev_count),
+ }
+}
+
+/// Feed the latest frame to the FieldModel during calibration collection.
+///
+/// Only acts when the model status is `Collecting`. Wraps the latest frame
+/// as a single-link observation and feeds it; errors are logged and ignored.
+pub fn maybe_feed_calibration(field: &mut FieldModel, frame_history: &VecDeque>) {
+ if field.status() != CalibrationStatus::Collecting {
+ return;
+ }
+ if let Some(latest) = frame_history.back() {
+ let observations = vec![latest.clone()];
+ if let Err(e) = field.feed_calibration(&observations) {
+ tracing::warn!("FieldModel calibration feed error: {e}");
+ }
+ }
+}
+
+/// Parse node positions from a semicolon-delimited string.
+///
+/// Format: `"x,y,z;x,y,z;..."` where each coordinate is an `f32`.
+/// Entries that fail to parse are silently skipped.
+pub fn parse_node_positions(input: &str) -> Vec<[f32; 3]> {
+ if input.is_empty() {
+ return Vec::new();
+ }
+ input
+ .split(';')
+ .filter_map(|triplet| {
+ let parts: Vec<&str> = triplet.split(',').collect();
+ if parts.len() != 3 {
+ return None;
+ }
+ let x = parts[0].parse::().ok()?;
+ let y = parts[1].parse::().ok()?;
+ let z = parts[2].parse::().ok()?;
+ Some([x, y, z])
+ })
+ .collect()
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_parse_node_positions() {
+ let positions = parse_node_positions("0,0,1.5;3,0,1.5;1.5,3,1.5");
+ assert_eq!(positions.len(), 3);
+ assert_eq!(positions[0], [0.0, 0.0, 1.5]);
+ assert_eq!(positions[1], [3.0, 0.0, 1.5]);
+ assert_eq!(positions[2], [1.5, 3.0, 1.5]);
+ }
+
+ #[test]
+ fn test_parse_node_positions_empty() {
+ let positions = parse_node_positions("");
+ assert!(positions.is_empty());
+ }
+
+ #[test]
+ fn test_parse_node_positions_invalid() {
+ let positions = parse_node_positions("abc;1,2,3");
+ assert_eq!(positions.len(), 1);
+ assert_eq!(positions[0], [1.0, 2.0, 3.0]);
+ }
+
+ #[test]
+ fn test_parse_node_positions_partial_triplet() {
+ let positions = parse_node_positions("1,2;3,4,5");
+ assert_eq!(positions.len(), 1);
+ assert_eq!(positions[0], [3.0, 4.0, 5.0]);
+ }
+}
diff --git a/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/main.rs b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/main.rs
index f4835fc3e..e323fb46d 100644
--- a/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/main.rs
+++ b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/main.rs
@@ -9,8 +9,11 @@
//! Replaces both ws_server.py and the Python HTTP server.
mod adaptive_classifier;
+mod field_bridge;
+mod multistatic_bridge;
mod rvf_container;
mod rvf_pipeline;
+mod tracker_bridge;
mod vital_signs;
// Training pipeline modules (exposed via lib.rs)
@@ -52,6 +55,11 @@ use wifi_densepose_wifiscan::{
};
use wifi_densepose_wifiscan::parse_netsh_output as parse_netsh_bssid_output;
+// Accuracy sprint: Kalman tracker, multistatic fusion, field model
+use wifi_densepose_signal::ruvsense::pose_tracker::PoseTracker;
+use wifi_densepose_signal::ruvsense::multistatic::{MultistaticFuser, MultistaticConfig};
+use wifi_densepose_signal::ruvsense::field_model::{FieldModel, FieldModelConfig, CalibrationStatus};
+
// ── CLI ──────────────────────────────────────────────────────────────────────
#[derive(Parser, Debug)]
@@ -144,6 +152,14 @@ struct Args {
/// Build fingerprint index from embeddings (env|activity|temporal|person)
#[arg(long, value_name = "TYPE")]
build_index: Option,
+
+ /// Node positions for multistatic fusion (format: "x,y,z;x,y,z;...")
+ #[arg(long, env = "SENSING_NODE_POSITIONS")]
+ node_positions: Option,
+
+ /// Start field model calibration on boot (empty room required)
+ #[arg(long)]
+ calibrate: bool,
}
// ── Data types ───────────────────────────────────────────────────────────────
@@ -282,9 +298,9 @@ struct BoundingBox {
/// Each ESP32 node gets its own frame history, smoothing buffers, and vital
/// sign detector so that data from different nodes is never mixed.
struct NodeState {
- frame_history: VecDeque>,
+ pub(crate) frame_history: VecDeque>,
smoothed_person_score: f64,
- prev_person_count: usize,
+ pub(crate) prev_person_count: usize,
smoothed_motion: f64,
current_motion_level: String,
debounce_counter: u32,
@@ -300,12 +316,12 @@ struct NodeState {
rssi_history: VecDeque,
vital_detector: VitalSignDetector,
latest_vitals: VitalSigns,
- last_frame_time: Option,
+ pub(crate) last_frame_time: Option,
edge_vitals: Option,
}
impl NodeState {
- fn new() -> Self {
+ pub(crate) fn new() -> Self {
Self {
frame_history: VecDeque::new(),
smoothed_person_score: 0.0,
@@ -436,6 +452,15 @@ struct AppStateInner {
/// Per-node sensing state for multi-node deployments.
/// Keyed by `node_id` from the ESP32 frame header.
node_states: HashMap,
+ // ── Accuracy sprint: Kalman tracker, multistatic fusion, eigenvalue counting ──
+ /// Global Kalman-based pose tracker for stable person IDs and smoothed keypoints.
+ pose_tracker: PoseTracker,
+ /// Instant of last tracker update (for computing dt).
+ last_tracker_instant: Option,
+ /// Attention-weighted multi-node CSI fusion engine.
+ multistatic_fuser: MultistaticFuser,
+ /// SVD-based room field model for eigenvalue person counting (None until calibration).
+ field_model: Option,
}
/// If no ESP32 frame arrives within this duration, source reverts to offline.
@@ -445,6 +470,31 @@ impl AppStateInner {
/// Return the effective data source, accounting for ESP32 frame timeout.
/// If the source is "esp32" but no frame has arrived in 5 seconds, returns
/// "esp32:offline" so the UI can distinguish active vs stale connections.
+ /// Person count: eigenvalue-based if field model is calibrated, else heuristic.
+ /// Uses global frame_history if populated, otherwise the freshest per-node history.
+ fn person_count(&self) -> usize {
+ match self.field_model.as_ref() {
+ Some(fm) => {
+ // Prefer global frame_history (populated by wifi/simulate paths).
+ // Fall back to freshest per-node history (populated by ESP32 paths).
+ let history = if !self.frame_history.is_empty() {
+ &self.frame_history
+ } else {
+ // Find the node with the most recent frame
+ self.node_states.values()
+ .filter(|ns| !ns.frame_history.is_empty())
+ .max_by_key(|ns| ns.last_frame_time)
+ .map(|ns| &ns.frame_history)
+ .unwrap_or(&self.frame_history)
+ };
+ field_bridge::occupancy_or_fallback(
+ fm, history, self.smoothed_person_score, self.prev_person_count,
+ )
+ }
+ None => score_to_person_count(self.smoothed_person_score, self.prev_person_count),
+ }
+ }
+
fn effective_source(&self) -> String {
if self.source == "esp32" {
if let Some(last) = self.last_esp32_frame {
@@ -1435,7 +1485,7 @@ async fn windows_wifi_task(state: SharedState, tick_ms: u64) {
let raw_score = compute_person_score(&features);
s.smoothed_person_score = s.smoothed_person_score * 0.90 + raw_score * 0.10;
let est_persons = if classification.presence {
- let count = score_to_person_count(s.smoothed_person_score, s.prev_person_count);
+ let count = s.person_count();
s.prev_person_count = count;
count
} else {
@@ -1475,10 +1525,13 @@ async fn windows_wifi_task(state: SharedState, tick_ms: u64) {
node_features: None,
};
- // Populate persons from the sensing update.
- let persons = derive_pose_from_sensing(&update);
- if !persons.is_empty() {
- update.persons = Some(persons);
+ // Populate persons from the sensing update (Kalman-smoothed via tracker).
+ let raw_persons = derive_pose_from_sensing(&update);
+ let tracked = tracker_bridge::tracker_update(
+ &mut s.pose_tracker, &mut s.last_tracker_instant, raw_persons,
+ );
+ if !tracked.is_empty() {
+ update.persons = Some(tracked);
}
if let Ok(json) = serde_json::to_string(&update) {
@@ -1569,7 +1622,7 @@ async fn windows_wifi_fallback_tick(state: &SharedState, seq: u32) {
let raw_score = compute_person_score(&features);
s.smoothed_person_score = s.smoothed_person_score * 0.90 + raw_score * 0.10;
let est_persons = if classification.presence {
- let count = score_to_person_count(s.smoothed_person_score, s.prev_person_count);
+ let count = s.person_count();
s.prev_person_count = count;
count
} else {
@@ -1609,9 +1662,12 @@ async fn windows_wifi_fallback_tick(state: &SharedState, seq: u32) {
node_features: None,
};
- let persons = derive_pose_from_sensing(&update);
- if !persons.is_empty() {
- update.persons = Some(persons);
+ let raw_persons = derive_pose_from_sensing(&update);
+ let tracked = tracker_bridge::tracker_update(
+ &mut s.pose_tracker, &mut s.last_tracker_instant, raw_persons,
+ );
+ if !tracked.is_empty() {
+ update.persons = Some(tracked);
}
if let Ok(json) = serde_json::to_string(&update) {
@@ -1788,9 +1844,13 @@ async fn handle_ws_pose_client(mut socket: WebSocket, state: SharedState) {
keypoints,
zone: "zone_1".into(),
}]
- }).unwrap_or_else(|| derive_pose_from_sensing(&sensing))
+ }).unwrap_or_else(|| {
+ // Prefer tracked persons from broadcast if available
+ sensing.persons.clone().unwrap_or_else(|| derive_pose_from_sensing(&sensing))
+ })
} else {
- derive_pose_from_sensing(&sensing)
+ // Prefer tracked persons from broadcast if available
+ sensing.persons.clone().unwrap_or_else(|| derive_pose_from_sensing(&sensing))
};
let pose_msg = serde_json::json!({
@@ -2229,7 +2289,7 @@ async fn api_info(State(state): State) -> Json {
async fn pose_current(State(state): State) -> Json {
let s = state.read().await;
let persons = match &s.latest_update {
- Some(update) => derive_pose_from_sensing(update),
+ Some(update) => update.persons.clone().unwrap_or_else(|| derive_pose_from_sensing(update)),
None => vec![],
};
Json(serde_json::json!({
@@ -2780,6 +2840,79 @@ async fn adaptive_unload(State(state): State) -> Json) -> Json {
+ let mut s = state.write().await;
+ // Guard: don't discard an in-progress calibration
+ if let Some(ref fm) = s.field_model {
+ if fm.status() == CalibrationStatus::Collecting {
+ return Json(serde_json::json!({
+ "success": false,
+ "error": "Calibration already in progress. Call /calibration/stop first.",
+ "frame_count": fm.calibration_frame_count(),
+ }));
+ }
+ }
+ match FieldModel::new(FieldModelConfig::default()) {
+ Ok(fm) => {
+ s.field_model = Some(fm);
+ Json(serde_json::json!({
+ "success": true,
+ "message": "Calibration started — keep room empty while frames accumulate.",
+ }))
+ }
+ Err(e) => Json(serde_json::json!({
+ "success": false,
+ "error": format!("{e}"),
+ })),
+ }
+}
+
+async fn calibration_stop(State(state): State) -> Json {
+ let mut s = state.write().await;
+ if let Some(ref mut fm) = s.field_model {
+ let ts = chrono::Utc::now().timestamp_micros() as u64;
+ match fm.finalize_calibration(ts, 0) {
+ Ok(modes) => {
+ let baseline = modes.baseline_eigenvalue_count;
+ let variance_explained = modes.variance_explained;
+ info!("Field model calibrated: baseline_eigenvalues={baseline}, variance_explained={variance_explained:.2}");
+ Json(serde_json::json!({
+ "success": true,
+ "baseline_eigenvalue_count": baseline,
+ "variance_explained": variance_explained,
+ "frame_count": fm.calibration_frame_count(),
+ }))
+ }
+ Err(e) => Json(serde_json::json!({
+ "success": false,
+ "error": format!("{e}"),
+ })),
+ }
+ } else {
+ Json(serde_json::json!({
+ "success": false,
+ "error": "No field model active — call /calibration/start first.",
+ }))
+ }
+}
+
+async fn calibration_status(State(state): State) -> Json {
+ let s = state.read().await;
+ match s.field_model.as_ref() {
+ Some(fm) => Json(serde_json::json!({
+ "active": true,
+ "status": format!("{:?}", fm.status()),
+ "frame_count": fm.calibration_frame_count(),
+ })),
+ None => Json(serde_json::json!({
+ "active": false,
+ "status": "none",
+ })),
+ }
+}
+
/// Generate a simple timestamp string (epoch seconds) for recording IDs.
fn chrono_timestamp() -> u64 {
std::time::SystemTime::now()
@@ -3045,12 +3178,30 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
else if vitals.presence { 0.3 }
else { 0.05 };
- // Aggregate person count across all active nodes.
+ // Aggregate person count: attention-weighted fusion or max-per-node fallback.
let now = std::time::Instant::now();
- let total_persons: usize = s.node_states.values()
- .filter(|n| n.last_frame_time.map_or(false, |t| now.duration_since(t).as_secs() < 10))
- .map(|n| n.prev_person_count)
- .sum();
+ let total_persons = {
+ let (fused, fallback_count) = multistatic_bridge::fuse_or_fallback(
+ &s.multistatic_fuser, &s.node_states,
+ );
+ match fused {
+ Some(ref f) => {
+ let score = multistatic_bridge::compute_person_score_from_amplitudes(&f.fused_amplitude);
+ s.smoothed_person_score = s.smoothed_person_score * 0.90 + score * 0.10;
+ let count = s.person_count();
+ s.prev_person_count = count;
+ count
+ }
+ None => fallback_count,
+ }
+ };
+
+ // Feed field model calibration if active (use per-node history for ESP32).
+ if let Some(ref mut fm) = s.field_model {
+ if let Some(ns) = s.node_states.get(&node_id) {
+ field_bridge::maybe_feed_calibration(fm, &ns.frame_history);
+ }
+ }
// Build nodes array with all active nodes.
let active_nodes: Vec = s.node_states.iter()
@@ -3112,9 +3263,12 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
node_features: None,
};
- let persons = derive_pose_from_sensing(&update);
- if !persons.is_empty() {
- update.persons = Some(persons);
+ let raw_persons = derive_pose_from_sensing(&update);
+ let tracked = tracker_bridge::tracker_update(
+ &mut s.pose_tracker, &mut s.last_tracker_instant, raw_persons,
+ );
+ if !tracked.is_empty() {
+ update.persons = Some(tracked);
}
if let Ok(json) = serde_json::to_string(&update) {
@@ -3244,12 +3398,30 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
else if classification.motion_level == "present_still" { 0.3 }
else { 0.05 };
- // Aggregate person count across all active nodes.
+ // Aggregate person count: attention-weighted fusion or naive sum fallback.
let now = std::time::Instant::now();
- let total_persons: usize = s.node_states.values()
- .filter(|n| n.last_frame_time.map_or(false, |t| now.duration_since(t).as_secs() < 10))
- .map(|n| n.prev_person_count)
- .sum();
+ let total_persons = {
+ let (fused, fallback_count) = multistatic_bridge::fuse_or_fallback(
+ &s.multistatic_fuser, &s.node_states,
+ );
+ match fused {
+ Some(ref f) => {
+ let score = multistatic_bridge::compute_person_score_from_amplitudes(&f.fused_amplitude);
+ s.smoothed_person_score = s.smoothed_person_score * 0.90 + score * 0.10;
+ let count = s.person_count();
+ s.prev_person_count = count;
+ count
+ }
+ None => fallback_count,
+ }
+ };
+
+ // Feed field model calibration if active (use per-node history for ESP32).
+ if let Some(ref mut fm) = s.field_model {
+ if let Some(ns) = s.node_states.get(&node_id) {
+ field_bridge::maybe_feed_calibration(fm, &ns.frame_history);
+ }
+ }
// Build nodes array with all active nodes.
let active_nodes: Vec = s.node_states.iter()
@@ -3291,9 +3463,12 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
node_features: None,
};
- let persons = derive_pose_from_sensing(&update);
- if !persons.is_empty() {
- update.persons = Some(persons);
+ let raw_persons = derive_pose_from_sensing(&update);
+ let tracked = tracker_bridge::tracker_update(
+ &mut s.pose_tracker, &mut s.last_tracker_instant, raw_persons,
+ );
+ if !tracked.is_empty() {
+ update.persons = Some(tracked);
}
if let Ok(json) = serde_json::to_string(&update) {
@@ -3360,7 +3535,7 @@ async fn simulated_data_task(state: SharedState, tick_ms: u64) {
let raw_score = compute_person_score(&features);
s.smoothed_person_score = s.smoothed_person_score * 0.90 + raw_score * 0.10;
let est_persons = if classification.presence {
- let count = score_to_person_count(s.smoothed_person_score, s.prev_person_count);
+ let count = s.person_count();
s.prev_person_count = count;
count
} else {
@@ -3410,10 +3585,13 @@ async fn simulated_data_task(state: SharedState, tick_ms: u64) {
node_features: None,
};
- // Populate persons from the sensing update.
- let persons = derive_pose_from_sensing(&update);
- if !persons.is_empty() {
- update.persons = Some(persons);
+ // Populate persons from the sensing update (Kalman-smoothed via tracker).
+ let raw_persons = derive_pose_from_sensing(&update);
+ let tracked = tracker_bridge::tracker_update(
+ &mut s.pose_tracker, &mut s.last_tracker_instant, raw_persons,
+ );
+ if !tracked.is_empty() {
+ update.persons = Some(tracked);
}
if update.classification.presence {
@@ -4042,6 +4220,29 @@ async fn main() {
m
}),
node_states: HashMap::new(),
+ // Accuracy sprint
+ pose_tracker: PoseTracker::new(),
+ last_tracker_instant: None,
+ multistatic_fuser: {
+ let mut fuser = MultistaticFuser::with_config(MultistaticConfig {
+ min_nodes: 1, // single-node passthrough
+ ..Default::default()
+ });
+ if let Some(ref pos_str) = args.node_positions {
+ let positions = field_bridge::parse_node_positions(pos_str);
+ if !positions.is_empty() {
+ info!("Configured {} node positions for multistatic fusion", positions.len());
+ fuser.set_node_positions(positions);
+ }
+ }
+ fuser
+ },
+ field_model: if args.calibrate {
+ info!("Field model calibration enabled — room should be empty during startup");
+ FieldModel::new(FieldModelConfig::default()).ok()
+ } else {
+ None
+ },
}));
// Start background tasks based on source
@@ -4138,6 +4339,10 @@ async fn main() {
.route("/api/v1/adaptive/train", post(adaptive_train))
.route("/api/v1/adaptive/status", get(adaptive_status))
.route("/api/v1/adaptive/unload", post(adaptive_unload))
+ // Field model calibration (eigenvalue-based person counting)
+ .route("/api/v1/calibration/start", post(calibration_start))
+ .route("/api/v1/calibration/stop", post(calibration_stop))
+ .route("/api/v1/calibration/status", get(calibration_status))
// Static UI files
.nest_service("/ui", ServeDir::new(&ui_path))
.layer(SetResponseHeaderLayer::overriding(
diff --git a/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/multistatic_bridge.rs b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/multistatic_bridge.rs
new file mode 100644
index 000000000..98d89dae2
--- /dev/null
+++ b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/multistatic_bridge.rs
@@ -0,0 +1,263 @@
+//! Bridge between sensing-server per-node state and the signal crate's
+//! `MultistaticFuser` for attention-weighted CSI fusion across ESP32 nodes.
+//!
+//! This module converts the server's `NodeState` (f64 amplitude history) into
+//! `MultiBandCsiFrame`s that the multistatic fusion pipeline expects, then
+//! drives `MultistaticFuser::fuse` with a graceful fallback when fusion fails
+//! (e.g. insufficient nodes or timestamp spread).
+
+use std::collections::HashMap;
+use std::time::{Duration, Instant};
+
+use wifi_densepose_signal::hardware_norm::{CanonicalCsiFrame, HardwareType};
+use wifi_densepose_signal::ruvsense::multiband::MultiBandCsiFrame;
+use wifi_densepose_signal::ruvsense::multistatic::{FusedSensingFrame, MultistaticFuser};
+
+use super::NodeState;
+
+/// Maximum age for a node frame to be considered active (10 seconds).
+const STALE_THRESHOLD: Duration = Duration::from_secs(10);
+
+/// Default WiFi channel frequency (MHz) used for single-channel frames.
+const DEFAULT_FREQ_MHZ: u32 = 2437; // Channel 6
+
+/// Convert a single `NodeState` into a `MultiBandCsiFrame` suitable for
+/// multistatic fusion.
+///
+/// Returns `None` when the node has no frame history or no recorded
+/// `last_frame_time`.
+pub fn node_frame_from_state(node_id: u8, ns: &NodeState) -> Option {
+ let last_time = ns.last_frame_time.as_ref()?;
+ let latest = ns.frame_history.back()?;
+ if latest.is_empty() {
+ return None;
+ }
+
+ let amplitude: Vec = latest.iter().map(|&v| v as f32).collect();
+ let n_sub = amplitude.len();
+ let phase = vec![0.0_f32; n_sub];
+
+ // Derive a monotonic timestamp: use wall-clock time minus elapsed since
+ // last frame to approximate when the frame was actually received.
+ let wall_us = std::time::SystemTime::now()
+ .duration_since(std::time::UNIX_EPOCH)
+ .map(|d| d.as_micros() as u64)
+ .unwrap_or(0);
+ let age_us = last_time.elapsed().as_micros() as u64;
+ let timestamp_us = wall_us.saturating_sub(age_us);
+
+ let canonical = CanonicalCsiFrame {
+ amplitude,
+ phase,
+ hardware_type: HardwareType::Esp32S3,
+ };
+
+ Some(MultiBandCsiFrame {
+ node_id,
+ timestamp_us,
+ channel_frames: vec![canonical],
+ frequencies_mhz: vec![DEFAULT_FREQ_MHZ],
+ coherence: 1.0, // single-channel, perfect self-coherence
+ })
+}
+
+/// Collect `MultiBandCsiFrame`s from all active nodes.
+///
+/// A node is considered active if its `last_frame_time` is within
+/// [`STALE_THRESHOLD`] of `now`.
+pub fn node_frames_from_states(node_states: &HashMap) -> Vec {
+ let now = Instant::now();
+ let mut frames = Vec::with_capacity(node_states.len());
+
+ for (&node_id, ns) in node_states {
+ // Skip stale nodes
+ if let Some(ref t) = ns.last_frame_time {
+ if now.duration_since(*t) > STALE_THRESHOLD {
+ continue;
+ }
+ } else {
+ continue;
+ }
+
+ if let Some(frame) = node_frame_from_state(node_id, ns) {
+ frames.push(frame);
+ }
+ }
+
+ frames
+}
+
+/// Attempt multistatic fusion; fall back to max per-node person count on failure.
+///
+/// Returns `(fused_frame, fallback_person_count)`. When fusion succeeds, the
+/// caller should compute person count from the fused amplitudes (the returned
+/// fallback count is 0 as a sentinel). On failure, returns the maximum
+/// per-node count (not the sum, to avoid double-counting overlapping coverage).
+pub fn fuse_or_fallback(
+ fuser: &MultistaticFuser,
+ node_states: &HashMap,
+) -> (Option, usize) {
+ let frames = node_frames_from_states(node_states);
+ if frames.is_empty() {
+ return (None, 0);
+ }
+
+ match fuser.fuse(&frames) {
+ Ok(fused) => {
+ // Return 0 as sentinel — caller must compute count from fused amplitudes.
+ (Some(fused), 0)
+ }
+ Err(e) => {
+ tracing::debug!("Multistatic fusion failed ({e}), using per-node max fallback");
+ // Use max (not sum) to avoid double-counting when nodes have overlapping coverage.
+ let max_count: usize = node_states
+ .values()
+ .filter(|ns| {
+ ns.last_frame_time
+ .map(|t| t.elapsed() <= STALE_THRESHOLD)
+ .unwrap_or(false)
+ })
+ .map(|ns| ns.prev_person_count)
+ .max()
+ .unwrap_or(0);
+ (None, max_count)
+ }
+ }
+}
+
+/// Compute a person-presence score from fused amplitude data.
+///
+/// Uses the squared coefficient of variation (variance / mean^2) as a
+/// lightweight proxy for body-induced CSI perturbation. A flat amplitude
+/// vector (no person) yields a score near zero; a vector with high variance
+/// relative to its mean (person moving) yields a score approaching 1.0.
+pub fn compute_person_score_from_amplitudes(amplitudes: &[f32]) -> f64 {
+ if amplitudes.is_empty() {
+ return 0.0;
+ }
+
+ let n = amplitudes.len() as f64;
+ let sum: f64 = amplitudes.iter().map(|&a| a as f64).sum();
+ let mean = sum / n;
+
+ let variance: f64 = amplitudes.iter().map(|&a| {
+ let diff = (a as f64) - mean;
+ diff * diff
+ }).sum::() / n;
+
+ let score = variance / (mean * mean + 1e-10);
+ score.clamp(0.0, 1.0)
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::collections::VecDeque;
+
+ /// Helper: build a minimal NodeState for testing. Uses `NodeState::new()`
+ /// then mutates the `pub(crate)` fields the bridge needs.
+ fn make_node_state(
+ frame_history: VecDeque>,
+ last_frame_time: Option,
+ prev_person_count: usize,
+ ) -> NodeState {
+ let mut ns = NodeState::new();
+ ns.frame_history = frame_history;
+ ns.last_frame_time = last_frame_time;
+ ns.prev_person_count = prev_person_count;
+ ns
+ }
+
+ #[test]
+ fn test_node_frame_from_empty_state() {
+ let ns = make_node_state(VecDeque::new(), Some(Instant::now()), 0);
+ assert!(node_frame_from_state(1, &ns).is_none());
+ }
+
+ #[test]
+ fn test_node_frame_from_state_no_time() {
+ let mut history = VecDeque::new();
+ history.push_back(vec![1.0, 2.0, 3.0]);
+ let ns = make_node_state(history, None, 0);
+ assert!(node_frame_from_state(1, &ns).is_none());
+ }
+
+ #[test]
+ fn test_node_frame_conversion() {
+ let mut history = VecDeque::new();
+ history.push_back(vec![10.0, 20.0, 30.5]);
+ let ns = make_node_state(history, Some(Instant::now()), 0);
+
+ let frame = node_frame_from_state(42, &ns).expect("should produce a frame");
+ assert_eq!(frame.node_id, 42);
+ assert_eq!(frame.channel_frames.len(), 1);
+
+ let ch = &frame.channel_frames[0];
+ assert_eq!(ch.amplitude.len(), 3);
+ assert!((ch.amplitude[0] - 10.0_f32).abs() < f32::EPSILON);
+ assert!((ch.amplitude[1] - 20.0_f32).abs() < f32::EPSILON);
+ assert!((ch.amplitude[2] - 30.5_f32).abs() < f32::EPSILON);
+ // Phase should be all zeros
+ assert!(ch.phase.iter().all(|&p| p == 0.0));
+ assert_eq!(ch.hardware_type, HardwareType::Esp32S3);
+ }
+
+ #[test]
+ fn test_stale_node_excluded() {
+ let mut states: HashMap = HashMap::new();
+
+ // Active node: frame just received
+ let mut active_history = VecDeque::new();
+ active_history.push_back(vec![1.0, 2.0]);
+ states.insert(1, make_node_state(active_history, Some(Instant::now()), 1));
+
+ // Stale node: frame 20 seconds ago
+ let mut stale_history = VecDeque::new();
+ stale_history.push_back(vec![3.0, 4.0]);
+ let stale_time = Instant::now() - Duration::from_secs(20);
+ states.insert(2, make_node_state(stale_history, Some(stale_time), 1));
+
+ let frames = node_frames_from_states(&states);
+ assert_eq!(frames.len(), 1, "stale node should be excluded");
+ assert_eq!(frames[0].node_id, 1);
+ }
+
+ #[test]
+ fn test_compute_person_score_empty() {
+ assert!((compute_person_score_from_amplitudes(&[]) - 0.0).abs() < f64::EPSILON);
+ }
+
+ #[test]
+ fn test_compute_person_score_flat() {
+ // Constant amplitude => variance = 0 => score ~ 0
+ let flat = vec![5.0_f32; 64];
+ let score = compute_person_score_from_amplitudes(&flat);
+ assert!(score < 0.001, "flat signal should have near-zero score, got {score}");
+ }
+
+ #[test]
+ fn test_compute_person_score_varied() {
+ // High variance relative to mean should produce a positive score
+ let varied: Vec = (0..64).map(|i| if i % 2 == 0 { 1.0 } else { 10.0 }).collect();
+ let score = compute_person_score_from_amplitudes(&varied);
+ assert!(score > 0.1, "varied signal should have positive score, got {score}");
+ assert!(score <= 1.0, "score should be clamped to 1.0, got {score}");
+ }
+
+ #[test]
+ fn test_compute_person_score_clamped() {
+ // Near-zero mean with non-zero variance => would blow up without clamp
+ let vals = vec![0.0_f32, 0.0, 0.0, 0.001];
+ let score = compute_person_score_from_amplitudes(&vals);
+ assert!(score <= 1.0, "score must be clamped to 1.0");
+ }
+
+ #[test]
+ fn test_fuse_or_fallback_empty() {
+ let fuser = MultistaticFuser::new();
+ let states: HashMap = HashMap::new();
+ let (fused, count) = fuse_or_fallback(&fuser, &states);
+ assert!(fused.is_none());
+ assert_eq!(count, 0);
+ }
+}
diff --git a/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/tracker_bridge.rs b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/tracker_bridge.rs
new file mode 100644
index 000000000..cdddc043f
--- /dev/null
+++ b/rust-port/wifi-densepose-rs/crates/wifi-densepose-sensing-server/src/tracker_bridge.rs
@@ -0,0 +1,397 @@
+//! Bridge between sensing-server PersonDetection types and signal crate PoseTracker.
+//!
+//! The sensing server uses f64 types (PersonDetection, PoseKeypoint, BoundingBox)
+//! while the signal crate's PoseTracker operates on f32 Kalman states. This module
+//! provides conversion functions and a single `tracker_update` entry point that
+//! accepts server-side detections and returns tracker-smoothed results.
+
+use std::time::Instant;
+use wifi_densepose_signal::ruvsense::{
+ self, KeypointState, PoseTrack, TrackLifecycleState, TrackId, NUM_KEYPOINTS,
+};
+use wifi_densepose_signal::ruvsense::pose_tracker::PoseTracker;
+
+use super::{BoundingBox, PersonDetection, PoseKeypoint};
+
+/// COCO-17 keypoint names in index order.
+const COCO_NAMES: [&str; 17] = [
+ "nose",
+ "left_eye",
+ "right_eye",
+ "left_ear",
+ "right_ear",
+ "left_shoulder",
+ "right_shoulder",
+ "left_elbow",
+ "right_elbow",
+ "left_wrist",
+ "right_wrist",
+ "left_hip",
+ "right_hip",
+ "left_knee",
+ "right_knee",
+ "left_ankle",
+ "right_ankle",
+];
+
+/// Map a lowercase keypoint name to its COCO-17 index.
+fn keypoint_name_to_coco_index(name: &str) -> Option {
+ COCO_NAMES.iter().position(|&n| n.eq_ignore_ascii_case(name))
+}
+
+/// Convert server-side PersonDetection slices into tracker-compatible keypoint arrays.
+///
+/// For each person, maps named keypoints to COCO-17 positions. Unmapped slots are
+/// filled with the centroid of the mapped keypoints so the Kalman filter has a
+/// reasonable initial value rather than zeros.
+fn detections_to_tracker_keypoints(persons: &[PersonDetection]) -> Vec<[[f32; 3]; 17]> {
+ persons
+ .iter()
+ .map(|person| {
+ let mut kps = [[0.0_f32; 3]; 17];
+ let mut mapped_count = 0u32;
+ let mut cx = 0.0_f32;
+ let mut cy = 0.0_f32;
+ let mut cz = 0.0_f32;
+
+ // First pass: place mapped keypoints and accumulate centroid
+ for kp in &person.keypoints {
+ if let Some(idx) = keypoint_name_to_coco_index(&kp.name) {
+ kps[idx] = [kp.x as f32, kp.y as f32, kp.z as f32];
+ cx += kp.x as f32;
+ cy += kp.y as f32;
+ cz += kp.z as f32;
+ mapped_count += 1;
+ }
+ }
+
+ // Compute centroid of mapped keypoints
+ let centroid = if mapped_count > 0 {
+ let n = mapped_count as f32;
+ [cx / n, cy / n, cz / n]
+ } else {
+ [0.0, 0.0, 0.0]
+ };
+
+ // Second pass: fill unmapped slots with centroid
+ // Build a set of mapped indices
+ let mut mapped = [false; 17];
+ for kp in &person.keypoints {
+ if let Some(idx) = keypoint_name_to_coco_index(&kp.name) {
+ mapped[idx] = true;
+ }
+ }
+ for i in 0..17 {
+ if !mapped[i] {
+ kps[i] = centroid;
+ }
+ }
+
+ kps
+ })
+ .collect()
+}
+
+/// Convert active PoseTracker tracks back into server-side PersonDetection values.
+///
+/// Only tracks whose lifecycle `is_alive()` are included.
+pub fn tracker_to_person_detections(tracker: &PoseTracker) -> Vec {
+ tracker
+ .active_tracks()
+ .into_iter()
+ .map(|track| {
+ let id = track.id.0 as u32;
+
+ let confidence = match track.lifecycle {
+ TrackLifecycleState::Active => 0.9,
+ TrackLifecycleState::Tentative => 0.5,
+ TrackLifecycleState::Lost => 0.3,
+ TrackLifecycleState::Terminated => 0.0,
+ };
+
+ // Build keypoints from Kalman state
+ let keypoints: Vec = (0..NUM_KEYPOINTS)
+ .map(|i| {
+ let pos = track.keypoints[i].position();
+ PoseKeypoint {
+ name: COCO_NAMES[i].to_string(),
+ x: pos[0] as f64,
+ y: pos[1] as f64,
+ z: pos[2] as f64,
+ confidence: track.keypoints[i].confidence as f64,
+ }
+ })
+ .collect();
+
+ // Compute bounding box from keypoint min/max
+ let mut min_x = f64::MAX;
+ let mut min_y = f64::MAX;
+ let mut max_x = f64::MIN;
+ let mut max_y = f64::MIN;
+ for kp in &keypoints {
+ if kp.x < min_x { min_x = kp.x; }
+ if kp.y < min_y { min_y = kp.y; }
+ if kp.x > max_x { max_x = kp.x; }
+ if kp.y > max_y { max_y = kp.y; }
+ }
+
+ let bbox = BoundingBox {
+ x: min_x,
+ y: min_y,
+ width: max_x - min_x,
+ height: max_y - min_y,
+ };
+
+ PersonDetection {
+ id,
+ confidence,
+ keypoints,
+ bbox,
+ zone: "tracked".to_string(),
+ }
+ })
+ .collect()
+}
+
+/// Run one tracker cycle: predict, match detections, update, prune.
+///
+/// This is the main entry point called each sensing frame. It:
+/// 1. Computes dt from the previous call instant
+/// 2. Predicts all existing tracks forward
+/// 3. Greedily assigns detections to tracks by Mahalanobis cost
+/// 4. Updates matched tracks, creates new tracks for unmatched detections
+/// 5. Prunes terminated tracks
+/// 6. Returns smoothed PersonDetection values from the tracker state
+pub fn tracker_update(
+ tracker: &mut PoseTracker,
+ last_instant: &mut Option,
+ persons: Vec,
+) -> Vec {
+ let now = Instant::now();
+ let dt = last_instant.map_or(0.1_f32, |prev| now.duration_since(prev).as_secs_f32());
+ *last_instant = Some(now);
+
+ // Predict all tracks forward
+ tracker.predict_all(dt);
+
+ if persons.is_empty() {
+ tracker.prune_terminated();
+ return tracker_to_person_detections(tracker);
+ }
+
+ // Convert detections to f32 keypoint arrays
+ let all_keypoints = detections_to_tracker_keypoints(&persons);
+
+ // Compute centroids for each detection
+ let centroids: Vec<[f32; 3]> = all_keypoints
+ .iter()
+ .map(|kps| {
+ let mut c = [0.0_f32; 3];
+ for kp in kps {
+ c[0] += kp[0];
+ c[1] += kp[1];
+ c[2] += kp[2];
+ }
+ let n = NUM_KEYPOINTS as f32;
+ c[0] /= n;
+ c[1] /= n;
+ c[2] /= n;
+ c
+ })
+ .collect();
+
+ // Greedy assignment: for each detection, find the best matching active track.
+ // Collect tracks once to avoid re-borrowing tracker per detection.
+ let active: Vec<(TrackId, [f32; 3])> = tracker.active_tracks().iter().map(|t| {
+ let centroid = {
+ let mut c = [0.0_f32; 3];
+ for kp in &t.keypoints {
+ let p = kp.position();
+ c[0] += p[0]; c[1] += p[1]; c[2] += p[2];
+ }
+ let n = NUM_KEYPOINTS as f32;
+ [c[0] / n, c[1] / n, c[2] / n]
+ };
+ (t.id, centroid)
+ }).collect();
+
+ let mut used_tracks: Vec = vec![false; active.len()];
+ let mut matched: Vec