@@ -299,6 +299,8 @@ struct NodeState {
299299 latest_vitals : VitalSigns ,
300300 last_frame_time : Option < std:: time:: Instant > ,
301301 edge_vitals : Option < Esp32VitalsPacket > ,
302+ /// Latest extracted features for cross-node fusion.
303+ latest_features : Option < FeatureInfo > ,
302304 // ── RuVector Phase 2: Temporal smoothing & coherence gating ──
303305 /// Previous frame's smoothed keypoint positions for EMA temporal smoothing.
304306 prev_keypoints : Option < Vec < [ f64 ; 3 ] > > ,
@@ -344,6 +346,7 @@ impl NodeState {
344346 latest_vitals : VitalSigns :: default ( ) ,
345347 last_frame_time : None ,
346348 edge_vitals : None ,
349+ latest_features : None ,
347350 prev_keypoints : None ,
348351 motion_energy_history : VecDeque :: with_capacity ( COHERENCE_WINDOW ) ,
349352 coherence_score : 1.0 , // assume stable initially
@@ -1988,6 +1991,61 @@ async fn latest(State(state): State<SharedState>) -> Json<serde_json::Value> {
19881991/// with a stride-swing pattern applied to arms and legs.
19891992// ── Multi-person estimation (issue #97) ──────────────────────────────────────
19901993
1994+ /// Fuse features across all active nodes for higher SNR.
1995+ ///
1996+ /// When multiple ESP32 nodes observe the same room, their CSI features
1997+ /// can be combined:
1998+ /// - Variance: use max (most sensitive node dominates)
1999+ /// - Motion/breathing/spectral power: weighted average by RSSI (closer node = higher weight)
2000+ /// - Dominant frequency: weighted average
2001+ /// - Change points: keep current node's value (not meaningful to average)
2002+ /// - Mean RSSI: use max (best signal)
2003+ fn fuse_multi_node_features (
2004+ current_features : & FeatureInfo ,
2005+ node_states : & HashMap < u8 , NodeState > ,
2006+ ) -> FeatureInfo {
2007+ let now = std:: time:: Instant :: now ( ) ;
2008+ let active: Vec < ( & FeatureInfo , f64 ) > = node_states. values ( )
2009+ . filter ( |ns| ns. last_frame_time . map_or ( false , |t| now. duration_since ( t) . as_secs ( ) < 10 ) )
2010+ . filter_map ( |ns| {
2011+ let feat = ns. latest_features . as_ref ( ) ?;
2012+ let rssi = ns. rssi_history . back ( ) . copied ( ) . unwrap_or ( -80.0 ) ;
2013+ Some ( ( feat, rssi) )
2014+ } )
2015+ . collect ( ) ;
2016+
2017+ if active. len ( ) <= 1 {
2018+ return current_features. clone ( ) ;
2019+ }
2020+
2021+ // RSSI-based weights: higher RSSI = closer to person = more weight.
2022+ // Map RSSI relative to best node into [0.1, 1.0].
2023+ let max_rssi = active. iter ( ) . map ( |( _, r) | * r) . fold ( f64:: NEG_INFINITY , f64:: max) ;
2024+ let weights: Vec < f64 > = active. iter ( )
2025+ . map ( |( _, r) | ( 1.0 + ( r - max_rssi + 20.0 ) / 20.0 ) . clamp ( 0.1 , 1.0 ) )
2026+ . collect ( ) ;
2027+ let w_sum: f64 = weights. iter ( ) . sum :: < f64 > ( ) . max ( 1e-9 ) ;
2028+
2029+ FeatureInfo {
2030+ // Weighted average variance (not max — max inflates person score
2031+ // and causes count flips between 1↔2 persons).
2032+ variance : active. iter ( ) . zip ( & weights)
2033+ . map ( |( ( f, _) , w) | f. variance * w) . sum :: < f64 > ( ) / w_sum,
2034+ // Weighted average for motion/breathing/spectral
2035+ motion_band_power : active. iter ( ) . zip ( & weights)
2036+ . map ( |( ( f, _) , w) | f. motion_band_power * w) . sum :: < f64 > ( ) / w_sum,
2037+ breathing_band_power : active. iter ( ) . zip ( & weights)
2038+ . map ( |( ( f, _) , w) | f. breathing_band_power * w) . sum :: < f64 > ( ) / w_sum,
2039+ spectral_power : active. iter ( ) . zip ( & weights)
2040+ . map ( |( ( f, _) , w) | f. spectral_power * w) . sum :: < f64 > ( ) / w_sum,
2041+ dominant_freq_hz : active. iter ( ) . zip ( & weights)
2042+ . map ( |( ( f, _) , w) | f. dominant_freq_hz * w) . sum :: < f64 > ( ) / w_sum,
2043+ change_points : current_features. change_points , // keep current node's value
2044+ // Best RSSI across nodes
2045+ mean_rssi : active. iter ( ) . map ( |( f, _) | f. mean_rssi ) . fold ( f64:: NEG_INFINITY , f64:: max) ,
2046+ }
2047+ }
2048+
19912049/// Estimate person count from CSI features using a weighted composite heuristic.
19922050///
19932051/// Single ESP32 link limitations: variance-based detection can reliably detect
@@ -3248,13 +3306,31 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
32483306 change_points : 0 ,
32493307 spectral_power : vitals. motion_energy as f64 ,
32503308 } ;
3251- let classification = ClassificationInfo {
3309+
3310+ // Store latest features on node for cross-node fusion.
3311+ s. node_states . get_mut ( & node_id)
3312+ . map ( |ns| ns. latest_features = Some ( features. clone ( ) ) ) ;
3313+
3314+ // Cross-node fusion: combine features from all active nodes.
3315+ let fused_features = fuse_multi_node_features ( & features, & s. node_states ) ;
3316+
3317+ let mut classification = ClassificationInfo {
32523318 motion_level : motion_level. to_string ( ) ,
32533319 presence : vitals. presence ,
32543320 confidence : vitals. presence_score as f64 ,
32553321 } ;
3322+
3323+ // Boost classification confidence with multi-node coverage.
3324+ let n_active = s. node_states . values ( )
3325+ . filter ( |ns| ns. last_frame_time . map_or ( false , |t| now. duration_since ( t) . as_secs ( ) < 10 ) )
3326+ . count ( ) ;
3327+ if n_active > 1 {
3328+ classification. confidence = ( classification. confidence
3329+ * ( 1.0 + 0.15 * ( n_active as f64 - 1.0 ) ) ) . clamp ( 0.0 , 1.0 ) ;
3330+ }
3331+
32563332 let signal_field = generate_signal_field (
3257- vitals . rssi as f64 , motion_score, vitals. breathing_rate_bpm / 60.0 ,
3333+ fused_features . mean_rssi , motion_score, vitals. breathing_rate_bpm / 60.0 ,
32583334 ( vitals. presence_score as f64 ) . min ( 1.0 ) , & [ ] ,
32593335 ) ;
32603336
@@ -3264,7 +3340,7 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
32643340 source : "esp32" . to_string ( ) ,
32653341 tick,
32663342 nodes : active_nodes,
3267- features : features . clone ( ) ,
3343+ features : fused_features . clone ( ) ,
32683344 classification,
32693345 signal_field,
32703346 vital_signs : Some ( VitalSigns {
@@ -3398,14 +3474,19 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
33983474 ns. latest_vitals = vitals. clone ( ) ;
33993475
34003476 let raw_score = compute_person_score ( & features) ;
3401- ns. smoothed_person_score = ns. smoothed_person_score * 0.90 + raw_score * 0.10 ;
3477+ // Slower EMA (0.05) for person score to prevent count flips
3478+ // from frame-to-frame variance oscillation in fused features.
3479+ ns. smoothed_person_score = ns. smoothed_person_score * 0.95 + raw_score * 0.05 ;
34023480 if classification. presence {
34033481 let count = score_to_person_count ( ns. smoothed_person_score , ns. prev_person_count ) ;
34043482 ns. prev_person_count = count;
34053483 } else {
34063484 ns. prev_person_count = 0 ;
34073485 }
34083486
3487+ // Store latest features on node for cross-node fusion.
3488+ ns. latest_features = Some ( features. clone ( ) ) ;
3489+
34093490 // Done with per-node mutable borrow; now read aggregated
34103491 // state from all nodes (the borrow of `ns` ends here).
34113492 // (We re-borrow node_states immutably via `s` below.)
@@ -3416,6 +3497,9 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
34163497 }
34173498 s. latest_vitals = vitals. clone ( ) ;
34183499
3500+ // Cross-node fusion: combine features from all active nodes.
3501+ let fused_features = fuse_multi_node_features ( & features, & s. node_states ) ;
3502+
34193503 s. tick += 1 ;
34203504 let tick = s. tick ;
34213505
@@ -3433,6 +3517,15 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
34333517 . max ( )
34343518 . unwrap_or ( 0 ) ;
34353519
3520+ // Boost classification confidence with multi-node coverage.
3521+ let n_active = s. node_states . values ( )
3522+ . filter ( |ns| ns. last_frame_time . map_or ( false , |t| now. duration_since ( t) . as_secs ( ) < 10 ) )
3523+ . count ( ) ;
3524+ if n_active > 1 {
3525+ classification. confidence = ( classification. confidence
3526+ * ( 1.0 + 0.15 * ( n_active as f64 - 1.0 ) ) ) . clamp ( 0.0 , 1.0 ) ;
3527+ }
3528+
34363529 // Build nodes array with all active nodes.
34373530 let active_nodes: Vec < NodeInfo > = s. node_states . iter ( )
34383531 . filter ( |( _, n) | n. last_frame_time . map_or ( false , |t| now. duration_since ( t) . as_secs ( ) < 10 ) )
@@ -3453,11 +3546,11 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
34533546 source : "esp32" . to_string ( ) ,
34543547 tick,
34553548 nodes : active_nodes,
3456- features : features . clone ( ) ,
3549+ features : fused_features . clone ( ) ,
34573550 classification,
34583551 signal_field : generate_signal_field (
3459- features . mean_rssi , motion_score, breathing_rate_hz,
3460- features . variance . min ( 1.0 ) , & sub_variances,
3552+ fused_features . mean_rssi , motion_score, breathing_rate_hz,
3553+ fused_features . variance . min ( 1.0 ) , & sub_variances,
34613554 ) ,
34623555 vital_signs : Some ( vitals) ,
34633556 enhanced_motion : None ,
0 commit comments