2015-09-17 10 views
8

Ho un produttore RX che crea un flusso di stringhe in questo modo (versione semplificata del reale flusso):Rx - flusso Divide in segmenti (liste) di condizione

A1 A2 A3 B1 B2 C1 C2 C3 C4 C5 C6....

il flusso è senza fine, ma ordinato. Quindi, dopo che le stringhe che iniziano con A si esauriscono, inizia B. Quando B si avvia, quando Z si avvia, ci spostiamo a AA1 ecc. C'è un numero sconosciuto di A, B ecc., Ma in genere è di 10-30 istanze per lettera.

Sto cercando un modo per dividere questo flusso in blocchi di tutto un di: A1 A2 A3, tutti i B di: B1 B2, tutti i C di: C1 C2 C3 C4 C5 C6 ecc Ogni blocco può essere sia un osservabile (che mi trasformerò in una lista) o semplicemente una lista.

Ho provato diversi approcci diversi usando RxJava, il tutto fallito. Tra le cose che non hanno funzionato sono:

  • Gruppo per: dal momento che il flusso è infinita, il per-lettera osservabile non completa. Quindi quando A si esaurisce e iniziano le B, A's Observable non si completa. Quindi c'è un numero sempre crescente di osservabili.

  • Finestra/Buffer con distinctUntilChanged - io uso "distinctUntilChanged" sul flusso originale per l'uscita del primo elemento di ogni gruppo (la prima A, prima B ecc). Quindi utilizzo tale stream come input per window o come operatore "buffer" da utilizzare come limite tra finestre/buffer. Non funzionava e tutto quello che ricevevo erano liste vuote.

Qual è la soluzione corretta utilizzando RX? Preferirei una soluzione Java, ma sono anche benvenute soluzioni in altre implementazioni di RX che possono essere facilmente convertite in Java.

risposta

2

Ecco un modo mi piacerebbe risolvere questo:

Observable<String> source = Observable.from(
     "A1", "A2", "A3", "B1", "B2", "B3", "C1", "D1"); 

Observable<List<String>> output = Observable.defer(() -> { 
    List<String> buffer = new ArrayList<>(); 
    return 
      Observable.concat(
       source.concatMap(new Function<String, Observable<List<String>>>() { 
        String lastKey; 
        @Override 
        public Observable<List<String>> apply(String t) { 
         String key = t.substring(0, 1); 
         if (lastKey != null && !key.equals(lastKey)) { 
          List<String> b = new ArrayList<>(buffer); 
          buffer.clear(); 
          buffer.add(t); 
          lastKey = key; 
          return Observable.just(b); 
         } 
         lastKey = key; 
         buffer.add(t); 
         return Observable.empty(); 
        } 
       }), 
       Observable.just(1) 
       .flatMap(v -> { 
        if (buffer.isEmpty()) { 
         return Observable.empty(); 
        } 
        return Observable.just(buffer); 
       }) 
      ); 
    } 
); 

output.subscribe(System.out::println); 

Questo è come funziona:

  • sto usando rimandare perché abbiamo bisogno di un buffer per abbonato, non un globale uno
  • i concatenare il buffer con l'emissione dell'ultimo tampone se la fonte sembra essere finita
  • uso concatMap e aggiungi ad un buffer fino a quando il tasto, fino ad allora, emetto obse vuoto rvables. Una volta cambiata la chiave, emetto il contenuto del buffer e ne avvio uno nuovo.
5

È possibile utilizzare rxjava-extras.toListWhile:

Observable<String> source = 
    Observable.just("A1", "A2", "A3", "B1", "B2", "B3", "C1", "D1"); 
source.compose(Transformers.<String> toListWhile(
      (list, t) -> list.isEmpty() 
         || list.get(0).charAt(0) == t.charAt(0))) 
     .forEach(System.out::println); 

Si fa quello @akarnokd ha fatto sotto le coperte ed è testato unità.

