Skip to content

Commit f63eb95

Browse files
committed
Using Arc Mutex seems more appropriate
1 parent 6b08973 commit f63eb95

5 files changed

Lines changed: 26 additions & 33 deletions

File tree

Cargo.lock

Lines changed: 3 additions & 20 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

base_agent/rs/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[package]
22
name = "tether-agent"
33
description = "Standardised use of MQTT and MessagePack for inter-process communication"
4-
version = "0.14.1"
4+
version = "0.14.2"
55
edition = "2021"
66
license = "MIT"
77
repository = "https://github.com/RandomStudio/tether"

base_agent/rs/src/agent/mod.rs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use rmp_serde::to_vec_named;
44
use rumqttc::tokio_rustls::rustls::ClientConfig;
55
use rumqttc::{Client, Event, MqttOptions, Packet, QoS, Transport};
66
use serde::Serialize;
7+
use std::sync::{Arc, Mutex};
78
use std::{sync::mpsc, thread, time::Duration};
89
use uuid::Uuid;
910

@@ -29,6 +30,7 @@ pub struct TetherAgent {
2930
pub(crate) client: Option<Client>,
3031
message_sender: mpsc::Sender<(TetherOrCustomTopic, Vec<u8>)>,
3132
message_receiver: mpsc::Receiver<(TetherOrCustomTopic, Vec<u8>)>,
33+
is_connected: Arc<Mutex<bool>>,
3234
}
3335

3436
#[derive(Clone)]
@@ -148,6 +150,7 @@ impl TetherAgentOptionsBuilder {
148150
message_sender,
149151
message_receiver,
150152
mqtt_client_id: self.mqtt_client_id,
153+
is_connected: Arc::new(Mutex::new(false)),
151154
};
152155

153156
if self.auto_connect {
@@ -283,19 +286,18 @@ impl TetherAgent {
283286

284287
let message_tx = self.message_sender.clone();
285288

286-
let (connected_tx, connected_rx) = mpsc::channel();
289+
let connection_state = Arc::clone(&self.is_connected);
287290

288291
thread::spawn(move || {
289-
let send_connected = connected_tx.clone();
290292
for event in connection.iter() {
291293
match event {
292294
Ok(e) => match e {
293295
Event::Incoming(incoming) => match incoming {
294296
Packet::ConnAck(_) => {
295297
info!("(Connected) ConnAck received!");
296-
send_connected
297-
.send(true)
298-
.expect("failed to push connected status form thread");
298+
let mut is_c =
299+
connection_state.lock().expect("failed to lock mutex");
300+
*is_c = true;
299301
}
300302
Packet::Publish(p) => {
301303
debug!("Incoming Publish packet (message received), {:?}", &p);
@@ -334,11 +336,15 @@ impl TetherAgent {
334336
let mut is_ready = false;
335337

336338
while !is_ready {
337-
std::thread::sleep(Duration::from_millis(100));
339+
debug!("Check whether connected...");
340+
std::thread::sleep(Duration::from_millis(1));
338341
trace!("Is ready? {}", is_ready);
339-
if let Ok(is_connected) = connected_rx.try_recv() {
340-
is_ready = is_connected;
341-
trace!("Is connected? {}", is_connected);
342+
let get_state = *self.is_connected.lock().expect("failed to lock mutex");
343+
if get_state {
344+
info!("Connection status confirmed");
345+
is_ready = true;
346+
} else {
347+
debug!("Not connected yet...");
342348
}
343349
}
344350

utilities/tether-utils/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[package]
22
name = "tether-utils"
33
description = "Utilities for Tether Systems"
4-
version = "0.11.3"
4+
version = "0.11.4"
55
edition = "2021"
66
license = "MIT"
77
repository = "https://github.com/RandomStudio/tether"
@@ -14,8 +14,8 @@ name = "tether"
1414

1515

1616
[dependencies]
17-
# tether-agent = { path = "../../base_agent/rs" }
18-
tether-agent = "0.14.1"
17+
tether-agent = { path = "../../base_agent/rs" }
18+
# tether-agent = "0.14.1"
1919
serde = { version = "1.0", features = ["derive"] }
2020
serde_json = "1.0.91"
2121
rmp-serde = "1.1.1"

utilities/tether-utils/src/bin/tether/main.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,10 @@ fn main() {
9999
Commands::Send(options) => {
100100
tether_send::send(options, &mut tether_agent)
101101
.unwrap_or_else(|e| error!("Failed to send: {}", e));
102+
// This silliness (below) is only needed because the command might
103+
// finish before the message is even sent!
104+
// Less of an issue with long-running applications, and should
105+
// be properly solved with an async version.
102106
std::thread::sleep(std::time::Duration::from_millis(1000));
103107
}
104108
Commands::Topics(options) => {

0 commit comments

Comments
 (0)