2015-12-17 21 views
17

Il seguente frammento di codice fa parte di un metodo che ottiene un elenco di directory, chiama un metodo di estrazione su ciascun file e serializza l'oggetto farmaco risultante in xml.Perché lo streaming parallelo di Files.list() è molto più lento rispetto all'utilizzo di Collection.parallelStream()?

try(Stream<Path> paths = Files.list(infoDir)) { 
    paths 
     .parallel() 
     .map(this::extract) 
     .forEachOrdered(drug -> { 
      try { 
       marshaller.write(drug); 
      } catch (JAXBException ex) { 
       ex.printStackTrace(); 
      } 
     }); 
} 

Qui è lo stesso codice esatto facendo la stessa identica cosa, ma utilizzando una pianura .list() chiamata per ottenere l'elenco di directory e chiamando .parallelStream() sulla lista risultante.

La mia macchina è un MacBook Pro quad core, Java v 1.8.0_60 (build 1.8.0_60-b27).

Sto elaborando ~ 7000 file. Le medie di 3 serie:

Prima versione: Con .parallel(): 20 secondi. Senza .parallel(): 41 secondi

Seconda versione: Con .parallelStream(): 12 secondi. Con .stream(): 41 secondi.

Questi 8 secondi in modalità parallela sembrano un'enorme differenza dato che il metodo extract che legge dallo streaming e fa tutto il lavoro pesante e la chiamata write alle scritture finali sono invariate.

+0

come funziona il tuo codice senza parallelStream? – sidgate

+0

Dubito che File.list() sia "lo stesso identico codice che fa la stessa identica cosa" che attraversa un DirectoryStream. – VGR

+0

@sidgate Ho aggiornato la domanda. Senza parallelizzazione vengono eseguite quasi esattamente nello stesso tempo. – kliron

risposta

18

Il problema è che l'attuale implementazione di Stream API insieme all'implementazione corrente di IteratorSpliterator per sorgenti di dimensioni sconosciute divide in modo errato tali sorgenti in attività parallele. Sei stato fortunato ad avere più di 1024 file, altrimenti non avresti alcun beneficio di parallelizzazione. L'implementazione dell'API Stream corrente tiene conto del valore estimateSize() restituito da Spliterator. Il IteratorSpliterator di dimensioni sconosciute restituisce Long.MAX_VALUE prima di dividere e il suo suffisso restituisce sempre Long.MAX_VALUE pure. La sua strategia di divisione è la seguente:

  1. Definire la dimensione del lotto corrente. La formula attuale è di iniziare con 1024 elementi e aumentare aritmeticamente (2048, 3072, 4096, 5120 e così via) fino al raggiungimento della dimensione MAX_BATCH (che è 33554432 elementi).
  2. Consuma elementi di input (nel tuo caso Paths) in array fino a quando la dimensione del batch non viene raggiunta o l'input è esaurito.
  3. Restituisce un ArraySpliterator iterando sull'array creato come prefisso, lasciandosi come suffisso.

Supponiamo di avere 7000 file. L'API Stream richiede dimensioni stimate, IteratorSpliterator restituisce Long.MAX_VALUE. Ok, l'API Stream chiede allo IteratorSpliterator di dividere, raccoglie 1024 elementi dal sottostante DirectoryStream all'array e si divide in ArraySpliterator (con una dimensione stimata 1024) e stesso (con una dimensione stimata che è ancora Long.MAX_VALUE). Poiché Long.MAX_VALUE è molto più di 1024, Stream API decide di continuare a dividere la parte più grande senza nemmeno tentare di dividere la parte più piccola. Così l'albero complessiva splitting va in questo modo:

     IteratorSpliterator (est. MAX_VALUE elements) 
          |     | 
ArraySpliterator (est. 1024 elements) IteratorSpliterator (est. MAX_VALUE elements) 
              |  | 
          /---------------/  | 
          |      | 
ArraySpliterator (est. 2048 elements) IteratorSpliterator (est. MAX_VALUE elements) 
              |  | 
          /---------------/  | 
          |      | 
ArraySpliterator (est. 3072 elements) IteratorSpliterator (est. MAX_VALUE elements) 
              |  | 
          /---------------/  | 
          |      | 
ArraySpliterator (est. 856 elements) IteratorSpliterator (est. MAX_VALUE elements) 
                | 
             (split returns null: refuses to split anymore) 

Così, dopo che avete cinque attività parallele da eseguire: in realtà contenente 1024, 2048, 3072, 856 e 0 elementi.Tieni presente che anche se l'ultimo blocco contiene 0 elementi, riporta comunque che sono stimati gli elementi Long.MAX_VALUE, quindi Stream API lo invierà anche allo ForkJoinPool. La cosa cattiva è che Stream API pensa che un'ulteriore suddivisione delle prime quattro attività sia inutile in quanto la loro dimensione stimata è molto inferiore. Quindi quello che ottieni è una divisione molto irregolare dell'input che utilizza quattro CPU core max (anche se ne hai di più). Se l'elaborazione per elemento impiega all'incirca lo stesso tempo per qualsiasi elemento, l'intero processo attende la parte più grande (3072 elementi) da completare. Quindi la massima velocità che si può avere è 7000/3072 = 2,28x. Quindi se l'elaborazione sequenziale richiede 41 secondi, il flusso parallelo richiederà circa 41/2.28 = 18 secondi (che è vicino ai numeri reali).

