In primo luogo, mi permetta di riscrivere il codice con 2 funzioni statiche per rendere più facile vedere quello che sta succedendo:
// Make an executor equivalent to Executors.newFixedThreadPool(nThreads)
// that will trace to standard error when a task begins or ends
static ExecutorService loggingExecutor(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>()) {
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.err.println("Executor beginning task on thread: "
+ t.getName());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.err.println("Executor finishing task on thread: "
+ Thread.currentThread().getName());
}
};
}
E
// same as what you pass to thenComposeAsync
static Function<Boolean, CompletableFuture<Boolean>> inner(Executor executor) {
return b -> {
System.err.println(Thread.currentThread().getName()
+ ": About to enqueue task");
CompletableFuture<Boolean> innerFuture = new CompletableFuture<>();
executor.execute(() -> {
System.err.println(Thread.currentThread().getName()
+ ": Inner task");
innerFuture.complete(true);
});
System.err.println(Thread.currentThread().getName()
+ ": Task enqueued");
return innerFuture;
};
}
Ora possiamo scrivere il banco di prova come segue:
ExecutorService e = loggingExecutor(1);
CompletableFuture.completedFuture(true)
.thenComposeAsync(inner(e), e)
.join();
e.shutdown();
/* Output before deadlock:
Executor beginning task on thread: pool-1-thread-1
pool-1-thread-1: About to enqueue task
pool-1-thread-1: Task enqueued
*/
Proviamo a concludere che il primo thread non viene rilasciato fino al risultato della s futuro econda viene calcolata:
ExecutorService e = loggingExecutor(2); // use 2 threads this time
CompletableFuture.completedFuture(true)
.thenComposeAsync(inner(e), e)
.join();
e.shutdown();
/*
Executor beginning task on thread: pool-1-thread-1
pool-1-thread-1: About to enqueue task
pool-1-thread-1: Task enqueued
Executor beginning task on thread: pool-1-thread-2
pool-1-thread-2: Inner task
Executor finishing task on thread: pool-1-thread-2
Executor finishing task on thread: pool-1-thread-1
*/
In effetti, sembra che la discussione 1 si tiene fino a quando il thread 2 è fatto
Vediamo se siete di destra che thenComposeAsync
si blocca:
ExecutorService e = loggingExecutor(1);
CompletableFuture<Boolean> future =
CompletableFuture.completedFuture(true)
.thenComposeAsync(inner(e), e);
System.err.println("thenComposeAsync returned");
future.join();
e.shutdown();
/*
thenComposeAsync returned
Executor beginning task on thread: pool-1-thread-1
pool-1-thread-1: About to enqueue task
pool-1-thread-1: Task enqueued
*/
thenComposeAsync
didn blocco. Ha restituito il CompletableFuture
subito e il deadlock si è verificato solo quando abbiamo provato a completarlo. Quindi, che cosa dovrebbe prendere per completare il futuro restituito da .thenComposeAsync(inner(e), e)
?
- L'API deve attendere innner (e) per tornare
CompletableFuture<Boolean>
- ha bisogno di aspettare la tornata
CompletableFuture<Boolean>
a anche completi. Solo allora il futuro sarà completo. Quindi, come puoi vedere, non può fare ciò che suggerisci e restituire il Futuro incompleto.
È un errore? Perché il CompletionStage mantiene il thread 1 mentre il task interno viene calcolato? Non è un errore perché, come hai notato, la documentazione è piuttosto vaga e non promette di rilasciare discussioni in un ordine particolare. Inoltre, nota che Thread1 verrà utilizzato per tutti i successivi metodi then*()
di CompletableFuture.Si consideri il seguente:
ExecutorService e = loggingExecutor(2);
CompletableFuture.completedFuture(true)
.thenComposeAsync(inner(e), e)
.thenRun(() -> System.err.println(Thread.currentThread().getName()
+ ": All done"))
.join();
e.shutdown();
/*
Executor beginning task on thread: pool-1-thread-1
pool-1-thread-1: About to enqueue task
pool-1-thread-1: Task enqueued
Executor beginning task on thread: pool-1-thread-2
pool-1-thread-2: Inner task
Executor finishing task on thread: pool-1-thread-2
pool-1-thread-1: All done
Executor finishing task on thread: pool-1-thread-1
*/
Come si può vedere, .thenRun (...) ottenuto eseguito sul filo 1. Credo che questo sia coerente con le altre * Async (..., esecutore exec) metodi di CompletableFuture.
Ma cosa succede se si desidera dividere la funzionalità di thenComposeAsync
in 2 passaggi controllabili separatamente anziché lasciarlo all'API per manipolare i thread? È sufficiente fare questo:
ExecutorService e = loggingExecutor(1);
completedFuture(true)
.thenApplyAsync(inner(e), e) // do the async part first
.thenCompose(x -> x) // compose separately
.thenRun(() -> System.err.println(Thread.currentThread().getName()
+ ": All done"))
.join();
e.shutdown();
Tutto funzionerà bene su 1 thread senza deadlock.
In conclusione, questo comportamento non è intuitivo come dici tu? Non lo so. Non riesco a immaginare perché esista anche thenComposeAsync
. Se un metodo restituisce CompletableFuture
, non dovrebbe bloccarlo e non ci dovrebbero essere motivi per chiamarlo in modo asincrono.
Grazie Misha per il dettaglio e lo sforzo. Il tuo esempio è molto più chiaro del mio. Ho bisogno di riprendermi un paio delle tue dichiarazioni. Nello specifico: "1. L'API deve attendere che innner (e) restituisca CompletableFuture; 2. deve attendere il completamento di CompletableFuture da completare. Solo allora è il futuro completo. Quindi, come puoi vedere, non posso fare ciò che suggerisci e restituire il Futuro incompleto. " Non sto facendo il salto mentale sottinteso da "come puoi vedere ..." Ci penserò un po 'poi postare qui quando lo farò meglio. –
sethwm