Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions votor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ solana-transaction-error = { workspace = true }
solana-vote = { workspace = true }
solana-vote-program = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt", "sync", "time"] }
wincode = { workspace = true, features = ["alloc"] }

[dev-dependencies]
Expand Down
151 changes: 118 additions & 33 deletions votor/src/voting_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,34 @@ use {
certificate::Certificate,
consensus_message::{ConsensusMessage, VoteMessage},
},
crossbeam_channel::Receiver,
crossbeam_channel::{Receiver, TryRecvError},
solana_client::connection_cache::ConnectionCache,
solana_clock::Slot,
solana_connection_cache::client_connection::ClientConnection,
solana_connection_cache::nonblocking::client_connection::ClientConnection,
solana_gossip::cluster_info::ClusterInfo,
solana_measure::measure::Measure,
solana_pubkey::Pubkey,
solana_runtime::bank_forks::BankForks,
solana_transaction_error::TransportError,
std::{
collections::HashMap,
collections::{HashMap, hash_map::DefaultHasher},
hash::{Hash, Hasher},
net::SocketAddr,
sync::{Arc, RwLock},
thread::{self, Builder, JoinHandle},
time::{Duration, Instant},
},
tokio::{sync::mpsc, time::timeout},
};

const STAKED_VALIDATORS_CACHE_TTL_S: u64 = 5;
/// Target number of epochs to keep in the staked validators cache. Due to lazy-lru eviction
/// semantics, the cache may hold up to `2 * STAKED_VALIDATORS_CACHE_NUM_EPOCH_TARGET` entries
/// before evicting down to this target.
const STAKED_VALIDATORS_CACHE_NUM_EPOCH_TARGET: usize = 3;
const STREAM_SEND_WORKERS: usize = 64;
const STREAM_SEND_QUEUE_CAPACITY_PER_WORKER: usize = 256;
const STREAM_SEND_TIMEOUT: Duration = Duration::from_secs(10);
const BLS_OP_RECEIVE_BATCH_SIZE: usize = 64;

#[derive(Debug)]
pub enum BLSOp {
Expand All @@ -42,14 +47,63 @@ pub enum BLSOp {
},
}