La soluzione di lavoro è completamente a posto. Si noti che utilizzando Files.list().parallel() si dispone anche di tutti gli elementi di input Path memorizzati nella memoria (negli oggetti ArraySpliterator). Quindi non sprecherai più memoria se li scarichi manualmente nello List. Implementazioni di liste supportate da array come ArrayList (che è attualmente creato da Collectors.toList()) possono essere divisi equamente senza problemi, il che si traduce in un'ulteriore accelerazione.

Perché questo caso non è ottimizzato? Ovviamente non è un problema impossibile (anche se l'implementazione potrebbe essere abbastanza complicata). Sembra che non sia un problema di alta priorità per gli sviluppatori JDK. Ci sono state diverse discussioni su questo argomento nelle mailing list. Puoi leggere il messaggio di Paul Sandoz here dove commenta il mio sforzo di ottimizzazione.

+3

Questa è un'ottima spiegazione !! +1 –

5

In alternativa, è possibile utilizzare questo spliterator personalizzato su misura per DirectoryStream:

public class DirectorySpliterator implements Spliterator<Path> { 
    Iterator<Path> iterator; 
    long est; 

    private DirectorySpliterator(Iterator<Path> iterator, long est) { 
     this.iterator = iterator; 
     this.est = est; 
    } 

    @Override 
    public boolean tryAdvance(Consumer<? super Path> action) { 
     if (iterator == null) { 
      return false; 
     } 
     Path path; 
     try { 
      synchronized (iterator) { 
       if (!iterator.hasNext()) { 
        iterator = null; 
        return false; 
       } 
       path = iterator.next(); 
      } 
     } catch (DirectoryIteratorException e) { 
      throw new UncheckedIOException(e.getCause()); 
     } 
     action.accept(path); 
     return true; 
    } 

    @Override 
    public Spliterator<Path> trySplit() { 
     if (iterator == null || est == 1) 
      return null; 
     long e = this.est >>> 1; 
     this.est -= e; 
     return new DirectorySpliterator(iterator, e); 
    } 

    @Override 
    public long estimateSize() { 
     return est; 
    } 

    @Override 
    public int characteristics() { 
     return DISTINCT | NONNULL; 
    } 

    public static Stream<Path> list(Path parent) throws IOException { 
     DirectoryStream<Path> ds = Files.newDirectoryStream(parent); 
     int splitSize = Runtime.getRuntime().availableProcessors() * 8; 
     DirectorySpliterator spltr = new DirectorySpliterator(ds.iterator(), splitSize); 
     return StreamSupport.stream(spltr, false).onClose(() -> { 
      try { 
       ds.close(); 
      } catch (IOException e) { 
       throw new UncheckedIOException(e); 
      } 
     }); 
    } 
} 

basta sostituire Files.list con DirectorySpliterator.list e sarà parallelizzare in modo uniforme senza buffer intermedio. Qui usiamo il fatto che DirectoryStream produce un elenco di directory senza un ordine specifico, quindi ogni thread parallelo ne prenderà solo una successiva (in modo sincronizzato, poiché abbiamo già operazioni IO sincrone, la sincronizzazione aggiuntiva ha un overhead quasi-nulla). L'ordine parallelo sarà diverso ogni volta (anche se si utilizza forEachOrdered), ma Files.list() non garantisce l'ordine.

L'unica parte non banale qui è il numero di attività parallele da creare. Dato che non sappiamo quanti file nella cartella fino a quando non lo attraversiamo, è opportuno utilizzare availableProcessors() come base. Creo circa 8 x availableProcessors() attività individuali, che sembrano essere un buon compromesso a grana fine/a grana grossa: se l'elaborazione per elemento è irregolare, disporre di più attività rispetto ai processori contribuirebbe a bilanciare il carico.

+1

Ricorda che il numero di CPU non ha bisogno di essere una potenza di due, non ha nemmeno bisogno di essere un numero pari. iirc, AMD fatto una volta CPU con tre core e bene, si potrebbe avere l'idea del perché non sono decollati, troppi software non sono riusciti a bilanciare il loro lavoro correttamente ... – Holger

+0

@Holger, sì ci ho pensato. Ho anche lavorato su CPU AMD a 3 core. È solo una macchina a 4 core con un core disabilitato (o ha fallito alcuni test di fabbrica o per ragioni di marketing, per vendere un po 'di CPU in meno). Non è molto difficile risolvere questo problema nel mio codice e il numero di sotto-attività è comunque una stima approssimativa. –

0

Un'altra alternativa per la vostra soluzione è quello di utilizzare .collect(Collectors.toList()).parallelStream() sul vostro flusso come

try(Stream<Path> paths = Files.list(infoDir)) { 
    paths 
     .collect(Collectors.toList()) 
     .parallelStream() 
     .map(this::extract) 
     .forEachOrdered(drug -> { 
      try { 
       marshaller.write(drug); 
      } catch (JAXBException ex) { 
       ex.printStackTrace(); 
      } 
     }); 
} 

Con questo non c'è bisogno di chiamare .map(f -> infoDir.resolve(f)) e le prestazioni dovrebbero essere simili alla tua seconda soluzione.