2016-06-16 36 views
16

Nel documentation for RetryWhen l'esempio c'è questa:errore di cattura se retryWhen: s tentativi esaurisce

Observable.create((Subscriber<? super String> s) -> { 
    System.out.println("subscribing"); 
    s.onError(new RuntimeException("always fails")); 
}).retryWhen(attempts -> { 
    return attempts.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i -> { 
     System.out.println("delay retry by " + i + " second(s)"); 
     return Observable.timer(i, TimeUnit.SECONDS); 
    }); 
}).toBlocking().forEach(System.out::println); 

ma come faccio a propagare l'errore se i tentativi si esaurisce?

Aggiunta .doOnError(System.out::println)dopo la clausola retryWhen non cattura l'errore. È persino emesso?

Aggiunta di un .doOnError(System.out::println)prima del riprovaQuando si visualizza always fails per tutti i tentativi.

risposta

1

Un'opzione utilizza Observable.materialize() per convertire gli elementi Observable.range() in notifiche. Poi, una volta onCompleted() viene rilasciato, si può propagare l'errore a valle (nel campione sotto Pair viene utilizzato per avvolgere Observable.range() notifiche e ad eccezione di Observable)

@Test 
    public void retryWhen() throws Exception { 

    Observable.create((Subscriber<? super String> s) -> { 
     System.out.println("subscribing"); 
     s.onError(new RuntimeException("always fails")); 
    }).retryWhen(attempts -> { 
     return attempts.zipWith(Observable.range(1, 3).materialize(), Pair::new) 
      .flatMap(notifAndEx -> { 
      System.out.println("delay retry by " + notifAndEx + " second(s)"); 
      return notifAndEx.getRight().isOnCompleted() 
        ? Observable.<Integer>error(notifAndEx.getLeft()) 
        : Observable.timer(notifAndEx.getRight().getValue(), TimeUnit.SECONDS); 
     }); 
    }).toBlocking().forEach(System.out::println); 
} 

    private static class Pair<L,R> { 
     private final L left; 
     private final R right; 

     public Pair(L left, R right) { 
      this.left = left; 
      this.right = right; 
     } 

     public L getLeft() { 
      return left; 
     } 

     public R getRight() { 
      return right; 
     } 
    } 
0

È possibile ottenere il comportamento desiderato utilizzando il RetryWhen costruttore in rxjava-extras che si trova sulla Maven Central. Usa la versione più recente.

Observable.create((Subscriber<? super String> s) -> { 
    System.out.println("subscribing"); 
    s.onError(new RuntimeException("always fails")); 
}) 
.retryWhen(RetryWhen 
    .delays(Observable.range(1, 3) 
       .map(n -> (long) n), 
      TimeUnit.SECONDS).build()) 
.doOnError(e -> e.printStackTrace()) 
.toBlocking().forEach(System.out::println); 
+0

Ehm, perché il down-vote? Questa è una libreria testata da unità. –

7

I Javadoc per retryWhen afferma che:

If that Observable calls onComplete or onError then retry will call onCompleted or onError on the child subscription.

In parole povere, se si vuole diffondere l'eccezione, è necessario rigenerare l'eccezione originale una volta che hai avuto abbastanza di riprovare.

Un modo semplice è impostare Observable.range in modo che sia 1 maggiore del numero di volte in cui si desidera riprovare.

Quindi nella funzione zip testare il numero corrente di tentativi. Se è uguale a NUMBER_OF_RETRIES + 1, restituisci Observable.error(throwable) o reinvia la tua eccezione.

EG

Observable.create((Subscriber<? super String> s) -> { 
      System.out.println("subscribing"); 
      s.onError(new RuntimeException("always fails")); 
     }).retryWhen(attempts -> { 
      return attempts.zipWith(Observable.range(1, NUMBER_OF_RETRIES + 1), (throwable, attempt) -> { 
       if (attempt == NUMBER_OF_RETRIES + 1) { 
        throw Throwables.propagate(throwable); 
       } 
       else { 
        return attempt; 
       } 
      }).flatMap(i -> { 
       System.out.println("delaying retry by " + i + " second(s)"); 
       return Observable.timer(i, TimeUnit.SECONDS); 
      }); 
     }).toBlocking().forEach(System.out::println); 

Per inciso doOnError non influisce osservabile in qualsiasi modo - vi fornisce semplicemente con un gancio per eseguire alcune azioni in caso di errore. Un esempio comune è la registrazione.

-1

è necessario utilizzare onErrorResumeNext dopo la retryWhen

Nel tuo esempio

Observable.create((Subscriber<? super String> s) -> { 
     System.out.println("subscribing"); 
     s.onError(new RuntimeException("always fails")); 
    }).retryWhen(attempts -> { 
     return attempts.zipWith(Observable.range(1, NUMBER_OF_RETRIES + 1), (n, i) -> { 
      if (i == NUMBER_OF_RETRIES + 1) { 
       throw Throwables.propagate(n); 
      } 
      else { 
       return i; 
      } 
     }).flatMap(i -> { 
      System.out.println("delay retry by " + i + " second(s)"); 
      return Observable.timer(i, TimeUnit.SECONDS); 
     }); 
    }) 
    .onErrorResumeNext(t -> {System.out.println("Error after all retries:" + t.getMessage()); 
               return Observable.error(t); 
              }) 
    .toBlocking().forEach(System.out::println); 

Al fondo di questa classe è possibile vedere un esempio pratico per capire come funziona. https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/errors/ObservableExceptions.java

-1

È possibile utilizzare la funzione di scansione, che restituisce una coppia con indice accumulata e decidere o meno di trasmettere l'errore:

.retryWhen(attempts -> 
    return .scan(Pair.create(0, null), (index, value) -> Pair.create(index.first + 1, value)) 
      .flatMap(pair -> { 
       if(pair.first > MAX_RETRY_COUNT) { 
        throw new RuntimeException(pair.second); 
       } 
       return Observable.timer(pair.first, TimeUnit.SECONDS); 
      }); 

Oppure si può attaccare con zipWith operatore, ma aumentare il numero di range Osservabile e restituire una coppia, invece dell'indice da solo.In questo modo, non perderai le informazioni sul precedente throwable.

attempts 
    .zipWith(Observable.range(1, MAX_RETRY_COUNT + 1), (throwable, i) -> Pair.create(i, throwable)) 
    .flatMap(pair -> { 
     if(pair.first > MAX_RETRY_COUNT) throw new RuntimeException(pair.second); 
     System.out.println("delay retry by " + pair.first + " second(s)"); 
     return Observable.timer(pair.first, TimeUnit.SECONDS); 
    }); 
9

Il doc per retryWhen dice che passi onError notifica ai suoi abbonati e termina. Quindi puoi fare qualcosa del genere:

final int ATTEMPTS = 3; 

    Observable.create((Subscriber<? super String> s) -> { 
     System.out.println("subscribing"); 
     s.onError(new RuntimeException("always fails")); 
    }).retryWhen(attempts -> attempts 
      .zipWith(Observable.range(1, ATTEMPTS), (n, i) -> 
        i < ATTEMPTS ? 
          Observable.timer(i, SECONDS) : 
          Observable.error(n)) 
      .flatMap(x -> x)) 
      .toBlocking() 
      .forEach(System.out::println);