2014-12-20 9 views
8

Il mio scenario esatto è l'inserimento di dati nel database in lotti, quindi voglio accumulare oggetti DOM ogni 1000, svuotarli.Esiste un modo elegante per elaborare un flusso in blocchi?

L'ho implementato inserendo il codice nell'accumulatore per rilevare la pienezza, quindi lo scarico, ma ciò sembra errato - il controllo di flusso dovrebbe provenire dal chiamante.

Potrei convertire il flusso in una lista quindi utilizzare la sottolista in modo iterativo, ma anche questo sembra goffo.

Esiste un modo pulito per eseguire ogni n elementi, quindi continuare con lo stream mentre si elabora il flusso solo una volta?

+2

Per un caso d'uso simile ho fatto questo: https://bitbucket.org/assylias/bigblue-utils/src/3f56d19777a0ebc5dc3b53d3c2ec8dc64fd2b28e/src/main/java/com/assylias/bigblue/utils/SplitProcessing.java?at= maestro - non esattamente quello che stai chiedendo però. – assylias

risposta

4

L'eleganza è negli occhi di chi guarda. Se non ti dispiace utilizzando una funzione stateful in groupingBy, si può fare questo:

AtomicInteger counter = new AtomicInteger(); 

stream.collect(groupingBy(x->counter.getAndIncrement()/chunkSize)) 
    .values() 
    .forEach(database::flushChunk); 

Questo non vince tutti i punti di prestazioni o di utilizzo della memoria sopra la vostra soluzione originale perché sarà ancora materializzerà l'intero flusso prima di fare nulla.

Se vuoi evitare di materializzare l'elenco, l'API di streaming non ti aiuterà. Si dovrà ottenere iteratore o spliterator del torrente e fare qualcosa del genere:

Spliterator<Integer> split = stream.spliterator(); 
int chunkSize = 1000; 

while(true) { 
    List<Integer> chunk = new ArrayList<>(size); 
    for (int i = 0; i < chunkSize && split.tryAdvance(chunk::add); i++){}; 
    if (chunk.isEmpty()) break; 
    database.flushChunk(chunk); 
} 
4

Utilizzando biblioteca StreamEx soluzione sarebbe simile

Stream<Integer> stream = IntStream.iterate(0, i -> i + 1).boxed().limit(15); 
AtomicInteger counter = new AtomicInteger(0); 
int chunkSize = 4; 

StreamEx.of(stream) 
     .groupRuns((prev, next) -> counter.incrementAndGet() % chunkSize != 0) 
     .forEach(chunk -> System.out.println(chunk)); 

uscita:

[0, 1, 2, 3] 
[4, 5, 6, 7] 
[8, 9, 10, 11] 
[12, 13, 14] 

groupRuns accetta predicato che decide se 2 elementi dovrebbero essere nello stesso gruppo.

Produce un gruppo non appena trova il primo elemento che non gli appartiene.

+0

Questo non funziona per un singolo record. Ad esempio, un flusso intero di semplicemente [1] fallirebbe. –

+0

Lo streaming di un singolo elemento funziona per me. Che tipo di errore vedi? Potresti pubblicare il codice che hai provato? –

+0

Il contatore restituisce un valore errato nel caso in cui sia presente un record. –