Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ name = "nitox"
readme = "README.md"
repository = "https://github.com/YellowInnovation/nitox"
version = "0.1.10"
edition = "2018"

[[bench]]
harness = false
Expand All @@ -40,16 +41,13 @@ parking_lot = "0.7"
rand = "0.6"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
tokio-codec = "0.1"
tokio-executor = "0.1"
tokio-tcp = "0.1"
tokio-tls = "0.2"
url = "1.7"

[dependencies.serde_json]
features = ["preserve_order"]
version = "1.0"

[dev-dependencies]
criterion = "0.2"
env_logger = "0.6"
Expand Down
36 changes: 20 additions & 16 deletions benches/nitox_parser_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ use nitox::commands::*;

fn benchmark_parser(c: &mut Criterion) {
c.bench_function("connect_parse", |b| {
let cmd = b"CONNECT\t{\"verbose\":false,\"pedantic\":false,\"tls_required\":false,\"name\":\"nitox\",\"lang\":\"rust\",\"version\":\"1.0.0\"}\r\n";
b.iter(|| ConnectCommand::try_parse(cmd))
let cmd = "CONNECT\t{\"verbose\":false,\"pedantic\":false,\"tls_required\":false,\"name\":\"nitox\",\"lang\":\"rust\",\"version\":\"1.0.0\"}\r\n";
b.iter(|| ConnectCommand::try_parse(cmd.into()))
});

c.bench_function("connect_write", |b| b.iter(|| ConnectCommand::default().into_vec()));

c.bench_function("pub_parse", |b| {
let cmd = b"PUB\tFOO\t11\r\nHello NATS!\r\n";
b.iter(|| PubCommand::try_parse(cmd))
let cmd = "PUB\tFOO\t11\r\nHello NATS!\r\n";
b.iter(|| PubCommand::try_parse(cmd.into()))
});

c.bench_function("pub_write", |b| {
Expand All @@ -25,13 +25,14 @@ fn benchmark_parser(c: &mut Criterion) {
subject: String::new(),
payload: bytes::Bytes::new(),
reply_to: None,
}.into_vec()
}
.into_vec()
})
});

c.bench_function("sub_parse", |b| {
let cmd = b"SUB\tFOO\tpouet\r\n";
b.iter(|| SubCommand::try_parse(cmd))
let cmd = "SUB\tFOO\tpouet\r\n";
b.iter(|| SubCommand::try_parse(cmd.into()))
});

c.bench_function("sub_write", |b| {
Expand All @@ -40,27 +41,29 @@ fn benchmark_parser(c: &mut Criterion) {
queue_group: None,
sid: String::new(),
subject: String::new(),
}.into_vec()
}
.into_vec()
})
});

c.bench_function("unsub_parse", |b| {
let cmd = b"UNSUB\tpouet\r\n";
b.iter(|| UnsubCommand::try_parse(cmd))
let cmd = "UNSUB\tpouet\r\n";
b.iter(|| UnsubCommand::try_parse(cmd.into()))
});

c.bench_function("unsub_write", |b| {
b.iter(|| {
UnsubCommand {
max_msgs: None,
sid: String::new(),
}.into_vec()
}
.into_vec()
})
});

c.bench_function("info_parse", |b| {
let cmd = b"INFO\t{\"server_id\":\"test\",\"version\":\"1.3.0\",\"go\":\"go1.10.3\",\"host\":\"0.0.0.0\",\"port\":4222,\"max_payload\":4000,\"proto\":1,\"client_id\":1337}\r\n";
b.iter(|| ServerInfo::try_parse(cmd))
let cmd = "INFO\t{\"server_id\":\"test\",\"version\":\"1.3.0\",\"go\":\"go1.10.3\",\"host\":\"0.0.0.0\",\"port\":4222,\"max_payload\":4000,\"proto\":1,\"client_id\":1337}\r\n";
b.iter(|| ServerInfo::try_parse(cmd.into()))
});

