2015-10-18 11 views
7

possiamo usare la cache dell'operatore() per evitare l'esecuzione di un lungo lavoro (richiesta HTTP) più volte, e riutilizzare il suo risultato:osservabile, riprovare in caso di errore e la cache solo se conseguiti

Observable apiCall = createApiCallObservable().cache(); // notice the .cache() 

--------------------------------------------- 
// the first time we need it 
apiCall.andSomeOtherStuff() 
       .subscribe(subscriberA); 

--------------------------------------------- 
//in the future when we need it again 
apiCall.andSomeDifferentStuff() 
       .subscribe(subscriberB); 

La prima volta, la richiesta http viene eseguita, ma la seconda volta, poiché abbiamo utilizzato l'operatore cache(), la richiesta non verrà eseguita, ma riusciamo a riutilizzare il primo risultato.

Questo funziona correttamente quando la prima richiesta viene completata correttamente. Ma se onError viene chiamato al primo tentativo, la prossima volta che un nuovo utente si iscrive allo stesso osservabile, l'onError verrà richiamato senza tentare nuovamente la richiesta http.

Quello che stiamo cercando di fare è che se onError viene chiamato la prima volta, la prossima volta che qualcuno si iscrive allo stesso osservabile, la richiesta http verrà tentata da zero. cioè l'osservabile memorizzerà solo le chiamate api riuscite, cioè quelle per le quali è stato chiamato onCompleted.

Qualche idea su come procedere? Abbiamo provato a utilizzare gli operatori retry() e cache() senza troppa fortuna.

risposta

4

Questa è la soluzione che abbiamo finito con, dopo l'estensione soluzione akarnokd:

public class OnErrorRetryCache<T> { 

    public static <T> Observable<T> from(Observable<T> source) { 
     return new OnErrorRetryCache<>(source).deferred; 
    } 

    private final Observable<T> deferred; 
    private final Semaphore singlePermit = new Semaphore(1); 

    private Observable<T> cache = null; 
    private Observable<T> inProgress = null; 

    private OnErrorRetryCache(Observable<T> source) { 
     deferred = Observable.defer(() -> createWhenObserverSubscribes(source)); 
    } 

    private Observable<T> createWhenObserverSubscribes(Observable<T> source) 
    { 
     singlePermit.acquireUninterruptibly(); 

     Observable<T> cached = cache; 
     if (cached != null) { 
      singlePermit.release(); 
      return cached; 
     } 

     inProgress = source 
       .doOnCompleted(this::onSuccess) 
       .doOnTerminate(this::onTermination) 
       .replay() 
       .autoConnect(); 

     return inProgress; 
    } 

    private void onSuccess() { 
     cache = inProgress; 
    } 

    private void onTermination() { 
     inProgress = null; 
     singlePermit.release(); 
    } 
} 

avevamo bisogno di memorizzare nella cache il risultato di una richiesta HTTP dal retrofit. Quindi questo è stato creato, con un osservabile che emette un singolo elemento in mente.

Se un osservatore si è iscritto mentre era in esecuzione la richiesta http, volevamo che esso attendesse e non eseguisse la richiesta due volte, a meno che non fosse in corso uno in corso. Per fare ciò il semaforo consente l'accesso singolo al blocco che crea o restituisce l'osservabile memorizzato nella cache, e se viene creato un nuovo osservabile, aspettiamo che si concluda. I test per quanto sopra possono essere trovati here

3

Devi fare un po 'di gestione dello stato. Ecco come farei:

public class CachedRetry { 

    public static final class OnErrorRetryCache<T> { 
     final AtomicReference<Observable<T>> cached = 
       new AtomicReference<>(); 

     final Observable<T> result; 

     public OnErrorRetryCache(Observable<T> source) { 
      result = Observable.defer(() -> { 
       for (;;) { 
        Observable<T> conn = cached.get(); 
        if (conn != null) { 
         return conn; 
        } 
        Observable<T> next = source 
          .doOnError(e -> cached.set(null)) 
          .replay() 
          .autoConnect(); 

        if (cached.compareAndSet(null, next)) { 
         return next; 
        } 
       } 
      }); 
     } 

     public Observable<T> get() { 
      return result; 
     } 
    } 

