diff --git a/Cargo.toml b/Cargo.toml index 70668b02..ca216bef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,9 +20,15 @@ autobenches = false edition = "2018" [dependencies] +async-channel = "1.6" +async-net = "1.6.0" +futures-timer = "3.0" +bevy_tasks = "0.5" +tracing = "0.1" byteorder = "1.4.3" crc = "1.8" crossbeam-channel = "0.5" +dashmap = "4.0" lazy_static = "1.4" log = "0.4" rand = "0.8" @@ -38,6 +44,9 @@ serde = "1.0" serde_derive = "1.0" quickcheck = "1.0" quickcheck_macros = "1.0" +static_assertions = "1.1" +futures = "0.3" +serial_test = "0.5" [features] tester = [ @@ -52,4 +61,4 @@ harness = false [[bin]] name = "laminar-tester" -required-features = ["tester"] \ No newline at end of file +required-features = ["tester"] diff --git a/src/net.rs b/src/net.rs index 7b7a5949..c7365496 100644 --- a/src/net.rs +++ b/src/net.rs @@ -18,4 +18,5 @@ mod quality; mod socket; mod virtual_connection; +pub mod aio; pub mod constants; diff --git a/src/net/aio/channel.rs b/src/net/aio/channel.rs new file mode 100644 index 00000000..e47025eb --- /dev/null +++ b/src/net/aio/channel.rs @@ -0,0 +1,224 @@ +use async_channel::{TryRecvError, TrySendError}; + +#[derive(Clone)] +pub struct BidirectionalAsyncChannel { + incoming: async_channel::Receiver, + outgoing: async_channel::Sender, +} + +impl BidirectionalAsyncChannel { + /// Creates a pair of connected Peers without limitations on how many messages can be + /// buffered. + pub fn create_unbounded_pair() -> (Self, Self) { + Self::create_pair(async_channel::unbounded(), async_channel::unbounded()) + } + + /// Creates a pair of connected Peers with a limited capacity for many messages can be + /// buffered in either direction. + pub fn create_bounded_pair(capacity: usize) -> (Self, Self) { + Self::create_pair( + async_channel::bounded(capacity), + async_channel::bounded(capacity), + ) + } + + /// Sends a message to the connected peer. + /// + /// If the send buffer is full, this method waits until there is space for a message. + /// + /// If the peer is disconnected, this method returns an error. + #[inline] + pub fn send(&self, message: T) -> async_channel::Send<'_, T> { + self.outgoing.send(message) + } + + /// Receives a message from the connected peer. + /// + /// If there is no pending messages, this method waits until there is a message. + /// + /// If the peer is disconnected, this method receives a message or returns an error if there + /// are no more messages. + #[inline] + pub fn recv(&self) -> async_channel::Recv<'_, T> { + self.incoming.recv() + } + + /// Attempts to send a message to the connected peer. + #[inline] + pub fn try_send(&self, message: T) -> Result<(), TrySendError> { + self.outgoing.try_send(message) + } + + /// Attempts to receive a message from the connected peer. + #[inline] + pub fn try_recv(&self) -> Result { + self.incoming.try_recv() + } + + /// Returns true if the associated peer is still connected. + pub fn is_connected(&self) -> bool { + !self.incoming.is_closed() && !self.outgoing.is_closed() + } + + /// Disconnects the paired Peers from either end. Any future attempts to send messages in + /// either direction will fail, but any messages not yet recieved. + /// + /// If the Peer, or it's constituent channels were cloned, all of the cloned instances will + /// appear disconnected. + pub fn disconnect(&self) { + self.outgoing.close(); + self.incoming.close(); + } + + /// Gets the raw sender for the peer. + pub fn sender(&self) -> async_channel::Sender { + self.outgoing.clone() + } + + /// Gets the raw reciever for the peer. + pub fn reciever(&self) -> async_channel::Receiver { + self.incoming.clone() + } + + /// The number of messages that are currently buffered in the send queue. Returns 0 if the + /// channel is closed. + pub fn pending_send_count(&self) -> usize { + self.outgoing.len() + } + + /// The number of messages that are currently buffered in the recieve queue. Returns 0 if the + /// channel is closed. + pub fn pending_recv_count(&self) -> usize { + self.incoming.len() + } + + fn create_pair( + a: (async_channel::Sender, async_channel::Receiver), + b: (async_channel::Sender, async_channel::Receiver), + ) -> (Self, Self) { + let (a_send, a_recv) = a; + let (b_send, b_recv) = b; + let a = Self { + incoming: a_recv, + outgoing: b_send, + }; + let b = Self { + incoming: b_recv, + outgoing: a_send, + }; + (a, b) + } +} + +#[cfg(test)] +mod test { + use super::*; + + static_assertions::assert_impl_all!(BidirectionalAsyncChannel: Clone); + + #[test] + pub fn send_works_both_ways() { + let (a, b) = BidirectionalAsyncChannel::::create_unbounded_pair(); + + assert!(a.try_send(1).is_ok()); + assert!(b.try_send(4).is_ok()); + assert!(a.try_send(2).is_ok()); + assert!(b.try_send(5).is_ok()); + assert!(a.try_send(3).is_ok()); + assert!(b.try_send(6).is_ok()); + + assert_eq!(a.pending_send_count(), 3); + assert_eq!(b.pending_send_count(), 3); + assert_eq!(a.pending_recv_count(), 3); + assert_eq!(b.pending_recv_count(), 3); + + assert_eq!(a.try_recv(), Ok(4)); + assert_eq!(a.try_recv(), Ok(5)); + assert_eq!(a.try_recv(), Ok(6)); + + assert_eq!(b.try_recv(), Ok(1)); + assert_eq!(b.try_recv(), Ok(2)); + assert_eq!(b.try_recv(), Ok(3)); + } + + #[test] + pub fn bounded_pairs_error_on_being_full() { + let (a, b) = BidirectionalAsyncChannel::::create_bounded_pair(2); + + assert!(a.try_send(1).is_ok()); + assert!(a.try_send(2).is_ok()); + assert!(matches!(a.try_send(3), Err(TrySendError::Full(3)))); + assert!(b.try_send(4).is_ok()); + assert!(b.try_send(5).is_ok()); + assert!(matches!(b.try_send(6), Err(TrySendError::Full(6)))); + + assert_eq!(a.try_recv(), Ok(4)); + assert_eq!(a.try_recv(), Ok(5)); + assert_eq!(a.try_recv(), Err(TryRecvError::Empty)); + + assert_eq!(b.try_recv(), Ok(1)); + assert_eq!(b.try_recv(), Ok(2)); + assert_eq!(a.try_recv(), Err(TryRecvError::Empty)); + } + + #[test] + pub fn disconnecting_closes_both_sides() { + let (a, b) = BidirectionalAsyncChannel::::create_bounded_pair(2); + + a.disconnect(); + assert!(!a.is_connected()); + assert!(!b.is_connected()); + + let (a, b) = BidirectionalAsyncChannel::::create_bounded_pair(2); + + b.disconnect(); + assert!(!a.is_connected()); + assert!(!b.is_connected()); + } + + #[test] + pub fn disconnecting_stop_any_future_sends() { + let (a, b) = BidirectionalAsyncChannel::::create_bounded_pair(2); + + a.disconnect(); + assert!(!a.is_connected()); + assert!(!b.is_connected()); + + assert!(matches!(a.try_send(1), Err(TrySendError::Closed(1)))); + assert!(matches!(b.try_send(1), Err(TrySendError::Closed(1)))); + assert!(matches!(a.try_recv(), Err(TryRecvError::Closed))); + assert!(matches!(b.try_recv(), Err(TryRecvError::Closed))); + } + + #[test] + pub fn disconnecting_allows_existing_items_to_be_flushed() { + let (a, b) = BidirectionalAsyncChannel::::create_unbounded_pair(); + + assert!(a.try_send(1).is_ok()); + assert!(a.try_send(2).is_ok()); + a.disconnect(); + assert!(matches!(a.try_send(3), Err(TrySendError::Closed(3)))); + + assert_eq!(b.try_recv(), Ok(1)); + assert_eq!(b.try_recv(), Ok(2)); + assert_eq!(b.try_recv(), Err(TryRecvError::Closed)); + } + + #[test] + pub fn dropping_leads_to_disconnect() { + let (a, b) = BidirectionalAsyncChannel::::create_unbounded_pair(); + + assert!(a.is_connected()); + drop(b); + assert!(!a.is_connected()); + + let (a, b) = BidirectionalAsyncChannel::::create_unbounded_pair(); + let c = b.clone(); + + assert!(a.is_connected()); + drop(b); + assert!(a.is_connected()); + drop(c); + assert!(!a.is_connected()); + } +} diff --git a/src/net/aio/mod.rs b/src/net/aio/mod.rs new file mode 100644 index 00000000..f4ca5304 --- /dev/null +++ b/src/net/aio/mod.rs @@ -0,0 +1,21 @@ +mod channel; +mod peer; +mod peers; + +pub mod udp; +pub use channel::*; +pub use peer::*; +pub use peers::*; + +/// Forwards all messages from one reciever to a sender until either the sender or reciever are +/// closed. +pub async fn forward( + input: async_channel::Receiver, + output: async_channel::Sender, +) { + while let Ok(message) = input.recv().await { + if let Err(_) = output.send(message).await { + break; + } + } +} diff --git a/src/net/aio/peer.rs b/src/net/aio/peer.rs new file mode 100644 index 00000000..8e9d712f --- /dev/null +++ b/src/net/aio/peer.rs @@ -0,0 +1,84 @@ +use crate::net::LinkConditioner; +use super::channel::BidirectionalAsyncChannel; +use futures_timer::Delay; +use bevy_tasks::TaskPool; +use std::fmt; +use std::ops::Deref; + +/// A bidirectional channel for binary messages. +#[derive(Clone)] +pub struct Peer(BidirectionalAsyncChannel>); + +impl Peer { + /// Creates a pair of connected Peers without limitations on how many messages can be + /// buffered. + pub fn create_unbounded_pair() -> (Self, Self) { + let (a, b) = BidirectionalAsyncChannel::create_unbounded_pair(); + (Self(a), Self(b)) + } + + /// Creates a pair of connected Peers with a limited capacity for many messages can be + /// buffered in either direction. + pub fn create_bounded_pair(capacity: usize) -> (Self, Self) { + let (a, b) = BidirectionalAsyncChannel::create_bounded_pair(capacity); + (Self(a), Self(b)) + } + + /// Converts the peer into a conditioned one. All outgoing sends will be randomly dropped + /// and have additional latency added based on the provided LinkConditioner. + /// + /// Useful for locally testing high latency or packet loss conditions. + /// + /// It is strongly advised not to use this in a release build as it might introduce + /// unnecessary packet loss and latency. + pub fn with_link_conditioner(self, pool: &TaskPool, conditioner: LinkConditioner) -> Self { + let (a, b) = Self::create_unbounded_pair(); + pool.spawn(Self::conditioned_send( + pool.clone(), b.reciever(), conditioner, self.sender())).detach(); + pool.spawn(super::forward(self.reciever(), b.sender())).detach(); + a + } + + async fn conditioned_send( + pool: TaskPool, + input: async_channel::Receiver>, + mut conditioner: LinkConditioner, + output: async_channel::Sender> + ) { + while let Ok(message) = input.recv().await { + if !conditioner.should_send() { + continue; + } + + if output.is_closed() { + break; + } + + let latency = conditioner.sample_latency(); + let output = output.clone(); + pool.spawn(async move { + Delay::new(latency).await; + output.send(message).await; + }); + } + } +} + +impl fmt::Debug for Peer { + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + write!(f, "Peer {{ connected: {} }}", self.is_connected()) + } +} + +impl Deref for Peer { + type Target = BidirectionalAsyncChannel>; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +#[cfg(test)] +mod test { + use super::*; + static_assertions::assert_impl_all!(Peer: Deref, Clone, Send, Sync); +} diff --git a/src/net/aio/peers.rs b/src/net/aio/peers.rs new file mode 100644 index 00000000..9023b61d --- /dev/null +++ b/src/net/aio/peers.rs @@ -0,0 +1,159 @@ +use super::Peer; +use core::hash::Hash; +use dashmap::DashMap; + +/// A keyed mapping of [Peer]s with ownership semantics. +/// +/// Dropping will disconnect all owned peers. +/// +/// [Peer]: crate::Peer +#[derive(Debug)] +pub struct Peers(DashMap) +where + T: Eq + Hash; + +impl Peers { + /// Gets a [Peer] by it's ID, if available. + /// + /// [Peer]: crate::Peer + pub fn get(&self, id: &T) -> Option { + self.0.get(&id).and_then(|kv| { + let peer = kv.value().clone(); + if peer.is_connected() { + Some(peer) + } else { + None + } + }) + } + + /// Gets the number of active connections managed by it. + pub fn len(&self) -> usize { + self.0.iter().filter(|kv| kv.value().is_connected()).count() + } + + /// Checks if the store has a connection to the given ID. + pub fn contains(&self, id: &T) -> bool { + self.0 + .get(&id) + .map(|kv| kv.value().is_connected()) + .unwrap_or(false) + } + + /// Creates a new unbounded peer pair and stores one end, mapping it to the provided ID, + /// returning the other end. + /// + /// If a peer was previous stored at the given ID, it will be replaced and disconnected. + #[must_use] + pub fn create_unbounded(&self, id: T) -> Peer { + let (a, b) = Peer::create_unbounded_pair(); + if let Some(prior) = self.0.insert(id, a) { + prior.disconnect(); + } + b + } + + /// Creates an bounded peer pair and stores one end, mapping it to the provided ID, returning + /// the other end. + /// + /// If a peer was previous stored at the given ID, it will be dropped and replaced. + #[must_use] + pub fn create_bounded(&self, id: T, capacity: usize) -> Peer { + let (a, b) = Peer::create_bounded_pair(capacity); + self.0.insert(id, a); + b + } + + /// Disconnects and removes a connection by it's ID + /// + /// A no-op if there no Peer with the given ID. + pub fn disconnect(&self, id: &T) { + if let Some((_, peer)) = self.0.remove(&id) { + peer.disconnect(); + } + } + + /// Removes all peers that are disconnected. + pub fn flush_disconnected(&self) { + self.0.retain(|_, peer| peer.is_connected()) + } +} + +impl Default for Peers { + fn default() -> Self { + Self(DashMap::::new()) + } +} + +impl Drop for Peers { + fn drop(&mut self) { + for kv in self.0.iter() { + kv.value().disconnect(); + } + } +} + +#[cfg(test)] +mod test { + use super::*; + + static_assertions::assert_impl_all!(Peers: Default, Drop, Send, Sync); + + #[test] + pub fn test_contains_works() { + const ID: i32 = 420; + let peers = Peers::::default(); + let _peer = peers.create_unbounded(ID); + assert!(peers.contains(&ID)); + assert!(peers.get(&ID).is_some()); + } + + #[test] + pub fn disconnecting_removes_peer() { + const ID: i32 = 420; + let peers = Peers::::default(); + let peer = peers.create_unbounded(ID); + assert!(peers.contains(&ID)); + assert!(peers.get(&ID).is_some()); + peer.disconnect(); + assert!(!peers.contains(&ID)); + assert!(peers.get(&ID).is_none()); + } + + #[test] + pub fn disconnecting_via_drop_removes_peer() { + const ID: i32 = 420; + let peers = Peers::::default(); + let peer = peers.create_unbounded(ID); + assert!(peers.contains(&ID)); + assert!(peers.get(&ID).is_some()); + drop(peer); + assert!(!peers.contains(&ID)); + assert!(peers.get(&ID).is_none()); + } + + #[test] + pub fn disconnecting_local_disconnects_remote() { + const ID: i32 = 420; + let peers = Peers::::default(); + let peer_remote = peers.create_unbounded(ID); + peers.disconnect(&ID); + assert!(!peer_remote.is_connected()); + } + + #[test] + pub fn dropping_disconnects_all_remotes() { + let peers = Peers::::default(); + let a = peers.create_unbounded(1); + let b = peers.create_unbounded(2); + let c = peers.create_unbounded(3); + + assert!(a.is_connected()); + assert!(b.is_connected()); + assert!(c.is_connected()); + drop(peers); + assert!(!a.is_connected()); + assert!(!b.is_connected()); + assert!(!c.is_connected()); + } +} diff --git a/src/net/aio/udp.rs b/src/net/aio/udp.rs new file mode 100644 index 00000000..b91e499c --- /dev/null +++ b/src/net/aio/udp.rs @@ -0,0 +1,266 @@ +use crate::Config; +use async_channel::TrySendError; +use async_net::{SocketAddr, UdpSocket}; +use super::{Peer, Peers}; +use bevy_tasks::TaskPool; +use std::convert::TryFrom; +use std::net::{Ipv4Addr, SocketAddrV4,ToSocketAddrs, UdpSocket as BlockingUdpSocket}; +use std::sync::{Arc, Weak}; +use std::time::{Duration, Instant}; +use tracing::{debug, error}; + +const CLEANUP_INTERVAL: Duration = Duration::from_millis(1000); + +#[derive(Debug)] +pub struct UdpManager { + peers: Arc>, + config: Config, + socket: UdpSocket, + task_pool: TaskPool, +} + +impl UdpManager { + /// Binds a [UdpSocket] and starts listening on it. + /// + /// # Errors + /// Returns a [std::io::Error] if it fails to bind to the provided socket addresses + /// or start an async poll on the socket. + /// + /// [UdpSocket]: async_net::UdpSocket + pub fn bind(pool: TaskPool, addrs: impl ToSocketAddrs) -> std::io::Result { + Self::bind_with_config(pool, addrs, Config::default()) + } + + /// Binds to any local port on the system, if available. + /// + /// # Errors + /// Returns a [std::io::Error] if it fails to bind to the provided socket addresses + /// or start an async poll on the socket. + pub fn bind_any(pool: TaskPool) -> std::io::Result { + Self::bind_any_with_config(pool, Config::default()) + } + + /// Binds to any local port on the system, if available, with a given config. + /// + /// # Errors + /// Returns a [std::io::Error] if it fails to bind to the provided socket addresses + /// or start an async poll on the socket. + pub fn bind_any_with_config( + pool: TaskPool, + config: Config + ) -> std::io::Result { + let loopback = Ipv4Addr::new(127, 0, 0, 1); + let address = SocketAddrV4::new(loopback, 0); + let blocking = BlockingUdpSocket::bind(address)?; + let socket = UdpSocket::try_from(blocking)?; + Ok(Self::bind_internal(pool, socket, config)) + } + + /// Binds to the socket and then sets up `ActiveConnections` to manage the "connections". + /// Because UDP connections are not persistent, we can only infer the status of the remote + /// endpoint by looking to see if they are still sending packets or not + /// + /// This function allows you to configure the socket with the passed configuration. + /// + /// # Errors + /// Returns a [std::io::Error] if it fails to bind to the provided socket addresses + /// or start an async poll on the socket. + pub fn bind_with_config( + pool: TaskPool, + addrs: impl ToSocketAddrs, + config: Config + ) -> std::io::Result { + let blocking = BlockingUdpSocket::bind(addrs)?; + let socket = UdpSocket::try_from(blocking)?; + Ok(Self::bind_internal(pool, socket, config)) + } + + fn bind_internal(pool: TaskPool, socket: UdpSocket, config: Config) -> Self { + let peers = Arc::new(Peers::default()); + let read_buffer_len = config.receive_buffer_max_size; + let manager = Self { + peers: peers.clone(), + config, + socket: socket.clone(), + task_pool: pool.clone(), + }; + + pool.spawn(Self::recv( + Arc::downgrade(&peers), + socket, + read_buffer_len + )) + .detach(); + + manager + } + + /// Creates a [Peer] bound to a specific target [SocketAddr]. + /// + /// Note this does not block or send any I/O. It simply creates the tasks for reading and + /// sending. + /// + /// [Peer]: super::Peer + /// [SocketAddr]: std::net::SocketAddr + pub fn connect(&self, remote: SocketAddr) -> Peer { + let peer = self.peers.create_bounded(remote, self.config.socket_event_buffer_size); + let other = self.peers.get(&remote).unwrap().clone(); + let socket = self.socket.clone(); + let task = Self::send(other, remote, socket); + self.task_pool.spawn(task).detach(); + peer + } + + /// Disconnects the connection to a given [SocketAddr] if available. + /// + /// [SocketAddr]: std::net::SocketAddr + pub fn disconnect(&self, addr: SocketAddr) { + self.peers.disconnect(&addr); + } + + async fn send(peer: Peer, target_addr: SocketAddr, socket: UdpSocket) { + while let Ok(message) = peer.recv().await { + if let Err(err) = socket.send_to(message.as_ref(), target_addr).await { + error!( + "Error while sending message to {:?}: {:?}", + target_addr, err + ); + } + } + + if let Ok(addr) = socket.local_addr() { + debug!( + "Stopping sender to {} from UDP socket on {}", + target_addr, + addr + ); + } + } + + async fn recv( + peers: Weak>, + socket: UdpSocket, + read_buffer_len: usize + ) { + let mut read_buf = vec![0u8; read_buffer_len]; + let last_flush = Instant::now(); + + while let Some(peers) = peers.upgrade() { + match socket.recv_from(&mut read_buf).await { + Ok((len, addr)) => { + debug_assert!(len < read_buffer_len); + if let Some(peer) = peers.get(&addr) { + Self::forward_packet(addr, peer, &read_buf[0..len]); + } + } + Err(err) => { + error!("Error while receiving UDP packets: {:?}", err); + } + } + + // Periodically cleanup the peers. + if Instant::now() - last_flush > CLEANUP_INTERVAL { + peers.flush_disconnected(); + } + } + + if let Ok(addr) = socket.local_addr() { + debug!( + "Stopping reciever for UDP socket on {}", + addr + ); + } + } + + fn forward_packet(addr: SocketAddr, peer: Peer, data: &[u8]) { + match peer.try_send(data.into()) { + Ok(()) => {} + Err(TrySendError::Full(_)) => { + debug!( + "Dropped packet due to the packet queue for {} being full", + addr + ); + } + Err(TrySendError::Closed(_)) => { + debug!("Dropped packet for disconnected packet queue: {} ", addr); + } + } + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + #[serial_test::serial] + pub fn test_basic_connect() { + const ADDR_A: &str = "127.0.0.1:10000"; + const ADDR_B: &str = "127.0.0.1:10001"; + let pool = TaskPool::new(); + + let socket_a = UdpManager::bind(pool.clone(), ADDR_A).unwrap(); + let socket_b = UdpManager::bind(pool.clone(), ADDR_B).unwrap(); + + let peer_a = socket_b.connect(ADDR_A.parse().unwrap()); + let peer_b = socket_a.connect(ADDR_B.parse().unwrap()); + + let msg_a: Box<[u8]> = b"Hello A!"[0..].into(); + let msg_b: Box<[u8]> = b"Hello B!"[0..].into(); + + peer_a.try_send(msg_b.clone()).unwrap(); + peer_b.try_send(msg_a.clone()).unwrap(); + + let recv_msg_a = futures::executor::block_on(peer_a.recv()).unwrap(); + let recv_msg_b = futures::executor::block_on(peer_b.recv()).unwrap(); + + assert_eq!(msg_a, recv_msg_a); + assert_eq!(msg_b, recv_msg_b); + } + + #[test] + #[serial_test::serial] + pub fn test_multiple_send() { + const ADDR_A: &str = "127.0.0.1:10000"; + const ADDR_B: &str = "127.0.0.1:10001"; + let pool = TaskPool::new(); + + let socket_a = UdpManager::bind(pool.clone(), ADDR_A).unwrap(); + let socket_b = UdpManager::bind(pool.clone(), ADDR_B).unwrap(); + + let peer_a = socket_b.connect(ADDR_A.parse().unwrap()); + let peer_b = socket_a.connect(ADDR_B.parse().unwrap()); + + peer_a.try_send(b"100"[0..].into()).unwrap(); + peer_a.try_send(b"101"[0..].into()).unwrap(); + peer_a.try_send(b"102"[0..].into()).unwrap(); + peer_a.try_send(b"103"[0..].into()).unwrap(); + peer_a.try_send(b"104"[0..].into()).unwrap(); + peer_a.try_send(b"105"[0..].into()).unwrap(); + + assert_eq!( + futures::executor::block_on(peer_b.recv()), + Ok(b"100"[0..].into()) + ); + assert_eq!( + futures::executor::block_on(peer_b.recv()), + Ok(b"101"[0..].into()) + ); + assert_eq!( + futures::executor::block_on(peer_b.recv()), + Ok(b"102"[0..].into()) + ); + assert_eq!( + futures::executor::block_on(peer_b.recv()), + Ok(b"103"[0..].into()) + ); + assert_eq!( + futures::executor::block_on(peer_b.recv()), + Ok(b"104"[0..].into()) + ); + assert_eq!( + futures::executor::block_on(peer_b.recv()), + Ok(b"105"[0..].into()) + ); + } +} diff --git a/src/net/link_conditioner.rs b/src/net/link_conditioner.rs index a3fd3d6c..14f0a6ac 100644 --- a/src/net/link_conditioner.rs +++ b/src/net/link_conditioner.rs @@ -4,6 +4,7 @@ //! becomes more sophisticated. use std::time::Duration; +use std::convert::TryFrom; use rand::Rng; use rand_pcg::Pcg64Mcg as Random; @@ -15,7 +16,7 @@ pub struct LinkConditioner { // Value between 0 and 1, representing the % change a packet will be dropped on sending packet_loss: f64, // Duration of the delay imposed between packets - latency: Duration, + max_additional_latency_millis: f64, // Random number generator random: Random, } @@ -26,7 +27,7 @@ impl LinkConditioner { pub fn new() -> LinkConditioner { LinkConditioner { packet_loss: 0.0, - latency: Duration::default(), + max_additional_latency_millis: 0.0, random: Random::new(0), } } @@ -37,16 +38,23 @@ impl LinkConditioner { self.packet_loss = rate; } - /// Sets the latency the link conditioner should apply to each packet + /// Sets the latency the link conditioner should apply to each packet. #[allow(dead_code)] pub fn set_latency(&mut self, latency: Duration) { - self.latency = latency + self.max_additional_latency_millis = f64::try_from(latency.as_millis() as u32).unwrap(); } /// Function that checks to see if a packet should be dropped or not pub fn should_send(&mut self) -> bool { self.random.gen_range(0.0..1.0) >= self.packet_loss } + + /// Adds a fixed amount of latency + pub fn sample_latency(&mut self) -> Duration { + let wait_millis = + (self.random.gen_range(0.0..1.0) * self.max_additional_latency_millis) as u64; + Duration::from_millis(wait_millis) + } } impl Default for LinkConditioner {