2016-05-11 31 views
6

Mi chiedo se esiste un modo per comporre gli operatori esistenti per eseguire l'opposto di switchMap().RxJava - Di fronte all'operatore switchMap()?

Il switchMap() insegue l'ultima emissione ricevuta e annulla qualsiasi Observable precedentemente in esecuzione. Diciamo che l'ho capovolto, e voglio ignorare tutte le emissioni che arrivano a un operatore xxxMap() mentre è occupato con la prima emissione ricevuta. Continuerà a ignorare le emissioni finché non avrà finito di emettere l'attuale Observable al suo interno. Quindi elaborerà la prossima emissione che riceve.

Observable.interval(1, TimeUnit.SECONDS) 
     .doOnNext(i -> System.out.println("Source Emitted Value: " + i)) 
     .ignoreWhileBusyMap(i -> doIntensiveProcess(i).subcribeOn(Schedulers.computation())) 
     .subscribe(i -> System.out.println("Subscriber received Value: " + i)); 

C'è un modo per realizzare questo? Nell'esempio di cui sopra, se intensiveProcess() dovesse durare tre secondi, il ignoreWhileBusyMap() sarebbe elaborare 0 ma probabilmente ignorano le emissioni 1 e 2 provenienti da interval() .E sarebbe quindi elaborare 3 ma probabilmente ignorare 4 e 5, e così via ...

risposta

5

Certo, porta l'elaborazione di un valore da un valore booleano impostato dopo la lavorazione finita:

AtomicBoolean gate = new AtomicBoolean(true); 

Observable.interval(200, TimeUnit.MILLISECONDS) 
.flatMap(v -> { 
    if (gate.get()) { 
     gate.set(false); 

     return Observable.just(v).delay(500, TimeUnit.MILLISECONDS) 
       .doAfterTerminate(() -> gate.set(true)); 
    } else { 
     return Observable.empty(); 
    } 
}) 
.take(10) 
.toBlocking() 
.subscribe(System.out::println, Throwable::printStackTrace); 

Modifica

Alternativa:

Observable.interval(200, TimeUnit.MILLISECONDS) 
.onBackpressureDrop() 
.flatMap(v -> { 
    return Observable.just(v).delay(500, TimeUnit.MILLISECONDS); 
}, 1) 
.take(10) 
.toBlocking() 
.subscribe(System.out::println, Throwable::printStackTrace); 

È possibile modificare onBackpressureDrop-onBackpressureLatest per proseguire immediatamente con l'ultimo valore.

+0

Impressionante, ho fatto qualcosa di simile con un 'semaforo' ma speravo di usare una composizione puramente reattiva con gli operatori esistenti. Suppongo di poter avvolgere tutto questo in un 'Transformer' però. – tmn

+0

Utilizzare un trasformatore differito per evitare la condivisione del gate tra più sottoscrittori finali. – akarnokd

+0

Ho appena realizzato che la tua soluzione non è block-y come la mia, quindi passerò a questa. Grazie! – tmn

0

Per rispondere allo stile Jeopardy: che cos'è concatMap?

concatMap sottoscriveranno il primo Observable e non sottoscriverà alle successive Observable s fino a quando le precedenti Observable chiamate onComplete().

In questo senso è l'"opposto" di switchMap che si annulla desiderosamente dai precedenti Observable s quando ne arriva uno nuovo.

concatMap vuole ascoltare tutto ciò che ogni osservabile ha da dire, mentre switchMap è una farfalla sociale e si sposta non appena è disponibile un altro osservabile.

+0

Non proprio, se leggete attentamente la domanda vedrete che sto perseguendo un comportamento che ha poco a che fare con' concatMap' o il suo contrario percepito.Per seguire la tua analogia, stavo cercando un operatore 'xxxMap' che si sarebbe concentrato a parlare con i primi incontri osservabili e dire agli osservabili successivi" non ora, sono impegnato a parlare con questo ragazzo ". Solo quando la sua conversazione sarà conclusa, permetterà ad un altro osservabile di impegnarsi con lui. – tmn

+0

Ecco cosa fa "concatMap'. – Andy

+0

Credo che 'concatMap()' sia lo stesso di 'flatMap()' ma non interleave. Garantisce che tutte le emissioni verranno emesse anche se le accoda. Ma l'operatore che stavo chiedendo semplicemente ignora le emissioni successive mentre è occupato. – tmn