diff --git a/.config/nextest.toml b/.config/nextest.toml index 7caa4a35e28..db99a2e3f95 100644 --- a/.config/nextest.toml +++ b/.config/nextest.toml @@ -17,7 +17,7 @@ filter = "package(solana-zk-elgamal-proof-program-tests) & test(/^test_batched_r threads-required = "num-cpus" [[profile.ci.overrides]] -filter = "package(solana-turbine) | package(solana-gossip) | package(solana-perf)" +filter = "package(solana-turbine) | package(solana-gossip) | package(solana-perf) | package(solana-quic-datagram)" retries = 0 [[profile.ci.overrides]] diff --git a/Cargo.lock b/Cargo.lock index e278ac5e0b1..18d59a98572 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -435,14 +435,19 @@ dependencies = [ "agave-math-utils", "agave-votor", "agave-votor-messages", + "arc-swap", "bitvec", + "bytes", "crossbeam-channel", "itertools 0.14.0", "lazy-lru", "log", "parking_lot 0.12.3", "qualifier_attr", + "quinn", + "quinn-proto", "rand 0.9.4", + "rustls", "serde", "serde_bytes", "solana-accounts-db", @@ -472,6 +477,7 @@ dependencies = [ "solana-signer-store", "solana-streamer", "solana-time-utils", + "solana-tls-utils", "solana-transaction", "solana-transaction-error", "solana-vote", @@ -479,6 +485,7 @@ dependencies = [ "tempfile", "test-case", "thiserror 2.0.18", + "tokio", "tokio-util 0.7.18", "wincode", ] @@ -639,21 +646,6 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" -[[package]] -name = "anstream" -version = "0.6.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8acc5369981196006228e28809f761875c0327210a891e941f4c683b3a99529b" -dependencies = [ - "anstyle", - "anstyle-parse 0.2.6", - "anstyle-query", - "anstyle-wincon", - "colorchoice", - "is_terminal_polyfill", - "utf8parse", -] - [[package]] name = "anstream" version = "1.0.0" @@ -661,7 +653,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "824a212faf96e9acacdbd09febd34438f8f711fb84e09a8916013cd7815ca28d" dependencies = [ "anstyle", - "anstyle-parse 1.0.0", + "anstyle-parse", "anstyle-query", "anstyle-wincon", "colorchoice", @@ -675,15 +667,6 @@ version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "940b3a0ca603d1eade50a4846a2afffd5ef57a9feac2c0e2ec2e14f9ead76000" -[[package]] -name = "anstyle-parse" -version = "0.2.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b2d16507662817a6a20a9ea92df6652ee4f94f914589377d69f3b21bc5798a9" -dependencies = [ - "utf8parse", -] - [[package]] name = "anstyle-parse" version = "1.0.0" @@ -1804,9 +1787,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.31" +version = "4.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "027bb0d98429ae334a8698531da7077bdf906419543a35a55c2cb1b66437d767" +checksum = "1ddb117e43bbf7dacf0a4190fef4d345b9bad68dfc649cb349e7d17d28428e51" dependencies = [ "clap_builder", "clap_derive", @@ -1814,21 +1797,21 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.31" +version = "4.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5589e0cba072e0f3d23791efac0fd8627b49c829c196a492e88168e6a669d863" +checksum = "714a53001bf66416adb0e2ef5ac857140e7dc3a0c48fb28b2f10762fc4b5069f" dependencies = [ - "anstream 0.6.18", + "anstream", "anstyle", - "clap_lex 0.7.4", + "clap_lex 1.1.0", "strsim 0.11.1", ] [[package]] name = "clap_derive" -version = "4.5.28" +version = "4.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf4ced95c6f4a675af3da73304b9ac4ed991640c36374e4b46795c49e17cf1ed" +checksum = "f2ce8604710f6733aa641a2b3731eaa1e8b3d9973d5e3565da11800813f997a9" dependencies = [ "heck", "proc-macro2", @@ -1847,9 +1830,9 @@ dependencies = [ [[package]] name = "clap_lex" -version = "0.7.4" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f46ad14479a25103f283c0f10005961cf086d8dc42205bb44c46ac563475dca6" +checksum = "c8d4a3bb8b1e0c1050499d1815f5ab16d04f0959b233085fb31653fbfc9d98f9" [[package]] name = "cmov" @@ -2051,7 +2034,7 @@ dependencies = [ "anes", "cast", "ciborium", - "clap 4.5.31", + "clap 4.6.1", "criterion-plot", "itertools 0.13.0", "num-traits", @@ -2753,7 +2736,7 @@ version = "0.11.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0621c04f2196ac3f488dd583365b9c09be011a4ab8b9f37248ffcc8f6198b56a" dependencies = [ - "anstream 1.0.0", + "anstream", "anstyle", "env_filter", "jiff", @@ -8949,7 +8932,9 @@ dependencies = [ "static_assertions", "strum", "tempfile", + "tokio", "tokio-util 0.7.18", + "wincode", ] [[package]] @@ -10502,7 +10487,7 @@ dependencies = [ "assert_matches", "bytes", "chrono", - "clap 4.5.31", + "clap 4.6.1", "crossbeam-channel", "futures 0.3.32", "histogram", diff --git a/Cargo.toml b/Cargo.toml index d588879bf19..157fa41ec30 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -308,6 +308,7 @@ protobuf-src = "1.1.0" protosol = "=8.2.0" qualifier_attr = { version = "0.2.2", default-features = false } quinn = "0.11.9" +quinn-proto = "0.11.14" rand = "0.9.4" rand_chacha = "0.9.0" rayon = "1.12.0" diff --git a/core/src/admin_rpc_post_init.rs b/core/src/admin_rpc_post_init.rs index 79259fc625c..5b4e417e9e2 100644 --- a/core/src/admin_rpc_post_init.rs +++ b/core/src/admin_rpc_post_init.rs @@ -34,6 +34,9 @@ pub enum KeyUpdaterType { Bls, /// BLS all-to-all connection cache key updater BlsConnectionCache, + /// Votor QUIC datagram endpoint key updater (single endpoint multiplexes + /// inbound consensus messages and outbound votes/certs). + VotorDatagram, } /// Responsible for managing the updaters for identity key change diff --git a/core/src/bls_sigverify/bls_cert_sigverify.rs b/core/src/bls_sigverify/bls_cert_sigverify.rs index 63aa9554024..655c637b7d5 100644 --- a/core/src/bls_sigverify/bls_cert_sigverify.rs +++ b/core/src/bls_sigverify/bls_cert_sigverify.rs @@ -1,5 +1,9 @@ use { - super::{bls_sigverifier::BAN_TIMEOUT, errors::SigVerifyCertError, stats::SigVerifyCertStats}, + super::{ + bls_sigverifier::{BAN_TIMEOUT, BlsBanlist}, + errors::SigVerifyCertError, + stats::SigVerifyCertStats, + }, crate::bls_sigverify::{bls_sigverifier::NUM_SLOTS_FOR_VERIFY, utils::send_certs_to_pool}, agave_bls_cert_verify::cert_verify::Error as BlsCertVerifyError, agave_votor_messages::{ @@ -16,7 +20,6 @@ use { solana_measure::measure::Measure, solana_pubkey::Pubkey, solana_runtime::bank::Bank, - solana_streamer::nonblocking::simple_qos::SimpleQosBanlist, std::{collections::HashSet, num::NonZeroU64}, thiserror::Error, }; @@ -53,7 +56,7 @@ pub(super) fn verify_and_send_certificates( certs: Vec, root_bank: &Bank, channel_to_pool: &Sender, - banlist: &SimpleQosBanlist, + banlist: &BlsBanlist, thread_pool: &ThreadPool, ) -> Result { for cert in certs.iter().map(|cert_payload| &cert_payload.cert) { @@ -95,7 +98,7 @@ fn verify_certs( root_bank: &Bank, verified_certs_set: &mut HashSet, stats: &mut SigVerifyCertStats, - banlist: &SimpleQosBanlist, + banlist: &BlsBanlist, thread_pool: &ThreadPool, ) -> SigVerifiedBatch { let verified = thread_pool.install(|| { diff --git a/core/src/bls_sigverify/bls_sigverifier.rs b/core/src/bls_sigverify/bls_sigverifier.rs index fb620beb366..92ca4ced7a6 100644 --- a/core/src/bls_sigverify/bls_sigverifier.rs +++ b/core/src/bls_sigverify/bls_sigverifier.rs @@ -12,7 +12,8 @@ use { cluster_info_vote_listener::VerifiedVoterSlotsSender, }, agave_votor::{ - consensus_metrics::ConsensusMetricsEventSender, generated_cert_types::GeneratedCertTypes, + consensus_metrics::ConsensusMetricsEventSender, datagram_endpoint::Datagram, + generated_cert_types::GeneratedCertTypes, }, agave_votor_messages::{ certificate::CertificateType, @@ -26,9 +27,11 @@ use { solana_gossip::cluster_info::ClusterInfo, solana_ledger::leader_schedule_cache::LeaderScheduleCache, solana_measure::measure_us, + solana_net_utils::banlist::Banlist, + solana_perf::packet::{BytesPacket, Meta, PacketBatch}, solana_pubkey::Pubkey, solana_runtime::{bank::Bank, bank_forks::SharableBanks}, - solana_streamer::{nonblocking::simple_qos::SimpleQosBanlist, packet::PacketBatch}, + solana_streamer::nonblocking::simple_qos::SimpleQosBanlist, std::{ collections::HashSet, sync::{ @@ -50,9 +53,31 @@ pub(super) const NUM_SLOTS_FOR_VERIFY: Slot = 90_000; /// We ban the sender for 2 days which roughly corresponds to an epoch pub(super) const BAN_TIMEOUT: Duration = Duration::from_hours(48); +pub(crate) enum BlsBanlist { + Stream(Arc), + Datagram(Arc>), +} + +impl BlsBanlist { + pub(crate) fn ban(&self, pubkey: Pubkey, timeout: Duration) -> bool { + match self { + Self::Stream(banlist) => banlist.ban(pubkey, timeout), + Self::Datagram(banlist) => banlist.ban(pubkey, timeout), + } + } + + #[cfg(test)] + fn is_banned(&self, pubkey: &Pubkey) -> bool { + match self { + Self::Stream(banlist) => banlist.is_banned(pubkey), + Self::Datagram(banlist) => banlist.is_banned(pubkey), + } + } +} + pub(crate) struct SigVerifierContext { pub(crate) migration_status: Arc, - pub(crate) banlist: Arc, + pub(crate) banlist: BlsBanlist, pub(crate) sharable_banks: SharableBanks, pub(crate) cluster_info: Arc, pub(crate) leader_schedule: Arc, @@ -60,8 +85,13 @@ pub(crate) struct SigVerifierContext { pub(crate) generated_cert_types: Arc, } +pub(crate) enum BlsPacketReceiver { + Stream(Receiver), + Datagram(Receiver), +} + pub(crate) struct SigVerifierChannels { - pub(crate) packet_receiver: Receiver, + pub(crate) packet_receiver: BlsPacketReceiver, pub(crate) channel_to_repair: VerifiedVoterSlotsSender, pub(crate) channel_to_reward: Sender, pub(crate) channel_to_pool: Sender, @@ -84,7 +114,7 @@ pub(crate) fn spawn_service( struct SigVerifier { migration_status: Arc, - banlist: Arc, + banlist: BlsBanlist, channels: SigVerifierChannels, /// Container to look up root banks from. sharable_banks: SharableBanks, @@ -294,15 +324,58 @@ impl SigVerifier { } } -/// Receives a `Vec` from the `receiver` while adhering to the `soft_receive_cap` limit. +/// Wraps a single inbound [`Datagram`] as a one-packet [`PacketBatch`] so the +/// rest of the sigverifier can keep operating on the `PacketBatch` abstraction. +/// The QUIC-authenticated sender pubkey is stored in the packet meta, where the +/// extract step reads it back via [`Meta::remote_pubkey`]. +fn datagram_to_batch(datagram: Datagram) -> PacketBatch { + let Datagram { + peer_pubkey, + peer_address, + message, + } = datagram; + let mut meta = Meta::default(); + meta.size = message.len(); + meta.set_socket_addr(&peer_address); + meta.set_remote_pubkey(peer_pubkey); + PacketBatch::Single(BytesPacket::new(message, meta)) +} + +/// Receives a batch of [`Datagram`]s from the `receiver`, each wrapped as a +/// single-packet [`PacketBatch`], up to the `soft_receive_cap` limit. /// /// Returns `Err(())` if the channel disconnected. fn recv_batches( + receiver: &BlsPacketReceiver, + soft_receive_cap: usize, +) -> Result, ()> { + match receiver { + BlsPacketReceiver::Stream(receiver) => recv_stream_batches(receiver, soft_receive_cap), + BlsPacketReceiver::Datagram(receiver) => recv_datagram_batches(receiver, soft_receive_cap), + } +} + +fn recv_stream_batches( receiver: &Receiver, soft_receive_cap: usize, +) -> Result, ()> { + recv_mapped_batches(receiver, soft_receive_cap, |batch| batch) +} + +fn recv_datagram_batches( + receiver: &Receiver, + soft_receive_cap: usize, +) -> Result, ()> { + recv_mapped_batches(receiver, soft_receive_cap, datagram_to_batch) +} + +fn recv_mapped_batches( + receiver: &Receiver, + soft_receive_cap: usize, + mut map: impl FnMut(T) -> PacketBatch, ) -> Result, ()> { let batch = match receiver.recv_timeout(Duration::from_secs(1)) { - Ok(b) => b, + Ok(b) => map(b), Err(e) => match e { RecvTimeoutError::Timeout => { return Ok(vec![]); @@ -317,7 +390,7 @@ fn recv_batches( while batches.len() < soft_receive_cap { match receiver.try_recv() { Ok(b) => { - batches.push(b); + batches.push(map(b)); } Err(e) => match e { TryRecvError::Empty => return Ok(batches), @@ -363,17 +436,16 @@ mod tests { solana_signer_store::encode_base2, }; - fn new_test_banlist() -> Arc { - let (banlist, _banlist_eviction_receiver) = SimpleQosBanlist::new(); - Arc::new(banlist) + fn new_test_banlist() -> Arc> { + Arc::new(Banlist::::default()) } struct TestContext { verifier: SigVerifier, validator_keypairs: Vec, - banlist: Arc, + banlist: Arc>, - _packet_sender: Sender, + _packet_sender: Sender, repair_receiver: VerifiedVoterSlotsReceiver, _reward_receiver: Receiver, pool_receiver: Receiver, @@ -427,7 +499,7 @@ mod tests { let verifier = SigVerifier::new( SigVerifierContext { migration_status: Arc::new(MigrationStatus::default()), - banlist: banlist.clone(), + banlist: BlsBanlist::Datagram(banlist.clone()), sharable_banks, cluster_info, leader_schedule, @@ -435,7 +507,7 @@ mod tests { generated_cert_types: generated_cert_types.clone(), }, SigVerifierChannels { - packet_receiver, + packet_receiver: BlsPacketReceiver::Datagram(packet_receiver), channel_to_repair, channel_to_reward, channel_to_pool, @@ -1385,7 +1457,7 @@ mod tests { let mut sig_verifier = SigVerifier::new( SigVerifierContext { migration_status: Arc::new(MigrationStatus::default()), - banlist: new_test_banlist(), + banlist: BlsBanlist::Datagram(new_test_banlist()), sharable_banks, cluster_info, leader_schedule, @@ -1393,7 +1465,7 @@ mod tests { generated_cert_types: Arc::new(GeneratedCertTypes::default()), }, SigVerifierChannels { - packet_receiver, + packet_receiver: BlsPacketReceiver::Datagram(packet_receiver), channel_to_repair: votes_for_repair_sender, channel_to_reward: reward_votes_sender, channel_to_pool: message_sender, diff --git a/core/src/bls_sigverify/bls_vote_sigverify.rs b/core/src/bls_sigverify/bls_vote_sigverify.rs index 6974e7867cf..f3229800073 100644 --- a/core/src/bls_sigverify/bls_vote_sigverify.rs +++ b/core/src/bls_sigverify/bls_vote_sigverify.rs @@ -5,7 +5,7 @@ use { crate::{ block_creation_loop::rewards::msg_types::AddVoteMessage, bls_sigverify::{ - bls_sigverifier::{BAN_TIMEOUT, NUM_SLOTS_FOR_VERIFY, SigVerifierChannels}, + bls_sigverifier::{BAN_TIMEOUT, BlsBanlist, NUM_SLOTS_FOR_VERIFY, SigVerifierChannels}, utils::{ send_votes_to_metrics, send_votes_to_pool, send_votes_to_repair, send_votes_to_rewards, @@ -32,7 +32,6 @@ use { solana_measure::{measure::Measure, measure_us}, solana_pubkey::Pubkey, solana_runtime::bank::Bank, - solana_streamer::nonblocking::simple_qos::SimpleQosBanlist, std::{collections::HashMap, sync::Arc}, }; @@ -76,7 +75,7 @@ pub(super) fn verify_and_send_votes( root_bank: &Bank, cluster_info: &ClusterInfo, leader_schedule: &LeaderScheduleCache, - banlist: &SimpleQosBanlist, + banlist: &BlsBanlist, thread_pool: &ThreadPool, channels: &SigVerifierChannels, ) -> Result { @@ -180,7 +179,7 @@ fn verify_votes( root_bank: &Bank, votes_to_verify: Vec, stats: &mut SigVerifyVoteStats, - banlist: &SimpleQosBanlist, + banlist: &BlsBanlist, thread_pool: &ThreadPool, ) -> Vec { // Filter votes too far in the future. diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 0469a562ea8..8022f74523e 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -30,11 +30,15 @@ use { }, agave_votor::{ consensus_metrics::MAX_IN_FLIGHT_CONSENSUS_EVENTS, + datagram_endpoint::{Datagram, StakedNodesAllowlist}, event::{LatestSwitchRequest, LeaderWindowInfo, VotorEventReceiver, VotorEventSender}, generated_cert_types::GeneratedCertTypes, vote_history::VoteHistory, vote_history_storage::VoteHistoryStorage, - voting_service::{VotingService as BLSVotingService, VotingServiceOverride}, + voting_service::{ + VotingService as BLSVotingService, VotingServiceOverride, VotorSendMode, + VotorTransportMode, + }, votor::{Votor, VotorConfig}, }, agave_votor_messages::consensus_message::Block, @@ -55,6 +59,7 @@ use { leader_schedule_cache::LeaderScheduleCache, shred::filter::TurbineMode, }, + solana_net_utils::banlist::Banlist, solana_poh::{poh_controller::PohController, poh_recorder::PohRecorder}, solana_pubkey::Pubkey, solana_rpc::{ @@ -117,11 +122,36 @@ pub struct Tvu { warm_quic_cache_service: Option, drop_bank_service: DropBankService, duplicate_shred_listener: DuplicateShredListener, - bls_sigverify_threads: Option<(JoinHandle<()>, JoinHandle<()>)>, + bls_transport_service: Option, votor: Votor, commitment_service: AggregateCommitmentService, } +enum BlsTransportService { + Stream { + streamer: JoinHandle<()>, + sigverifier: JoinHandle<()>, + }, + Datagram { + sigverifier: JoinHandle<()>, + }, +} + +impl BlsTransportService { + fn join(self) -> thread::Result<()> { + match self { + Self::Stream { + streamer, + sigverifier, + } => { + streamer.join()?; + sigverifier.join() + } + Self::Datagram { sigverifier } => sigverifier.join(), + } + } +} + pub struct TvuSockets { pub fetch: Vec, pub repair: UdpSocket, @@ -144,6 +174,7 @@ pub struct TvuConfig { pub shred_sigverify_threads: NonZeroUsize, pub bls_sigverify_threads: NonZeroUsize, pub turbine_xdp_sender: Option, + pub votor_transport_mode: VotorTransportMode, } impl Default for TvuConfig { @@ -159,6 +190,7 @@ impl Default for TvuConfig { shred_sigverify_threads: NonZeroUsize::new(1).expect("1 is non-zero"), bls_sigverify_threads: NonZeroUsize::new(1).expect("1 is non-zero"), turbine_xdp_sender: None, + votor_transport_mode: VotorTransportMode::default(), } } } @@ -184,8 +216,19 @@ pub struct AlpenglowInitializationState { pub staked_nodes: Arc>, pub key_notifiers: Arc>, - // For BLS voting service + // For BLS voting service stream mode pub bls_connection_cache: Arc, + + // Votor QUIC datagram transport handles. Egress goes to the BLS + // voting service; ingress feeds the BLS sigverifier; banlist is shared + // with the sigverifier so external triggers (e.g. signature failure) + // can soft-ban peers cluster-wide. `votor_allowlist` is owned by the + // voting service's StakedValidatorsCache, which republishes the + // staked-pubkey set on every epoch boundary. + pub votor_egress: tokio::sync::mpsc::Sender, + pub votor_ingress: Receiver, + pub votor_banlist: Arc>, + pub votor_allowlist: Option>, pub voting_service_test_override: Option, } @@ -268,6 +311,10 @@ impl Tvu { staked_nodes, key_notifiers, bls_connection_cache, + votor_egress, + votor_ingress, + votor_banlist, + votor_allowlist, voting_service_test_override, highest_finalized, } = votor_init; @@ -279,72 +326,102 @@ impl Tvu { bounded(MAX_IN_FLIGHT_CONSENSUS_EVENTS); let generated_cert_types = Arc::new(GeneratedCertTypes::default()); - // The BLS socket is currently only available on Testnet and Development clusters. - // Closer to release we will enable this for all clusters. - let bls_sigverify_threads = if let Some(bls_socket) = bls_socket { - let (bls_packet_sender, bls_packet_receiver) = bounded(MAX_ALPENGLOW_PACKET_NUM); + let bls_transport_service = match tvu_config.votor_transport_mode { + VotorTransportMode::QuicStream => bls_socket.map(|bls_socket| { + let (bls_packet_sender, bls_packet_receiver) = bounded(MAX_ALPENGLOW_PACKET_NUM); - let ( - SpawnServerResult { - endpoints: _, - thread: bls_streamer_t, - key_updater: bls_key_updater, - }, - banlist, - ) = { - let quic_server_params = QuicStreamerConfig { - num_threads: NonZeroUsize::new(4.min(num_cpus::get())).unwrap(), - ..Default::default() - }; - let qos_config = SimpleQosConfig { - max_streams_per_second: 30, - // Cap by # of active validators (some overhead for epoch boundaries) - max_staked_connections: MAX_ALPENGLOW_VOTE_ACCOUNTS * 2, - // Two staked connection per validator to account for hotspares - max_connections_per_peer: 2, - }; - spawn_simple_qos_server( - "solQuicBLS", - "quic_streamer_bls", - vec![bls_socket.into()], - &cluster_info.keypair(), - bls_packet_sender, - staked_nodes, - quic_server_params, - qos_config, - cancel, - ) - .unwrap() - }; - - // sigverifier - let sharable_banks = bank_forks.read().unwrap().sharable_banks(); - let bls_sigverifier_t = bls_sigverifier::spawn_service( - exit.clone(), - SigVerifierContext { - migration_status: migration_status.clone(), + let ( + SpawnServerResult { + endpoints: _, + thread: bls_streamer_t, + key_updater: bls_key_updater, + }, banlist, - sharable_banks, - cluster_info: cluster_info.clone(), - leader_schedule: leader_schedule_cache.clone(), - num_threads: tvu_config.bls_sigverify_threads.get(), - generated_cert_types: generated_cert_types.clone(), - }, - SigVerifierChannels { - packet_receiver: bls_packet_receiver, - channel_to_repair: verified_voter_slots_sender, - channel_to_reward: reward_votes_sender, - channel_to_pool: consensus_message_sender.clone(), - channel_to_metrics: consensus_metrics_sender.clone(), - }, - ); - - let mut key_notifiers = key_notifiers.write().unwrap(); - key_notifiers.add(KeyUpdaterType::Bls, bls_key_updater); + ) = { + let quic_server_params = QuicStreamerConfig { + num_threads: NonZeroUsize::new(4.min(num_cpus::get())).unwrap(), + ..Default::default() + }; + let qos_config = SimpleQosConfig { + max_streams_per_second: 30, + max_staked_connections: MAX_ALPENGLOW_VOTE_ACCOUNTS * 2, + max_connections_per_peer: 2, + }; + spawn_simple_qos_server( + "solQuicBLS", + "quic_streamer_bls", + vec![bls_socket.into()], + &cluster_info.keypair(), + bls_packet_sender, + staked_nodes, + quic_server_params, + qos_config, + cancel, + ) + .unwrap() + }; - Some((bls_streamer_t, bls_sigverifier_t)) - } else { - None + let sharable_banks = bank_forks.read().unwrap().sharable_banks(); + let bls_sigverifier_t = bls_sigverifier::spawn_service( + exit.clone(), + SigVerifierContext { + migration_status: migration_status.clone(), + banlist: bls_sigverifier::BlsBanlist::Stream(banlist), + sharable_banks, + cluster_info: cluster_info.clone(), + leader_schedule: leader_schedule_cache.clone(), + num_threads: tvu_config.bls_sigverify_threads.get(), + generated_cert_types: generated_cert_types.clone(), + }, + SigVerifierChannels { + packet_receiver: bls_sigverifier::BlsPacketReceiver::Stream( + bls_packet_receiver, + ), + channel_to_repair: verified_voter_slots_sender, + channel_to_reward: reward_votes_sender, + channel_to_pool: consensus_message_sender.clone(), + channel_to_metrics: consensus_metrics_sender.clone(), + }, + ); + + key_notifiers + .write() + .unwrap() + .add(KeyUpdaterType::Bls, bls_key_updater); + + BlsTransportService::Stream { + streamer: bls_streamer_t, + sigverifier: bls_sigverifier_t, + } + }), + VotorTransportMode::QuicDatagram => { + let _ = bls_socket; + let sharable_banks = bank_forks.read().unwrap().sharable_banks(); + let bls_sigverifier_t = bls_sigverifier::spawn_service( + exit.clone(), + SigVerifierContext { + migration_status: migration_status.clone(), + banlist: bls_sigverifier::BlsBanlist::Datagram(votor_banlist), + sharable_banks, + cluster_info: cluster_info.clone(), + leader_schedule: leader_schedule_cache.clone(), + num_threads: tvu_config.bls_sigverify_threads.get(), + generated_cert_types: generated_cert_types.clone(), + }, + SigVerifierChannels { + packet_receiver: bls_sigverifier::BlsPacketReceiver::Datagram( + votor_ingress, + ), + channel_to_repair: verified_voter_slots_sender, + channel_to_reward: reward_votes_sender, + channel_to_pool: consensus_message_sender.clone(), + channel_to_metrics: consensus_metrics_sender.clone(), + }, + ); + Some(BlsTransportService::Datagram { + sigverifier: bls_sigverifier_t, + }) + } }; let (fetch_sender, fetch_receiver) = EvictingSender::new_bounded(SHRED_FETCH_CHANNEL_SIZE); @@ -595,7 +672,15 @@ impl Tvu { bls_receiver, cluster_info.clone(), vote_history_storage, - bls_connection_cache, + match tvu_config.votor_transport_mode { + VotorTransportMode::QuicStream => VotorSendMode::Stream { + connection_cache: bls_connection_cache, + }, + VotorTransportMode::QuicDatagram => VotorSendMode::Datagram { + egress: votor_egress, + allowlist: votor_allowlist, + }, + }, bank_forks.clone(), voting_service_test_override, ); @@ -649,7 +734,7 @@ impl Tvu { warm_quic_cache_service, drop_bank_service, duplicate_shred_listener, - bls_sigverify_threads, + bls_transport_service, votor, commitment_service, }) @@ -671,9 +756,8 @@ impl Tvu { } self.drop_bank_service.join()?; self.duplicate_shred_listener.join()?; - if let Some((streamer, sigverifier)) = self.bls_sigverify_threads { - streamer.join()?; - sigverifier.join()?; + if let Some(bls_transport_service) = self.bls_transport_service { + bls_transport_service.join()?; } self.votor.join()?; self.commitment_service.join()?; @@ -731,10 +815,7 @@ pub mod tests { solana_runtime::{bank::Bank, bank_forks_controller::BankForksControllerHandle}, solana_signer::Signer, solana_tpu_client::tpu_client::{DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_VOTE_USE_QUIC}, - std::{ - sync::atomic::{AtomicU64, Ordering}, - time::Duration, - }, + std::sync::atomic::{AtomicU64, Ordering}, }; #[test] @@ -799,10 +880,12 @@ pub mod tests { DEFAULT_TPU_CONNECTION_POOL_SIZE, ) }; - let bls_connection_cache = ConnectionCache::new_quic_for_tests( - "connection_cache_bls_quic", - DEFAULT_TPU_CONNECTION_POOL_SIZE, - ); + // Stub the votor datagram channels — the test runs without an + // actual alpenglow endpoint. The egress sink and ingress source + // are never connected to each other; ingress simply never fires. + let (votor_egress, _votor_egress_rx) = tokio::sync::mpsc::channel(1024); + let (_votor_ingress_tx, votor_ingress) = bounded(1024); + let votor_banlist = Arc::new(Banlist::::default()); let replay_highest_frozen = Arc::new(ReplayHighestFrozen::default()); let (leader_window_info_sender, _leader_window_info_receiver) = unbounded(); let (optimistic_parent_sender, optimistic_parent_receiver) = unbounded(); @@ -815,20 +898,7 @@ pub mod tests { ))); let (votor_event_sender, votor_event_receiver): (VotorEventSender, VotorEventReceiver) = unbounded(); - let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let key_notifiers = Arc::new(RwLock::new(KeyUpdaters::default())); - let cancel = CancellationToken::new(); - thread::spawn({ - let cancel = cancel.clone(); - let exit = exit.clone(); - move || loop { - if exit.load(Ordering::Relaxed) { - cancel.cancel(); - break; - } - thread::sleep(Duration::from_secs(1)); - } - }); let (bank_forks_controller, bank_forks_controller_receiver) = BankForksControllerHandle::new(); let bank_forks_controller = Arc::new(bank_forks_controller); @@ -898,10 +968,17 @@ pub mod tests { highest_parent_ready, votor_event_sender, votor_event_receiver, - cancel, - staked_nodes, + cancel: CancellationToken::new(), + staked_nodes: Arc::new(RwLock::new(StakedNodes::default())), key_notifiers, - bls_connection_cache: Arc::new(bls_connection_cache), + bls_connection_cache: Arc::new(ConnectionCache::new_quic_for_tests( + "connection_cache_bls_quic", + 1, + )), + votor_egress, + votor_ingress, + votor_banlist, + votor_allowlist: None, voting_service_test_override: None, highest_finalized: Arc::new(RwLock::new(None)), bank_forks_controller, diff --git a/core/src/validator.rs b/core/src/validator.rs index 160eb923341..ff5f727441e 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -36,7 +36,7 @@ use { agave_votor::{ vote_history::{VoteHistory, VoteHistoryError}, vote_history_storage::{NullVoteHistoryStorage, VoteHistoryStorage}, - voting_service::VotingServiceOverride, + voting_service::{VotingServiceOverride, VotorTransportMode}, }, agave_xdp::transmitter::{Transmitter, TransmitterBuilder}, anyhow::{Result, anyhow}, @@ -394,6 +394,7 @@ pub struct ValidatorConfig { pub tvu_bls_sigverify_threads: NonZeroUsize, pub delay_leader_block_for_pending_fork: bool, pub voting_service_test_override: Option, + pub votor_transport_mode: VotorTransportMode, pub repair_handler_type: RepairHandlerType, // Thread niceness adjustment for snapshot packager service pub snapshot_packager_niceness_adj: i8, @@ -479,6 +480,7 @@ impl ValidatorConfig { tvu_bls_sigverify_threads: NonZeroUsize::new(2).expect("2 is non-zero"), delay_leader_block_for_pending_fork: false, voting_service_test_override: None, + votor_transport_mode: VotorTransportMode::default(), repair_handler_type: RepairHandlerType::default(), snapshot_packager_niceness_adj: 0, } @@ -1189,9 +1191,7 @@ impl Validator { let bls_connection_cache = Arc::new(ConnectionCache::new_with_client_options( "connection_cache_bls_quic", - // BLS consensus messaging is extremely low throughput (5 PPS). Even during standstill operations - // we wouldn't expect more than a 100 PPS. 1 connection is enough. - 1, /* connection_pool_size */ + 1, Some(node.sockets.quic_alpenglow_client), Some(( &identity_keypair, @@ -1206,11 +1206,14 @@ impl Validator { )), Some((&staked_nodes, &identity_keypair.pubkey())), )); + let key_notifiers = Arc::new(RwLock::new(KeyUpdaters::default())); - key_notifiers.write().unwrap().add( - KeyUpdaterType::BlsConnectionCache, - bls_connection_cache.clone(), - ); + if config.votor_transport_mode == VotorTransportMode::QuicStream { + key_notifiers.write().unwrap().add( + KeyUpdaterType::BlsConnectionCache, + bls_connection_cache.clone(), + ); + } // test-validator crate may start the validator in a tokio runtime // context which forces us to use the same runtime because a nested @@ -1227,6 +1230,66 @@ impl Validator { .unwrap() }); + // Votor QUIC datagram endpoint. One UDP socket multiplexes + // outbound votes/certs (egress) and inbound consensus messages + // (ingress) per the lex-pubkey direction rule. Only constructed + // on Testnet/Development clusters; absent → stub channels are + // installed downstream so the BLS sigverifier task spawns but + // never receives anything. + let datagram_alpenglow_socket = if config.votor_transport_mode + == VotorTransportMode::QuicDatagram + && matches!( + genesis_config.cluster_type, + ClusterType::Testnet | ClusterType::Development, + ) { + node.sockets.alpenglow.take() + } else { + None + }; + let (votor_egress, votor_ingress, votor_banlist, votor_allowlist) = + if let Some(socket) = datagram_alpenglow_socket { + let votor_rt_handle = tpu_client_next_runtime + .as_ref() + .map(TokioRuntime::handle) + .unwrap_or_else(|| current_runtime_handle.as_ref().unwrap()); + let (ingress_tx, ingress_rx) = + crossbeam_channel::bounded(crate::tvu::MAX_ALPENGLOW_PACKET_NUM); + + let banlist = Arc::new(solana_net_utils::banlist::Banlist::::default()); + let allowlist = agave_votor::datagram_endpoint::build_allowlist( + &bank_forks.read().unwrap().sharable_banks(), + ); + + let endpoint = agave_votor::datagram_endpoint::VotorDatagramEndpoint::new( + votor_rt_handle, + &identity_keypair, + socket, + ingress_tx, + allowlist.clone(), + banlist.clone(), + ) + .map_err(|e| ValidatorError::Other(format!("alpenglow endpoint: {e:?}")))?; + key_notifiers + .write() + .unwrap() + .add(KeyUpdaterType::VotorDatagram, endpoint.key_updater.clone()); + let egress = endpoint.egress.clone(); + votor_rt_handle.spawn({ + let cancel = cancel.clone(); + async move { + cancel.cancelled().await; + endpoint.close(); + } + }); + (egress, ingress_rx, banlist, Some(allowlist)) + } else { + let (egress, _) = tokio::sync::mpsc::channel(1); + let (_, ingress) = + crossbeam_channel::bounded::(1); + let banlist = Arc::new(solana_net_utils::banlist::Banlist::::default()); + (egress, ingress, banlist, None) + }; + let rpc_override_health_check = Arc::new(AtomicBool::new(config.rpc_config.disable_health_check)); let ( @@ -1568,8 +1631,9 @@ impl Validator { }; // disable all2all tests if not allowed for a given cluster type - let alpenglow_socket = if genesis_config.cluster_type == ClusterType::Testnet - || genesis_config.cluster_type == ClusterType::Development + let alpenglow_socket = if config.votor_transport_mode == VotorTransportMode::QuicStream + && (genesis_config.cluster_type == ClusterType::Testnet + || genesis_config.cluster_type == ClusterType::Development) { node.sockets.alpenglow } else { @@ -1625,6 +1689,7 @@ impl Validator { shred_sigverify_threads: config.tvu_shred_sigverify_threads, bls_sigverify_threads: config.tvu_bls_sigverify_threads, turbine_xdp_sender: turbine_xdp_sender.clone(), + votor_transport_mode: config.votor_transport_mode, }, &max_slots, block_metadata_notifier, @@ -1651,6 +1716,10 @@ impl Validator { staked_nodes: staked_nodes.clone(), key_notifiers: key_notifiers.clone(), bls_connection_cache, + votor_egress, + votor_ingress, + votor_banlist, + votor_allowlist, voting_service_test_override: config.voting_service_test_override.clone(), highest_finalized, }, diff --git a/dev-bins/Cargo.lock b/dev-bins/Cargo.lock index 0c7a00690ca..5b45bc80461 100644 --- a/dev-bins/Cargo.lock +++ b/dev-bins/Cargo.lock @@ -350,6 +350,7 @@ dependencies = [ "agave-math-utils", "agave-votor-messages", "bitvec", + "bytes", "crossbeam-channel", "itertools 0.14.0", "lazy-lru", @@ -385,6 +386,7 @@ dependencies = [ "solana-vote", "solana-vote-program", "thiserror 2.0.18", + "tokio", "wincode", ] @@ -7570,7 +7572,9 @@ dependencies = [ "static_assertions", "strum", "tempfile", + "tokio", "tokio-util 0.7.18", + "wincode", ] [[package]] @@ -8075,6 +8079,7 @@ dependencies = [ "tokio", ] + [[package]] name = "solana-rayon-threadlimit" version = "4.2.0-alpha.0" diff --git a/local-cluster/Cargo.toml b/local-cluster/Cargo.toml index cf9a9ab1c49..1927ba390c6 100644 --- a/local-cluster/Cargo.toml +++ b/local-cluster/Cargo.toml @@ -81,7 +81,9 @@ solana-vote-program = { workspace = true } static_assertions = { workspace = true } strum = { workspace = true, features = ["derive"] } tempfile = { workspace = true } +tokio = { workspace = true, features = ["sync", "rt", "rt-multi-thread"] } tokio-util = { workspace = true } +wincode = { workspace = true, features = ["alloc"] } [dev-dependencies] assert_matches = { workspace = true } diff --git a/local-cluster/src/cluster_tests.rs b/local-cluster/src/cluster_tests.rs index 89c6cab12ba..3a1de1c566e 100644 --- a/local-cluster/src/cluster_tests.rs +++ b/local-cluster/src/cluster_tests.rs @@ -5,6 +5,7 @@ use log::*; use { crate::{cluster::QuicTpuClient, local_cluster::LocalCluster}, + agave_votor::datagram_endpoint::{Datagram, StakedNodesAllowlist, VotorDatagramEndpoint}, agave_votor_messages::consensus_message::ConsensusMessage, crossbeam_channel::bounded, rand::{Rng, rng}, @@ -26,16 +27,11 @@ use { solana_hash::Hash, solana_keypair::Keypair, solana_ledger::blockstore::Blockstore, - solana_net_utils::SocketAddrSpace, + solana_net_utils::{SocketAddrSpace, banlist::Banlist}, solana_poh_config::PohConfig, solana_pubkey::Pubkey, solana_rpc_client::rpc_client::RpcClient, solana_signer::Signer, - solana_streamer::{ - nonblocking::simple_qos::SimpleQosConfig, - quic::{QuicStreamerConfig, spawn_simple_qos_server}, - streamer::StakedNodes, - }, solana_system_transaction as system_transaction, solana_time_utils::timestamp, solana_tpu_client::tpu_client::{TpuClient, TpuClientConfig, TpuSenderError}, @@ -55,7 +51,6 @@ use { thread::{JoinHandle, sleep}, time::{Duration, Instant}, }, - tokio_util::sync::CancellationToken, }; #[cfg(feature = "dev-context-only-utils")] use { @@ -435,41 +430,43 @@ pub fn check_for_new_processed( ); } -/// Start a QUIC streamer to listen for votes and certificates. -/// Returns a cancellation token, the server thread handle, and a receiver for packet batches. -pub fn start_quic_streamer_to_listen_for_votes_and_certs( +/// Spawn a Votor datagram endpoint to sniff vote / cert +/// traffic for a local-cluster test. Validators in the cluster dial +/// this listener as an `AdditionalListener`; the lex-pubkey rule +/// requires `listener_keypair.pubkey() > max(validator_pubkeys)` and +/// the validators' admission must include `listener_keypair.pubkey()` +/// — both ensured by the caller (the test). +/// +/// Returns the endpoint (kept alive by the caller so its task does +/// not get dropped), the ingress receiver (one item per received +/// datagram), and a tokio runtime handle whose lifetime must outlive +/// the endpoint. +pub fn start_datagram_listener_for_votes_and_certs( vote_listener_socket: UdpSocket, - validator_keys: &[Arc], - node_stakes: &[u64], + listener_keypair: Keypair, ) -> ( - CancellationToken, - JoinHandle<()>, - crossbeam_channel::Receiver, + VotorDatagramEndpoint, + crossbeam_channel::Receiver, + tokio::runtime::Runtime, ) { + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads(2) + .thread_name("solAlpenglowListen") + .build() + .expect("tokio runtime"); let (sender, receiver) = bounded(1024); - let cancel = CancellationToken::new(); - let stakes = validator_keys - .iter() - .zip(node_stakes) - .map(|(keypair, stake)| (keypair.pubkey(), *stake)) - .collect(); - let staked_nodes: Arc> = Arc::new(RwLock::new(StakedNodes::new( - Arc::new(stakes), - HashMap::::default(), // overrides - ))); - let (result, _banlist) = spawn_simple_qos_server( - "solAlpenglowTest", - "alpenglow_local_cluster_test", - [vote_listener_socket.into()], - &Keypair::new(), + let banlist = Arc::new(Banlist::::default()); + let endpoint = VotorDatagramEndpoint::new( + rt.handle(), + &listener_keypair, + vote_listener_socket, sender, - staked_nodes, - QuicStreamerConfig::default(), - SimpleQosConfig::default(), - cancel.clone(), + Arc::new(StakedNodesAllowlist::allow_all_for_tests()), + banlist, ) - .unwrap(); - (cancel, result.thread, receiver) + .expect("alpenglow datagram listener"); + (endpoint, receiver, rt) } /// Check that all nodes in the cluster are producing notarized votes. @@ -478,8 +475,7 @@ pub fn check_for_new_notarized_votes( contact_infos: &[ContactInfo], test_name: &str, vote_listener_socket: UdpSocket, - validator_keys: &[Arc], - node_stakes: &[u64], + listener_keypair: Keypair, ) { let loop_start = Instant::now(); let loop_timeout = Duration::from_secs(180); @@ -500,11 +496,8 @@ pub fn check_for_new_notarized_votes( let contact_infos_owned: Vec = contact_infos.to_vec(); let test_name_owned = test_name.to_string(); - let (cancel, quic_server_thread, receiver) = start_quic_streamer_to_listen_for_votes_and_certs( - vote_listener_socket, - validator_keys, - node_stakes, - ); + let (_endpoint, receiver, _rt) = + start_datagram_listener_for_votes_and_certs(vote_listener_socket, listener_keypair); // Now start vote listener and wait for new notarized votes. let vote_listener = std::thread::spawn({ @@ -519,52 +512,45 @@ pub fn check_for_new_notarized_votes( move || { while !done { assert!(loop_start.elapsed() < loop_timeout); - let Ok(packet_batch) = receiver.recv_timeout(Duration::from_millis(100)) else { + let Ok(dg) = receiver.recv_timeout(Duration::from_millis(100)) else { continue; }; - for packet in packet_batch.iter() { - let Ok(ConsensusMessage::Vote(vote_message)) = packet.deserialize_slice(..) - else { - continue; - }; - let vote = vote_message.vote; - if !vote.is_notarization() { - continue; - } - let rank = vote_message.rank; - if rank >= contact_infos_owned.len() as u16 { - warn!( - "Received vote with rank {} which is greater than number of nodes {}", - rank, - contact_infos_owned.len() - ); - continue; - } - let slot = vote.slot(); - if slot <= last_notarized[rank as usize] { - continue; - } - last_notarized[rank as usize] = slot; - num_new_notarized_votes[rank as usize] += 1; - done = num_new_notarized_votes.iter().all(|&x| x > num_new_votes); - if done || last_print.elapsed().as_secs() > 3 { - info!( - "{test_name_owned} waiting for {num_new_votes} new notarized votes.. \ - observed: {num_new_notarized_votes:?}" - ); - last_print = Instant::now(); - } + let Ok(ConsensusMessage::Vote(vote_message)) = + wincode::deserialize::(&dg.message) + else { + continue; + }; + let vote = vote_message.vote; + if !vote.is_notarization() { + continue; + } + let rank = vote_message.rank; + if rank >= contact_infos_owned.len() as u16 { + warn!( + "Received vote with rank {} which is greater than number of nodes {}", + rank, + contact_infos_owned.len() + ); + continue; + } + let slot = vote.slot(); + if slot <= last_notarized[rank as usize] { + continue; } - if done { - cancel.cancel(); + last_notarized[rank as usize] = slot; + num_new_notarized_votes[rank as usize] += 1; + done = num_new_notarized_votes.iter().all(|&x| x > num_new_votes); + if done || last_print.elapsed().as_secs() > 3 { + info!( + "{test_name_owned} waiting for {num_new_votes} new notarized votes.. \ + observed: {num_new_notarized_votes:?}" + ); + last_print = Instant::now(); } } } }); vote_listener.join().expect("Vote listener thread panicked"); - quic_server_thread - .join() - .expect("QUIC server thread panicked"); } pub fn check_no_new_roots( diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index e0bf90f1628..195f13970c4 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -911,9 +911,8 @@ impl LocalCluster { num_new_notarized_votes: usize, test_name: &str, socket_addr_space: SocketAddrSpace, - vote_listener_addr: std::net::UdpSocket, - validator_keys: &[Arc], - node_stakes: &[u64], + vote_listener_socket: std::net::UdpSocket, + listener_keypair: Keypair, ) { let alive_node_contact_infos = self.discover_nodes(socket_addr_space, test_name); info!("{test_name} looking for new notarized votes on all nodes"); @@ -921,9 +920,8 @@ impl LocalCluster { num_new_notarized_votes, &alive_node_contact_infos, test_name, - vote_listener_addr, - validator_keys, - node_stakes, + vote_listener_socket, + listener_keypair, ); info!("{test_name} done waiting for notarized votes"); } diff --git a/local-cluster/src/validator_configs.rs b/local-cluster/src/validator_configs.rs index ebd76009044..68a4047869b 100644 --- a/local-cluster/src/validator_configs.rs +++ b/local-cluster/src/validator_configs.rs @@ -83,6 +83,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { tvu_bls_sigverify_threads: config.tvu_bls_sigverify_threads, delay_leader_block_for_pending_fork: config.delay_leader_block_for_pending_fork, voting_service_test_override: config.voting_service_test_override.clone(), + votor_transport_mode: config.votor_transport_mode, repair_handler_type: config.repair_handler_type.clone(), snapshot_packager_niceness_adj: config.snapshot_packager_niceness_adj, } diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 94ab61b8af7..b7145729f35 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -4,7 +4,9 @@ use { SnapshotArchiveKind, SnapshotInterval, paths as snapshot_paths, snapshot_archive_info::SnapshotArchiveInfoGetter, snapshot_config::SnapshotConfig, }, - agave_votor::voting_service::{AlpenglowPortOverride, VotingServiceOverride}, + agave_votor::voting_service::{ + AdditionalListener, AlpenglowPortOverride, VotingServiceOverride, + }, agave_votor_messages::migration::MIGRATION_SLOT_OFFSET, assert_matches::assert_matches, crossbeam_channel::{Receiver, bounded}, @@ -5959,6 +5961,21 @@ fn test_restart_node_alpenglow() { /// We start 2 nodes, where the first node A holds 90% of the stake. /// We let A run by itself, and ensure that B can join and rejoin the network /// through repair. +/// Generate a fresh keypair whose pubkey is strictly greater than every +/// pubkey in `lower_bounds`. Alpenglow tests that attach a sniffer +/// endpoint via `AdditionalListener` need this so the lex-pubkey rule +/// routes every validator's dial of the sniffer correctly (lower +/// dials, higher listens). +fn keypair_above_all(lower_bounds: &[Pubkey]) -> Keypair { + let upper = lower_bounds.iter().max().copied().unwrap_or_default(); + loop { + let k = Keypair::new(); + if k.pubkey() > upper { + return k; + } + } +} + /// Verifies that a low-stake Alpenglow validator can fall behind, rejoin through repair, /// and resume notarized voting in an imbalanced-stake network. #[test] @@ -5984,23 +6001,31 @@ fn test_alpenglow_imbalanced_stakes_catchup() { leader_schedule: Arc::new(leader_schedule), }; + // Collect node pubkeys early so we can derive a listener identity + // whose pubkey is strictly greater than every validator's — the + // lex-pubkey rule then routes every validator's dial of the + // listener correctly (validator < listener). + let node_pubkeys = validator_keys + .iter() + .map(|key| key.node_keypair.pubkey()) + .collect::>(); + let listener_keypair = keypair_above_all(&node_pubkeys); + let listener_pubkey = listener_keypair.pubkey(); + // Create our UDP socket to listen to votes let vote_listener_addr = bind_to_localhost_unique().unwrap(); let mut validator_config = ValidatorConfig::default_for_test(); validator_config.fixed_leader_schedule = Some(leader_schedule); validator_config.voting_service_test_override = Some(VotingServiceOverride { - additional_listeners: vec![vote_listener_addr.local_addr().unwrap()], + additional_listeners: vec![AdditionalListener { + pubkey: listener_pubkey, + addr: vote_listener_addr.local_addr().unwrap(), + }], alpenglow_port_override: AlpenglowPortOverride::default(), }); validator_config.wait_for_supermajority = Some(0); - // Collect node pubkeys - let node_pubkeys = validator_keys - .iter() - .map(|key| key.node_keypair.pubkey()) - .collect::>(); - // Cluster config let mut cluster_config = ClusterConfig { mint_lamports: DEFAULT_MINT_LAMPORTS + node_stakes.iter().sum::(), @@ -6044,18 +6069,12 @@ fn test_alpenglow_imbalanced_stakes_catchup() { info!("restarting node B"); cluster.restart_node(&node_pubkeys[1], b_info, SocketAddrSpace::Unspecified); - // Ensure all nodes are voting - let validator_node_keypairs: Vec<_> = validator_keys - .iter() - .map(|k| k.node_keypair.clone()) - .collect(); cluster.check_for_new_notarized_votes( 16, "test_alpenglow_imbalanced_stakes_catchup", SocketAddrSpace::Unspecified, vote_listener_addr, - &validator_node_keypairs, - &node_stakes, + listener_keypair, ); } @@ -6173,17 +6192,26 @@ fn test_alpenglow_migration( ) { agave_logger::setup_with_default(AG_DEBUG_LOG_FILTER); + let (leader_schedule, keys) = create_custom_leader_schedule_with_random_keys(leader_schedule); + + // Listener pubkey must exceed every validator's pubkey so the + // lex-pubkey rule routes each validator's dial to the listener + // correctly. + let validator_pubkeys: Vec = keys.iter().map(|k| k.node_keypair.pubkey()).collect(); + let listener_keypair = keypair_above_all(&validator_pubkeys); + let vote_listener_socket = bind_to_localhost_unique().unwrap(); let vote_listener_addr = vote_listener_socket.try_clone().unwrap(); let mut validator_config = ValidatorConfig::default_for_test(); validator_config.voting_service_test_override = Some(VotingServiceOverride { - additional_listeners: vec![vote_listener_addr.local_addr().unwrap()], + additional_listeners: vec![AdditionalListener { + pubkey: listener_keypair.pubkey(), + addr: vote_listener_addr.local_addr().unwrap(), + }], alpenglow_port_override: AlpenglowPortOverride::default(), }); validator_config.wait_for_supermajority = Some(0); - let (leader_schedule, keys) = create_custom_leader_schedule_with_random_keys(leader_schedule); - validator_config.fixed_leader_schedule = Some(FixedSchedule { leader_schedule: Arc::new(leader_schedule), }); @@ -6215,12 +6243,8 @@ fn test_alpenglow_migration( // Create local cluster with alpenglow accounts but feature not activated let cluster = LocalCluster::new(&mut cluster_config, SocketAddrSpace::Unspecified); - let validator_keys: Vec> = cluster - .validators - .values() - .map(|v| v.info.keypair.clone()) - .collect(); - + // Send feature activation transaction + info!("Sending feature activation transaction"); let client = RpcClient::new_socket_with_commitment( cluster.entry_point_info.rpc().unwrap(), CommitmentConfig::processed(), @@ -6261,8 +6285,7 @@ fn test_alpenglow_migration( test_name, SocketAddrSpace::Unspecified, vote_listener_addr, - &validator_keys, - &node_stakes, + listener_keypair, ); // Additionally ensure that roots are being made diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 71ed14646f3..b811c8e3f1b 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -338,6 +338,7 @@ dependencies = [ "agave-math-utils", "agave-votor-messages", "bitvec", + "bytes", "crossbeam-channel", "itertools 0.14.0", "lazy-lru", @@ -373,6 +374,7 @@ dependencies = [ "solana-vote", "solana-vote-program", "thiserror 2.0.18", + "tokio", "wincode", ] @@ -7898,6 +7900,7 @@ dependencies = [ "tokio", ] + [[package]] name = "solana-rayon-threadlimit" version = "4.2.0-alpha.0" diff --git a/validator/src/admin_rpc_service.rs b/validator/src/admin_rpc_service.rs index 90b1d334966..aaa804aba63 100644 --- a/validator/src/admin_rpc_service.rs +++ b/validator/src/admin_rpc_service.rs @@ -1372,8 +1372,7 @@ mod tests { KeyUpdaterType::TpuVote, KeyUpdaterType::Forward, KeyUpdaterType::RpcService, - KeyUpdaterType::Bls, - KeyUpdaterType::BlsConnectionCache, + KeyUpdaterType::VotorDatagram, ]) ); let mut io = MetaIoHandler::default(); diff --git a/votor/Cargo.toml b/votor/Cargo.toml index 50794e2696d..5029f24e34f 100644 --- a/votor/Cargo.toml +++ b/votor/Cargo.toml @@ -32,13 +32,18 @@ frozen-abi = [ agave-logger = { workspace = true } agave-math-utils = { workspace = true } agave-votor-messages = { workspace = true } +arc-swap = { workspace = true } bitvec = { workspace = true } +bytes = { workspace = true } crossbeam-channel = { workspace = true } itertools = { workspace = true } lazy-lru = { workspace = true } log = { workspace = true } parking_lot = { workspace = true } qualifier_attr = { workspace = true } +quinn = { workspace = true } +quinn-proto = { workspace = true } +rustls = { workspace = true } serde = { workspace = true } serde_bytes = { workspace = true } solana-accounts-db = { workspace = true } @@ -61,6 +66,7 @@ solana-leader-schedule = { workspace = true } solana-ledger = { workspace = true } solana-measure = { workspace = true } solana-metrics = { workspace = true } +solana-net-utils = { workspace = true } solana-pubkey = { workspace = true } solana-rpc = { workspace = true } solana-runtime = { workspace = true } @@ -68,25 +74,26 @@ solana-signature = { workspace = true } solana-signer = { workspace = true } solana-signer-store = { workspace = true } solana-streamer = { workspace = true } +solana-tls-utils = { workspace = true } solana-time-utils = { workspace = true } solana-transaction = { workspace = true } solana-transaction-error = { workspace = true } solana-vote = { workspace = true } solana-vote-program = { workspace = true } thiserror = { workspace = true } +tokio = { workspace = true, features = ["rt-multi-thread", "macros", "sync", "time"] } +tokio-util = { workspace = true } wincode = { workspace = true, features = ["alloc"] } [dev-dependencies] agave-votor = { path = ".", features = ["agave-unstable-api", "dev-context-only-utils"] } rand = { workspace = true } -solana-net-utils = { path = "../net-utils", features = ["agave-unstable-api"] } solana-perf = { path = "../perf", features = ["agave-unstable-api", "dev-context-only-utils"] } solana-runtime = { path = "../runtime", features = ["agave-unstable-api", "dev-context-only-utils"] } solana-sdk-ids = { workspace = true } solana-streamer = { path = "../streamer", features = ["agave-unstable-api", "dev-context-only-utils"] } tempfile = { workspace = true } test-case = { workspace = true } -tokio-util = { workspace = true } [lints] workspace = true diff --git a/votor/src/datagram_endpoint.rs b/votor/src/datagram_endpoint.rs new file mode 100644 index 00000000000..49f27dc3736 --- /dev/null +++ b/votor/src/datagram_endpoint.rs @@ -0,0 +1,1017 @@ +//! Votor-specific QUIC datagram transport. + +use { + arc_swap::ArcSwap, + bytes::Bytes, + crossbeam_channel::{Sender, TrySendError}, + quinn::{ + AckFrequencyConfig, ClientConfig, Connection, Endpoint, EndpointConfig, IdleTimeout, + Incoming, SendDatagramError, ServerConfig, TokioRuntime, TransportConfig, VarInt, + congestion::{Controller, ControllerFactory}, + crypto::rustls::{QuicClientConfig, QuicServerConfig}, + }, + quinn_proto::RttEstimator, + rustls::pki_types::{CertificateDer, PrivateKeyDer}, + solana_keypair::{Keypair, Signer}, + solana_metrics::datapoint_info, + solana_net_utils::{banlist::Banlist, token_bucket::TokenBucket}, + solana_pubkey::Pubkey, + solana_runtime::bank_forks::SharableBanks, + solana_tls_utils::{ + NotifyKeyUpdate, get_remote_pubkey, new_dummy_x509_certificate, + socket_addr_to_quic_server_name, tls_client_config_builder, tls_server_config_builder, + }, + std::{ + any::Any, + collections::{HashMap, hash_map::Entry}, + net::{SocketAddr, UdpSocket}, + sync::{ + Arc, + atomic::{AtomicU64, Ordering}, + }, + time::{Duration, Instant}, + }, + tokio::{ + sync::{mpsc, watch}, + time::MissedTickBehavior, + }, + tokio_util::sync::CancellationToken, +}; + +const ALPENGLOW_ALPN: &[u8] = b"alpenglow-v1"; + +const MAX_PEERS: u64 = 2000; +const EGRESS_CHANNEL_CAP: usize = 4 * MAX_PEERS as usize; +const CONN_EVENT_CHANNEL_CAP: usize = MAX_PEERS as usize; +const MAX_INBOUND_CONNECTIONS_PER_PEER: usize = 2; +const HANDSHAKE_GLOBAL_BURST: u64 = 200; +const HANDSHAKE_GLOBAL_REFILL_PER_SECOND: f64 = 40.0; +const MAX_DATAGRAMS_PER_SECOND_PER_PEER: f64 = 30.0; +const PEER_RATE_LIMIT_BURST: u64 = 100; +const PEER_RATE_LIMIT_BURST_DOS: u64 = 100_000; +const BAN_DURATION_DOS: Duration = Duration::from_secs(48 * 60 * 60); +const ALLOWLIST_CHECK_INTERVAL: Duration = Duration::from_secs(10); +const BANLIST_PRUNE_INTERVAL: Duration = Duration::from_secs(60 * 60); +const METRICS_INTERVAL: Duration = Duration::from_secs(2); +const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(2); +const MAX_IDLE_TIMEOUT: Duration = Duration::from_secs(5); +const KEEP_ALIVE_INTERVAL: Duration = Duration::from_millis(600); +const MAX_ACK_DELAY: Duration = Duration::from_millis(400); +const INITIAL_MTU: u16 = 1280; +const MIN_MTU: u16 = 1280; +const NOP_CONGESTION_WINDOW: u64 = 8 * 1024 * 1024; +const DATAGRAM_RECEIVE_BUFFER: usize = MAX_PEERS as usize * 8; +const DATAGRAM_SEND_BUFFER: usize = MAX_PEERS as usize * 2; + +const CLOSE_INVALID_IDENTITY: VarInt = VarInt::from_u32(2); +const CLOSE_NOT_ADMITTED: VarInt = VarInt::from_u32(3); +const CLOSE_BANNED: VarInt = VarInt::from_u32(4); +const CLOSE_TABLE_FULL: VarInt = VarInt::from_u32(5); +const CLOSE_IDENTITY_ROTATED: VarInt = VarInt::from_u32(11); +const CLOSE_PEER_MOVED: VarInt = VarInt::from_u32(12); + +#[derive(Debug)] +pub struct Datagram { + pub peer_pubkey: Pubkey, + pub peer_address: SocketAddr, + pub message: Bytes, +} + +#[derive(Default)] +pub struct StakedNodesAllowlist { + inner: ArcSwap>, + allow_all: bool, +} + +impl StakedNodesAllowlist { + pub fn new(peers: HashMap) -> Self { + Self { + inner: ArcSwap::new(Arc::new(peers)), + allow_all: false, + } + } + + pub fn allow_all_for_tests() -> Self { + Self { + inner: ArcSwap::default(), + allow_all: true, + } + } + + pub fn swap(&self, peers: HashMap) { + self.inner.store(Arc::new(peers)); + } + + pub fn len(&self) -> usize { + self.inner.load().len() + } + + pub fn is_empty(&self) -> bool { + !self.allow_all && self.inner.load().is_empty() + } + + pub fn allow(&self, peer: &Pubkey) -> bool { + self.allow_all || self.inner.load().contains_key(peer) + } +} + +pub fn current_admit_set(banks: &SharableBanks) -> HashMap { + let bank = banks.working(); + let epoch = bank.epoch(); + bank.epoch_staked_nodes(epoch) + .map(|nodes| { + nodes + .iter() + .filter(|(_, stake)| **stake > 0) + .map(|(pubkey, stake)| (*pubkey, *stake)) + .collect() + }) + .unwrap_or_default() +} + +pub fn build_allowlist(banks: &SharableBanks) -> Arc { + Arc::new(StakedNodesAllowlist::new(current_admit_set(banks))) +} + +struct IdentitySnapshot { + pubkey: Pubkey, + cert: CertificateDer<'static>, + key: PrivateKeyDer<'static>, +} + +impl IdentitySnapshot { + fn from_keypair(keypair: &Keypair) -> Self { + let (cert, key) = new_dummy_x509_certificate(keypair); + Self { + pubkey: keypair.pubkey(), + cert, + key, + } + } +} + +pub struct KeyUpdater { + tx: watch::Sender>>, +} + +impl NotifyKeyUpdate for KeyUpdater { + fn update_key(&self, keypair: &Keypair) -> Result<(), Box> { + self.tx + .send(Some(Arc::new(IdentitySnapshot::from_keypair(keypair)))) + .map_err(|_| "votor datagram endpoint is shut down".into()) + } +} + +#[derive(Default)] +struct DatagramStats { + peak_connections: AtomicU64, + datagrams_sent: AtomicU64, + datagrams_received: AtomicU64, + connection_lost: AtomicU64, + connect_failed: AtomicU64, + egress_dropped_dial_in_progress: AtomicU64, + datagram_rate_limited: AtomicU64, + ingress_dropped_channel_full: AtomicU64, + handshake_rejected_global_limit: AtomicU64, + handshake_rejected_unauthorized: AtomicU64, + handshake_rejected_overload: AtomicU64, + connection_evicted_identity_rotated: AtomicU64, + connection_evicted_allowlist: AtomicU64, + connection_evicted_peer_moved: AtomicU64, +} + +impl DatagramStats { + fn record_connection_count(&self, count: u64) { + self.peak_connections.fetch_max(count, Ordering::Relaxed); + } +} + +fn add(metric: &AtomicU64) { + metric.fetch_add(1, Ordering::Relaxed); +} + +fn take_peak(stats: &DatagramStats, live_connections: u64) -> i64 { + stats + .peak_connections + .swap(live_connections, Ordering::Relaxed) + .max(live_connections) as i64 +} + +fn swap(metric: &AtomicU64) -> i64 { + metric.swap(0, Ordering::Relaxed) as i64 +} + +fn report_client(stats: &DatagramStats, live_connections: u64) { + datapoint_info!( + "votor_datagram_client", + ("connections_peak", take_peak(stats, live_connections), i64), + ("datagrams_sent", swap(&stats.datagrams_sent), i64), + ("connect_failed", swap(&stats.connect_failed), i64), + ("connection_lost", swap(&stats.connection_lost), i64), + ( + "egress_dropped_dial_in_progress", + swap(&stats.egress_dropped_dial_in_progress), + i64 + ), + ( + "connection_evicted_peer_moved", + swap(&stats.connection_evicted_peer_moved), + i64 + ), + ( + "connection_evicted_identity_rotated", + swap(&stats.connection_evicted_identity_rotated), + i64 + ), + ); +} + +fn report_server(stats: &DatagramStats, live_connections: u64) { + datapoint_info!( + "votor_datagram_server", + ("connections_peak", take_peak(stats, live_connections), i64), + ("datagrams_received", swap(&stats.datagrams_received), i64), + ("connect_failed", swap(&stats.connect_failed), i64), + ("connection_lost", swap(&stats.connection_lost), i64), + ( + "datagram_rate_limited", + swap(&stats.datagram_rate_limited), + i64 + ), + ( + "datagram_ingress_dropped_channel_full", + swap(&stats.ingress_dropped_channel_full), + i64 + ), + ( + "handshake_rejected_global_limit", + swap(&stats.handshake_rejected_global_limit), + i64 + ), + ( + "handshake_rejected_unauthorized", + swap(&stats.handshake_rejected_unauthorized), + i64 + ), + ( + "handshake_rejected_overload", + swap(&stats.handshake_rejected_overload), + i64 + ), + ( + "connection_evicted_allowlist", + swap(&stats.connection_evicted_allowlist), + i64 + ), + ( + "connection_evicted_identity_rotated", + swap(&stats.connection_evicted_identity_rotated), + i64 + ), + ); +} + +pub struct VotorDatagramEndpoint { + pub egress: mpsc::Sender, + pub key_updater: Arc, + shutdown: CancellationToken, +} + +impl VotorDatagramEndpoint { + #[allow(clippy::too_many_arguments)] + pub fn new( + runtime: &tokio::runtime::Handle, + keypair: &Keypair, + socket: UdpSocket, + ingress: Sender, + allowlist: Arc, + banlist: Arc>, + ) -> std::io::Result { + let local_pubkey = keypair.pubkey(); + let (cert, key) = new_dummy_x509_certificate(keypair); + let server_config = new_server_config(cert.clone(), key.clone_key()); + let client_config = new_client_config(cert, key); + + let mut endpoint = { + let _guard = runtime.enter(); + Endpoint::new( + EndpointConfig::default(), + Some(server_config), + socket, + Arc::new(TokioRuntime), + )? + }; + endpoint.set_default_client_config(client_config); + + let (egress_tx, egress_rx) = mpsc::channel(EGRESS_CHANNEL_CAP); + let (out_events_tx, out_events_rx) = mpsc::channel(CONN_EVENT_CHANNEL_CAP); + let (in_events_tx, in_events_rx) = mpsc::channel(CONN_EVENT_CHANNEL_CAP); + let (id_tx, identity_rx) = watch::channel(None); + let key_updater = Arc::new(KeyUpdater { tx: id_tx }); + let shutdown = CancellationToken::new(); + + runtime.spawn( + OutboundLoop { + endpoint: endpoint.clone(), + local_pubkey, + generation: 0, + egress_rx, + banlist: banlist.clone(), + identity_rx: identity_rx.clone(), + outgoing: HashMap::new(), + events_tx: out_events_tx, + events_rx: out_events_rx, + shutdown: shutdown.clone(), + stats: Arc::default(), + } + .run(), + ); + runtime.spawn( + InboundLoop { + endpoint, + generation: 0, + ingress, + allowlist, + banlist, + identity_rx, + incoming: HashMap::new(), + events_tx: in_events_tx, + events_rx: in_events_rx, + handshake_global_limiter: TokenBucket::new( + HANDSHAKE_GLOBAL_BURST, + HANDSHAKE_GLOBAL_BURST, + HANDSHAKE_GLOBAL_REFILL_PER_SECOND, + ), + stats: Arc::default(), + shutdown: shutdown.clone(), + } + .run(), + ); + + Ok(Self { + egress: egress_tx, + key_updater, + shutdown, + }) + } + + pub fn close(&self) { + self.shutdown.cancel(); + } +} + +enum OutgoingEntry { + Dialing, + Established(Connection), +} + +enum OutboundEvent { + Dialed { + peer: Pubkey, + generation: u64, + outcome: Option, + }, + Closed { + peer: Pubkey, + generation: u64, + stable_id: usize, + }, +} + +struct OutboundLoop { + endpoint: Endpoint, + local_pubkey: Pubkey, + generation: u64, + egress_rx: mpsc::Receiver, + banlist: Arc>, + identity_rx: watch::Receiver>>, + outgoing: HashMap, + events_tx: mpsc::Sender, + events_rx: mpsc::Receiver, + shutdown: CancellationToken, + stats: Arc, +} + +impl OutboundLoop { + async fn run(mut self) { + let mut metrics = tokio::time::interval(METRICS_INTERVAL); + metrics.set_missed_tick_behavior(MissedTickBehavior::Skip); + let mut identity_closed = false; + loop { + tokio::select! { + biased; + _ = self.shutdown.cancelled() => break, + changed = self.identity_rx.changed(), if !identity_closed => { + if changed.is_err() { + identity_closed = true; + continue; + } + let snapshot = self.identity_rx.borrow_and_update().clone(); + if let Some(snapshot) = snapshot { + self.apply_identity_change(snapshot); + } + } + Some(event) = self.events_rx.recv() => self.handle_event(event), + maybe_datagram = self.egress_rx.recv() => { + let Some(datagram) = maybe_datagram else { break }; + self.handle_datagram(datagram); + } + _ = metrics.tick() => report_client(&self.stats, self.outgoing.len() as u64), + } + } + } + + fn apply_identity_change(&mut self, snapshot: Arc) { + self.endpoint.set_default_client_config(new_client_config( + snapshot.cert.clone(), + snapshot.key.clone_key(), + )); + self.local_pubkey = snapshot.pubkey; + self.generation = self.generation.wrapping_add(1); + let evicted = self + .outgoing + .drain() + .filter_map(|(_, entry)| match entry { + OutgoingEntry::Established(conn) => Some(conn), + OutgoingEntry::Dialing => None, + }) + .inspect(|conn| conn.close(CLOSE_IDENTITY_ROTATED, b"IDENTITY_ROTATED")) + .count() as u64; + self.stats + .connection_evicted_identity_rotated + .fetch_add(evicted, Ordering::Relaxed); + } + + fn handle_datagram(&mut self, datagram: Datagram) { + let Datagram { + peer_pubkey: peer, + peer_address: addr, + message, + } = datagram; + if peer == self.local_pubkey || self.banlist.is_banned(&peer) { + return; + } + if self.send_outbound(peer, addr, &message) { + tokio::spawn( + OutboundDial { + endpoint: self.endpoint.clone(), + peer, + addr, + generation: self.generation, + trigger: message, + events: self.events_tx.clone(), + stats: self.stats.clone(), + } + .run(), + ); + } + } + + fn send_outbound(&mut self, peer: Pubkey, addr: SocketAddr, bytes: &Bytes) -> bool { + match self.outgoing.entry(peer) { + Entry::Vacant(slot) => { + slot.insert(OutgoingEntry::Dialing); + true + } + Entry::Occupied(mut slot) => match slot.get() { + OutgoingEntry::Dialing => { + add(&self.stats.egress_dropped_dial_in_progress); + false + } + OutgoingEntry::Established(conn) if conn.remote_address() == addr => { + match conn.send_datagram(bytes.clone()) { + Ok(()) => { + add(&self.stats.datagrams_sent); + false + } + Err(SendDatagramError::ConnectionLost(_)) => { + add(&self.stats.connection_lost); + *slot.get_mut() = OutgoingEntry::Dialing; + true + } + Err(_) => { + add(&self.stats.connect_failed); + false + } + } + } + OutgoingEntry::Established(_) => { + let old = std::mem::replace(slot.get_mut(), OutgoingEntry::Dialing); + if let OutgoingEntry::Established(conn) = old { + conn.close(CLOSE_PEER_MOVED, b"PEER_MOVED"); + add(&self.stats.connection_evicted_peer_moved); + } + true + } + }, + } + } + + fn handle_event(&mut self, event: OutboundEvent) { + let generation = match &event { + OutboundEvent::Dialed { generation, .. } | OutboundEvent::Closed { generation, .. } => { + *generation + } + }; + if generation != self.generation { + if let OutboundEvent::Dialed { + outcome: Some(conn), + .. + } = event + { + conn.close(CLOSE_IDENTITY_ROTATED, b"IDENTITY_ROTATED"); + add(&self.stats.connection_evicted_identity_rotated); + } + return; + } + + match event { + OutboundEvent::Dialed { + peer, + outcome: Some(conn), + .. + } => match self.outgoing.get_mut(&peer) { + Some(slot @ OutgoingEntry::Dialing) => { + *slot = OutgoingEntry::Established(conn.clone()); + self.stats + .record_connection_count(self.outgoing.len() as u64); + self.spawn_close_watcher(peer, conn); + } + _ => conn.close(CLOSE_IDENTITY_ROTATED, b"IDENTITY_ROTATED"), + }, + OutboundEvent::Dialed { + peer, + outcome: None, + .. + } => { + if let Entry::Occupied(slot) = self.outgoing.entry(peer) + && matches!(slot.get(), OutgoingEntry::Dialing) + { + slot.remove(); + } + } + OutboundEvent::Closed { + peer, stable_id, .. + } => { + if let Entry::Occupied(slot) = self.outgoing.entry(peer) + && matches!(slot.get(), OutgoingEntry::Established(conn) if conn.stable_id() == stable_id) + { + slot.remove(); + } + } + } + } + + fn spawn_close_watcher(&self, peer: Pubkey, conn: Connection) { + let events = self.events_tx.clone(); + let generation = self.generation; + tokio::spawn(async move { + let stable_id = conn.stable_id(); + conn.closed().await; + let _ = events + .send(OutboundEvent::Closed { + peer, + generation, + stable_id, + }) + .await; + }); + } +} + +struct OutboundDial { + endpoint: Endpoint, + peer: Pubkey, + addr: SocketAddr, + generation: u64, + trigger: Bytes, + events: mpsc::Sender, + stats: Arc, +} + +impl OutboundDial { + async fn run(self) { + let outcome = match self.dial().await { + Some(conn) => { + match conn.send_datagram(self.trigger) { + Ok(()) => add(&self.stats.datagrams_sent), + Err(SendDatagramError::ConnectionLost(_)) => add(&self.stats.connection_lost), + Err(_) => add(&self.stats.connect_failed), + } + Some(conn) + } + None => None, + }; + let _ = self + .events + .send(OutboundEvent::Dialed { + peer: self.peer, + generation: self.generation, + outcome, + }) + .await; + } + + async fn dial(&self) -> Option { + let server_name = socket_addr_to_quic_server_name(self.addr); + let connecting = self.endpoint.connect(self.addr, &server_name).ok()?; + let conn = match tokio::time::timeout(HANDSHAKE_TIMEOUT, connecting).await { + Ok(Ok(conn)) => conn, + Ok(Err(_)) | Err(_) => { + add(&self.stats.connect_failed); + return None; + } + }; + let Some(remote_pubkey) = get_remote_pubkey(&conn) else { + conn.close(CLOSE_INVALID_IDENTITY, b"INVALID_IDENTITY"); + add(&self.stats.connect_failed); + return None; + }; + if remote_pubkey != self.peer { + conn.close(CLOSE_INVALID_IDENTITY, b"INVALID_IDENTITY"); + add(&self.stats.connect_failed); + return None; + } + Some(conn) + } +} + +enum InboundEvent { + Accepted { + peer: Pubkey, + conn: Connection, + generation: u64, + }, + Closed { + peer: Pubkey, + generation: u64, + stable_id: usize, + }, +} + +struct InboundLoop { + endpoint: Endpoint, + generation: u64, + ingress: Sender, + allowlist: Arc, + banlist: Arc>, + identity_rx: watch::Receiver>>, + incoming: HashMap>, + events_tx: mpsc::Sender, + events_rx: mpsc::Receiver, + handshake_global_limiter: TokenBucket, + stats: Arc, + shutdown: CancellationToken, +} + +impl InboundLoop { + async fn run(mut self) { + let mut prune = tokio::time::interval(BANLIST_PRUNE_INTERVAL); + prune.set_missed_tick_behavior(MissedTickBehavior::Skip); + let mut metrics = tokio::time::interval(METRICS_INTERVAL); + metrics.set_missed_tick_behavior(MissedTickBehavior::Skip); + let mut identity_closed = false; + + loop { + tokio::select! { + biased; + _ = self.shutdown.cancelled() => break, + changed = self.identity_rx.changed(), if !identity_closed => { + if changed.is_err() { + identity_closed = true; + continue; + } + let snapshot = self.identity_rx.borrow_and_update().clone(); + if let Some(snapshot) = snapshot { + self.apply_identity_change(snapshot); + } + } + Some(event) = self.events_rx.recv() => self.handle_event(event), + maybe_incoming = self.endpoint.accept() => { + let Some(incoming) = maybe_incoming else { break }; + self.maybe_accept_connection(incoming); + } + _ = prune.tick() => self.banlist.prune(), + _ = metrics.tick() => report_server(&self.stats, self.incoming_len()), + } + } + } + + fn incoming_len(&self) -> u64 { + self.incoming.values().map(Vec::len).sum::() as u64 + } + + fn apply_identity_change(&mut self, snapshot: Arc) { + self.endpoint.set_server_config(Some(new_server_config( + snapshot.cert.clone(), + snapshot.key.clone_key(), + ))); + self.generation = self.generation.wrapping_add(1); + let evicted = self + .incoming + .drain() + .flat_map(|(_, conns)| conns) + .inspect(|conn| conn.close(CLOSE_IDENTITY_ROTATED, b"IDENTITY_ROTATED")) + .count() as u64; + self.stats + .connection_evicted_identity_rotated + .fetch_add(evicted, Ordering::Relaxed); + } + + fn maybe_accept_connection(&mut self, incoming: Incoming) { + let remote_addr = incoming.remote_address(); + if remote_addr.is_ipv6() || remote_addr.ip().is_multicast() { + incoming.ignore(); + return; + } + if !remote_addr.ip().is_loopback() + && self.handshake_global_limiter.consume_tokens(1).is_err() + { + add(&self.stats.handshake_rejected_global_limit); + incoming.ignore(); + return; + } + tokio::spawn( + InboundAccept { + incoming, + generation: self.generation, + events: self.events_tx.clone(), + stats: self.stats.clone(), + } + .run(), + ); + } + + fn handle_event(&mut self, event: InboundEvent) { + let generation = match &event { + InboundEvent::Accepted { generation, .. } | InboundEvent::Closed { generation, .. } => { + *generation + } + }; + if generation != self.generation { + if let InboundEvent::Accepted { conn, .. } = event { + conn.close(CLOSE_IDENTITY_ROTATED, b"IDENTITY_ROTATED"); + add(&self.stats.connection_evicted_identity_rotated); + } + return; + } + match event { + InboundEvent::Accepted { peer, conn, .. } => self.maybe_admit_inbound(peer, conn), + InboundEvent::Closed { + peer, stable_id, .. + } => self.reap_incoming(peer, stable_id), + } + } + + fn maybe_admit_inbound(&mut self, peer: Pubkey, conn: Connection) { + if self.banlist.is_banned(&peer) { + conn.close(CLOSE_BANNED, b"BANNED"); + add(&self.stats.handshake_rejected_unauthorized); + return; + } + if !self.allowlist.allow(&peer) { + conn.close(CLOSE_NOT_ADMITTED, b"NOT_ADMITTED"); + add(&self.stats.handshake_rejected_unauthorized); + return; + } + if self.insert_inbound(peer, conn.clone()).is_err() { + conn.close(CLOSE_TABLE_FULL, b"TABLE_FULL"); + add(&self.stats.handshake_rejected_overload); + return; + } + self.stats.record_connection_count(self.incoming_len()); + let remote_addr = conn.remote_address(); + tokio::spawn(read_datagram_loop( + conn, + peer, + remote_addr, + self.generation, + self.ingress.clone(), + self.allowlist.clone(), + self.banlist.clone(), + self.events_tx.clone(), + self.stats.clone(), + )); + } + + fn insert_inbound(&mut self, peer: Pubkey, conn: Connection) -> Result<(), ()> { + match self.incoming.entry(peer) { + Entry::Vacant(slot) => { + slot.insert(vec![conn]); + Ok(()) + } + Entry::Occupied(mut slot) => { + if slot.get().len() < MAX_INBOUND_CONNECTIONS_PER_PEER { + slot.get_mut().push(conn); + Ok(()) + } else { + Err(()) + } + } + } + } + + fn reap_incoming(&mut self, peer: Pubkey, stable_id: usize) { + if let Entry::Occupied(mut slot) = self.incoming.entry(peer) { + slot.get_mut().retain(|conn| conn.stable_id() != stable_id); + if slot.get().is_empty() { + slot.remove(); + } + } + } +} + +struct InboundAccept { + incoming: Incoming, + generation: u64, + events: mpsc::Sender, + stats: Arc, +} + +impl InboundAccept { + async fn run(self) { + let conn = match self.incoming.accept() { + Ok(connecting) => match tokio::time::timeout(HANDSHAKE_TIMEOUT, connecting).await { + Ok(Ok(conn)) => conn, + Ok(Err(_)) | Err(_) => { + add(&self.stats.connect_failed); + return; + } + }, + Err(_) => { + add(&self.stats.connect_failed); + return; + } + }; + let Some(peer) = get_remote_pubkey(&conn) else { + conn.close(CLOSE_INVALID_IDENTITY, b"INVALID_IDENTITY"); + add(&self.stats.connect_failed); + return; + }; + let _ = self + .events + .send(InboundEvent::Accepted { + peer, + conn, + generation: self.generation, + }) + .await; + } +} + +async fn read_datagram_loop( + connection: Connection, + peer: Pubkey, + remote_addr: SocketAddr, + generation: u64, + ingress: Sender, + allowlist: Arc, + banlist: Arc>, + events: mpsc::Sender, + stats: Arc, +) { + let stable_id = connection.stable_id(); + const RATE_LIMIT_WATERMARK: u64 = PEER_RATE_LIMIT_BURST_DOS - PEER_RATE_LIMIT_BURST; + let rate_limit = TokenBucket::new( + PEER_RATE_LIMIT_BURST_DOS, + PEER_RATE_LIMIT_BURST_DOS, + MAX_DATAGRAMS_PER_SECOND_PER_PEER, + ); + let mut allowlist_check = tokio::time::interval(ALLOWLIST_CHECK_INTERVAL); + allowlist_check.tick().await; + loop { + tokio::select! { + result = connection.read_datagram() => { + match result { + Ok(message) => { + if banlist.is_banned(&peer) { + connection.close(CLOSE_BANNED, b"BANNED"); + break; + } + match rate_limit.consume_tokens(1) { + Ok(remaining) if remaining > RATE_LIMIT_WATERMARK => {} + Ok(_) => { + add(&stats.datagram_rate_limited); + continue; + } + Err(_) => { + banlist.ban(peer, BAN_DURATION_DOS); + connection.close(CLOSE_BANNED, b"BANNED"); + break; + } + } + match ingress.try_send(Datagram { + peer_pubkey: peer, + peer_address: remote_addr, + message, + }) { + Ok(()) => add(&stats.datagrams_received), + Err(TrySendError::Full(_)) => add(&stats.ingress_dropped_channel_full), + Err(TrySendError::Disconnected(_)) => break, + } + } + Err(_) => { + add(&stats.connection_lost); + break; + } + } + } + _ = allowlist_check.tick() => { + if !allowlist.allow(&peer) { + connection.close(CLOSE_NOT_ADMITTED, b"NOT_ADMITTED"); + add(&stats.connection_evicted_allowlist); + break; + } + } + } + } + let _ = events + .send(InboundEvent::Closed { + peer, + generation, + stable_id, + }) + .await; +} + +#[derive(Clone)] +struct NopCongestion; + +impl Controller for NopCongestion { + fn on_congestion_event(&mut self, _: Instant, _: Instant, _: bool, _: u64) {} + + fn on_ack(&mut self, _: Instant, _: Instant, _: u64, _: bool, _: &RttEstimator) {} + + fn on_mtu_update(&mut self, _: u16) {} + + fn window(&self) -> u64 { + NOP_CONGESTION_WINDOW + } + + fn initial_window(&self) -> u64 { + NOP_CONGESTION_WINDOW + } + + fn clone_box(&self) -> Box { + Box::new(self.clone()) + } + + fn into_any(self: Box) -> Box { + self + } +} + +impl ControllerFactory for NopCongestion { + fn build(self: Arc, _: Instant, _: u16) -> Box { + Box::new(NopCongestion) + } +} + +fn new_transport_config() -> TransportConfig { + let max_idle = + IdleTimeout::try_from(MAX_IDLE_TIMEOUT).expect("MAX_IDLE_TIMEOUT fits IdleTimeout"); + let mut ack_freq = AckFrequencyConfig::default(); + ack_freq.max_ack_delay(Some(MAX_ACK_DELAY)); + ack_freq.ack_eliciting_threshold(VarInt::from_u32(512)); + ack_freq.reordering_threshold(VarInt::from_u32(0)); + + let mut config = TransportConfig::default(); + config + .datagram_receive_buffer_size(Some(DATAGRAM_RECEIVE_BUFFER)) + .datagram_send_buffer_size(DATAGRAM_SEND_BUFFER) + .initial_mtu(INITIAL_MTU) + .min_mtu(MIN_MTU) + .mtu_discovery_config(None) + .keep_alive_interval(Some(KEEP_ALIVE_INTERVAL)) + .max_idle_timeout(Some(max_idle)) + .ack_frequency_config(Some(ack_freq)) + .congestion_controller_factory(Arc::new(NopCongestion)) + .max_concurrent_bidi_streams(VarInt::from(0u8)) + .max_concurrent_uni_streams(VarInt::from(0u8)); + config +} + +fn new_server_config(cert: CertificateDer<'static>, key: PrivateKeyDer<'static>) -> ServerConfig { + let mut tls = tls_server_config_builder() + .with_single_cert(vec![cert], key) + .expect("rustls accepts votor datagram cert/key"); + tls.alpn_protocols = vec![ALPENGLOW_ALPN.to_vec()]; + let quic = QuicServerConfig::try_from(tls).expect("server TLS config is valid"); + let mut config = ServerConfig::with_crypto(Arc::new(quic)); + config + .transport_config(Arc::new(new_transport_config())) + .migration(false); + config +} + +fn new_client_config(cert: CertificateDer<'static>, key: PrivateKeyDer<'static>) -> ClientConfig { + let mut tls = tls_client_config_builder() + .with_client_auth_cert(vec![cert], key) + .expect("rustls accepts votor datagram cert/key"); + tls.enable_early_data = true; + tls.alpn_protocols = vec![ALPENGLOW_ALPN.to_vec()]; + let quic = QuicClientConfig::try_from(tls).expect("client TLS config is valid"); + let mut config = ClientConfig::new(Arc::new(quic)); + config.transport_config(Arc::new(new_transport_config())); + config +} diff --git a/votor/src/lib.rs b/votor/src/lib.rs index e3cf0a52fa9..d5ed9188cb8 100644 --- a/votor/src/lib.rs +++ b/votor/src/lib.rs @@ -9,6 +9,7 @@ pub mod common; pub mod consensus_metrics; pub mod consensus_pool; mod consensus_pool_service; +pub mod datagram_endpoint; pub mod event; mod event_handler; pub mod generated_cert_types; diff --git a/votor/src/staked_validators_cache.rs b/votor/src/staked_validators_cache.rs index 0912b78690c..fa1507369ff 100644 --- a/votor/src/staked_validators_cache.rs +++ b/votor/src/staked_validators_cache.rs @@ -1,21 +1,29 @@ +#[cfg(any(test, feature = "dev-context-only-utils"))] +use std::collections::HashSet; use { - crate::voting_service::AlpenglowPortOverride, + crate::{ + datagram_endpoint::{self, StakedNodesAllowlist}, + voting_service::AlpenglowPortOverride, + }, lazy_lru::LruCache, solana_clock::{Epoch, Slot}, solana_gossip::cluster_info::ClusterInfo, solana_pubkey::Pubkey, - solana_runtime::bank_forks::BankForks, + solana_runtime::bank_forks::SharableBanks, std::{ collections::HashMap, net::SocketAddr, - sync::{Arc, RwLock}, + sync::Arc, time::{Duration, Instant}, }, }; struct StakedValidatorsCacheEntry { - /// Alpenglow Sockets associated with the staked validators - alpenglow_sockets: Vec, + /// (Pubkey, Alpenglow socket) pairs for the staked validators. The pubkey + /// is the validator's node identity (same one signing TLS certs on the + /// alpenglow datagram endpoint). The socket is overridden via + /// `AlpenglowPortOverride` if a test override matches the pubkey. + peers: Vec<(Pubkey, SocketAddr)>, /// The time at which this entry was created creation_time: Instant, @@ -35,8 +43,9 @@ pub struct StakedValidatorsCache { /// Time to live for cache entries ttl: Duration, - /// Bank forks - bank_forks: Arc>, + /// Lock-free handle to the root/working banks. Cheap to clone and read, + /// avoids acquiring the `BankForks` `RwLock`. + sharable_banks: SharableBanks, /// Whether to include the running validator's socket address in cache entries include_self: bool, @@ -46,32 +55,66 @@ pub struct StakedValidatorsCache { /// timestamp of the last alpenglow port override we read alpenglow_port_override_last_modified: Instant, + + /// Allowlist for the votor datagram endpoint. + /// + /// `None` ⇒ disabled (used by tests / paths that don't need + /// allowlist gating). + allowlist: Option>, + + /// Extra pubkeys that should always be allowed even when not in + /// the staked-nodes set. Used by tests. + #[cfg(any(test, feature = "dev-context-only-utils"))] + extra_admit: HashSet, } impl StakedValidatorsCache { + #[allow(clippy::too_many_arguments)] pub fn new( - bank_forks: Arc>, + sharable_banks: SharableBanks, ttl: Duration, target_cache_size: usize, include_self: bool, alpenglow_port_override: Option, + allowlist: Option>, + #[cfg(any(test, feature = "dev-context-only-utils"))] extra_admit: HashSet, ) -> Self { Self { cache: LruCache::new(target_cache_size), ttl, - bank_forks, + sharable_banks, include_self, alpenglow_port_override, alpenglow_port_override_last_modified: Instant::now(), + allowlist, + #[cfg(any(test, feature = "dev-context-only-utils"))] + extra_admit, } } + /// Republish the allowlist consumed by the votor datagram endpoint: the + /// pubkey map plus the gossip-advertised alpenglow IPs of those peers. + fn refresh_allowlist(&mut self) { + let Some(allowlist) = self.allowlist.as_ref() else { + return; + }; + #[cfg(not(any(test, feature = "dev-context-only-utils")))] + let map = datagram_endpoint::current_admit_set(&self.sharable_banks); + #[cfg(any(test, feature = "dev-context-only-utils"))] + let map = { + let mut map = datagram_endpoint::current_admit_set(&self.sharable_banks); + // extra_admit entries (test-only probes) carry stake 0 — they are + // allowed by key presence, not stake weight. + map.extend(self.extra_admit.iter().map(|pk| (*pk, 0u64))); + map + }; + allowlist.swap(map); + } + #[inline] fn cur_epoch(&self, slot: Slot) -> Epoch { - self.bank_forks - .read() - .unwrap() - .working_bank() + self.sharable_banks + .working() .epoch_schedule() .get_epoch(slot) } @@ -82,10 +125,11 @@ impl StakedValidatorsCache { cluster_info: &ClusterInfo, update_time: Instant, ) { - let banks = { - let bank_forks = self.bank_forks.read().unwrap(); - [bank_forks.root_bank(), bank_forks.working_bank()] - }; + // Drive the allowlist refresh from here — same cadence, same Bank + // reads, and `cluster_info` is in hand for gossip IP resolution. + self.refresh_allowlist(); + + let banks = [self.sharable_banks.root(), self.sharable_banks.working()]; let epoch_staked_nodes = banks .iter() @@ -123,10 +167,11 @@ impl StakedValidatorsCache { }) .collect(); + nodes.sort_unstable_by_key(|node| node.alpenglow_socket); nodes.dedup_by_key(|node| node.alpenglow_socket); nodes.sort_unstable_by_key(|a| a.stake); - let mut alpenglow_sockets = Vec::with_capacity(nodes.len()); + let mut peers = Vec::with_capacity(nodes.len()); let override_map = self .alpenglow_port_override .as_ref() @@ -142,12 +187,12 @@ impl StakedValidatorsCache { } else { alpenglow_socket }; - alpenglow_sockets.push(socket); + peers.push((node.pubkey, socket)); } self.cache.put( epoch, StakedValidatorsCacheEntry { - alpenglow_sockets, + peers, creation_time: update_time, }, ); @@ -158,7 +203,7 @@ impl StakedValidatorsCache { slot: Slot, cluster_info: &ClusterInfo, access_time: Instant, - ) -> (&[SocketAddr], bool) { + ) -> (&[(Pubkey, SocketAddr)], bool) { // Check if self.alpenglow_port_override has a different last_modified. // Immediately refresh the cache if it does. if let Some(alpenglow_port_override) = &self.alpenglow_port_override { @@ -183,7 +228,7 @@ impl StakedValidatorsCache { epoch: Epoch, cluster_info: &ClusterInfo, access_time: Instant, - ) -> (&[SocketAddr], bool) { + ) -> (&[(Pubkey, SocketAddr)], bool) { // For a given epoch, if we either: // // (1) have a cache entry that has expired @@ -203,10 +248,7 @@ impl StakedValidatorsCache { ( // Unwrapping is fine here, since update_cache guarantees that we push a cache entry to // self.cache[epoch]. - self.cache - .get(&epoch) - .map(|v| &*v.alpenglow_sockets) - .unwrap(), + self.cache.get(&epoch).map(|v| &*v.peers).unwrap(), refresh_cache, ) } @@ -225,8 +267,8 @@ impl StakedValidatorsCache { #[cfg(test)] mod tests { use { - super::StakedValidatorsCache, - crate::voting_service::AlpenglowPortOverride, + super::{HashSet, StakedValidatorsCache}, + crate::{datagram_endpoint::StakedNodesAllowlist, voting_service::AlpenglowPortOverride}, rand::Rng, solana_gossip::{ cluster_info::ClusterInfo, contact_info::ContactInfo, crds::GossipRoute, @@ -370,7 +412,15 @@ mod tests { create_bank_forks_and_cluster_info(num_nodes, num_zero_stake_nodes, slot_num); // Create our staked validators cache - let mut svc = StakedValidatorsCache::new(bank_forks, Duration::from_secs(5), 5, true, None); + let mut svc = StakedValidatorsCache::new( + bank_forks.read().unwrap().sharable_banks(), + Duration::from_secs(5), + 5, + true, + None, + None, + HashSet::new(), + ); let now = Instant::now(); @@ -445,7 +495,15 @@ mod tests { let (bank_forks, cluster_info, _) = create_bank_forks_and_cluster_info(50, 7, base_slot); // Create our staked validators cache - let mut svc = StakedValidatorsCache::new(bank_forks, Duration::from_secs(5), 5, true, None); + let mut svc = StakedValidatorsCache::new( + bank_forks.read().unwrap().sharable_banks(), + Duration::from_secs(5), + 5, + true, + None, + None, + HashSet::new(), + ); assert_eq!(0, svc.len()); assert!(svc.is_empty()); @@ -517,7 +575,15 @@ mod tests { create_bank_forks_and_cluster_info(num_nodes, num_zero_stake_nodes, slot_num); // Create our staked validators cache - let mut svc = StakedValidatorsCache::new(bank_forks, Duration::from_secs(5), 5, true, None); + let mut svc = StakedValidatorsCache::new( + bank_forks.read().unwrap().sharable_banks(), + Duration::from_secs(5), + 5, + true, + None, + None, + HashSet::new(), + ); let now = Instant::now(); @@ -546,23 +612,35 @@ mod tests { .unwrap(); // Create our staked validators cache - set include_self to true - let mut svc = - StakedValidatorsCache::new(bank_forks.clone(), Duration::from_secs(5), 5, true, None); + let mut svc = StakedValidatorsCache::new( + bank_forks.read().unwrap().sharable_banks(), + Duration::from_secs(5), + 5, + true, + None, + None, + HashSet::new(), + ); - let (sockets, _) = - svc.get_staked_validators_by_slot(slot_num, &cluster_info, Instant::now()); - assert_eq!(sockets.len(), num_nodes); - assert!(sockets.contains(&my_socket_addr)); + let (peers, _) = svc.get_staked_validators_by_slot(slot_num, &cluster_info, Instant::now()); + assert_eq!(peers.len(), num_nodes); + assert!(peers.iter().any(|(_, s)| s == &my_socket_addr)); // Create our staked validators cache - set include_self to false - let mut svc = - StakedValidatorsCache::new(bank_forks, Duration::from_secs(5), 5, false, None); + let mut svc = StakedValidatorsCache::new( + bank_forks.read().unwrap().sharable_banks(), + Duration::from_secs(5), + 5, + false, + None, + None, + HashSet::new(), + ); - let (sockets, _) = - svc.get_staked_validators_by_slot(slot_num, &cluster_info, Instant::now()); + let (peers, _) = svc.get_staked_validators_by_slot(slot_num, &cluster_info, Instant::now()); // We should have num_nodes - 1 sockets, since we exclude our own socket address. - assert_eq!(sockets.len(), num_nodes.checked_sub(1).unwrap()); - assert!(!sockets.contains(&my_socket_addr)); + assert_eq!(peers.len(), num_nodes.checked_sub(1).unwrap()); + assert!(!peers.iter().any(|(_, s)| s == &my_socket_addr)); } #[test] @@ -576,31 +654,72 @@ mod tests { // Create our staked validators cache - set include_self to false let mut svc = StakedValidatorsCache::new( - bank_forks, + bank_forks.read().unwrap().sharable_banks(), Duration::from_secs(5), 5, false, Some(alpenglow_port_override.clone()), + None, + HashSet::new(), ); // Nothing in the override, so we should get the original socket addresses. - let (sockets, _) = svc.get_staked_validators_by_slot(0, &cluster_info, Instant::now()); - assert_eq!(sockets.len(), 2); - assert!(!sockets.contains(&blackhole_addr)); + let (peers, _) = svc.get_staked_validators_by_slot(0, &cluster_info, Instant::now()); + assert_eq!(peers.len(), 2); + assert!(!peers.iter().any(|(_, s)| s == &blackhole_addr)); // Add an override for pubkey_B, and check that we get the overridden socket address. alpenglow_port_override.update_override(HashMap::from([(pubkey_b, blackhole_addr)])); - let (sockets, _) = svc.get_staked_validators_by_slot(0, &cluster_info, Instant::now()); - assert_eq!(sockets.len(), 2); - // Sort sockets to ensure the blackhole address is at index 0. - let mut sockets: Vec<_> = sockets.to_vec(); + let (peers, _) = svc.get_staked_validators_by_slot(0, &cluster_info, Instant::now()); + assert_eq!(peers.len(), 2); + // Sort peers by socket to ensure the blackhole address is at index 0. + let mut sockets: Vec = peers.iter().map(|(_, s)| *s).collect(); sockets.sort(); assert_eq!(sockets[0], blackhole_addr); assert_ne!(sockets[1], blackhole_addr); // Now clear the override, and check that we get the original socket addresses. alpenglow_port_override.clear(); - let (sockets, _) = svc.get_staked_validators_by_slot(0, &cluster_info, Instant::now()); - assert_eq!(sockets.len(), 2); - assert!(!sockets.contains(&blackhole_addr)); + let (peers, _) = svc.get_staked_validators_by_slot(0, &cluster_info, Instant::now()); + assert_eq!(peers.len(), 2); + assert!(!peers.iter().any(|(_, s)| s == &blackhole_addr)); + } + + /// A cache refresh must publish the staked-pubkey set into the allowlist. + #[test] + fn test_allowlist_populated_from_gossip() { + let slot_num = 325_000_000_u64; + let num_nodes = 10_usize; + let num_zero_stake_nodes = 3_usize; + let (bank_forks, cluster_info, pubkeys) = + create_bank_forks_and_cluster_info(num_nodes, num_zero_stake_nodes, slot_num); + + let allowlist = Arc::new(StakedNodesAllowlist::default()); + let mut svc = StakedValidatorsCache::new( + bank_forks.read().unwrap().sharable_banks(), + Duration::from_secs(5), + 5, + true, // include_self + None, + Some(allowlist.clone()), + HashSet::new(), + ); + + // Nothing published before the first refresh: the gate denies everyone. + assert!(allowlist.is_empty()); + + // A refresh republishes the allowlist from the bank + gossip. + let _ = svc.get_staked_validators_by_slot(slot_num, &cluster_info, Instant::now()); + + // Staked peers admitted, zero-stake peers rejected. + for (ix, pk) in pubkeys.iter().enumerate() { + if ix < num_zero_stake_nodes { + assert!( + !allowlist.allow(pk), + "zero-stake node {ix} must not be admitted" + ); + } else { + assert!(allowlist.allow(pk), "staked node {ix} must be admitted"); + } + } } } diff --git a/votor/src/voting_service.rs b/votor/src/voting_service.rs index 09cf5db9fa9..d388338d7ae 100644 --- a/votor/src/voting_service.rs +++ b/votor/src/voting_service.rs @@ -1,5 +1,8 @@ +#[cfg(any(test, feature = "dev-context-only-utils"))] +use std::collections::HashSet; use { crate::{ + datagram_endpoint::{Datagram, StakedNodesAllowlist}, staked_validators_cache::StakedValidatorsCache, vote_history_storage::{SavedVoteHistoryVersions, VoteHistoryStorage}, }, @@ -7,6 +10,7 @@ use { certificate::Certificate, consensus_message::{ConsensusMessage, VoteMessage}, }, + bytes::Bytes, crossbeam_channel::Receiver, solana_client::connection_cache::ConnectionCache, solana_clock::Slot, @@ -23,6 +27,7 @@ use { thread::{self, Builder, JoinHandle}, time::{Duration, Instant}, }, + tokio::sync::mpsc, }; const STAKED_VALIDATORS_CACHE_TTL_S: u64 = 5; @@ -42,6 +47,37 @@ pub enum BLSOp { }, } +pub struct VotingService { + thread_hdl: JoinHandle<()>, +} + +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] +pub enum VotorTransportMode { + QuicStream, + #[default] + QuicDatagram, +} + +#[derive(Clone)] +pub enum VotorSendMode { + Stream { + connection_cache: Arc, + }, + Datagram { + egress: mpsc::Sender, + allowlist: Option>, + }, +} + +impl VotorSendMode { + fn datagram_allowlist(&self) -> Option> { + match self { + Self::Stream { .. } => None, + Self::Datagram { allowlist, .. } => allowlist.clone(), + } + } +} + fn send_message( buf: Vec, socket: &SocketAddr, @@ -52,10 +88,6 @@ fn send_message( client.send_data_async(Arc::new(buf)) } -pub struct VotingService { - thread_hdl: JoinHandle<()>, -} - /// Override for Alpenglow ports to allow testing with different ports /// The last_modified is used to determine if the override has changed so /// StakedValidatorsCache can refresh its cache. @@ -111,18 +143,29 @@ impl AlpenglowPortOverride { } } +/// Test-only additional listener: the voting service will fan out every +/// consensus message to this `(pubkey, addr)` peer alongside the live +/// staked-validators list. Used by unit tests and local-cluster fixtures +/// to attach a probe endpoint. +#[derive(Clone, Debug)] +pub struct AdditionalListener { + pub pubkey: Pubkey, + pub addr: SocketAddr, +} + #[derive(Clone)] pub struct VotingServiceOverride { - pub additional_listeners: Vec, + pub additional_listeners: Vec, pub alpenglow_port_override: AlpenglowPortOverride, } impl VotingService { + #[allow(clippy::too_many_arguments)] pub fn new( bls_receiver: Receiver, cluster_info: Arc, vote_history_storage: Arc, - connection_cache: Arc, + send_mode: VotorSendMode, bank_forks: Arc>, test_override: Option, ) -> Self { @@ -134,15 +177,26 @@ impl VotingService { }) => (additional_listeners, Some(alpenglow_port_override)), }; + // Additional-listener pubkeys are test-only sniffers / probes — + // they aren't in the staked-nodes set but we still want to dial + // them from this validator's client side. Union them into the + // allowlist via the cache. + #[cfg(any(test, feature = "dev-context-only-utils"))] + let extra_admit: HashSet = additional_listeners.iter().map(|l| l.pubkey).collect(); + let allowlist = send_mode.datagram_allowlist(); + let thread_hdl = Builder::new() .name("solVotorVoteSvc".to_string()) .spawn(move || { let mut staked_validators_cache = StakedValidatorsCache::new( - bank_forks.clone(), + bank_forks.read().unwrap().sharable_banks(), Duration::from_secs(STAKED_VALIDATORS_CACHE_TTL_S), STAKED_VALIDATORS_CACHE_NUM_EPOCH_TARGET, false, alpenglow_port_override, + allowlist, + #[cfg(any(test, feature = "dev-context-only-utils"))] + extra_admit, ); info!("AlpenglowVotingService has started"); @@ -151,7 +205,7 @@ impl VotingService { &cluster_info, vote_history_storage.as_ref(), bls_op, - &connection_cache, + &send_mode, &additional_listeners, &mut staked_validators_cache, ); @@ -166,8 +220,8 @@ impl VotingService { slot: Slot, cluster_info: &ClusterInfo, message: &ConsensusMessage, - connection_cache: &ConnectionCache, - additional_listeners: &[SocketAddr], + send_mode: &VotorSendMode, + additional_listeners: &[AdditionalListener], staked_validators_cache: &mut StakedValidatorsCache, ) { let buf = match wincode::serialize(message) { @@ -177,19 +231,51 @@ impl VotingService { return; } }; + let (staked_peers, _) = staked_validators_cache.get_staked_validators_by_slot( + slot, + cluster_info, + Instant::now(), + ); + + match send_mode { + VotorSendMode::Stream { connection_cache } => { + let sockets = additional_listeners + .iter() + .map(|listener| listener.addr) + .chain(staked_peers.iter().map(|(_, addr)| *addr)); - let (staked_validator_alpenglow_sockets, _) = staked_validators_cache - .get_staked_validators_by_slot(slot, cluster_info, Instant::now()); - let sockets = additional_listeners - .iter() - .chain(staked_validator_alpenglow_sockets.iter()); - - // We use send_message in a loop right now because we worry that sending packets too fast - // will cause a packet spike and overwhelm the network. If we later find out that this is - // not an issue, we can optimize this by using multi_targret_send or similar methods. - for socket in sockets { - if let Err(e) = send_message(buf.clone(), socket, connection_cache) { - warn!("Failed to send alpenglow message to {socket}: {e:?}"); + for socket in sockets { + if let Err(e) = send_message(buf.clone(), &socket, connection_cache) { + warn!("Failed to send alpenglow message to {socket}: {e:?}"); + } + } + } + VotorSendMode::Datagram { egress, .. } => { + let buf = Bytes::from(buf); + let peers = additional_listeners + .iter() + .map(|listener| (listener.pubkey, listener.addr)) + .chain(staked_peers.iter().copied()); + + // Drop on full / closed — votor consensus tolerates loss; we + // never want to backpressure into vote production. + for (pubkey, addr) in peers { + let result = egress.try_send(Datagram { + peer_pubkey: pubkey, + peer_address: addr, + message: buf.clone(), + }); + match result { + Ok(()) => {} + Err(mpsc::error::TrySendError::Full(_)) => { + warn!("alpenglow egress channel full; dropping vote/cert"); + } + Err(mpsc::error::TrySendError::Closed(_)) => { + warn!("alpenglow egress channel closed; endpoint shutting down"); + return; + } + } + } } } } @@ -198,8 +284,8 @@ impl VotingService { cluster_info: &ClusterInfo, vote_history_storage: &dyn VoteHistoryStorage, bls_op: BLSOp, - connection_cache: &ConnectionCache, - additional_listeners: &[SocketAddr], + send_mode: &VotorSendMode, + additional_listeners: &[AdditionalListener], staked_validators_cache: &mut StakedValidatorsCache, ) { match bls_op { @@ -220,7 +306,7 @@ impl VotingService { slot, cluster_info, &msg, - connection_cache, + send_mode, additional_listeners, staked_validators_cache, ); @@ -233,7 +319,7 @@ impl VotingService { slot, cluster_info, &message, - connection_cache, + send_mode, additional_listeners, staked_validators_cache, ); @@ -248,9 +334,11 @@ impl VotingService { } #[cfg(test)] +#[allow(clippy::arithmetic_side_effects)] mod tests { use { super::*, + crate::datagram_endpoint::VotorDatagramEndpoint, crate::vote_history_storage::{ NullVoteHistoryStorage, SavedVoteHistory, SavedVoteHistoryVersions, }, @@ -259,11 +347,12 @@ mod tests { consensus_message::{ConsensusMessage, VoteMessage}, vote::Vote, }, - crossbeam_channel::bounded, + bytes::Bytes, solana_bls_signatures::{BLS_SIGNATURE_AFFINE_SIZE, Signature as BLSSignature}, - solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo}, + solana_gossip::contact_info::ContactInfo, solana_keypair::Keypair, - solana_net_utils::{SocketAddrSpace, sockets::bind_to_localhost_unique}, + solana_net_utils::{SocketAddrSpace, banlist::Banlist, sockets::bind_to_localhost_unique}, + solana_pubkey::Pubkey as PubkeyAlias, solana_runtime::{ bank::Bank, bank_forks::BankForks, @@ -272,24 +361,58 @@ mod tests { }, }, solana_signer::Signer, - solana_streamer::{ - nonblocking::swqos::SwQosConfig, - quic::{QuicStreamerConfig, SpawnServerResult, spawn_stake_weighted_qos_server}, - streamer::StakedNodes, - }, - std::{ - net::SocketAddr, - sync::{Arc, RwLock}, - }, test_case::test_case, - tokio_util::sync::CancellationToken, + tokio::runtime::Runtime, }; + /// Generate a keypair whose pubkey is strictly less than `upper`. + /// The voting-service test relies on the lex-pubkey direction rule: + /// the client (lower pubkey) dials the listener (higher pubkey). + fn keypair_below(upper: &PubkeyAlias) -> Keypair { + loop { + let k = Keypair::new(); + if &k.pubkey() < upper { + return k; + } + } + } + + /// Spin up an in-process quic-datagram endpoint with the given keypair + /// and allowlist. Returns (endpoint, ingress_rx, bound_addr, runtime). + fn spawn_endpoint( + keypair: Keypair, + allowlist: Arc, + ) -> ( + VotorDatagramEndpoint, + crossbeam_channel::Receiver, + SocketAddr, + Runtime, + ) { + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .expect("tokio runtime"); + let socket = bind_to_localhost_unique().expect("bind UDP"); + let addr = socket.local_addr().expect("local addr"); + let (ingress_tx, ingress_rx) = crossbeam_channel::bounded(4096); + let banlist = Arc::new(Banlist::::default()); + let endpoint = VotorDatagramEndpoint::new( + rt.handle(), + &keypair, + socket, + ingress_tx, + allowlist, + banlist, + ) + .expect("VotorDatagramEndpoint::new"); + (endpoint, ingress_rx, addr, rt) + } + fn create_voting_service( bls_receiver: Receiver, - listener: SocketAddr, + listener: AdditionalListener, + egress: mpsc::Sender, ) -> (VotingService, Vec) { - // Create 10 node validatorvotekeypairs vec let validator_keypairs = (0..10) .map(|_| ValidatorVoteKeypairs::new_rand()) .collect::>(); @@ -313,10 +436,10 @@ mod tests { bls_receiver, Arc::new(cluster_info), Arc::new(NullVoteHistoryStorage::default()), - Arc::new(ConnectionCache::new_quic( - "TestAlpenglowConnectionCache", - 10, - )), + VotorSendMode::Datagram { + egress, + allowlist: None, // no allowlist gating in this unit test + }, bank_forks, Some(VotingServiceOverride { additional_listeners: vec![listener], @@ -352,60 +475,76 @@ mod tests { }))] fn test_send_message(bls_op: BLSOp, expected_message: ConsensusMessage) { agave_logger::setup(); - let (bls_sender, bls_receiver) = bounded(1024); - // Create listener thread on a random port we allocated and return SocketAddr to create VotingService - - // Bind to a random UDP port - let socket = bind_to_localhost_unique().unwrap(); - let listener_addr = socket.local_addr().unwrap(); - - // Create VotingService with the listener address - let (_, validator_keypairs) = create_voting_service(bls_receiver, listener_addr); - - // Send a BLS message via the VotingService - assert!(bls_sender.send(bls_op).is_ok()); - - // Start a quick streamer to handle quick control packets - let (sender, receiver) = bounded(1024); - let stakes = validator_keypairs - .iter() - .map(|x| (x.node_keypair.pubkey(), 100)) - .collect(); - let staked_nodes: Arc> = Arc::new(RwLock::new(StakedNodes::new( - Arc::new(stakes), - HashMap::::default(), // overrides - ))); - let cancel = CancellationToken::new(); - let SpawnServerResult { - endpoints: _, - thread: quic_server_thread, - key_updater: _, - } = spawn_stake_weighted_qos_server( - "AlpenglowLocalClusterTest", - "voting_service_test", - [socket.into()], - &Keypair::new(), - sender, - staked_nodes, - QuicStreamerConfig::default_for_tests(), - SwQosConfig::default(), - cancel.clone(), - ) - .unwrap(); - - let packets = receiver.recv().unwrap(); - let packet = packets.first().expect("No packets received"); - let received_message = packet - .deserialize_slice::(..) - .unwrap_or_else(|err| { - panic!( - "Failed to deserialize BLSMessage: {:?} {:?}", - size_of::(), - err - ) + + // Listener identity is generated first; the client (lower pubkey) + // then dials it per the lex-pubkey direction rule. Use an allow-all test + // allowlist for both ends. The test is exercising the egress + // path, not the allowlist policy. + let listener_kp = Keypair::new(); + let listener_pubkey = listener_kp.pubkey(); + let client_kp = keypair_below(&listener_pubkey); + let (endpoint, ingress_rx, listener_addr, _rt) = spawn_endpoint( + listener_kp, + Arc::new(StakedNodesAllowlist::allow_all_for_tests()), + ); + + let (client_endpoint, _client_ingress_rx, _client_addr, _client_rt) = spawn_endpoint( + client_kp, + Arc::new(StakedNodesAllowlist::allow_all_for_tests()), + ); + let egress = client_endpoint.egress.clone(); + + let (bls_sender, bls_receiver) = crossbeam_channel::unbounded(); + let listener = AdditionalListener { + pubkey: listener_pubkey, + addr: listener_addr, + }; + let (_voting_service, _validator_keypairs) = + create_voting_service(bls_receiver, listener, egress.clone()); + + // Warm the QUIC connection. The voting service issues exactly one + // `try_send` per BLS op; the first send to a fresh peer is consumed + // as the dial trigger and dropped on the floor. Drive a poll-loop + // here so that by the time the real op arrives, the slot holds an + // `Established`. 500ms is generous for a localhost handshake. + let warmup = Bytes::from_static(b"warmup"); + let warmup_deadline = Instant::now() + Duration::from_millis(500); + loop { + let _ = egress.try_send(Datagram { + peer_pubkey: listener_pubkey, + peer_address: listener_addr, + message: warmup.clone(), }); - assert_eq!(received_message, expected_message); - cancel.cancel(); - quic_server_thread.join().unwrap(); + if ingress_rx.recv_timeout(Duration::from_millis(50)).is_ok() { + break; + } + assert!( + Instant::now() < warmup_deadline, + "warmup datagram did not reach listener within 500ms; dial never completed", + ); + } + + // Send the BLS op through. The cached `Established` carries it. + bls_sender.send(bls_op).expect("bls_sender.send"); + + // The listener endpoint should receive the serialized + // ConsensusMessage. Drain ingress until we see a match. + let deadline = Instant::now() + Duration::from_secs(5); + let received = loop { + let remaining = deadline.saturating_duration_since(Instant::now()); + if remaining.is_zero() { + panic!("listener never received the message"); + } + let recv_result = ingress_rx.recv_timeout(remaining); + if let Ok(dg) = recv_result { + if let Ok(msg) = wincode::deserialize::(&dg.message) { + break msg; + } + } + }; + assert_eq!(received, expected_message); + + endpoint.close(); + client_endpoint.close(); } } diff --git a/votor/tests/identity_rotation.rs b/votor/tests/identity_rotation.rs new file mode 100644 index 00000000000..f8e53480f91 --- /dev/null +++ b/votor/tests/identity_rotation.rs @@ -0,0 +1,167 @@ +#![allow(clippy::arithmetic_side_effects)] + +use { + agave_votor::datagram_endpoint::{Datagram, StakedNodesAllowlist, VotorDatagramEndpoint}, + bytes::Bytes, + crossbeam_channel::Receiver, + solana_keypair::{Keypair, Signer}, + solana_net_utils::{banlist::Banlist, sockets::bind_to_localhost_unique}, + solana_pubkey::Pubkey, + solana_tls_utils::NotifyKeyUpdate, + std::{ + collections::HashMap, + sync::Arc, + time::{Duration, Instant}, + }, + tokio::runtime::Runtime, +}; + +/// Pick a keypair whose pubkey is strictly less than `upper`. Used so +/// the client always plays the lex-correct dialer role. +fn keypair_below(upper: &Pubkey) -> Keypair { + loop { + let k = Keypair::new(); + if &k.pubkey() < upper { + return k; + } + } +} + +/// Re-send `payload` on `egress` every 50ms until `rx` observes an +/// item satisfying `cond`, or `timeout` elapses. Mirrors the helper in +/// `datagram_endpoint` tests: the first egress to a fresh peer +/// is consumed as the dial trigger and dropped; subsequent retries +/// ride the resulting `Established`. Identity rotation evicts the +/// table, so the post-rotation send also needs the same retry loop. +fn send_until_received( + rt: &Runtime, + egress: &tokio::sync::mpsc::Sender, + target_pk: Pubkey, + target_addr: std::net::SocketAddr, + payload: Bytes, + rx: &Receiver, + timeout: Duration, + mut cond: impl FnMut(&Datagram) -> Option, + msg: &str, +) -> T { + const POLL_INTERVAL: Duration = Duration::from_millis(50); + let deadline = Instant::now() + timeout; + while Instant::now() < deadline { + rt.block_on(async { + let _ = egress.try_send(Datagram { + peer_pubkey: target_pk, + peer_address: target_addr, + message: payload.clone(), + }); + }); + if let Ok(item) = rx.recv_timeout(POLL_INTERVAL) + && let Some(t) = cond(&item) + { + return t; + } + } + panic!("{msg}"); +} + +/// Identity rotation exercised through the votor wrapper. +/// +/// This test confirms that when we are rotating identity through +/// `key_updater.update_key(&new_kp)`, it evicts cached connections +/// with `IDENTITY_ROTATED`, swaps the local pubkey, +/// and accepts subsequent traffic attributed to the new identity. +#[test] +fn identity_rotation_via_votor_wrapper() { + let rt = Runtime::new().expect("tokio runtime"); + + // Server endpoint with an allow-all test allowlist so any client pubkey is + // accepted — we're testing the rotation path, not the allowlist. + let server_kp = Keypair::new(); + let server_pubkey = server_kp.pubkey(); + let server_socket = bind_to_localhost_unique().expect("server bind"); + let server_addr = server_socket.local_addr().expect("server addr"); + let (server_ingress_tx, server_ingress_rx) = crossbeam_channel::bounded(4096); + let server_banlist = Arc::new(Banlist::::default()); + let server = VotorDatagramEndpoint::new( + rt.handle(), + &server_kp, + server_socket, + server_ingress_tx, + Arc::new(StakedNodesAllowlist::allow_all_for_tests()), + server_banlist, + ) + .expect("server endpoint"); + + // Client endpoint with K1 — strictly below the server so the + // lex-correct dial goes client → server. + let k1 = keypair_below(&server_pubkey); + let k1_pubkey = k1.pubkey(); + let client_socket = bind_to_localhost_unique().expect("client bind"); + let (client_ingress_tx, _client_ingress_rx) = crossbeam_channel::bounded(4096); + let client_banlist = Arc::new(Banlist::::default()); + // Client allowlist must include server_pubkey so its dial-side + // allowlist check passes. Use a StakedNodesAllowlist populated + // with the server's pubkey (mirrors how the cache would seed it + // from BankForks's staked_nodes). + let admit: HashMap<_, _> = std::iter::once((server_pubkey, 100u64)).collect(); + let client = VotorDatagramEndpoint::new( + rt.handle(), + &k1, + client_socket, + client_ingress_tx, + Arc::new(StakedNodesAllowlist::new(admit)), + client_banlist, + ) + .expect("client endpoint"); + + // Send one message under K1. Server should observe it attributed + // to K1's pubkey. Retry until landed: the first send to a fresh + // peer is the dial trigger and is dropped. + let p1 = Bytes::from_static(b"under-K1"); + send_until_received( + &rt, + &client.egress, + server_pubkey, + server_addr, + p1.clone(), + &server_ingress_rx, + Duration::from_secs(5), + |d| (d.peer_pubkey == k1_pubkey && d.message == p1).then_some(()), + "server never received message attributed to K1", + ); + + // Rotate to K2 via the wrapper's KeyUpdater handle. K2 must also + // be lex-below the server so the dial after rotation still goes + // the right direction. + let k2 = keypair_below(&server_pubkey); + let k2_pubkey = k2.pubkey(); + assert_ne!(k1_pubkey, k2_pubkey, "K1 and K2 must differ"); + NotifyKeyUpdate::update_key(client.key_updater.as_ref(), &k2) + .expect("key updater accepts rotation"); + + // Give the control loop a beat to apply the rotation (rebuild TLS + // configs, swap, evict the K1 connection). + std::thread::sleep(Duration::from_millis(500)); + + // Send under K2. Server should now observe K2's pubkey. Same retry + // pattern: rotation wiped the table, so the post-rotation egress is + // also a dial trigger and is dropped on the first try. + let p2 = Bytes::from_static(b"under-K2"); + send_until_received( + &rt, + &client.egress, + server_pubkey, + server_addr, + p2.clone(), + &server_ingress_rx, + Duration::from_secs(5), + |d| (d.peer_pubkey == k2_pubkey && d.message == p2).then_some(()), + "server never received message attributed to K2 after rotation", + ); + + // Identity rotation is the local endpoint closing its own + // connections with IDENTITY_ROTATED — not a HANDOVER event for + // the peer. The peer's read loop reaps the entry without soft-ban + // and accepts the fresh K2 handshake on the next send. + drop(client); + drop(server); +}