2014-09-03 7 views
5

Ho alcuni file di testo di grandi dimensioni che desidero elaborare raggruppando le sue linee.È possibile eseguire un gruppo pigro, restituendo un flusso, in java 8?

ho cercato di utilizzare le nuove funzionalità di streaming, come

return FileUtils.readLines(...) 
      .parallelStream() 
      .map(...) 
      .collect(groupingBy(pair -> pair[0])); 

Il problema è che, per quanto ne so, questo genera una mappa.

C'è un modo per avere un codice di alto livello come quello sopra che genera, ad esempio, un flusso di voci?

UPDATE: Quello che sto cercando è qualcosa come Python itertools.groupby. I miei file sono già ordinati (per coppia [0]), voglio solo caricare i gruppi uno per uno.

Ho già una soluzione iterativa. Mi sto solo chiedendo se c'è un modo più dichiarativo per farlo. Btw, usando guava o un'altra libreria di terze parti non sarebbe un grosso problema.

+2

Come può fai un pigro raggruppa per? Per raggruppare alcune proprietà dell'oggetto contenuto nel flusso, è necessario eseguire un'iterazione su tutti gli elementi nel flusso. – Eran

+0

Cosa intendi per "raggruppare le sue linee?" intendi il binning come il metodo Stream 'groupBy' o intendi la lettura di più righe alla volta alla rinfusa? – dkatzel

+0

Grazie per i commenti, aggiunto un UPDATE alla domanda. –

risposta

3

L'attività che si desidera raggiungere è molto diversa da ciò che fa il raggruppamento. groupingBy non si basa sull'ordine degli elementi dello Stream ma sull'algoritmo dello Map applicato al risultato del classificatore Function.

Ciò che si desidera è piegare articoli adiacenti aventi un valore di proprietà comune in un articolo List. Non è nemmeno necessario che lo Stream sia ordinato in base a tale proprietà, purché sia ​​possibile garantire che tutti gli articoli con lo stesso valore di proprietà siano raggruppati.

Forse è possibile formulare questa attività come una riduzione, ma per me la struttura risultante sembra troppo complicata.

Quindi, a meno che il supporto diretto per questa funzione viene aggiunto al Stream s, un approccio basato iteratore sembra più pragmatico per me:

class Folding<T,G> implements Spliterator<Map.Entry<G,List<T>>> { 
    static <T,G> Stream<Map.Entry<G,List<T>>> foldBy(
      Stream<? extends T> s, Function<? super T, ? extends G> f) { 
     return StreamSupport.stream(new Folding<>(s.spliterator(), f), false); 
    } 
    private final Spliterator<? extends T> source; 
    private final Function<? super T, ? extends G> pf; 
    private final Consumer<T> c=this::addItem; 
    private List<T> pending, result; 
    private G pendingGroup, resultGroup; 

    Folding(Spliterator<? extends T> s, Function<? super T, ? extends G> f) { 
     source=s; 
     pf=f; 
    } 
    private void addItem(T item) { 
     G group=pf.apply(item); 
     if(pending==null) pending=new ArrayList<>(); 
     else if(!pending.isEmpty()) { 
      if(!Objects.equals(group, pendingGroup)) { 
       if(pending.size()==1) 
        result=Collections.singletonList(pending.remove(0)); 
       else { 
        result=pending; 
        pending=new ArrayList<>(); 
       } 
       resultGroup=pendingGroup; 
      } 
     } 
     pendingGroup=group; 
     pending.add(item); 
    } 
    public boolean tryAdvance(Consumer<? super Map.Entry<G, List<T>>> action) { 
     while(source.tryAdvance(c)) { 
      if(result!=null) { 
       action.accept(entry(resultGroup, result)); 
       result=null; 
       return true; 
      } 
     } 
     if(pending!=null) { 
      action.accept(entry(pendingGroup, pending)); 
      pending=null; 
      return true; 
     } 
     return false; 
    } 
    private Map.Entry<G,List<T>> entry(G g, List<T> l) { 
     return new AbstractMap.SimpleImmutableEntry<>(g, l); 
    } 
    public int characteristics() { return 0; } 
    public long estimateSize() { return Long.MAX_VALUE; } 
    public Spliterator<Map.Entry<G, List<T>>> trySplit() { return null; } 
} 

La natura pigra del risultante piegato Stream può essere meglio dimostrata applicando ad un flusso infinito:

Folding.foldBy(Stream.iterate(0, i->i+1), i->i>>4) 
     .filter(e -> e.getKey()>5) 
     .findFirst().ifPresent(e -> System.out.println(e.getValue())); 
+1

leggera correzione: 'groupingBy' conserva effettivamente l'ordinamento del flusso originale, se il collettore a valle coopera (la maggior parte lo fa, eccetto quelli con la caratteristica UNORDERED); il sottoinsieme di elementi in un determinato bucket viene presentato al collector downstream nello stesso ordine in cui erano presenti nell'input. –

+1

@Brian Goetz: sì, * mantiene * l'ordine, ma tutto ciò che ho detto nella mia risposta è che non * si basa * sull'ordine per formare i gruppi. Btw. quello era uno dei casi di test che ho fatto per la mia soluzione: la raccolta di uno stream restituito dalla mia soluzione in una 'Map' deve produrre esattamente la stessa' Map' come 'groupingBy' usando lo stesso classificatore. – Holger

1

cyclops-react, ho libreria un contributo alla, offre sia sharding e il raggruppamento funcitonality che potrebbero fare quello che vuoi.

ReactiveSeq<ListX<TYPE>> grouped = ReactiveSeq.fromCollection(FileUtils.readLines(...)) 
      .groupedStatefullyWhile((batch,next) -> batch.size()==0 ? true : next.equals(batch.get(0))); 

L'operatore groupedStatefullyWhile consente elementi di essere raggruppati in base allo stato corrente del batch. ReactiveSeq è un flusso sequenziale a thread singolo.

Map<Key, Stream<Value> sharded = 
        new LazyReact() 
       .fromCollection(FileUtils.readLines(...)) 
       .map(..) 
       .shard(shards, pair -> pair[0]); 

Questo creerà un LazyFutureStream (che implementa java.util.stream.Stream), che elaborerà i dati nel file in modo asincrono e in parallelo. È pigro e non inizierà l'elaborazione fino a quando i dati non saranno trascinati.

L'unica avvertenza è che è necessario definire i frammenti in anticipo. Cioè il parametro 'shards' sopra il quale si trova una mappa di async.Queue digitato dalla chiave per lo shard (probabilmente qualunque coppia [0] è?).

ad es.

Map<Integer,Queue<String>> shards; 

There is a sharding example with video here e test code here

0

Esso può essere fatto da collapse con StreamEx

final int[][] aa = { { 1, 1 }, { 1, 2 }, { 2, 2 }, { 2, 3 }, { 3, 3 }, { 4, 4 } }; 

StreamEx.of(aa) 
     .collapse((a, b) -> a[0] == b[0], Collectors.groupingBy(a -> a[0])) 
     .forEach(System.out::println); 

Possiamo aggiungere peek e limit per verificare se si tratta di calcolo pigro:

StreamEx.of(aa) 
     .peek(System.out::println) 
     .collapse((a, b) -> a[0] == b[0], Collectors.groupingBy(a -> a[0])) 
     .limit(1) 
     .forEach(System.out::println);