diff --git a/Cargo.lock b/Cargo.lock index e278ac5e0b1..3a48bffd89e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -479,6 +479,7 @@ dependencies = [ "tempfile", "test-case", "thiserror 2.0.18", + "tokio", "tokio-util 0.7.18", "wincode", ] diff --git a/votor/Cargo.toml b/votor/Cargo.toml index 50794e2696d..86ff4fba1f8 100644 --- a/votor/Cargo.toml +++ b/votor/Cargo.toml @@ -74,6 +74,7 @@ solana-transaction-error = { workspace = true } solana-vote = { workspace = true } solana-vote-program = { workspace = true } thiserror = { workspace = true } +tokio = { workspace = true, features = ["rt", "sync", "time"] } wincode = { workspace = true, features = ["alloc"] } [dev-dependencies] diff --git a/votor/src/voting_service.rs b/votor/src/voting_service.rs index 09cf5db9fa9..d97cc602b4d 100644 --- a/votor/src/voting_service.rs +++ b/votor/src/voting_service.rs @@ -7,22 +7,23 @@ use { certificate::Certificate, consensus_message::{ConsensusMessage, VoteMessage}, }, - crossbeam_channel::Receiver, + crossbeam_channel::{Receiver, TryRecvError}, solana_client::connection_cache::ConnectionCache, solana_clock::Slot, - solana_connection_cache::client_connection::ClientConnection, + solana_connection_cache::nonblocking::client_connection::ClientConnection, solana_gossip::cluster_info::ClusterInfo, solana_measure::measure::Measure, solana_pubkey::Pubkey, solana_runtime::bank_forks::BankForks, - solana_transaction_error::TransportError, std::{ - collections::HashMap, + collections::{HashMap, hash_map::DefaultHasher}, + hash::{Hash, Hasher}, net::SocketAddr, sync::{Arc, RwLock}, thread::{self, Builder, JoinHandle}, time::{Duration, Instant}, }, + tokio::{sync::mpsc, time::timeout}, }; const STAKED_VALIDATORS_CACHE_TTL_S: u64 = 5; @@ -30,6 +31,10 @@ const STAKED_VALIDATORS_CACHE_TTL_S: u64 = 5; /// semantics, the cache may hold up to `2 * STAKED_VALIDATORS_CACHE_NUM_EPOCH_TARGET` entries /// before evicting down to this target. const STAKED_VALIDATORS_CACHE_NUM_EPOCH_TARGET: usize = 3; +const STREAM_SEND_WORKERS: usize = 64; +const STREAM_SEND_QUEUE_CAPACITY_PER_WORKER: usize = 256; +const STREAM_SEND_TIMEOUT: Duration = Duration::from_secs(10); +const BLS_OP_RECEIVE_BATCH_SIZE: usize = 64; #[derive(Debug)] pub enum BLSOp { @@ -42,14 +47,63 @@ pub enum BLSOp { }, } -fn send_message( - buf: Vec, - socket: &SocketAddr, - connection_cache: &ConnectionCache, -) -> Result<(), TransportError> { - let client = connection_cache.get_connection(socket); +struct StreamSendRequest { + socket: SocketAddr, + payload: Arc>, +} - client.send_data_async(Arc::new(buf)) +struct StreamSender { + workers: Vec>, +} + +impl StreamSender { + fn new(connection_cache: Arc) -> Self { + let mut workers = Vec::with_capacity(STREAM_SEND_WORKERS); + for _ in 0..STREAM_SEND_WORKERS { + let (sender, receiver) = mpsc::channel(STREAM_SEND_QUEUE_CAPACITY_PER_WORKER); + tokio::spawn(stream_send_worker(connection_cache.clone(), receiver)); + workers.push(sender); + } + Self { workers } + } + + fn try_send( + &self, + socket: SocketAddr, + payload: Arc>, + ) -> Result<(), mpsc::error::TrySendError> { + let worker_index = socket_worker_index(&socket, self.workers.len()); + self.workers[worker_index].try_send(StreamSendRequest { socket, payload }) + } +} + +fn socket_worker_index(socket: &SocketAddr, workers: usize) -> usize { + let mut hasher = DefaultHasher::new(); + socket.hash(&mut hasher); + (hasher.finish() as usize) % workers +} + +async fn stream_send_worker( + connection_cache: Arc, + mut receiver: mpsc::Receiver, +) { + while let Some(StreamSendRequest { socket, payload }) = receiver.recv().await { + let client = connection_cache.get_nonblocking_connection(&socket); + match timeout( + STREAM_SEND_TIMEOUT, + client.send_data(payload.as_ref().as_slice()), + ) + .await + { + Ok(Ok(())) => {} + Ok(Err(err)) => { + warn!("Failed to send alpenglow message to {socket}: {err:?}"); + } + Err(_) => { + warn!("Timed out sending alpenglow message to {socket}"); + } + } + } } pub struct VotingService { @@ -138,25 +192,51 @@ impl VotingService { .name("solVotorVoteSvc".to_string()) .spawn(move || { let mut staked_validators_cache = StakedValidatorsCache::new( - bank_forks.clone(), + bank_forks, Duration::from_secs(STAKED_VALIDATORS_CACHE_TTL_S), STAKED_VALIDATORS_CACHE_NUM_EPOCH_TARGET, false, alpenglow_port_override, ); + let runtime = tokio::runtime::Builder::new_current_thread() + .thread_name("solVotorVoteRt") + .enable_time() + .build() + .unwrap(); - info!("AlpenglowVotingService has started"); - while let Ok(bls_op) = bls_receiver.recv() { - Self::handle_bls_op( - &cluster_info, - vote_history_storage.as_ref(), - bls_op, - &connection_cache, - &additional_listeners, - &mut staked_validators_cache, - ); - } - info!("AlpenglowVotingService has stopped"); + runtime.block_on(async move { + let stream_sender = StreamSender::new(connection_cache); + info!("AlpenglowVotingService has started"); + loop { + let mut handled_message = false; + for _ in 0..BLS_OP_RECEIVE_BATCH_SIZE { + match bls_receiver.try_recv() { + Ok(bls_op) => { + handled_message = true; + Self::handle_bls_op( + &cluster_info, + vote_history_storage.as_ref(), + bls_op, + &stream_sender, + &additional_listeners, + &mut staked_validators_cache, + ); + } + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Disconnected) => { + info!("AlpenglowVotingService has stopped"); + return; + } + } + } + + if handled_message { + tokio::task::yield_now().await; + } else { + tokio::time::sleep(Duration::from_millis(1)).await; + } + } + }); }) .unwrap(); Self { thread_hdl } @@ -166,7 +246,7 @@ impl VotingService { slot: Slot, cluster_info: &ClusterInfo, message: &ConsensusMessage, - connection_cache: &ConnectionCache, + sender: &StreamSender, additional_listeners: &[SocketAddr], staked_validators_cache: &mut StakedValidatorsCache, ) { @@ -184,12 +264,17 @@ impl VotingService { .iter() .chain(staked_validator_alpenglow_sockets.iter()); - // We use send_message in a loop right now because we worry that sending packets too fast - // will cause a packet spike and overwhelm the network. If we later find out that this is - // not an issue, we can optimize this by using multi_targret_send or similar methods. + let buf = Arc::new(buf); for socket in sockets { - if let Err(e) = send_message(buf.clone(), socket, connection_cache) { - warn!("Failed to send alpenglow message to {socket}: {e:?}"); + match sender.try_send(*socket, buf.clone()) { + Ok(()) => {} + Err(mpsc::error::TrySendError::Full(_)) => { + warn!("alpenglow stream send queue full; dropping message to {socket}"); + } + Err(mpsc::error::TrySendError::Closed(_)) => { + warn!("alpenglow stream send queue closed; dropping message to {socket}"); + return; + } } } } @@ -198,7 +283,7 @@ impl VotingService { cluster_info: &ClusterInfo, vote_history_storage: &dyn VoteHistoryStorage, bls_op: BLSOp, - connection_cache: &ConnectionCache, + sender: &StreamSender, additional_listeners: &[SocketAddr], staked_validators_cache: &mut StakedValidatorsCache, ) { @@ -220,7 +305,7 @@ impl VotingService { slot, cluster_info, &msg, - connection_cache, + sender, additional_listeners, staked_validators_cache, ); @@ -233,7 +318,7 @@ impl VotingService { slot, cluster_info, &message, - connection_cache, + sender, additional_listeners, staked_validators_cache, );