    public static void main(String[] args) { 
     AtomicInteger calls = new AtomicInteger(); 
     Observable<Integer> source = Observable 
       .just(1) 
       .doOnSubscribe(() -> 
        System.out.println("Subscriptions: " + (1 + calls.get()))) 
       .flatMap(v -> { 
        if (calls.getAndIncrement() == 0) { 
         return Observable.error(new RuntimeException()); 
        } 
        return Observable.just(42); 
       }); 

     Observable<Integer> o = new OnErrorRetryCache<>(source).get(); 

     o.subscribe(System.out::println, 
       Throwable::printStackTrace, 
       () -> System.out.println("Done")); 

     o.subscribe(System.out::println, 
       Throwable::printStackTrace, 
       () -> System.out.println("Done")); 

     o.subscribe(System.out::println, 
       Throwable::printStackTrace, 
       () -> System.out.println("Done")); 
    } 
} 

Funziona memorizzando nella cache una sorgente di successo e restituisce a tutti. Altrimenti, una fonte (parzialmente) fallita crearà la cache e l'osservatore della chiamata successiva attiverà una riscrittura.

+0

Grazie akarnokd, questo sembra buono. Ho solo alcuni problemi quando l'origine è una richiesta HTTP di lunga durata (alcuni secondi), e la seconda, i terzi iscritti si iscrivono mentre i primi sono ancora in corso. In tal caso falliscono tutti e la richiesta non viene tentata più di una volta. L'operatore cache() si comporta diversamente. Vedrò di più e tenterò di replicare il problema che sto menzionando usando il tuo esempio, e ti ricontatteremo presto. – Plato

0

Avete considerato l'utilizzo di AsyncSubject per implementare la cache per la richiesta di rete? Ho creato un'applicazione di esempio RxApp per verificare come potrebbe funzionare. Io uso un modello singleton per ottenere la risposta dalla rete. Ciò rende possibile memorizzare le risposte nella cache, accedere ai dati da più frammenti, sottoscrivere richieste in sospeso e anche fornire dati fittizi per i test dell'interfaccia utente automatizzati.

+0

Se abbiamo usato un AsyncSubject e la prima richiesta http non è riuscita, a tutti i futuri abbonati verrebbe notificato l'errore, invece della chiamata http tentata di nuovo, giusto? – Plato

+0

Questo è corretto ma il modello dovrebbe fornire ad es. metodo reset() che l'interfaccia utente può chiamare dopo aver gestito l'errore mostrando un errore all'utente. – pmellaaho

5

Bene, per chiunque sia ancora interessato, penso di avere un modo migliore per raggiungerlo con rx.

La nota fondamentale è di usare onErrorResumeNext, che consente di sostituire l'Osservabile in caso di errore. quindi dovrebbe essere simile a questa:

Observable<Object> apiCall = createApiCallObservable().cache(1); 
//future call 
apiCall.onErrorResumeNext(new Func1<Throwable, Observable<? extends Object>>() { 
    public Observable<? extends Object> call(Throwable throwable) { 
     return createApiCallObservable(); 
     } 
    }); 

questo modo, se la prima chiamata non è riuscita la chiamata futuro sarà solo richiamarlo (solo una volta).

ma ogni altro chiamante che cercherà di utilizzare il primo osservabile avrà esito negativo e farà una richiesta diversa.

hai fatto riferimento all'originale osservabile, aggiorniamolo.

così, un getter pigro:

Observable<Object> apiCall; 
private Observable<Object> getCachedApiCall() { 
    if (apiCall == null){ 
     apiCall = createApiCallObservable().cache(1); 
    } 
    return apiCall; 
} 

ora, un getter che riprovare se il precedente era fallito:

private Observable<Object> getRetryableCachedApiCall() { 
    return getCachedApiCall().onErrorResumeNext(new Func1<Throwable, Observable<? extends Object>>() { 
     public Observable<? extends Object> call(Throwable throwable) { 
      apiCall = null; 
      return getCachedApiCall(); 
     } 
    }); 
} 

Si prega di notare che sarà ripetere solo una volta per ogni volta che è chiamato.

Così ora il vostro codice sarà simile a questo:

--------------------------------------------- 
// the first time we need it - this will be without a retry if you want.. 
getCachedApiCall().andSomeOtherStuff() 
       .subscribe(subscriberA); 

--------------------------------------------- 
//in the future when we need it again - for any other call so we will have a retry 
getRetryableCachedApiCall().andSomeDifferentStuff() 
       .subscribe(subscriberB);