2016-06-09 29 views
7

Sto cercando un soggetto (o qualcosa di simile) che può:coda come Soggetto a RxJava

  1. potrebbe ricevere articoli e tenerli in una coda o buffer se non ci sono abbonati
  2. Una volta che abbiamo avere un abbonato tutti gli elementi sono consumati e mai emesso di nuovo
  3. posso iscriversi/ritirarsi da/per Soggetto

BehaviorSubject quasi sarebbe fare il lavoro, ma conserva ultimo elemento osservato.

UPDATE

Sulla base di risposta accettata Ho lavorato soluzione simile di un singolo articolo osservato. Aggiunta anche la parte di disiscrizione per evitare perdite di memoria.

class LastEventObservable private constructor(
     private val onSubscribe: OnSubscribe<Any>, 
     private val state: State 
) : Observable<Any>(onSubscribe) { 

    fun emit(value: Any) { 
     if (state.subscriber.hasObservers()) { 
      state.subscriber.onNext(value) 
     } else { 
      state.lastItem = value 
     } 
    } 

    companion object { 
     fun create(): LastEventObservable { 
      val state = State() 

      val onSubscribe = OnSubscribe<Any> { subscriber -> 
       just(state.lastItem) 
         .filter { it != null } 
         .doOnNext { subscriber.onNext(it) } 
         .doOnCompleted { state.lastItem = null } 
         .subscribe() 

       val subscription = state.subscriber.subscribe(subscriber) 

       subscriber.add(Subscriptions.create { subscription.unsubscribe() }) 
      } 

      return LastEventObservable(onSubscribe, state) 
     } 
    } 

    private class State { 
     var lastItem: Any? = null 
     val subscriber = PublishSubject.create<Any>() 
    } 
} 
+0

Hai visto la mia risposta? –

+0

Chiarisci cosa intendi con "Una volta che abbiamo un abbonato tutti gli oggetti sono consumati e mai più emessi" - se hai qualcosa come: yourSource.take (1) .subscribe(), dovrebbe cancellare tutti gli elementi da yourSource? –

+0

No, questo dovrebbe consumare solo quell'elemento. –

risposta

7

ho raggiungere il risultato previsto la creazione di un osservabile personalizzato che avvolge una pubblicano soggetto e gestisce la cache delle emissioni se non ci sono abbonati collegati. Controlla.

public class ExampleUnitTest { 
    @Test 
    public void testSample() throws Exception { 
     MyCustomObservable myCustomObservable = new MyCustomObservable(); 

     myCustomObservable.emit("1"); 
     myCustomObservable.emit("2"); 
     myCustomObservable.emit("3"); 

     Subscription subscription = myCustomObservable.subscribe(System.out::println); 

     myCustomObservable.emit("4"); 
     myCustomObservable.emit("5"); 

     subscription.unsubscribe(); 

     myCustomObservable.emit("6"); 
     myCustomObservable.emit("7"); 
     myCustomObservable.emit("8"); 

     myCustomObservable.subscribe(System.out::println); 
    } 
} 

class MyCustomObservable extends Observable<String> { 
    private static PublishSubject<String> publishSubject = PublishSubject.create(); 
    private static List<String> valuesCache = new ArrayList<>(); 

    protected MyCustomObservable() { 
     super(subscriber -> { 
      Observable.from(valuesCache) 
        .doOnNext(subscriber::onNext) 
        .doOnCompleted(valuesCache::clear) 
        .subscribe(); 

      publishSubject.subscribe(subscriber); 
     }); 
    } 

    public void emit(String value) { 
     if (publishSubject.hasObservers()) { 
      publishSubject.onNext(value); 
     } else { 
      valuesCache.add(value); 
     } 
    } 
} 

Spero che sia d'aiuto!

I migliori saluti.

+0

Ha funzionato come un fascino.Non ero troppo appassionato di quelle variabili statiche però :) –

+0

Io neanche @MartynasJurkus. Come possiamo migliorare questa soluzione? –

+0

Vedere la mia domanda aggiornata. Anche se il mio codice è in Kotlin. –

1

Se si desidera attendere un solo utente, utilizzare UnicastSubject ma si noti che se si annulla l'iscrizione nel mezzo, tutti gli elementi in coda successivi andranno persi.

Edit:

Una volta che abbiamo un abbonato tutti gli elementi sono consumati e mai emessi di nuovo

Per più sottoscrittori, utilizzare ReplaySubject.

+0

Ci dovrebbe essere la possibilità di avere più abbonati. –

2

ho avuto problema simile, i miei requisiti erano:

  • dovrebbe supportare la riproduzione di valori quando nessun osservatore ha sottoscritto
  • dovrebbe consentire solo un osservatore sottoscritto alla volta
  • dovrebbe permettere un altro osservatore di sottoscrivere quando primo osservatore è disposto

ho implementato come RxRelay ma l'implementazione per l'argomento sarà simile:

public final class CacheRelay<T> extends Relay<T> { 

    private final ConcurrentLinkedQueue<T> queue = new ConcurrentLinkedQueue<>(); 
    private final PublishRelay<T> relay = PublishRelay.create(); 

    private CacheRelay() { 
    } 

    public static <T> CacheRelay<T> create() { 
     return new CacheRelay<>(); 
    } 

    @Override 
    public void accept(T value) { 
     if (relay.hasObservers()) { 
      relay.accept(value); 
     } else { 
      queue.add(value); 
     } 
    } 

    @Override 
    public boolean hasObservers() { 
     return relay.hasObservers(); 
    } 

    @Override 
    protected void subscribeActual(Observer<? super T> observer) { 
     if (hasObservers()) { 
      EmptyDisposable.error(new IllegalStateException("Only a single observer at a time allowed."), observer); 
     } else { 
      for (T element; (element = queue.poll()) != null;) { 
       observer.onNext(element); 
      } 
      relay.subscribeActual(observer); 
     } 
    } 
} 

Guardate questa Gist for more