2015-03-10 6 views
16

Mi piacerebbe sapere come ignorare le eccezioni e continuare lo streaming infinito (nel mio caso flusso di posizioni)?Come ignorare l'errore e continuare lo streaming infinito?

Sto recuperando la posizione dell'utente corrente (utilizzando Android-ReactiveLocation) e quindi inviandole alla mia API (utilizzando Retrofit).

Nel mio caso, quando si verifica un'eccezione durante una chiamata di rete (ad es. Timeout) viene invocato il metodo onError e il flusso si arresta automaticamente. Come evitarlo?

attività:

private RestService mRestService; 
private Subscription mSubscription; 
private LocationRequest mLocationRequest = LocationRequest.create() 
      .setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY) 
      .setInterval(100); 
... 
private void start() { 
    mRestService = ...; 
    ReactiveLocationProvider reactiveLocationProvider = new ReactiveLocationProvider(this); 
    mSubscription = reactiveLocationProvider.getUpdatedLocation(mLocationRequest) 
      .buffer(50) 
      .flatMap(locations -> mRestService.postLocations(locations)) // can throw exception 
      .subscribeOn(Schedulers.newThread()) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribe(); 
} 

RestService:

public interface RestService { 
    @POST("/.../") 
    Observable<Response> postLocations(@Body List<Location> locations); 
} 
+0

vedere la risposta simile con eccezioni di eventi click: http://stackoverflow.com/questions/26154236/how-to-subscribe-to-click-events-so-exceptions-dont-unsubscribe/26229584#26229584 –

risposta

7

mRestService.postLocations(locations) emettere un elemento, quindi completare. Se si verifica un errore, emette l'errore, che completa il flusso.

Come si chiama questo metodo in un flatMap, l'errore continua al flusso "principale" e quindi il flusso si interrompe.

