2015-09-22 8 views
7

Supponiamo che io ho il seguente codice RxJava (che accede a un DB, ma il caso esatto uso è irrilevante):Il thread di annullamento della sottoscrizione è sicuro in RxJava?

public Observable<List<DbPlaceDto>> getPlaceByStringId(final List<String> stringIds) { 
    return Observable.create(new Observable.OnSubscribe<List<DbPlaceDto>>() { 
     @Override 
     public void call(Subscriber<? super List<DbPlaceDto>> subscriber) { 
      try { 
       Cursor c = getPlacseDb(stringIds); 

       List<DbPlaceDto> dbPlaceDtoList = new ArrayList<>(); 
       while (c.moveToNext()) { 
        dbPlaceDtoList.add(getDbPlaceDto(c)); 
       } 
       c.close(); 

       if (!subscriber.isUnsubscribed()) { 
        subscriber.onNext(dbPlaceDtoList); 
        subscriber.onCompleted(); 
       } 
      } catch (Exception e) { 
       if (!subscriber.isUnsubscribed()) { 
        subscriber.onError(e); 
       } 
      } 
     } 
    }); 
} 

Dato questo codice, ho le seguenti domande:

  1. se qualcuno l'annullamento dell'iscrizione dall'osservabile restituito da questo metodo (dopo un precedente abbonamento), è tale operazione thread-safe? Quindi i miei controlli 'isUnsubscribed()' sono corretti in questo senso, indipendentemente dalla pianificazione?

  2. Esiste un modo più pulito con un codice inferiore per verificare la presenza di stati non sottoscritti rispetto a quello che sto utilizzando qui? Non ho trovato nulla nel framework. Ho pensato che SafeSubscriber risolva il problema di non inoltrare eventi quando l'iscritto non è iscritto, ma a quanto pare non lo fa.

risposta

8

è che il funzionamento thread-safe?

Sì. Si sta ricevendo un rx.Subscriber che (eventualmente) verifica contro un booleano volatile impostato su true quando l'abbonamento dell'iscritto è annullato.

modo pulito riducendo codice standard per controllare gli stati sottoscritti

Il SyncOnSubscribe e AsyncOnSubscribe (disponibile come @Experimental api dalla versione 1.0.15) è stato creato per questo caso d'uso. Funzionano come alternativa sicura alla chiamata Observable.create. Ecco un esempio (forzato) del caso sincrono.

public static class FooState { 
    public Integer next() { 
     return 1; 
    } 
    public void shutdown() { 

    } 
    public FooState nextState() { 
     return new FooState(); 
    } 
} 
public static void main(String[] args) { 
    OnSubscribe<Integer> sos = SyncOnSubscribe.createStateful(FooState::new, 
      (state, o) -> { 
       o.onNext(state.next()); 
       return state.nextState(); 
      }, 
      state -> state.shutdown()); 
    Observable<Integer> obs = Observable.create(sos); 
} 

noti che la funzione successiva SyncOnSubscribe non è consentito richiamare più observer.onNext di una volta per iterazione né può rimettere in quel osservatore contemporaneamente. Ecco un paio di link allo SyncOnSubscribeimplementation e tests sulla testata del ramo 1.x. L'utilizzo principale è semplificare la scrittura di osservabili che eseguono l'iterazione o l'analisi dei dati in modo sincrono e onNext in downstream, ma lo fanno in un framework che supporta la contropressione e controlla se non è stato sottoscritto. Essenzialmente si creerebbe una funzione next che verrebbe richiamata ogni volta che gli operatori downstream necessitano di un nuovo elemento dati su Next. La tua prossima funzione può chiamare su Avanti 0 o 1 volta.

Il AsyncOnSubscribe è progettato per suonare bene con contropressione per sorgenti osservabili che operano in modo asincrono (come le chiamate off-box). Gli argomenti per la tua prossima funzione includono il conteggio delle richieste e l'osservabile fornito dovrebbe fornire un osservabile che soddisfi i dati fino all'ammontare richiesto. Un esempio di questo comportamento sono le query impaginate da un'origine dati esterna.

In precedenza era una pratica sicura trasformare il tuo OnSubscribe in un Iterable e utilizzare Observable.from(Iterable). Questa implementazione ottiene un iteratore e controlla subscriber.isUnsubscribed() per te.

+0

Grazie, hai effettivamente risposto ad un'altra mia domanda riguardante la creazione di osservabili 'personalizzati' con un adeguato supporto per la contropressione! Ho verificato SyncSubscriber e sembra davvero buono.In molti casi vedrei la conversione di un'operazione in un Iterable un po 'imbarazzante semanticamente, ma è comunque bene sapere che possiamo ottenere un semplice supporto per la retropressione in questo modo. Vedo, però, che questa classe è ancora contrassegnata come @Experimental, quando pensi che possa essere considerata approssimativamente la produzione? –

+0

Buono a sapersi! Il passo successivo è quello di metterlo in una versione (che dovrebbe essere presto disponibile nella versione 1.0.15). Dopodiché, generalmente si promuoverà in stato '@ Beta' o direttamente in stato pubblico man mano che acquisiamo sicurezza. – Aaron