Skip to content

Commit 6524b50

Browse files
authored
Switch Arti restart from flag based to version based approach
Switch Arti restart from flag based to version based approach
2 parents 13dad0e + 04299ca commit 6524b50

2 files changed

Lines changed: 61 additions & 47 deletions

File tree

p2p/src/tor/arti.rs

Lines changed: 42 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use std::path::{Path, PathBuf};
3939
use std::pin::pin;
4040
use std::str::FromStr;
4141
use std::sync::atomic::Ordering;
42-
use std::sync::{mpsc, Arc};
42+
use std::sync::Arc;
4343
use std::time::{Duration, Instant};
4444
use std::{fs, thread};
4545
use tor_config::{BoolOrAuto, ExplicitOrAuto};
@@ -153,8 +153,10 @@ pub fn random_http_probe_url() -> &'static str {
153153
lazy_static! {
154154
// It is a tor server only running instance, in case of libraries can be shared by multiple nodes and wallets
155155
static ref TOR_ARTI_INSTANCE: std::sync::RwLock<Option<ArtiCore>> = std::sync::RwLock::new(None);
156-
// Tor service full restart request
157-
static ref TOR_RESTART_REQUEST: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false);
156+
// Tor instance ID. We can't store it with ArtiCore because of the access
157+
static ref TOR_ARTI_INSTANCE_ID: std::sync::atomic::AtomicU32 = std::sync::atomic::AtomicU32::new(0);
158+
// Tor service full restart request. Value 0 - not requsted. Otherwise next ArtiCore instance_id
159+
static ref TOR_RESTART_REQUEST: std::sync::atomic::AtomicU32 = std::sync::atomic::AtomicU32::new(1);
158160
// Last restarting time (need to understand how long the tor was online without any issue)
159161
static ref TOR_RESTART_TIME: std::sync::RwLock<Option<Instant>> = std::sync::RwLock::new(None);
160162
// Monitoring thread. Only one instance is allowed
@@ -244,7 +246,16 @@ pub(crate) fn get_shutdown_arti_token() -> CancellationToken {
244246

245247
pub fn request_arti_restart(reason: &str) {
246248
info!("Requestion Arti restart. Reason: {}", reason);
247-
TOR_RESTART_REQUEST.store(true, Ordering::Relaxed);
249+
let next_id = TOR_ARTI_INSTANCE_ID.load(Ordering::SeqCst) + 1;
250+
TOR_RESTART_REQUEST.store(next_id, Ordering::SeqCst);
251+
}
252+
253+
pub fn get_next_arti_instance_id() -> u32 {
254+
TOR_RESTART_REQUEST.load(Ordering::SeqCst)
255+
}
256+
257+
pub fn get_current_arti_instance_id() -> u32 {
258+
TOR_ARTI_INSTANCE_ID.load(Ordering::SeqCst)
248259
}
249260

250261
pub fn is_arti_started() -> bool {
@@ -270,12 +281,15 @@ pub fn is_arti_healthy() -> bool {
270281
Ok(guard) => guard.is_some(),
271282
Err(_) => false,
272283
};
273-
let restart_requested = TOR_RESTART_REQUEST.load(Ordering::Relaxed);
274-
has_tor && !restart_requested
284+
let restart_requested = TOR_RESTART_REQUEST.load(Ordering::SeqCst);
285+
let tor_version = TOR_ARTI_INSTANCE_ID.load(Ordering::SeqCst);
286+
has_tor && (restart_requested == tor_version)
275287
}
276288

277289
pub fn is_arti_restarting() -> bool {
278-
TOR_RESTART_REQUEST.load(Ordering::Relaxed)
290+
let restart_requested = TOR_RESTART_REQUEST.load(Ordering::SeqCst);
291+
let tor_version = TOR_ARTI_INSTANCE_ID.load(Ordering::SeqCst);
292+
tor_version < restart_requested
279293
}
280294

281295
pub fn register_arti_active_object(obj_name: String) {
@@ -311,12 +325,15 @@ pub fn start_arti(
311325
let expiration_time = {
312326
let mut atri_writer = TOR_ARTI_INSTANCE.write().unwrap_or_else(|e| e.into_inner());
313327

328+
debug_assert!(atri_writer.is_none());
329+
314330
let create_arti_res =
315331
ArtiCore::new(config, base_dir, print_start_message, cleanup_arti_data);
316332
let (a, expiration_time) = create_arti_res?;
317-
TOR_RESTART_REQUEST.store(false, Ordering::Relaxed);
318-
*TOR_RESTART_TIME.write().unwrap_or_else(|e| e.into_inner()) = Some(Instant::now());
319333
*atri_writer = Some(a);
334+
TOR_RESTART_REQUEST.store(1, Ordering::SeqCst);
335+
TOR_ARTI_INSTANCE_ID.store(1, Ordering::SeqCst);
336+
*TOR_RESTART_TIME.write().unwrap_or_else(|e| e.into_inner()) = Some(Instant::now());
320337
expiration_time
321338
};
322339

@@ -371,6 +388,7 @@ pub fn start_arti(
371388
};
372389
need_arti_restart
373390
|| TOR_RESTART_REQUEST.load(Ordering::Relaxed)
391+
> TOR_ARTI_INSTANCE_ID.load(Ordering::SeqCst)
374392
|| Utc::now().timestamp() > expiration_time
375393
};
376394

@@ -440,7 +458,10 @@ pub fn stop_arti() {
440458

441459
// Stopping the arti
442460
stop_start_arti(false);
443-
TOR_RESTART_REQUEST.store(false, Ordering::Relaxed);
461+
TOR_RESTART_REQUEST.store(
462+
TOR_ARTI_INSTANCE_ID.load(Ordering::SeqCst),
463+
Ordering::SeqCst,
464+
);
444465
// Checking if nothing left alive
445466
debug_assert!(TOR_ARTI_INSTANCE
446467
.read()
@@ -505,7 +526,7 @@ where
505526
// return expiration time
506527
fn restart_arti(start_new_client: bool) -> i64 {
507528
error!("Stopping ARTI...");
508-
let (tor_runtime, config, base_dir, restart_senders) = {
529+
let (tor_runtime, config, base_dir) = {
509530
let context_ids = release_arti_cancelling_all();
510531
let mut guard = TOR_ARTI_INSTANCE.write().unwrap_or_else(|e| e.into_inner()); // ? converts PoisonError to E
511532
init_arti_cancelling_all(context_ids);
@@ -514,12 +535,7 @@ fn restart_arti(start_new_client: bool) -> i64 {
514535
Some(arti) => {
515536
drop(arti.tor_client);
516537
drop(guard);
517-
(
518-
arti.tor_runtime,
519-
arti.config,
520-
arti.base_dir,
521-
arti.restart_senders,
522-
)
538+
(arti.tor_runtime, arti.config, arti.base_dir)
523539
}
524540
None => {
525541
error!("restart_arti called for empty instance. Ignoring this call");
@@ -539,43 +555,34 @@ fn restart_arti(start_new_client: bool) -> i64 {
539555
Ok((arti_core, expiration_time)) => {
540556
info!("New Arti instance is successfully created.");
541557
*TOR_ARTI_INSTANCE.write().unwrap_or_else(|e| e.into_inner()) = Some(arti_core);
542-
TOR_RESTART_REQUEST.store(false, Ordering::Relaxed);
558+
let tor_id = TOR_ARTI_INSTANCE_ID.fetch_add(1, Ordering::SeqCst) + 1;
559+
debug_assert!(TOR_RESTART_REQUEST.load(Ordering::SeqCst) == tor_id);
560+
TOR_RESTART_REQUEST.store(tor_id, Ordering::SeqCst);
543561
*TOR_RESTART_TIME.write().unwrap_or_else(|e| e.into_inner()) = Some(Instant::now());
544562
network_status::update_network_outage_time(Utc::now().timestamp());
545-
for sender in restart_senders {
546-
let _ = sender.send(());
547-
}
548563
return expiration_time;
549564
}
550565
Err(e) => {
551566
error!("Unable to create Arti instance, {}", e);
552567
info!("Waiting for 60 seconds before retry to create Arti instance.");
553-
std::thread::sleep(Duration::from_secs(60));
568+
for _ in 0..60 {
569+
std::thread::sleep(Duration::from_secs(1));
570+
if is_shutdown_arti() {
571+
return Utc::now().timestamp() + 600;
572+
}
573+
}
554574
}
555575
}
556576
}
557577
}
558578

559-
pub fn register_arti_restart_event() -> Result<std::sync::mpsc::Receiver<()>, Error> {
560-
let mut arti_core = TOR_ARTI_INSTANCE.write().unwrap_or_else(|e| e.into_inner());
561-
match &mut *arti_core {
562-
Some(arti_core) => {
563-
let (tx, rx) = mpsc::channel::<()>();
564-
arti_core.restart_senders.push(tx);
565-
Ok(rx)
566-
}
567-
None => Err(Error::TorProcess("Arti is not running".into())),
568-
}
569-
}
570-
571579
/// Embedded tor server - Atri
572580
pub struct ArtiCore {
573581
// Using special runtime because it is the only reliable way to kill the tor_client.
574582
tor_runtime: Runtime,
575583
tor_client: TorClient<PreferredRuntime>,
576584
config: TorConfig,
577585
base_dir: PathBuf,
578-
restart_senders: Vec<std::sync::mpsc::Sender<()>>,
579586
}
580587

581588
const WEB: &str = "web";
@@ -676,7 +683,6 @@ impl ArtiCore {
676683
tor_client,
677684
config: config.clone(),
678685
base_dir: base_dir.into(),
679-
restart_senders: Vec::new(),
680686
},
681687
expiration_time,
682688
)),

p2p/src/tor/onion_service.rs

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use crate::tor::arti;
16+
use crate::tor::arti::is_arti_restarting;
1617
use crate::tor::tcp_data_stream::TcpDataStream;
1718
use crate::{Error, PeerAddr};
1819
use async_std::stream::StreamExt;
@@ -29,7 +30,7 @@ use tor_proto::client::stream::IncomingStreamRequest;
2930
static SERVICE_COUNTER: AtomicU32 = AtomicU32::new(0);
3031

3132
// started_service_callback accepting onion address
32-
fn start_arti<F>(
33+
fn start_onion_service<F>(
3334
context_id: u32,
3435
onion_expanded_key: &[u8; 64],
3536
service_name: &str,
@@ -116,12 +117,15 @@ where
116117
};
117118

118119
loop {
120+
while is_arti_restarting() && !stop_state.is_stopped() {
121+
thread::sleep(Duration::from_millis(500));
122+
}
119123
if stop_state.is_stopped() {
120124
break;
121125
}
122126

123127
info!("Starting Arti service {}...", service_name);
124-
match start_arti(
128+
match start_onion_service(
125129
context_id,
126130
&onion_expanded_key,
127131
service_name,
@@ -135,8 +139,6 @@ where
135139
arti::register_arti_active_object(onion_service_object.clone());
136140
arti::register_arti_active_object(incoming_requests_object.clone());
137141

138-
let restarted_rc = arti::register_arti_restart_event()?;
139-
140142
let stop_state2 = stop_state.clone();
141143
let context_id2 = context_id;
142144
let service_name2 = String::from(service_name);
@@ -206,9 +208,7 @@ where
206208
if need_arti_restart || arti::is_arti_restarting() {
207209
drop(onion_service);
208210
arti::unregister_arti_active_object(&onion_service_object);
209-
if !arti::is_arti_restarting() {
210-
arti::request_arti_restart("Onion service is dead, restarting");
211-
}
211+
arti::request_arti_restart("Onion service is dead, restarting");
212212
break;
213213
}
214214

@@ -289,6 +289,7 @@ where
289289
if monitoring.join().is_err() {
290290
break;
291291
}
292+
let expected_tor_instance_id = arti::get_next_arti_instance_id();
292293
arti::unregister_arti_active_object(&incoming_requests_object);
293294

294295
warn!(
@@ -298,9 +299,10 @@ where
298299

299300
// Waiting while arti is started
300301
while !stop_state.is_stopped() {
301-
if restarted_rc.recv_timeout(Duration::from_secs(1)).is_ok() {
302+
if arti::get_current_arti_instance_id() == expected_tor_instance_id {
302303
break;
303304
}
305+
thread::sleep(Duration::from_millis(300));
304306
}
305307

306308
if stop_state.is_stopped() {
@@ -316,7 +318,6 @@ where
316318
if stop_state.is_stopped() {
317319
break;
318320
}
319-
thread::sleep(Duration::from_secs(1));
320321
}
321322
Err(e) => {
322323
if stop_state.is_stopped() {
@@ -329,9 +330,16 @@ where
329330
}
330331
}
331332

332-
error!("Unable to restart onion service. Will retry soon. {}", e);
333+
error!("Unable to restart onion service. Will restart Arti. {}", e);
333334
// restarting arti first
334-
arti::request_arti_restart(&format!("Unable to restart onion service, {}", e));
335+
arti::request_arti_restart(&format!("Unable to start onion service, {}", e));
336+
let expected_tor_instance_id = arti::get_next_arti_instance_id();
337+
while !stop_state.is_stopped() {
338+
if arti::get_current_arti_instance_id() == expected_tor_instance_id {
339+
break;
340+
}
341+
thread::sleep(Duration::from_millis(300));
342+
}
335343
}
336344
}
337345
}

0 commit comments

Comments
 (0)