Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions mavlink-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ crc-any = { workspace = true, default-features = false }
embedded-hal-02 = { version = "0.2", optional = true, package = "embedded-hal" }
embedded-io = { version = "0.7", optional = true }
embedded-io-async = { version = "0.7", optional = true }
futures = { version = "0.3", default-features = false, optional = true }
futures = { version = "0.3", default-features = false, features = ["std"], optional = true }
nb = { version = "1.0", optional = true }
rand = { version = "0.9", optional = true, default-features = false, features = ["std", "std_rng"] }
serde = { version = "1.0.115", optional = true, features = ["derive"] }
Expand All @@ -35,6 +35,7 @@ serialport = { version = "4.7.2", default-features = false, optional = true }
sha2 = { version = "0.10", optional = true }
tokio = { version = "1.0", default-features = false, features = ["io-util", "net", "fs"], optional = true }
tokio-serial = { version = "5.4.4", default-features = false, optional = true }
tokio-util = { version = "0.7", default-features = false, features = ["compat"], optional = true }

[features]
default = ["std", "tcp", "udp", "direct-serial", "serde"]
Expand All @@ -49,7 +50,7 @@ direct-serial = ["serialport"]
embedded = ["dep:embedded-io", "dep:embedded-io-async"]
embedded-hal-02 = ["dep:nb", "dep:embedded-hal-02"]
serde = ["dep:serde", "dep:serde_arrays"]
tokio-1 = ["dep:tokio", "dep:async-trait", "dep:tokio-serial", "dep:futures"]
tokio-1 = ["dep:tokio", "dep:async-trait", "dep:tokio-serial", "dep:futures", "dep:tokio-util"]
signing = ["dep:sha2"]
arbitrary = ["dep:arbitrary", "dep:rand"]

Expand Down
21 changes: 9 additions & 12 deletions mavlink-core/src/async_connection/direct_serial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use async_trait::async_trait;
use futures::lock::Mutex;
use tokio::io::{BufReader, ReadHalf, WriteHalf};
use tokio_serial::{SerialPort, SerialPortBuilderExt, SerialStream};
use tokio_util::compat::{Compat, TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};

