@@ -212,8 +212,13 @@ private void runAS(ParseAssignments.HostEntry remote) throws IOException {
212212 if (msgs [0 ] != null && bestPath .get () != null && REPEAT > 1 ) {
213213 try (ScionProvider .Sync sender = service .getSync ()) {
214214 for (int i = 1 ; i < msgs .length ; i ++) {
215- List <Scmp .TracerouteMessage > messages = sender .sendTracerouteRequest (bestPath .get ());
216- msgs [i ] = messages .get (messages .size () - 1 );
215+ try {
216+ List <Scmp .TracerouteMessage > messages = sender .sendTracerouteRequest (bestPath .get ());
217+ msgs [i ] = messages .get (messages .size () - 1 );
218+ } catch (IOException e ) {
219+ msgs [i ] = Scmp .TracerouteMessage .createEmpty (bestPath .get ());
220+ msgs [i ].setTimedOut (1_000_000_000 );
221+ }
217222 }
218223 }
219224 }
@@ -284,7 +289,7 @@ private Scmp.TimedMessage findPaths(List<Path> paths, Ref<Path> bestOut, long is
284289 case FASTEST_TR :
285290 return findFastestTR (paths , bestOut , isdAs );
286291 case FASTEST_TR_ASYNC :
287- return findFastestTRasync (paths , bestOut , isdAs );
292+ return findFastestTraceAsync (paths , bestOut , isdAs );
288293 case SHORTEST_TR :
289294 return findShortestTR (paths , bestOut , isdAs );
290295 case SHORTEST_ECHO :
@@ -374,37 +379,9 @@ private Scmp.TracerouteMessage findFastestTR(List<Path> paths, Ref<Path> refBest
374379 }
375380 }
376381
377- private Scmp .TracerouteMessage findFastestTRasync (
382+ private Scmp .TracerouteMessage findFastestTraceAsync (
378383 List <Path > paths , Ref <Path > refBest , long isdAs ) {
379- ConcurrentHashMap <Integer , Scmp .TimedMessage > messages = new ConcurrentHashMap <>();
380- CountDownLatch barrier = new CountDownLatch (paths .size ());
381- AtomicInteger errors = new AtomicInteger ();
382- ScmpSenderAsync .ResponseHandler handler =
383- new ScmpSenderAsync .ResponseHandler () {
384- @ Override
385- public void onResponse (Scmp .TimedMessage msg ) {
386- barrier .countDown ();
387- messages .put (msg .getSequenceNumber (), msg );
388- }
389-
390- @ Override
391- public void onTimeout (Scmp .TimedMessage msg ) {
392- barrier .countDown ();
393- messages .put (msg .getSequenceNumber (), msg );
394- }
395-
396- @ Override
397- public void onError (Scmp .ErrorMessage msg ) {
398- errors .incrementAndGet ();
399- barrier .countDown ();
400- }
401-
402- @ Override
403- public void onException (Throwable t ) {
404- errors .incrementAndGet ();
405- barrier .countDown ();
406- }
407- };
384+ PingResponseHandler handler = new PingResponseHandler (paths .size ());
408385
409386 // Send all requests
410387 try (ScionProvider .Async sender = service .getAsync (handler )) {
@@ -414,26 +391,20 @@ public void onException(Throwable t) {
414391 }
415392
416393 // Wait for all messages to be received, BEFORE closing the "sender".
417- if (!barrier .await (1100 , TimeUnit .MILLISECONDS )) {
418- throw new IllegalStateException (
419- "Missing messages: " + barrier .getCount () + "/" + paths .size ());
420- }
394+ handler .await ();
421395 } catch (IOException e ) {
422396 println ("ERROR: " + e .getMessage ());
423397 summary .incAsError (isdAs );
424398 return null ;
425- } catch (InterruptedException e ) {
426- Thread .currentThread ().interrupt ();
427- throw new IllegalStateException (e );
428399 }
429400
430- if (errors . get () > 0 && messages .isEmpty ()) {
401+ if (handler . hasErrors () && handler . messages .isEmpty ()) {
431402 summary .incAsError (isdAs );
432403 return null ;
433404 }
434405
435406 Scmp .TracerouteMessage best = null ;
436- for (Scmp .TimedMessage tm : messages .values ()) {
407+ for (Scmp .TimedMessage tm : handler . messages .values ()) {
437408 Scmp .TracerouteMessage msg = (Scmp .TracerouteMessage ) tm ;
438409 summary .checkTotalMax (msg .getIsdAs (), msg );
439410
@@ -455,4 +426,55 @@ public void onException(Throwable t) {
455426 }
456427 return best ;
457428 }
429+
430+ private static class PingResponseHandler implements ScmpSenderAsync .ResponseHandler {
431+ private final Map <Integer , Scmp .TimedMessage > messages = new ConcurrentHashMap <>();
432+ private final CountDownLatch barrier ;
433+ private final AtomicInteger errors = new AtomicInteger ();
434+ private final int nPaths ;
435+
436+ private PingResponseHandler (int nPaths ) {
437+ this .nPaths = nPaths ;
438+ barrier = new CountDownLatch (nPaths );
439+ }
440+
441+ @ Override
442+ public void onResponse (Scmp .TimedMessage msg ) {
443+ barrier .countDown ();
444+ messages .put (msg .getSequenceNumber (), msg );
445+ }
446+
447+ @ Override
448+ public void onTimeout (Scmp .TimedMessage msg ) {
449+ barrier .countDown ();
450+ messages .put (msg .getSequenceNumber (), msg );
451+ }
452+
453+ @ Override
454+ public void onError (Scmp .ErrorMessage msg ) {
455+ errors .incrementAndGet ();
456+ barrier .countDown ();
457+ }
458+
459+ @ Override
460+ public void onException (Throwable t ) {
461+ errors .incrementAndGet ();
462+ barrier .countDown ();
463+ }
464+
465+ void await () {
466+ try {
467+ if (!barrier .await (1100 , TimeUnit .MILLISECONDS )) {
468+ throw new IllegalStateException ("Missing messages: " + barrier .getCount () + "/" + nPaths );
469+ }
470+ } catch (InterruptedException e ) {
471+ Thread .currentThread ().interrupt ();
472+ throw new IllegalStateException (e );
473+ }
474+ }
475+
476+ public boolean hasErrors () {
477+ return errors .get () > 0 ;
478+ }
479+ }
458480}
0 commit comments