2015-06-12 9 views
7

Ho un ForkJoinPool personalizzato creato con il parallelismo di 25.Perché parallelStream non utilizza l'intero parallelismo disponibile?

customForkJoinPool = new ForkJoinPool(25); 

Ho una lista di 700 nomi di file e ho usato il codice come questo per scaricare i file da S3 in parallelo e lanciare loro di oggetti Java:

customForkJoinPool.submit(() -> { 
    return fileNames 
    .parallelStream() 
    .map((fileName) -> { 
     Logger log = Logger.getLogger("ForkJoinTest"); 
     long startTime = System.currentTimeMillis(); 
     log.info("Starting job at Thread:" + Thread.currentThread().getName()); 
     MyObject obj = readObjectFromS3(fileName); 
     long endTime = System.currentTimeMillis(); 
     log.info("completed a job with Latency:" + (endTime - startTime)); 
     return obj; 
    }) 
    .collect(Collectors.toList); 
    }); 
}); 

Quando guardo i log, vedo solo 5 thread in uso. Con un parallelismo di 25, mi aspettavo che usasse 25 thread. La latenza media per scaricare e convertire il file in un oggetto è di circa 200 ms. Cosa mi manca?

Potrebbe essere una domanda migliore: come può un parallelismo vedere quanto dividere l'elenco originale prima di creare i thread per esso? In questo caso, sembra che abbia deciso di dividerlo 5 volte e fermarsi.

+0

Questo problema è un male in forma per fork-join. Per i principianti, dovresti usare 'ManagedBlocker' dato che il tuo compito sta bloccando. Ma poiché è I/O e non è divisibile in modo ricorsivo, non è ancora una buona idea. – erickson

risposta

5

Perché stai facendo questo con ForkJoinPool? È pensato per attività legate alla CPU con attività secondarie troppo veloci per garantire la pianificazione individuale. Il carico di lavoro è legato all'IO e con una latenza di 200 ms l'overhead di pianificazione individuale è trascurabile.

Utilizzare un Executor:

import static java.util.stream.Collectors.toList; 
import static java.util.concurrent.CompletableFuture.supplyAsync; 

ExecutorService threads = Executors.newFixedThreadPool(25); 

List<MyObject> result = fileNames.stream() 
     .map(fn -> supplyAsync(() -> readObjectFromS3(fn), threads)) 
     .collect(toList()).stream() 
     .map(CompletableFuture::join) 
     .collect(toList()); 
+0

Grazie. Questo risponde al mio problema reale. Anche se mi chiedo ancora come parallelStream calcoli quanto dividere e quando smettere :) – Aishwar

+0

@Aishwar - In questo caso, non .... e questo è il problema. Vedi la mia risposta. –

3

Penso che la risposta sia in questo ... dal ForkJoinPool javadoc.

"La piscina cerca di mantenere abbastanza thread attivi (o disponibili) con l'aggiunta in modo dinamico, sospensione, o la ripresa thread di lavoro interni, anche se alcune attività sono in fase di stallo in attesa di unirsi ad altri. Tuttavia, tali aggiustamenti sono garantiti di fronte a I/O bloccati o altre sincronizzazioni non gestite. "

Nel tuo caso, i download eseguiranno operazioni di I/O di blocco.