fn send_message(
buf: Vec<u8>,
socket: &SocketAddr,
connection_cache: &ConnectionCache,
) -> Result<(), TransportError> {
let client = connection_cache.get_connection(socket);
struct StreamSendRequest {
socket: SocketAddr,
payload: Arc<Vec<u8>>,
}

client.send_data_async(Arc::new(buf))
struct StreamSender {
workers: Vec<mpsc::Sender<StreamSendRequest>>,
}

impl StreamSender {
fn new(connection_cache: Arc<ConnectionCache>) -> Self {
let mut workers = Vec::with_capacity(STREAM_SEND_WORKERS);
for _ in 0..STREAM_SEND_WORKERS {
let (sender, receiver) = mpsc::channel(STREAM_SEND_QUEUE_CAPACITY_PER_WORKER);
tokio::spawn(stream_send_worker(connection_cache.clone(), receiver));
workers.push(sender);
}
Self { workers }
}

fn try_send(
&self,
socket: SocketAddr,
payload: Arc<Vec<u8>>,
) -> Result<(), mpsc::error::TrySendError<StreamSendRequest>> {
let worker_index = socket_worker_index(&socket, self.workers.len());
self.workers[worker_index].try_send(StreamSendRequest { socket, payload })
}
}

fn socket_worker_index(socket: &SocketAddr, workers: usize) -> usize {
let mut hasher = DefaultHasher::new();
socket.hash(&mut hasher);
(hasher.finish() as usize) % workers
}

async fn stream_send_worker(
connection_cache: Arc<ConnectionCache>,
mut receiver: mpsc::Receiver<StreamSendRequest>,
) {
while let Some(StreamSendRequest { socket, payload }) = receiver.recv().await {
let client = connection_cache.get_nonblocking_connection(&socket);
match timeout(
STREAM_SEND_TIMEOUT,
client.send_data(payload.as_ref().as_slice()),
)
.await
{
Ok(Ok(())) => {}
Ok(Err(err)) => {
warn!("Failed to send alpenglow message to {socket}: {err:?}");
}
Err(_) => {
warn!("Timed out sending alpenglow message to {socket}");
}
}
}
}

pub struct VotingService {
Expand Down Expand Up @@ -138,25 +192,51 @@ impl VotingService {
.name("solVotorVoteSvc".to_string())
.spawn(move || {
let mut staked_validators_cache = StakedValidatorsCache::new(
bank_forks.clone(),
bank_forks,
Duration::from_secs(STAKED_VALIDATORS_CACHE_TTL_S),
STAKED_VALIDATORS_CACHE_NUM_EPOCH_TARGET,
false,
alpenglow_port_override,
);
let runtime = tokio::runtime::Builder::new_current_thread()
.thread_name("solVotorVoteRt")
.enable_time()
.build()
.unwrap();

info!("AlpenglowVotingService has started");
while let Ok(bls_op) = bls_receiver.recv() {
Self::handle_bls_op(
&cluster_info,
vote_history_storage.as_ref(),
bls_op,
&connection_cache,
&additional_listeners,
&mut staked_validators_cache,
);
}
info!("AlpenglowVotingService has stopped");
runtime.block_on(async move {
let stream_sender = StreamSender::new(connection_cache);
info!("AlpenglowVotingService has started");
loop {
let mut handled_message = false;
for _ in 0..BLS_OP_RECEIVE_BATCH_SIZE {
match bls_receiver.try_recv() {
Ok(bls_op) => {
handled_message = true;
Self::handle_bls_op(
&cluster_info,
vote_history_storage.as_ref(),
bls_op,
&stream_sender,
&additional_listeners,
&mut staked_validators_cache,
);
}
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => {
info!("AlpenglowVotingService has stopped");
return;
}
}
}

if handled_message {
tokio::task::yield_now().await;
} else {
tokio::time::sleep(Duration::from_millis(1)).await;
}
}
});
})
.unwrap();
Self { thread_hdl }
Expand All @@ -166,7 +246,7 @@ impl VotingService {
slot: Slot,
cluster_info: &ClusterInfo,
message: &ConsensusMessage,
connection_cache: &ConnectionCache,
sender: &StreamSender,
additional_listeners: &[SocketAddr],
staked_validators_cache: &mut StakedValidatorsCache,
) {
Expand All @@ -184,12 +264,17 @@ impl VotingService {
.iter()
.chain(staked_validator_alpenglow_sockets.iter());

// We use send_message in a loop right now because we worry that sending packets too fast
// will cause a packet spike and overwhelm the network. If we later find out that this is
// not an issue, we can optimize this by using multi_targret_send or similar methods.
let buf = Arc::new(buf);
for socket in sockets {
if let Err(e) = send_message(buf.clone(), socket, connection_cache) {
warn!("Failed to send alpenglow message to {socket}: {e:?}");
match sender.try_send(*socket, buf.clone()) {
Ok(()) => {}
Err(mpsc::error::TrySendError::Full(_)) => {
warn!("alpenglow stream send queue full; dropping message to {socket}");
}
Err(mpsc::error::TrySendError::Closed(_)) => {
warn!("alpenglow stream send queue closed; dropping message to {socket}");
return;
}
}
}
}
Expand All @@ -198,7 +283,7 @@ impl VotingService {
cluster_info: &ClusterInfo,
vote_history_storage: &dyn VoteHistoryStorage,
bls_op: BLSOp,
connection_cache: &ConnectionCache,
sender: &StreamSender,
additional_listeners: &[SocketAddr],
staked_validators_cache: &mut StakedValidatorsCache,
) {
Expand All @@ -220,7 +305,7 @@ impl VotingService {
slot,
cluster_info,
&msg,
connection_cache,
sender,
additional_listeners,
staked_validators_cache,
);
Expand All @@ -233,7 +318,7 @@ impl VotingService {
slot,
cluster_info,
&message,
connection_cache,
sender,
additional_listeners,
staked_validators_cache,
);
Expand Down