Quello che puoi fare è trasformare il tuo errore in un altro oggetto (come descritto qui: https://stackoverflow.com/a/28971140/476690), ma non nel tuo stream principale (come presumo tu abbia già provato) ma sul mRestService.postLocations(locations).

In questo modo, questa chiamata emetterà un errore, che verrà trasformato in un oggetto/un altro osservabile e quindi completato. (senza chiamare onError).

In una vista consumer, mRestService.postLocations(locations) emetterà un elemento, quindi verrà completato, come se tutto avesse successo.

mSubscription = reactiveLocationProvider.getUpdatedLocation(mLocationRequest) 
     .buffer(50) 
     .flatMap(locations -> mRestService.postLocations(locations).onErrorReturn((e) -> Collections.emptyList()) // can throw exception 
     .subscribeOn(Schedulers.newThread()) 
     .observeOn(AndroidSchedulers.mainThread()) 
     .subscribe(); 
34

Si consiglia di utilizzare uno dei error handling operators.

  • onErrorResumeNext( ) - incarica un'osservabile ad emettere una sequenza di elementi, se si verifica un errore
  • onErrorReturn( ) - incarica un'osservabile per emettere una voce particolare quando incontra un errore
  • onExceptionResumeNext( ) - incarica un'osservabile per continuare emettere elementi dopo che incontra un'eccezione (ma non un'altra varietà di gettabili)
  • retry( ) - se una fonte osservabile emette un errore, iscriversi ad essa nella speranza che si completi senza errori
  • retryWhen( ) - se una fonte osservabile emette un errore, passare che l'errore ad un altro Observable per determinare se sottoscrivere nuovamente alla fonte

Especialy retry e onExceptionResumeNext sembrano promettenti nel tuo caso.

+1

Quando I aggiungi 'onExceptionResumeNext (Observable.empty())' dopo il 'flatMap (posizioni -> mRestService.postLocations (posizioni))' 'onCompleted' viene richiamato e il flusso termina. – Ziem

+15

Non aggiungerlo dopo 'flatMap', ma all'interno di' flatMap'. –

+0

Grazie! onErrorResumeNext() - costruzione molto utile per fallback. – Levor

1

Prova a chiamare il servizio di riposo in una chiamata Observable.defer. In questo modo per ogni chiamata avrai la possibilità di utilizzare il proprio 'onErrorResumeNext' e gli errori non causeranno il completamento del flusso principale.

reactiveLocationProvider.getUpdatedLocation(mLocationRequest) 
    .buffer(50) 
    .flatMap(locations -> 
    Observable.defer(() -> mRestService.postLocations(locations)) 
     .onErrorResumeNext(<SOME_DEFAULT_TO_REACT_TO>) 
) 
........ 

Tale soluzione è originario di questa discussione ->RxJava Observable and Subscriber for skipping exception?, ma penso che funzionerà nel tuo caso troppo.

6

Basta incollare il collegamento da informazioni @ risposta di Miken in caso di smarrimento:

import rx.Observable.Operator; 
import rx.functions.Action1; 

public final class OperatorSuppressError<T> implements Operator<T, T> { 
    final Action1<Throwable> onError; 

    public OperatorSuppressError(Action1<Throwable> onError) { 
     this.onError = onError; 
    } 

    @Override 
    public Subscriber<? super T> call(final Subscriber<? super T> t1) { 
     return new Subscriber<T>(t1) { 

      @Override 
      public void onNext(T t) { 
       t1.onNext(t); 
      } 

      @Override 
      public void onError(Throwable e) { 
       onError.call(e); 
      } 

      @Override 
      public void onCompleted() { 
       t1.onCompleted(); 
      } 

     }; 
    } 
} 

e utilizzarlo vicino alla fonte osservabile perché altri operatori possono cancellarsi con entusiasmo prima.

Observerable.create(connectToUnboundedStream()).lift(new OperatorSuppressError(log()).doOnNext(someStuff()).subscribe(); 

Si noti, tuttavia, che questo sopprime la consegna errore dalla fonte . Se uno qualsiasi su Next nella catena dopo che genera un'eccezione, è ancora probabile che la fonte verrà annullata.

+0

Funziona ma dopo che un errore è stato eliminato, la mia sorgente osservabile sembra smettere di funzionare. C'è un modo per riavviarlo? – Matthias

0

Una leggera modifica della soluzione (@MikeN) per consentire i flussi finiti per completare:

import rx.Observable.Operator; 
import rx.functions.Action1; 

public final class OperatorSuppressError<T> implements Operator<T, T> { 
    final Action1<Throwable> onError; 

    public OperatorSuppressError(Action1<Throwable> onError) { 
     this.onError = onError; 
    } 

    @Override 
    public Subscriber<? super T> call(final Subscriber<? super T> t1) { 
     return new Subscriber<T>(t1) { 

      @Override 
      public void onNext(T t) { 
       t1.onNext(t); 
      } 

      @Override 
      public void onError(Throwable e) { 
       onError.call(e); 
       //this will allow finite streams to complete 
       t1.onCompleted(); 
      } 

      @Override 
      public void onCompleted() { 
       t1.onCompleted(); 
      } 

     }; 
    } 
} 
1

aggiungere la mia soluzione per questo problema:

privider 
    .compose(ignoreErrorsTransformer) 
    .subscribe() 

private final Observable.Transformer<ResultType, ResultType> ignoreErrorsTransformer = 
     new Observable.Transformer<ResultType, ResultType>() { 
      @Override 
      public Observable<ResultType> call(Observable<ResultType> resultTypeObservable) { 
       return resultTypeObservable 
         .materialize() 
         .filter(new Func1<Notification<ResultType>, Boolean>() { 
          @Override 
          public Boolean call(Notification<ResultType> resultTypeNotification) { 
           return !resultTypeNotification.isOnError(); 
          } 
         }) 
         .dematerialize(); 

      } 
     }; 
+0

Cosa fa l'operatore materialize? –

+0

Non lavoro, il flusso finisce –

1

Se si desidera semplicemente ignorare l'errore all'interno di flatMapsenza restituire un elemento fare ciò:

flatMap(item -> 
    restService.getSomething(item).onErrorResumeNext(Observable.empty()) 
);