A deadlock stazionamento thread si verifica in un normale pool di thread se tutti i thread nel pool sono in attesa di attività in coda nello stesso pool da completare. ForkJoinPool
evita questo problema rubando il lavoro da altri thread all'interno della chiamata join()
, piuttosto che semplicemente in attesa. Per esempio:È possibile utilizzare il comportamento di sottrazione del lavoro di ForkJoinPool per evitare un deadlock di inattività del thread?
private static class ForkableTask extends RecursiveTask<Integer> {
private final CyclicBarrier barrier;
ForkableTask(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
protected Integer compute() {
try {
barrier.await();
return 1;
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
}
}
@Test
public void testForkJoinPool() throws Exception {
final int parallelism = 4;
final ForkJoinPool pool = new ForkJoinPool(parallelism);
final CyclicBarrier barrier = new CyclicBarrier(parallelism);
final List<ForkableTask> forkableTasks = new ArrayList<>(parallelism);
for (int i = 0; i < parallelism; ++i) {
forkableTasks.add(new ForkableTask(barrier));
}
int result = pool.invoke(new RecursiveTask<Integer>() {
@Override
protected Integer compute() {
for (ForkableTask task : forkableTasks) {
task.fork();
}
int result = 0;
for (ForkableTask task : forkableTasks) {
result += task.join();
}
return result;
}
});
assertThat(result, equalTo(parallelism));
}
Ma quando si utilizza l'interfaccia ExecutorService
ad un ForkJoinPool
, work-furto non sembra verificarsi. Per esempio:
private static class CallableTask implements Callable<Integer> {
private final CyclicBarrier barrier;
CallableTask(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public Integer call() throws Exception {
barrier.await();
return 1;
}
}
@Test
public void testWorkStealing() throws Exception {
final int parallelism = 4;
final ExecutorService pool = new ForkJoinPool(parallelism);
final CyclicBarrier barrier = new CyclicBarrier(parallelism);
final List<CallableTask> callableTasks = Collections.nCopies(parallelism, new CallableTask(barrier));
int result = pool.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int result = 0;
// Deadlock in invokeAll(), rather than stealing work
for (Future<Integer> future : pool.invokeAll(callableTasks)) {
result += future.get();
}
return result;
}
}).get();
assertThat(result, equalTo(parallelism));
}
Da un rapido sguardo ForkJoinPool
s' implementazione, tutti i regolari ExecutorService
API sono implementate utilizzando ForkJoinTask
s, quindi non sono sicuro del perché si verifica una situazione di stallo.
Non penso che il furto di lavoro eviti lo stallo. Quando sei in una situazione di stallo, non puoi fare progressi. Il furto di lavoro evita solo code non bilanciate consentendo ai thread di rubare da altre code se la loro coda è vuota. – markspace
@markspace Nell'implementazione di 'ForkJoinTask',' join() 'tenta di eseguire altri lavori dalla deque anziché dallo stallo, evitando così il deadlock. Poiché 'ForkJoinPool.invokeAll()' converte il 'Callable's in' ForkJoinTask's, mi aspettavo che funzionasse anche. –