use super::AsyncConnectable;
use crate::connection::direct_serial::config::SerialConfig;
Expand All @@ -28,8 +29,8 @@ use crate::{
use super::AsyncMavConnection;

pub struct AsyncSerialConnection {
read_port: Mutex<AsyncPeekReader<BufReader<ReadHalf<SerialStream>>>>,
write_port: Mutex<WriteHalf<SerialStream>>,
read_port: Mutex<AsyncPeekReader<Compat<BufReader<ReadHalf<SerialStream>>>>>,
write_port: Mutex<Compat<WriteHalf<SerialStream>>>,
sequence: AtomicU8,
protocol_version: MavlinkVersion,
recv_any_version: bool,
Expand All @@ -54,10 +55,8 @@ impl<M: Message + Sync + Send> AsyncMavConnection<M> for AsyncSerialConnection {
.await;
match result {
Ok(message) => return Ok(message),
Err(MessageReadError::Io(e)) => {
if e.kind() == io::ErrorKind::UnexpectedEof {
return Err(MessageReadError::Io(e));
}
Err(MessageReadError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => {
return Err(MessageReadError::Io(e));
}
_ => {}
}
Expand All @@ -79,10 +78,8 @@ impl<M: Message + Sync + Send> AsyncMavConnection<M> for AsyncSerialConnection {
.await;
match result {
Ok(message) => return Ok(message),
Err(MessageReadError::Io(e)) => {
if e.kind() == io::ErrorKind::UnexpectedEof {
return Err(MessageReadError::Io(e));
}
Err(MessageReadError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => {
return Err(MessageReadError::Io(e));
}
_ => {}
}
Expand Down Expand Up @@ -187,8 +184,8 @@ impl AsyncConnectable for SerialConfig {
let buf_reader = BufReader::with_capacity(read_buffer_capacity, reader);

Ok(Box::new(AsyncSerialConnection {
read_port: Mutex::new(AsyncPeekReader::new(buf_reader)),
write_port: Mutex::new(writer),
read_port: Mutex::new(AsyncPeekReader::new(buf_reader.compat())),
write_port: Mutex::new(writer.compat_write()),
sequence: AtomicU8::new(0),
protocol_version: MavlinkVersion::V2,
recv_any_version: false,
Expand Down
17 changes: 7 additions & 10 deletions mavlink-core/src/async_connection/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::{
use async_trait::async_trait;
use futures::lock::Mutex;
use tokio::fs::File;
use tokio_util::compat::{Compat, TokioAsyncReadCompatExt};

#[cfg(not(feature = "signing"))]
use crate::{read_versioned_msg_async, read_versioned_raw_message_async};
Expand All @@ -27,7 +28,7 @@ use crate::{
pub async fn open(file_path: &PathBuf) -> io::Result<AsyncFileConnection> {
let file = File::open(file_path).await?;
Ok(AsyncFileConnection {
file: Mutex::new(AsyncPeekReader::new(file)),
file: Mutex::new(AsyncPeekReader::new(file.compat())),
protocol_version: MavlinkVersion::V2,
recv_any_version: false,
#[cfg(feature = "signing")]
Expand All @@ -36,7 +37,7 @@ pub async fn open(file_path: &PathBuf) -> io::Result<AsyncFileConnection> {
}

pub struct AsyncFileConnection {
file: Mutex<AsyncPeekReader<File>>,
file: Mutex<AsyncPeekReader<Compat<File>>>,
protocol_version: MavlinkVersion,
recv_any_version: bool,
#[cfg(feature = "signing")]
Expand All @@ -62,10 +63,8 @@ impl<M: Message + Sync + Send> AsyncMavConnection<M> for AsyncFileConnection {
ok @ Ok(..) => {
return ok;
}
Err(MessageReadError::Io(e)) => {
if e.kind() == io::ErrorKind::UnexpectedEof {
return Err(MessageReadError::Io(e));
}
Err(MessageReadError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => {
return Err(MessageReadError::Io(e));
}
_ => {}
}
Expand All @@ -89,10 +88,8 @@ impl<M: Message + Sync + Send> AsyncMavConnection<M> for AsyncFileConnection {
ok @ Ok(..) => {
return ok;
}
Err(MessageReadError::Io(e)) => {
if e.kind() == io::ErrorKind::UnexpectedEof {
return Err(MessageReadError::Io(e));
}
Err(MessageReadError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => {
return Err(MessageReadError::Io(e));
}
_ => {}
}
Expand Down
14 changes: 8 additions & 6 deletions mavlink-core/src/async_connection/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use core::ops::DerefMut;
use futures::{lock::Mutex, FutureExt};
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
use tokio::net::{TcpListener, TcpStream};
use tokio_util::compat::Compat;
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};

#[cfg(not(feature = "signing"))]
use crate::{
Expand All @@ -31,9 +33,9 @@ pub async fn tcpout<T: std::net::ToSocketAddrs>(address: T) -> io::Result<AsyncT
let (reader, writer) = socket.into_split();

Ok(AsyncTcpConnection {
reader: Mutex::new(AsyncPeekReader::new(reader)),
reader: Mutex::new(AsyncPeekReader::new(reader.compat())),
writer: Mutex::new(TcpWrite {
socket: writer,
socket: writer.compat_write(),
sequence: 0,
}),
protocol_version: MavlinkVersion::V2,
Expand All @@ -52,9 +54,9 @@ pub async fn tcpin<T: std::net::ToSocketAddrs>(address: T) -> io::Result<AsyncTc
Ok((socket, _)) => {
let (reader, writer) = socket.into_split();
return Ok(AsyncTcpConnection {
reader: Mutex::new(AsyncPeekReader::new(reader)),
reader: Mutex::new(AsyncPeekReader::new(reader.compat())),
writer: Mutex::new(TcpWrite {
socket: writer,
socket: writer.compat_write(),
sequence: 0,
}),
protocol_version: MavlinkVersion::V2,
Expand All @@ -75,7 +77,7 @@ pub async fn tcpin<T: std::net::ToSocketAddrs>(address: T) -> io::Result<AsyncTc
}

pub struct AsyncTcpConnection {
reader: Mutex<AsyncPeekReader<OwnedReadHalf>>,
reader: Mutex<AsyncPeekReader<Compat<OwnedReadHalf>>>,
writer: Mutex<TcpWrite>,
protocol_version: MavlinkVersion,
recv_any_version: bool,
Expand All @@ -84,7 +86,7 @@ pub struct AsyncTcpConnection {
}

struct TcpWrite {
socket: OwnedWriteHalf,
socket: Compat<OwnedWriteHalf>,
sequence: u8,
}

Expand Down
38 changes: 13 additions & 25 deletions mavlink-core/src/async_connection/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@ use std::io;
use std::{collections::VecDeque, io::Read, sync::Arc};

use async_trait::async_trait;
use futures::io::AsyncRead;
use futures::lock::Mutex;
use tokio::{
io::{AsyncRead, ReadBuf},
net::UdpSocket,
};
use tokio::net::UdpSocket;

use crate::connection::udp::config::{UdpConfig, UdpMode};
use crate::MAVLinkMessageRaw;
Expand Down Expand Up @@ -38,36 +36,26 @@ impl AsyncRead for UdpRead {
fn poll_read(
mut self: core::pin::Pin<&mut Self>,
cx: &mut core::task::Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
if self.buffer.is_empty() {
let mut read_buffer = [0u8; MTU_SIZE];
let mut read_buffer = ReadBuf::new(&mut read_buffer);
let mut read_buf = tokio::io::ReadBuf::new(&mut read_buffer);

match self.socket.poll_recv_from(cx, &mut read_buffer) {
match self.socket.poll_recv_from(cx, &mut read_buf) {
Poll::Ready(Ok(address)) => {
let n_buffer = read_buffer.filled().len();

let n = (&read_buffer.filled()[0..n_buffer]).read(buf.initialize_unfilled())?;
buf.advance(n);

self.buffer.extend(&read_buffer.filled()[n..n_buffer]);
let filled = read_buf.filled();
let n = (&filled[..]).read(buf)?;
self.buffer.extend(&filled[n..]);
self.last_recv_address = Some(address);
Poll::Ready(Ok(()))
Poll::Ready(Ok(n))
}
Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
Poll::Pending => Poll::Pending,
}
} else {
let read_result = self.buffer.read(buf.initialize_unfilled());
let result = match read_result {
Ok(n) => {
buf.advance(n);
Ok(())
}
Err(err) => Err(err),
};
Poll::Ready(result)
let n = self.buffer.read(buf)?;
Poll::Ready(Ok(n))
}
}
}
Expand Down Expand Up @@ -278,7 +266,7 @@ impl AsyncConnectable for UdpConfig {
#[cfg(test)]
mod tests {
use super::*;
use tokio::io::AsyncReadExt;
use futures::io::AsyncReadExt;

#[tokio::test]
async fn test_datagram_buffering() {
Expand Down
26 changes: 13 additions & 13 deletions mavlink-core/src/async_peek_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,26 @@
//! The purpose of the buffered/peekable reader is to allow for backtracking parsers.
//!
//! This is the async version of [`crate::peek_reader::PeekReader`].
//! A reader implementing the tokio library's [`tokio::io::AsyncBufRead`]/[`tokio::io::AsyncBufReadExt`] traits seems like a good fit, but
//! A reader implementing the [`futures::io::AsyncBufRead`]/[`futures::io::AsyncBufReadExt`] traits seems like a good fit, but
//! it does not allow for peeking a specific number of bytes, so it provides no way to request
//! more data from the underlying reader without consuming the existing data.
//!
//! This API still tries to adhere to the [`tokio::io::AsyncBufRead`]'s trait philosophy.
//! This API still tries to adhere to the [`futures::io::AsyncBufRead`]'s trait philosophy.
//!
//! The main type [`AsyncPeekReader`] does not implement [`tokio::io::AsyncBufReadExt`] itself, as there is no added benefit
//! The main type [`AsyncPeekReader`] does not implement [`futures::io::AsyncBufReadExt`] itself, as there is no added benefit
//! in doing so.
//!

#[cfg(doc)]
use std::io::ErrorKind;

use tokio::io::AsyncReadExt;
use futures::io::AsyncReadExt;

use crate::error::MessageReadError;

/// A buffered/peekable reader
///
/// This reader wraps a type implementing [`tokio::io::AsyncRead`] and adds buffering via an internal buffer.
/// This reader wraps a type implementing [`futures::io::AsyncRead`] and adds buffering via an internal buffer.
///
/// It allows the user to `peek` a specified number of bytes (without consuming them),
/// to `read` bytes (consuming them), or to `consume` them after `peek`ing.
Expand All @@ -41,8 +41,8 @@ pub struct AsyncPeekReader<R, const BUFFER_SIZE: usize = 280> {
reader: R,
}

impl<R: AsyncReadExt + Unpin, const BUFFER_SIZE: usize> AsyncPeekReader<R, BUFFER_SIZE> {
/// Instantiates a new [`AsyncPeekReader`], wrapping the provided [`tokio::io::AsyncReadExt`] and using the default chunk size
impl<R: futures::io::AsyncRead + Unpin, const BUFFER_SIZE: usize> AsyncPeekReader<R, BUFFER_SIZE> {
/// Instantiates a new [`AsyncPeekReader`], wrapping the provided [`futures::io::AsyncRead`] and using the default chunk size
pub fn new(reader: R) -> Self {
Self {
buffer: [0; BUFFER_SIZE],
Expand All @@ -55,14 +55,14 @@ impl<R: AsyncReadExt + Unpin, const BUFFER_SIZE: usize> AsyncPeekReader<R, BUFFE
/// Peeks an exact amount of bytes from the internal buffer
///
/// If the internal buffer does not contain enough data, this function will read
/// from the underlying [`tokio::io::AsyncReadExt`] until it does, an error occurs or no more data can be read (EOF).
/// from the underlying [`futures::io::AsyncRead`] until it does, an error occurs or no more data can be read (EOF).
///
/// This function does not consume data from the buffer, so subsequent calls to `peek` or `read` functions
/// will still return the peeked data.
///
/// # Errors
///
/// - If any error occurs while reading from the underlying [`tokio::io::AsyncReadExt`] it is returned
/// - If any error occurs while reading from the underlying [`futures::io::AsyncRead`] it is returned
/// - If an EOF occurs and the specified amount could not be read, this function will return an [`ErrorKind::UnexpectedEof`].
///
/// # Panics
Expand All @@ -75,13 +75,13 @@ impl<R: AsyncReadExt + Unpin, const BUFFER_SIZE: usize> AsyncPeekReader<R, BUFFE
/// Reads a specified amount of bytes from the internal buffer
///
/// If the internal buffer does not contain enough data, this function will read
/// from the underlying [`tokio::io::AsyncReadExt`] until it does, an error occurs or no more data can be read (EOF).
/// from the underlying [`futures::io::AsyncRead`] until it does, an error occurs or no more data can be read (EOF).
///
/// This function consumes the data from the buffer, unless an error occurs, in which case no data is consumed.
///
/// # Errors
///
/// - If any error occurs while reading from the underlying [`tokio::io::AsyncReadExt`] it is returned
/// - If any error occurs while reading from the underlying [`futures::io::AsyncRead`] it is returned
/// - If an EOF occurs and the specified amount could not be read, this function will return an [`ErrorKind::UnexpectedEof`].
///
/// # Panics
Expand Down Expand Up @@ -121,14 +121,14 @@ impl<R: AsyncReadExt + Unpin, const BUFFER_SIZE: usize> AsyncPeekReader<R, BUFFE
amount
}

/// Returns an immutable reference to the underlying [`tokio::io::AsyncRead`]
/// Returns an immutable reference to the underlying [`futures::io::AsyncRead`]
///
/// Reading directly from the underlying reader will cause data loss
pub fn reader_ref(&mut self) -> &R {
&self.reader
}

/// Returns a mutable reference to the underlying [`tokio::io::AsyncRead`]
/// Returns a mutable reference to the underlying [`futures::io::AsyncRead`]
///
/// Reading directly from the underlying reader will cause data loss
pub fn reader_mut(&mut self) -> &mut R {
Expand Down
12 changes: 4 additions & 8 deletions mavlink-core/src/connection/direct_serial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,8 @@ impl<M: Message> MavConnection<M> for SerialConnection {
ok @ Ok(..) => {
return ok;
}
Err(MessageReadError::Io(e)) => {
if e.kind() == io::ErrorKind::UnexpectedEof {
return Err(MessageReadError::Io(e));
}
Err(MessageReadError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => {
return Err(MessageReadError::Io(e));
}
_ => {}
}
Expand All @@ -78,10 +76,8 @@ impl<M: Message> MavConnection<M> for SerialConnection {
ok @ Ok(..) => {
return ok;
}
Err(MessageReadError::Io(e)) => {
if e.kind() == io::ErrorKind::UnexpectedEof {
return Err(MessageReadError::Io(e));
}
Err(MessageReadError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => {
return Err(MessageReadError::Io(e));
}
_ => {}
}
Expand Down
12 changes: 4 additions & 8 deletions mavlink-core/src/connection/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,8 @@ impl<M: Message> MavConnection<M> for FileConnection {
ok @ Ok(..) => {
return ok;
}
Err(MessageReadError::Io(e)) => {
if e.kind() == io::ErrorKind::UnexpectedEof {
return Err(MessageReadError::Io(e));
}
Err(MessageReadError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => {
return Err(MessageReadError::Io(e));
}
_ => {}
}
Expand All @@ -84,10 +82,8 @@ impl<M: Message> MavConnection<M> for FileConnection {
ok @ Ok(..) => {
return ok;
}
Err(MessageReadError::Io(e)) => {
if e.kind() == io::ErrorKind::UnexpectedEof {
return Err(MessageReadError::Io(e));
}
Err(MessageReadError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => {
return Err(MessageReadError::Io(e));
}
_ => {}
}
Expand Down
Loading