diff --git a/Cargo.lock b/Cargo.lock index a5b5246..1f7c501 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5214,7 +5214,7 @@ dependencies = [ [[package]] name = "saorsa-core" version = "0.24.2" -source = "git+https://github.com/saorsa-labs/saorsa-core?branch=fix%2Fstability-improvements#5586795740eafd9818e827baa2eb2677ff3ba942" +source = "git+https://github.com/grumbach/saorsa-core?branch=plan1%2Fbad-node-eviction#97161f8e53411431c3fd1ccb117ebfed2e0de65a" dependencies = [ "anyhow", "async-trait", @@ -5328,7 +5328,8 @@ dependencies = [ [[package]] name = "saorsa-transport" version = "0.34.1" -source = "git+https://github.com/saorsa-labs/saorsa-transport?branch=fix%2Fstability-improvements#55f423ca5e3312e31ba22475b68a76f4ffd50285" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31ebc09fb51e2c325e61ff09892f1256c60ef4f3beb881fd2980befe3e53e9bc" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index b3bbb58..736bbc1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,18 @@ resolver = "2" # `ChunkPutRequest::content` on `main` but no crates.io release yet exists # that includes it. Without this patch, the transitive `ant-node 0.11.2` # fails to build under `cargo clippy --all-targets` / `cargo test`. +# +# Temporary patch: route saorsa-core through the fork branch carrying PR #114 +# (saorsa-labs/saorsa-core#114), which adds the `P2PNode::report_application_failure` +# method this PR depends on. Both `[patch.crates-io]` (for ant-protocol's +# saorsa-core dep, which crates.io publishes) and the git-URL patch below (for +# ant-core's direct git dep) are needed so all transitive resolutions land +# on the same source. Once #114 merges into saorsa-labs/saorsa-core +# rc-2026.4.4, REMOVE the saorsa-core entries before merging the present PR. [patch.crates-io] ant-protocol = { git = "https://github.com/WithAutonomi/ant-protocol.git", rev = "93e63b8a41a97c37c24d1164a3ee5525e002ddcd" } ant-node = { git = "https://github.com/WithAutonomi/ant-node.git", rev = "8b68b2d7f4662faf67ed7812dc6cb37de0c74a8b" } +saorsa-core = { git = "https://github.com/grumbach/saorsa-core", branch = "plan1/bad-node-eviction" } +[patch."https://github.com/saorsa-labs/saorsa-core"] +saorsa-core = { git = "https://github.com/grumbach/saorsa-core", branch = "plan1/bad-node-eviction" } diff --git a/ant-core/src/data/client/quote.rs b/ant-core/src/data/client/quote.rs index 51b3cb3..192c906 100644 --- a/ant-core/src/data/client/quote.rs +++ b/ant-core/src/data/client/quote.rs @@ -7,15 +7,52 @@ use crate::data::client::peer_cache::record_peer_outcome; use crate::data::client::Client; use crate::data::error::{Error, Result}; use ant_protocol::evm::{Amount, PaymentQuote}; -use ant_protocol::transport::{MultiAddr, PeerId}; +use ant_protocol::transport::{MultiAddr, P2PNode, PeerId}; use ant_protocol::{ compute_address, send_and_await_chunk_response, ChunkMessage, ChunkMessageBody, ChunkQuoteRequest, ChunkQuoteResponse, CLOSE_GROUP_MAJORITY, CLOSE_GROUP_SIZE, }; use futures::stream::{FuturesUnordered, StreamExt}; +use std::sync::Arc; use std::time::{Duration, Instant}; use tracing::{debug, info, warn}; +/// Trust weight applied for a verifiable bad-binding detection. +/// +/// Sized to push a peer from neutral 0.5 to below the production swap-out +/// threshold (`saorsa_core::adaptive::DEFAULT_SWAP_THRESHOLD = 0.35`) in a +/// single event. `saorsa-core` clamps the consumer-reported weight to its +/// `MAX_CONSUMER_WEIGHT = 5.0`; using exactly the cap means one mismatch is +/// enough to evict the peer from the next lookup. See plan-1-bad-node-eviction +/// for the math. Mirrors the storer-side weight in +/// `ant-node/src/payment/verifier.rs`. +const BAD_BINDING_TRUST_WEIGHT: f64 = 5.0; + +/// Reports a verifiable bad-binding event for a peer. +/// +/// Abstracted into a trait so the cheap, deterministic call sites +/// (`classify_quote_response`, `drop_quotes_with_bad_bindings`) can be +/// unit-tested without spinning up a `P2PNode` — tests pass a mock reporter +/// that records the events. Production code passes a `&Arc` whose +/// blanket impl forwards to `P2PNode::report_application_failure`. +/// +/// Trait method returns a boxed future (rather than an `async fn`) so we +/// don't pull in `async-trait` for one tiny call site. The cost — one +/// allocation per bad-binding event — is dwarfed by the network round-trip +/// that just produced the bad quote. +trait TrustReporter: Send + Sync { + fn report_bad_binding<'a>(&'a self, peer_id: &'a PeerId) -> futures::future::BoxFuture<'a, ()>; +} + +impl TrustReporter for Arc { + fn report_bad_binding<'a>(&'a self, peer_id: &'a PeerId) -> futures::future::BoxFuture<'a, ()> { + Box::pin(async move { + self.report_application_failure(peer_id, BAD_BINDING_TRUST_WEIGHT) + .await; + }) + } +} + /// Compute XOR distance between a peer's ID bytes and a target address. /// /// Uses the first 32 bytes of the peer ID (or fewer if shorter) XORed with @@ -128,6 +165,37 @@ fn classify_quote_response( Ok((payment_quote, price)) } +/// Report a trust-engine penalty if the per-peer classifier produced a +/// `BadQuoteBinding` verdict. No-op for every other outcome — including +/// network/timeout/protocol/serialization failures which are diagnosed +/// elsewhere (saorsa-core's own `ConnectionFailed`/`ConnectionTimeout` +/// signals already cover transport-level misbehaviour). +/// +/// Pulled out of the per-peer closure so the bad-binding → trust-event +/// edge can be unit-tested with a mock reporter (see B1 in +/// `notes/plan-1-bad-node-eviction.md`). +async fn report_if_bad_binding( + reporter: &R, + peer_id: &PeerId, + result: &std::result::Result<(PaymentQuote, Amount), Error>, +) { + if matches!(result, Err(Error::BadQuoteBinding { .. })) { + reporter.report_bad_binding(peer_id).await; + } +} + +/// Report trust-engine penalties for every peer that the post-collection +/// defensive filter dropped. Mirrors the per-peer reporting above for the +/// secondary defence in `drop_quotes_with_bad_bindings`. +async fn report_dropped_bad_bindings( + reporter: &R, + dropped_peers: &[PeerId], +) { + for peer_id in dropped_peers { + reporter.report_bad_binding(peer_id).await; + } +} + /// Map a per-peer quote-collection outcome to the AIMD-cache success flag. /// /// `Ok(_)` and `AlreadyStored` are both *benign* outcomes — the peer is @@ -143,11 +211,12 @@ fn quote_outcome_is_success(result: &std::result::Result<(PaymentQuote, Amount), } /// Drop quotes whose `pub_key` does not BLAKE3-hash to the peer that supplied -/// them. Logs each dropped quote at WARN. +/// them. Logs each dropped quote at WARN. Returns the `PeerId`s that were +/// dropped so the caller can report a trust-engine penalty per offender. fn drop_quotes_with_bad_bindings( quotes: &mut Vec<(PeerId, Vec, PaymentQuote, Amount)>, -) -> usize { - let before = quotes.len(); +) -> Vec { + let mut dropped = Vec::new(); quotes.retain(|(peer_id, _, quote, _)| { if quote_binding_is_valid(peer_id, quote) { true @@ -157,10 +226,11 @@ fn drop_quotes_with_bad_bindings( (peer is signing quotes with another peer's key); the storer would \ reject this proof" ); + dropped.push(*peer_id); false } }); - before - quotes.len() + dropped } impl Client { @@ -276,6 +346,11 @@ impl Client { record_peer_outcome(&node_clone, peer_id_clone, &addrs_clone, success, rtt_ms) .await; + // Trust-engine penalty for bad-binding peers. Sized to drop a + // peer below the swap-out threshold in a single event so the + // next `find_closest_peers` call routes around them. + report_if_bad_binding(&node_clone, &peer_id_clone, &result).await; + (peer_id_clone, addrs_clone, result) }; @@ -339,17 +414,24 @@ impl Client { // Defensive double-check: the per-peer handler already filters // bad-binding responses into `failures`, but if any path slipped a bad // quote into `quotes` (e.g. a future refactor) this catches it before - // we sort by distance and return. `bad_dropped` should be 0 in normal - // operation; non-zero indicates an upstream regression worth investigating. - let bad_dropped = drop_quotes_with_bad_bindings(&mut quotes); - if bad_dropped > 0 { + // we sort by distance and return. `bad_dropped_peers` should be empty + // in normal operation; non-empty indicates an upstream regression + // worth investigating. + let bad_dropped_peers = drop_quotes_with_bad_bindings(&mut quotes); + if !bad_dropped_peers.is_empty() { warn!( - "Defensive filter dropped {bad_dropped} quotes with mismatched peer bindings \ + "Defensive filter dropped {} quotes with mismatched peer bindings \ for address {} — the per-peer handler should have caught these earlier \ (this indicates an upstream regression)", + bad_dropped_peers.len(), hex::encode(address), ); - bad_quote_count += bad_dropped; + bad_quote_count += bad_dropped_peers.len(); + // Report a trust-engine penalty for each peer the defensive + // filter dropped — the per-peer handler should have done this + // already, but we belt-and-brace in case a future refactor + // bypasses the primary path. + report_dropped_bad_bindings(node, &bad_dropped_peers).await; } // Check already-stored: only count votes from the closest CLOSE_GROUP_SIZE peers. @@ -577,7 +659,7 @@ mod tests { let dropped = drop_quotes_with_bad_bindings(&mut quotes); - assert_eq!(dropped, 2, "two crossed-key quotes must be dropped"); + assert_eq!(dropped.len(), 2, "two crossed-key quotes must be dropped"); assert_eq!(quotes.len(), 3, "three real-key quotes must remain"); // Cross-checked invariant: every retained quote would be accepted by @@ -598,7 +680,7 @@ mod tests { let mut quotes: Vec<_> = (0..5).map(|_| good_quote_real()).collect(); let before = quotes.len(); let dropped = drop_quotes_with_bad_bindings(&mut quotes); - assert_eq!(dropped, 0); + assert!(dropped.is_empty()); assert_eq!(quotes.len(), before); for (peer_id, _, quote, _) in "es { assert!(storer_binding_would_accept(peer_id, quote)); @@ -615,7 +697,7 @@ mod tests { .map(|_| bad_quote_real()) .collect(); let dropped = drop_quotes_with_bad_bindings(&mut quotes); - assert_eq!(dropped, CLOSE_GROUP_SIZE * 2); + assert_eq!(dropped.len(), CLOSE_GROUP_SIZE * 2); assert!(quotes.is_empty()); } @@ -694,7 +776,11 @@ mod tests { // Step 2: run the patched filter. let dropped = drop_quotes_with_bad_bindings(&mut quotes); - assert_eq!(dropped, 1, "exactly the crossed-key quote must be filtered"); + assert_eq!( + dropped.len(), + 1, + "exactly the crossed-key quote must be filtered" + ); // Step 3: prove the storer would accept every survivor under the FULL spec. for (peer_id, _, quote, _) in "es { @@ -728,7 +814,7 @@ mod tests { .collect(); let dropped = drop_quotes_with_bad_bindings(&mut quotes); - assert_eq!(dropped, bad_count); + assert_eq!(dropped.len(), bad_count); assert!( quotes.len() < CLOSE_GROUP_SIZE, "this is the precondition for InsufficientPeers downstream" @@ -893,6 +979,174 @@ mod tests { } } + // ============================================================ + // Tests for the trust-event reporter wiring (plan-1 §B). + // + // The mock reporter records the peer IDs it was asked to penalise so + // we can assert exact-match behaviour: one event per bad peer, none + // for good peers, no spurious events on transport-class failures. + // ============================================================ + + /// In-memory `TrustReporter` that records the peers it was asked to + /// penalise. Tests inspect the recorded list to assert exact wiring. + #[derive(Default)] + struct MockReporter { + events: std::sync::Mutex>, + } + + impl MockReporter { + fn events(&self) -> Vec { + self.events.lock().expect("MockReporter mutex").clone() + } + } + + impl TrustReporter for MockReporter { + fn report_bad_binding<'a>( + &'a self, + peer_id: &'a PeerId, + ) -> futures::future::BoxFuture<'a, ()> { + Box::pin(async move { + self.events + .lock() + .expect("MockReporter mutex") + .push(*peer_id); + }) + } + } + + /// B1: A bad-binding response from a peer triggers exactly one + /// `report_bad_binding` call, and the classifier still returns the + /// pre-existing typed `BadQuoteBinding` error so the upstream collector + /// keeps counting it correctly. + #[tokio::test] + async fn classify_quote_response_reports_trust_event_on_bad_binding() { + let reporter = MockReporter::default(); + let (peer_id, _, quote, _) = bad_quote_real(); + let bytes = serialize_quote("e); + + let result = classify_quote_response(&peer_id, &bytes, false); + report_if_bad_binding(&reporter, &peer_id, &result).await; + + assert!( + matches!(result, Err(Error::BadQuoteBinding { .. })), + "classifier must still surface the typed error: {result:?}" + ); + let events = reporter.events(); + assert_eq!(events.len(), 1, "exactly one trust event must be reported"); + assert_eq!( + events[0].as_bytes(), + peer_id.as_bytes(), + "the reported peer must be the bad-binding offender" + ); + } + + /// B1 (negative): A self-consistent quote does NOT trigger a trust event. + /// Together with B1, this proves the reporter fires iff the binding fails. + #[tokio::test] + async fn classify_quote_response_does_not_report_on_good_binding() { + let reporter = MockReporter::default(); + let (peer_id, _, quote, _) = good_quote_real(); + let bytes = serialize_quote("e); + + let result = classify_quote_response(&peer_id, &bytes, false); + report_if_bad_binding(&reporter, &peer_id, &result).await; + + assert!(result.is_ok(), "classifier must accept good quote"); + assert!( + reporter.events().is_empty(), + "good quotes must not trigger trust events" + ); + } + + /// B1 (transport failures): A non-bad-binding error (network, timeout, + /// protocol, serialization, already-stored) must NOT trigger a trust + /// event from this reporter — those classes have their own diagnostics + /// in the AIMD cache and saorsa-core's connection-failure signal. + #[tokio::test] + async fn classify_quote_response_does_not_report_on_non_binding_failures() { + let reporter = MockReporter::default(); + let (peer_id, _, _quote, _) = good_quote_real(); + + let cases: Vec> = vec![ + Err(Error::AlreadyStored), + Err(Error::Network("net".into())), + Err(Error::Timeout("to".into())), + Err(Error::Protocol("proto".into())), + Err(Error::Serialization("ser".into())), + ]; + for case in &cases { + report_if_bad_binding(&reporter, &peer_id, case).await; + } + + assert!( + reporter.events().is_empty(), + "non-binding failures must not trigger trust events: {:?}", + reporter.events() + ); + } + + /// B2: The defensive bulk-drop path reports exactly one trust event + /// per dropped peer. + #[tokio::test] + async fn drop_quotes_with_bad_bindings_reports_one_event_per_dropped_peer() { + let reporter = MockReporter::default(); + let good_a = good_quote_real(); + let bad = bad_quote_real(); + let good_b = good_quote_real(); + let good_c = good_quote_real(); + let mut quotes = vec![good_a, bad.clone(), good_b, good_c]; + + let dropped = drop_quotes_with_bad_bindings(&mut quotes); + report_dropped_bad_bindings(&reporter, &dropped).await; + + assert_eq!(quotes.len(), 3, "three good quotes must remain"); + assert_eq!(dropped.len(), 1, "exactly one bad quote must be dropped"); + let events = reporter.events(); + assert_eq!(events.len(), 1, "one trust event per dropped peer"); + assert_eq!( + events[0].as_bytes(), + bad.0.as_bytes(), + "the reported peer must be the bad-binding offender" + ); + } + + /// B3: Trust events are recorded only for the bad peer; good peers in + /// the same batch are unaffected. Locks in the per-peer attribution. + #[tokio::test] + async fn bad_binding_does_not_affect_trust_for_other_peers() { + let reporter = MockReporter::default(); + let (good_a_peer, _, good_a_quote, _) = good_quote_real(); + let (bad_peer, _, bad_quote, _) = bad_quote_real(); + let (good_b_peer, _, good_b_quote, _) = good_quote_real(); + + for (peer_id, quote) in [ + (&good_a_peer, &good_a_quote), + (&bad_peer, &bad_quote), + (&good_b_peer, &good_b_quote), + ] { + let bytes = serialize_quote(quote); + let result = classify_quote_response(peer_id, &bytes, false); + report_if_bad_binding(&reporter, peer_id, &result).await; + } + + let events = reporter.events(); + assert_eq!(events.len(), 1, "only the bad peer must be penalised"); + assert_eq!(events[0].as_bytes(), bad_peer.as_bytes()); + // Explicit negative checks: neither good peer was reported. + assert!( + !events + .iter() + .any(|p| p.as_bytes() == good_a_peer.as_bytes()), + "first good peer must not be penalised" + ); + assert!( + !events + .iter() + .any(|p| p.as_bytes() == good_b_peer.as_bytes()), + "second good peer must not be penalised" + ); + } + /// Cross-validate the classifier's binding verdict against the /// independent storer-spec re-derivation across mixed responders. #[test]