+0

grazie per aver menzionato la risorsa è stato utile per imparare altri – Drim

+0

@ dave-moten. Ho provato questo, ma sto ottenendo un errore di compilazione. "Nessuna istanza di tipo variabile (s) R esiste in modo che Transformer > sia conforme a ObservableTransformer ' ". Sto usando 0.8.0.7 con JDK 1.8.0_131. – melston

+0

@ dave-moten, non importa. Stavo usando rxjava2 e questo ha causato il problema. – melston

0

Dopo aver esaminato le risposte di akarnokd's e Dave, ho trovato la mia soluzione implementando un operatore Rx personalizzato chiamato BufferWhile.Sembra funzionare altrettanto bene come le altre soluzioni (per favore qualcuno mi corregga se sbaglio), ma sembra più semplice:

public class RxBufferWhileOperator<T, U> implements Operator<List<T>, T>{ 

    private final Func1<? super T, ? extends U> keyGenerator; 

    public RxBufferWhileOperator(Func1<? super T, ? extends U> keyGenerator) { 
     this.keyGenerator = keyGenerator; 
    } 

    @Override 
    public Subscriber<? super T> call(final Subscriber<? super List<T>> s) { 
     return new Subscriber<T>(s) { 

      private ArrayList<T> buffer = null; 
      private U currentKey = null; 

      @Override 
      public void onCompleted() { 
       submitAndClearBuffer(); 
       s.onCompleted(); 
      } 

      @Override 
      public void onError(Throwable e) { 
       submitAndClearBuffer(); //Optional, remove if submitting partial buffers doesn't make sense in your case 
       s.onError(e); 
      } 

      @Override 
      public void onNext(T t) { 
       if (currentKey == null || !currentKey.equals(keyGenerator.call(t))) { 
        currentKey = keyGenerator.call(t); 
        submitAndClearBuffer(); 
        buffer.add(t); 
       } else { 
        buffer.add(t); 
        request(1); // Request additional T since we "swallowed" the incoming result without calling subsequent subscribers 
       } 
      } 

      private void submitAndClearBuffer() { 
       if (buffer != null && buffer.size() > 0) { 
        s.onNext(buffer); 
       } 
       buffer = new ArrayList<>(); 
      } 
     }; 
    } 
} 

posso applicare questo operatore sull'originale osservabile utilizzando lift, e ottenere un osservabile che emette liste di stringhe.

+1

Pochi commenti: 1) Di solito non inviamo buffer parziali su onError, ma dipende da te. 2) l'operatore non gestisce correttamente la contropressione e soffre del cosiddetto effetto drop-value; per ogni T non si emette un elenco e quindi il downstream non può richiedere di più se c'è una mancata corrispondenza dei conteggi. Dovresti chiamare la richiesta (1) in onNext nella clausola else per chiedere il riapprovvigionamento. – akarnokd

+0

@akarnokd grazie per l'input. Modificherò il codice. – Malt

+0

@akarnokd Ho gestito entrambe le osservazioni. Ora faccio il buffer e poi richiedo nella clausola else, e ho aggiunto un commento che dice che l'invio di un buffer parziale è facoltativo – Malt

1

Supponiamo di avere un flusso di sourcestring e una funzione key che estrae la chiave per ogni string, come il seguente:

IObservable<string> source = ...; 
Func<string, string> key = s => new string(s.TakeWhile(char.IsLetter).ToArray()); 

Quindi possiamo usare Buffer con un selettore di chiusura personalizzato.

var query = source.Publish(o => o.Buffer(() => 
{ 
    var keys = o.Select(key); 
    return Observable 
     .CombineLatest(
      keys.Take(1), 
      keys.Skip(1), 
      (a, b) => a != b) 
     .Where(x => x); 
})); 

Ogni buffer termina quando il primo elemento nel buffer e l'elemento corrente che consideriamo aggiunta al buffer non hanno la stessa chiave.