Sto cercando un soggetto (o qualcosa di simile) che può:coda come Soggetto a RxJava
- potrebbe ricevere articoli e tenerli in una coda o buffer se non ci sono abbonati
- Una volta che abbiamo avere un abbonato tutti gli elementi sono consumati e mai emesso di nuovo
- posso iscriversi/ritirarsi da/per Soggetto
BehaviorSubject
quasi sarebbe fare il lavoro, ma conserva ultimo elemento osservato.
UPDATE
Sulla base di risposta accettata Ho lavorato soluzione simile di un singolo articolo osservato. Aggiunta anche la parte di disiscrizione per evitare perdite di memoria.
class LastEventObservable private constructor(
private val onSubscribe: OnSubscribe<Any>,
private val state: State
) : Observable<Any>(onSubscribe) {
fun emit(value: Any) {
if (state.subscriber.hasObservers()) {
state.subscriber.onNext(value)
} else {
state.lastItem = value
}
}
companion object {
fun create(): LastEventObservable {
val state = State()
val onSubscribe = OnSubscribe<Any> { subscriber ->
just(state.lastItem)
.filter { it != null }
.doOnNext { subscriber.onNext(it) }
.doOnCompleted { state.lastItem = null }
.subscribe()
val subscription = state.subscriber.subscribe(subscriber)
subscriber.add(Subscriptions.create { subscription.unsubscribe() })
}
return LastEventObservable(onSubscribe, state)
}
}
private class State {
var lastItem: Any? = null
val subscriber = PublishSubject.create<Any>()
}
}
Hai visto la mia risposta? –
Chiarisci cosa intendi con "Una volta che abbiamo un abbonato tutti gli oggetti sono consumati e mai più emessi" - se hai qualcosa come: yourSource.take (1) .subscribe(), dovrebbe cancellare tutti gli elementi da yourSource? –
No, questo dovrebbe consumare solo quell'elemento. –