c.bench_function("info_write", |b| {
Expand All @@ -79,8 +82,8 @@ fn benchmark_parser(c: &mut Criterion) {
});

c.bench_function("message_parse", |b| {
let cmd = b"MSG\tFOO\tpouet\t4\r\ntoto\r\n";
b.iter(|| Message::try_parse(cmd))
let cmd = "MSG\tFOO\tpouet\t4\r\ntoto\r\n";
b.iter(|| Message::try_parse(cmd.into()))
});

c.bench_function("message_write", |b| {
Expand All @@ -90,7 +93,8 @@ fn benchmark_parser(c: &mut Criterion) {
sid: String::new(),
reply_to: None,
payload: bytes::Bytes::new(),
}.into_vec()
}
.into_vec()
})
});
}
Expand Down
16 changes: 8 additions & 8 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ use std::{
use tokio_executor;
use url::Url;

use error::NatsError;
use net::*;
use protocol::{commands::*, Op};
use crate::error::NatsError;
use crate::net::*;
use crate::protocol::{commands::*, Op};

/// Sink (write) part of a TCP stream
type NatsSink = stream::SplitSink<NatsConnection>;
Expand Down Expand Up @@ -113,10 +113,10 @@ impl NatsClientMultiplexer {
(NatsClientMultiplexer { subs_tx, other_tx }, other_rx)
}

pub fn for_sid(&self, sid: NatsSubscriptionId) -> impl Stream<Item = Message, Error = NatsError> + Send + Sync {
pub fn for_sid(&self, sid: &str) -> impl Stream<Item = Message, Error = NatsError> + Send + Sync {
let (tx, rx) = mpsc::unbounded();
(*self.subs_tx.write()).insert(
sid,
sid.into(),
SubscriptionSink {
tx,
max_count: None,
Expand Down Expand Up @@ -330,7 +330,7 @@ impl NatsClient {
let inner_rx = self.rx.clone();
let sid = cmd.sid.clone();
self.tx.send(Op::SUB(cmd)).and_then(move |_| {
let stream = inner_rx.for_sid(sid.clone()).and_then(move |msg| {
let stream = inner_rx.for_sid(&sid).and_then(move |msg| {
{
let mut stx = inner_rx.subs_tx.write();
let mut delete = None;
Expand Down Expand Up @@ -390,7 +390,7 @@ impl NatsClient {
let sid = sub_cmd.sid.clone();

let unsub_cmd = UnsubCommand {
sid: sub_cmd.sid.clone(),
sid: sid.clone(),
max_msgs: Some(1),
};

Expand All @@ -400,7 +400,7 @@ impl NatsClient {

let stream = self
.rx
.for_sid(sid.clone())
.for_sid(&sid)
.inspect(|msg| debug!(target: "nitox", "Request saw msg in multiplexed stream {:#?}", msg))
.take(1)
.into_future()
Expand Down
30 changes: 18 additions & 12 deletions src/codec.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::error::NatsError;
use crate::protocol::Op;
use bytes::{BufMut, BytesMut};
use error::NatsError;
use protocol::{CommandError, Op};
use tokio_codec::{Decoder, Encoder};

/// `tokio-codec` implementation of the protocol parsing
Expand Down Expand Up @@ -41,15 +41,24 @@ impl Decoder for OpCodec {
return Ok(None);
}

debug!(target: "nitox", "next index: {}", self.next_index);
debug!(target: "nitox", "codec buffer is {:?}", buf);

// Let's check if we find a blank space at the beginning
if let Some(command_offset) = buf[self.next_index..]
.iter()
.position(|b| *b == b' ' || *b == b'\t' || *b == b'\r')
{
let command_end = self.next_index + command_offset;

debug!(target: "nitox", "command end: {}", command_end);
debug!(target: "nitox", "codec detected command name {:?}", &buf[..command_end]);

if !Op::command_exists(&buf[..command_end]) {
debug!(target: "nitox", "command was incomplete");
return Ok(None);
}

if let Some(command_body_offset) = buf[command_end..].windows(2).position(|w| w == b"\r\n") {
let mut end_buf_pos = command_end + command_body_offset + 2;

Expand All @@ -65,26 +74,23 @@ impl Decoder for OpCodec {
}

debug!(target: "nitox", "codec detected command body {:?}", &buf[..end_buf_pos]);
match Op::from_bytes(&buf[..command_end], &buf[..end_buf_pos]) {
Err(CommandError::IncompleteCommandError) => {
debug!(target: "nitox", "command was incomplete");
self.next_index = buf.len();
Ok(None)
}

let cmd_buf = buf.split_to(end_buf_pos);
debug!(target: "nitox", "buffer now contains {:?}", buf);
self.next_index = 0;

match Op::from_bytes(cmd_buf.freeze(), command_end) {
Ok(op) => {
debug!(target: "nitox", "codec parsed command {:#?}", op);
let _ = buf.split_to(end_buf_pos);
debug!(target: "nitox", "buffer now contains {:?}", buf);
self.next_index = 0;
Ok(Some(op))
}
Err(e) => {
debug!(target: "nitox", "command couldn't be parsed {}", e);
self.next_index = 0;
Err(e.into())
}
}
} else {
debug!(target: "nitox", "command was incomplete");
Ok(None)
}
} else {
Expand Down
9 changes: 5 additions & 4 deletions src/net/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use parking_lot::RwLock;
use std::{net::SocketAddr, sync::Arc};
use tokio_executor;

use error::NatsError;
use protocol::Op;
use crate::error::NatsError;
use crate::protocol::Op;

use super::connection_inner::NatsConnectionInner;

Expand Down Expand Up @@ -54,7 +54,7 @@ impl NatsConnection {
let inner_arc = Arc::clone(&self.inner);
let inner_state = Arc::clone(&self.state);
let is_tls = self.is_tls;
let maybe_host = self.host.clone();
let maybe_host: Option<String> = self.host.clone();
NatsConnectionInner::connect_tcp(&self.addr)
.and_then(move |socket| {
if is_tls {
Expand All @@ -66,7 +66,8 @@ impl NatsConnection {
} else {
Either::B(future::ok(NatsConnectionInner::from(socket)))
}
}).and_then(move |inner| {
})
.and_then(move |inner| {
{
*inner_arc.write() = inner;
*inner_state.write() = NatsConnectionState::Connected;
Expand Down
6 changes: 3 additions & 3 deletions src/net/connection_inner.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
use codec::OpCodec;
use crate::codec::OpCodec;
use crate::protocol::Op;
use futures::{
future::{self, Either},
prelude::*,
};
use native_tls::TlsConnector as NativeTlsConnector;
use protocol::Op;
use std::net::SocketAddr;
use tokio_codec::{Decoder, Framed};
use tokio_tcp::TcpStream;
use tokio_tls::{TlsConnector, TlsStream};

use error::NatsError;
use crate::error::NatsError;

/// Inner raw stream enum over TCP and TLS/TCP
#[derive(Debug)]
Expand Down
5 changes: 3 additions & 2 deletions src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::sync::Arc;
pub(crate) mod connection;
mod connection_inner;

use error::NatsError;
use crate::error::NatsError;

use self::connection::NatsConnectionState;
use self::connection_inner::*;
Expand Down Expand Up @@ -34,7 +34,8 @@ pub(crate) fn connect_tls(host: String, addr: SocketAddr) -> impl Future<Item =
.and_then(move |socket| {
debug!(target: "nitox", "Connected through TCP, upgrading to TLS");
NatsConnectionInner::upgrade_tcp_to_tls(&host, socket)
}).map(move |socket| {
})
.map(move |socket| {
debug!(target: "nitox", "Connected through TCP over TLS");
NatsConnection {
is_tls: true,
Expand Down
29 changes: 16 additions & 13 deletions src/protocol/client/connect.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use bytes::Bytes;
use protocol::{Command, CommandError};
use crate::protocol::{Command, CommandError};
use bytes::{BufMut, Bytes, BytesMut};
use serde_json as json;

/// The CONNECT message is the client version of the INFO message. Once the client has established a TCP/IP
Expand Down Expand Up @@ -58,10 +58,7 @@ impl ConnectCommandBuilder {
}

fn default_ver(&self) -> Result<String, String> {
match ::std::env::var("CARGO_PKG_VERSION") {
Ok(v) => Ok(v),
Err(_) => Ok("0.1.x".into()),
}
Ok(env!("CARGO_PKG_VERSION").into())
}

fn default_lang(&self) -> Result<String, String> {
Expand All @@ -73,10 +70,16 @@ impl Command for ConnectCommand {
const CMD_NAME: &'static [u8] = b"CONNECT";

fn into_vec(self) -> Result<Bytes, CommandError> {
Ok(format!("CONNECT\t{}\r\n", json::to_string(&self)?).as_bytes().into())
let json_cmd = json::to_vec(&self)?;
let mut cmd: BytesMut = BytesMut::with_capacity(10 + json_cmd.len());
cmd.put("CONNECT\t");
cmd.put(json_cmd);
cmd.put("\r\n");

Ok(cmd.freeze())
}

fn try_parse(buf: &[u8]) -> Result<ConnectCommand, CommandError> {
fn try_parse(buf: Bytes) -> Result<Self, CommandError> {
let len = buf.len();

if buf[len - 2..] != [b'\r', b'\n'] {
Expand All @@ -94,22 +97,22 @@ impl Command for ConnectCommand {
#[cfg(test)]
mod tests {
use super::{ConnectCommand, ConnectCommandBuilder};
use protocol::Command;
use crate::protocol::Command;

static DEFAULT_CONNECT: &'static str = "CONNECT\t{\"verbose\":false,\"pedantic\":false,\"tls_required\":false,\"name\":\"nitox\",\"lang\":\"rust\",\"version\":\"1.0.0\"}\r\n";

#[test]
fn it_parses() {
let parse_res = ConnectCommand::try_parse(DEFAULT_CONNECT.as_bytes());
let parse_res = ConnectCommand::try_parse(DEFAULT_CONNECT.into());
assert!(parse_res.is_ok());
let cmd = parse_res.unwrap();
assert_eq!(cmd.verbose, false);
assert_eq!(cmd.pedantic, false);
assert_eq!(cmd.tls_required, false);
assert!(cmd.name.is_some());
assert_eq!(&cmd.name.unwrap(), "nitox");
assert_eq!(&cmd.lang, "rust");
assert_eq!(&cmd.version, "1.0.0");
assert_eq!(cmd.name.unwrap(), "nitox");
assert_eq!(cmd.lang, "rust");
assert_eq!(cmd.version, "1.0.0");
}

#[test]
Expand Down
Loading