2015-12-14 8 views
6

Sto lavorando con un framework che chiama una funzione che implemento. Vorrei che il parametro di questa funzione fosse convertito in Osservabile e inviato attraverso una sequenza di Osservatori. Pensavo di poter usare un soggetto per questo, ma non si comporta come mi aspettavo.RxJS: Come avere un osservatore per elaborare più oggetti osservabili?

Per chiarire, ho qualcosa come il seguente codice. Pensavo che Option 1 di seguito avrebbe funzionato, ma finora mi sto accontentando di Option 2, che non sembra affatto idiomatico.

var eventSubject = new Rx.Subject(); 
var resultSource = eventSubject.map(processEvent); 
var subscription = resultSource.subscribe(
    function(event) { 
     console.log("got event", event); 
    }, 
    function(e) { 
     log.error(e); 
    }, 
    function() { 
     console.log('eventSubject onCompleted'); 
    } 
); 

// The framework calls this method 
function onEvent(eventArray) { 

    var eventSource = Rx.Observable.from(eventArray); 

    // Option 1: I thought this would work, but it doesn't 
    // eventSource.subscribe(eventSubject); 

    // Option 2: This does work, but its obviously clunky 
    eventSource.subscribe(
     function(event) { 
      log.debug("sending to subject"); 
      eventSubject.onNext(event); 
     }, 
     function(e) { 
      log.error(e); 
     }, 
     function() { 
      console.log('eventSource onCompleted'); 
     } 
); 
} 
+0

è che 'gestore onEvent' qualcosa ti registri? – user3743222

+0

In ogni caso, quello che posso pensare usa 'Subject.create (observer, observable) 'e non risulta in qualcosa di meno goffo dato che l'osservatore che tu passi farà esattamente lo stesso di quello che hai passato a' eventSource .subscribe', quindi vediamo le proposte di altre persone. – user3743222

+0

@ user3743222 - onEvent è una funzione che scrivo, il mio framework (loopback) conosce la funzione esistente e la chiama dal proprio codice. Non è possibile utilizzare i metodi fromEvent() o fromEventPattern() poiché non corrispondono al metodo di registrazione dei gestori del framework. – JBCP

risposta

2

E 'solo che quando si sottoscrive l'intero Subject a un osservabile, si finisce per la sottoscrizione caso onComplete di quella osservabile al soggetto. Il che significa che quando l'osservabile termina, completerà il tuo soggetto. Quindi, quando ottieni il prossimo set di eventi, non fanno nulla perché il soggetto è già completo.

Una soluzione è esattamente ciò che hai fatto. Basta iscrivere l'evento onNext all'oggetto.

Una seconda soluzione, e forse più "rx come" è quello di trattare i dati in arrivo come osservabile di osservabili e appiattire questo flusso osservabile con mergeAll:

var eventSubject = new Rx.Subject(); // observes observable sequences 
var resultSource = eventSubject 
    .mergeAll() // subscribe to each inner observable as it arrives 
    .map(processEvent); 
var subscription = resultSource.subscribe(
    function(event) { 
     console.log("got event", event); 
    }, 
    function(e) { 
     log.error(e); 
    }, 
    function() { 
     console.log('eventSubject onCompleted'); 
    } 
); 

// The framework calls this method 
function onEvent(eventArray) { 

    var eventSource = Rx.Observable.from(eventArray); 
    // send a new inner observable sequence 
    // into the subject 
    eventSubject.onNext(eventSource); 
} 

Aggiornamento: Here is a running example

+0

Domanda stupida: è possibile creare la combinazione onNext, onError, onComplete in un oggetto (possibilmente un Observer)? In modo che l'intera cosa possa essere spostata su un altro file per modularità? Mi piacerebbe fare qualcosa come 'resultSource.subscribe (mySink)' (che è più fluido), in questo momento devo fare 'mySink (resultSource)', dove mySink fa l'abbonamento internamente – JBCP

+0

Questo uso di '.merge () 'non sembra corrispondere alla documentazione: https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/merge.md – JBCP

+0

Intendi' .mergeAll() ' ? Vedere i documenti: https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/mergeall.md – JBCP

3

Come Brandon già spiegato, sottoscrivere l'eventSubject a un altro mezzo osservabile, sottoscrivendo eventSubjects onNext, onError e onComplete su osservable onNext, onError e onComplete. Dal tuo esempio ti sembra di voler solo iscriverti a onNext.

Il soggetto completa/errri una volta che il primo evento Source completa/errori - il tuo eventSubject ignora correttamente qualsiasi ulteriore onNext/onError attivato da eventSoures successivi.

Ci sono diversi modi per abbonarsi solo al onNext di qualsiasi EventSource:

  1. sottoscrivono manualmente solo onNext.

    resultSource = eventSubject 
        .map(processEvent); 
    
    eventSource.subscribe(
        function(event) { 
         eventSubject.onNext(event); 
        }, 
        function(error) { 
         // don't subscribe to onError 
        }, 
        function() { 
         // don't subscribe to onComplete 
        } 
    ); 
    
  2. Utilizzare un operatore che gestisce solo sottoscrivendo le eventSources onNext/onError per voi. Questo è ciò che Brandon ha suggerito. Tieni presente che anche questo si iscrive a eventSources onError, che non sembra volere dal tuo esempio.

    resultSource = eventSubject 
        .mergeAll() 
        .map(processEvent); 
    
    eventSubject.onNext(eventSource); 
    
  3. Utilizzare un osservatore che non chiamare i eventSubjects onError/onComplete per i eventSources onError/onComplete. Potresti semplicemente sovrascrivere eventSubjects onComplete come un trucco sporco, ma probabilmente è meglio creare un nuovo osservatore.

    resultSource = eventSubject 
        .map(processEvent); 
    
    var eventObserver = Rx.Observer.create(
        function (event) { 
        eventSubject.onNext(event); 
        } 
    ); 
    
    eventSubject.subscribe(eventObserver);