2015-10-01 11 views
9

Normalmente quando si utilizza parallelStream() di Java 8, il risultato è l'esecuzione tramite il pool comune di fork-join predefinito (ad esempio ForkJoinPool.commonPool()).Java parallelStream() con pool personalizzato con il furto del lavoro del chiamante?

Ciò è chiaramente indesiderabile, tuttavia, se si ha un lavoro che è lontano dal limite della CPU, ad es. potrebbe essere in attesa su IO per la maggior parte del tempo. In questi casi, si vorrà utilizzare un pool separato, dimensionato in base ad altri criteri (ad esempio, quanto tempo è probabile che le attività utilizzino effettivamente la CPU).

Non c'è ovvio mezzo di ottenere parallelStream() per utilizzare un pool diverso, ma esiste un modo come dettagliato here.

Sfortunatamente, questo approccio comporta il richiamo dell'operazione terminale sul flusso parallelo da un thread del pool di fork-join. Lo svantaggio di questo è che se il pool di join di fork di destinazione è completamente occupato con il lavoro esistente, l'intera esecuzione lo attende senza fare assolutamente nulla. Pertanto, il pool può diventare un collo di bottiglia peggiore rispetto all'esecuzione a thread singolo. Al contrario, quando si utilizza parallelStream() nella modalità "normale", ForkJoinPool.common.externalHelpComplete() o ForkJoinPool.common.tryExternalUnpush() vengono utilizzati per consentire al thread chiamante dall'esterno del pool di aiutare nell'elaborazione.

Qualcuno sa di un modo per sia ottenere parallelStream() per utilizzare un pool non predefinito fork-join e hanno un thread chiamante da fuori piscina aiuto fork-join nella lavorazione di questo lavoro (ma non il resto del lavoro di fork-join pool)?

+3

Non capisco il tuo _Il lato negativo di questo è che se il pool di join del fork di destinazione è completamente occupato con il lavoro esistente_. Non creeresti un nuovo pool solo per questa invocazione di stream parallelo? –

+1

È ancora peggio. Quando chiami 'get' sulle tue attività che non sono nel pool comune, chiamerà ancora' ForkJoinPool.common.tryExternalUnpush() ', ma, ovviamente, non troverà l'attività nella coda del pool comune. – Holger

+0

Per rispondere alla domanda, no, non vorrei creare un nuovo pool di thread solo per questa chiamata. Piuttosto condividerei questo altro gruppo di thread attraverso molte invocazioni simili, alcune delle quali potrebbero sovrapporsi, alcune delle quali potrebbero avere compiti molto più lunghi/più grandi di altre, ecc. –

risposta

2

È possibile utilizzare awaitQuiescence sulla piscina per dare una mano. Tuttavia, non è possibile selezionare le attività che verranno aiutate, sarà solo il prossimo in attesa dal pool, quindi, se ci sono più attività in sospeso, si potrebbe finire per eseguirle prima di arrivare al proprio.

ForkJoinPool forkJoinPool = new ForkJoinPool(1); 
// make all threads busy: 
forkJoinPool.submit(() -> LockSupport.parkNanos(Long.MAX_VALUE)); 
// submit our task (may contain your stream operation) 
ForkJoinTask<Thread> task = forkJoinPool.submit(() -> Thread.currentThread()); 
// help out 
while(!task.isDone()) // use zero timeout to execute one task only 
    forkJoinPool.awaitQuiescence(0, TimeUnit.NANOSECONDS); 
System.out.println(Thread.currentThread()==task.get()); 

stampa true.

mentre

ForkJoinPool forkJoinPool = new ForkJoinPool(1); 
// make all threads busy: 
forkJoinPool.submit(() -> LockSupport.parkNanos(Long.MAX_VALUE)); 
// overload: 
forkJoinPool.submit(() -> LockSupport.parkNanos(Long.MAX_VALUE)); 
// submit our task (may contain your stream operation) 
ForkJoinTask<Thread> task = forkJoinPool.submit(() -> Thread.currentThread()); 
// help out 
while(!task.isDone()) 
    forkJoinPool.awaitQuiescence(0, TimeUnit.NANOSECONDS); 
System.out.println(Thread.currentThread()==task.get()); 

si bloccherà per sempre come si tenta di eseguire la seconda operazione di blocco.

Tuttavia, consentirà al thread di avvio di elaborare le attività in sospeso del pool che aumenteranno la possibilità che il proprio task venga eseguito fino a quando non ci sono attività infinite (l'esempio sopra è estremo e scelto solo per la dimostrazione).


ma nota che l'intero rapporto tra la forcella/Accedi quadro e l'API Stream è un dettaglio di implementazione in ogni caso.

+0

Avevo concluso che potevo farlo, ma come si nota, potrebbe benissimo significare che finirò per dare una mano ad altri thread con i suoi compiti, piuttosto che aiutare a lavorare da solo. È una specie di antipasto. Inoltre, ho capito che fork-join è un dettaglio di implementazione, ma è necessario un controllo migliore su parallelStream(), ad es. qualcosa di semplice come parallelStream (forkJoinPool). –

+1

Beh, con un metodo come 'parallelStream (ForkJoinPool)', non sarebbe più un dettaglio di implementazione ... – Holger

+0

No, ma quale parallelStream() con no-argument sarebbe ancora un dettaglio di implementazione. Questo ti darebbe solo un'opzione per prendere un po 'di controllo nei casi in cui ne hai bisogno. –