Ho un produttore RX che crea un flusso di stringhe in questo modo (versione semplificata del reale flusso):Rx - flusso Divide in segmenti (liste) di condizione
A1 A2 A3 B1 B2 C1 C2 C3 C4 C5 C6....
il flusso è senza fine, ma ordinato. Quindi, dopo che le stringhe che iniziano con A
si esauriscono, inizia B
. Quando B
si avvia, quando Z
si avvia, ci spostiamo a AA1
ecc. C'è un numero sconosciuto di A
, B
ecc., Ma in genere è di 10-30 istanze per lettera.
Sto cercando un modo per dividere questo flusso in blocchi di tutto un di: A1 A2 A3
, tutti i B di: B1 B2
, tutti i C di: C1 C2 C3 C4 C5 C6
ecc Ogni blocco può essere sia un osservabile (che mi trasformerò in una lista) o semplicemente una lista.
Ho provato diversi approcci diversi usando RxJava, il tutto fallito. Tra le cose che non hanno funzionato sono:
Gruppo per: dal momento che il flusso è infinita, il per-lettera osservabile non completa. Quindi quando A si esaurisce e iniziano le B, A's Observable non si completa. Quindi c'è un numero sempre crescente di osservabili.
Finestra/Buffer con distinctUntilChanged - io uso "distinctUntilChanged" sul flusso originale per l'uscita del primo elemento di ogni gruppo (la prima A, prima B ecc). Quindi utilizzo tale stream come input per
window
o come operatore "buffer" da utilizzare come limite tra finestre/buffer. Non funzionava e tutto quello che ricevevo erano liste vuote.
Qual è la soluzione corretta utilizzando RX? Preferirei una soluzione Java, ma sono anche benvenute soluzioni in altre implementazioni di RX che possono essere facilmente convertite in Java.
grazie per aver menzionato la risorsa è stato utile per imparare altri – Drim
@ dave-moten. Ho provato questo, ma sto ottenendo un errore di compilazione. "Nessuna istanza di tipo variabile (s) R esiste in modo che Transformer> sia conforme a ObservableTransformer ' ". Sto usando 0.8.0.7 con JDK 1.8.0_131. –
melston
@ dave-moten, non importa. Stavo usando rxjava2 e questo ha causato il problema. – melston