2014-12-18 11 views
5

Ho scritto un esempio di codice forzato, e potrebbe non essere un codice che qualcuno dovrebbe usare, ma credo che lo dovrebbe funzionare. Tuttavia invece si blocca. Ho letto le risposte descritte here, ma le ho trovate insufficienti.Perché thenComposeAsync attende il riscatto per essere riscattabile

Ecco l'esempio di codice:

import java.util.concurrent.CompletableFuture; 
import java.util.concurrent.Executor; 
import java.util.concurrent.Executors; 

public class Test { 

    public static void main(String argv[]) throws Exception { 

     int nThreads = 1; 
     Executor executor = Executors.newFixedThreadPool(nThreads); 



     CompletableFuture.completedFuture(true) 
      .thenComposeAsync((unused)->{ 

       System.err.println("About to enqueue task"); 
       CompletableFuture<Boolean> innerFuture = new CompletableFuture<>(); 
       executor.execute(() -> { 

        // pretend this is some really expensive computation done asynchronously 

        System.err.println("Inner task"); 
        innerFuture.complete(true); 
       }); 
       System.err.println("Task enqueued"); 

       return innerFuture; 
      }, executor).get(); 

     System.err.println("All done"); 
     System.exit(0); 

    } 

} 

Questo stampa:

intorno a accodare compito

Task accodato

E poi si blocca. È un deadlock perché l'esecutore ha solo un singolo thread, ed è in attesa che l'innerFuture diventi riscattabile. Perché il blocco "thenComposeAsync" per il suo valore restituito può essere riscattato, invece di restituire il futuro ancora incompleto e liberare il thread nell'executor?

Questo sembra completamente non intuitivo e le javadoc non aiutano veramente. Sto fondamentalmente fraintendendo il funzionamento di CompletionStages? O si tratta di un bug nell'implementazione?

risposta

2

Quindi, dopo molte conversazioni interessanti, ho deciso di inviare un'email a uno degli autori di JDK. Ho scoperto che questo comportamento non era inteso, ed è davvero un bug presente in 1.8u25. C'è una correzione che verrà rilasciata con una versione patch successiva di Java 8. Non so quale. Per tutti coloro che vogliono testare il nuovo comportamento, è possibile scaricare l'ultima vaso jsr166 qui:

http://gee.cs.oswego.edu/dl/concurrency-interest/index.html

1

In primo luogo, mi permetta di riscrivere il codice con 2 funzioni statiche per rendere più facile vedere quello che sta succedendo:

// Make an executor equivalent to Executors.newFixedThreadPool(nThreads) 
// that will trace to standard error when a task begins or ends 
static ExecutorService loggingExecutor(int nThreads) { 
    return new ThreadPoolExecutor(nThreads, nThreads, 
      0L, TimeUnit.MILLISECONDS, 
      new LinkedBlockingQueue<>()) { 

       @Override 
       protected void beforeExecute(Thread t, Runnable r) { 
        System.err.println("Executor beginning task on thread: " 
         + t.getName()); 
       } 

       @Override 
       protected void afterExecute(Runnable r, Throwable t) { 
        System.err.println("Executor finishing task on thread: " 
         + Thread.currentThread().getName()); 
       } 

      }; 
} 

E

// same as what you pass to thenComposeAsync 
static Function<Boolean, CompletableFuture<Boolean>> inner(Executor executor) { 
    return b -> { 
     System.err.println(Thread.currentThread().getName() 
        + ": About to enqueue task"); 
     CompletableFuture<Boolean> innerFuture = new CompletableFuture<>(); 
     executor.execute(() -> { 
      System.err.println(Thread.currentThread().getName() 
        + ": Inner task"); 
      innerFuture.complete(true); 
     }); 
     System.err.println(Thread.currentThread().getName() 
        + ": Task enqueued"); 

     return innerFuture; 
    }; 
} 

Ora possiamo scrivere il banco di prova come segue:

ExecutorService e = loggingExecutor(1); 

CompletableFuture.completedFuture(true) 
     .thenComposeAsync(inner(e), e) 
     .join(); 

e.shutdown(); 

/* Output before deadlock: 
Executor beginning task on thread: pool-1-thread-1 
pool-1-thread-1: About to enqueue task 
pool-1-thread-1: Task enqueued 
*/ 

Proviamo a concludere che il primo thread non viene rilasciato fino al risultato della s futuro econda viene calcolata:

ExecutorService e = loggingExecutor(2); // use 2 threads this time 

CompletableFuture.completedFuture(true) 
     .thenComposeAsync(inner(e), e) 
     .join(); 

e.shutdown(); 

/* 
Executor beginning task on thread: pool-1-thread-1 
pool-1-thread-1: About to enqueue task 
pool-1-thread-1: Task enqueued 
Executor beginning task on thread: pool-1-thread-2 
pool-1-thread-2: Inner task 
Executor finishing task on thread: pool-1-thread-2 
Executor finishing task on thread: pool-1-thread-1 
*/ 

In effetti, sembra che la discussione 1 si tiene fino a quando il thread 2 è fatto

Vediamo se siete di destra che thenComposeAsync si blocca:

ExecutorService e = loggingExecutor(1); 

CompletableFuture<Boolean> future = 
     CompletableFuture.completedFuture(true) 
     .thenComposeAsync(inner(e), e); 

System.err.println("thenComposeAsync returned"); 

future.join(); 

e.shutdown(); 

/* 
thenComposeAsync returned 
Executor beginning task on thread: pool-1-thread-1 
pool-1-thread-1: About to enqueue task 
pool-1-thread-1: Task enqueued 
*/ 

thenComposeAsync didn blocco. Ha restituito il CompletableFuture subito e il deadlock si è verificato solo quando abbiamo provato a completarlo. Quindi, che cosa dovrebbe prendere per completare il futuro restituito da .thenComposeAsync(inner(e), e)?

  1. L'API deve attendere innner (e) per tornare CompletableFuture<Boolean>
  2. ha bisogno di aspettare la tornata CompletableFuture<Boolean> a anche completi. Solo allora il futuro sarà completo. Quindi, come puoi vedere, non può fare ciò che suggerisci e restituire il Futuro incompleto.

È un errore? Perché il CompletionStage mantiene il thread 1 mentre il task interno viene calcolato? Non è un errore perché, come hai notato, la documentazione è piuttosto vaga e non promette di rilasciare discussioni in un ordine particolare. Inoltre, nota che Thread1 verrà utilizzato per tutti i successivi metodi then*() di CompletableFuture.Si consideri il seguente:

ExecutorService e = loggingExecutor(2); 

CompletableFuture.completedFuture(true) 
     .thenComposeAsync(inner(e), e) 
     .thenRun(() -> System.err.println(Thread.currentThread().getName() 
         + ": All done")) 
     .join(); 

e.shutdown(); 

/* 
Executor beginning task on thread: pool-1-thread-1 
pool-1-thread-1: About to enqueue task 
pool-1-thread-1: Task enqueued 
Executor beginning task on thread: pool-1-thread-2 
pool-1-thread-2: Inner task 
Executor finishing task on thread: pool-1-thread-2 
pool-1-thread-1: All done 
Executor finishing task on thread: pool-1-thread-1 
*/ 

Come si può vedere, .thenRun (...) ottenuto eseguito sul filo 1. Credo che questo sia coerente con le altre * Async (..., esecutore exec) metodi di CompletableFuture.

Ma cosa succede se si desidera dividere la funzionalità di thenComposeAsync in 2 passaggi controllabili separatamente anziché lasciarlo all'API per manipolare i thread? È sufficiente fare questo:

ExecutorService e = loggingExecutor(1); 

completedFuture(true) 
     .thenApplyAsync(inner(e), e) // do the async part first 
     .thenCompose(x -> x)   // compose separately 
     .thenRun(() -> System.err.println(Thread.currentThread().getName() 
         + ": All done")) 
     .join(); 

e.shutdown(); 

Tutto funzionerà bene su 1 thread senza deadlock.

In conclusione, questo comportamento non è intuitivo come dici tu? Non lo so. Non riesco a immaginare perché esista anche thenComposeAsync. Se un metodo restituisce CompletableFuture, non dovrebbe bloccarlo e non ci dovrebbero essere motivi per chiamarlo in modo asincrono.

+0

Grazie Misha per il dettaglio e lo sforzo. Il tuo esempio è molto più chiaro del mio. Ho bisogno di riprendermi un paio delle tue dichiarazioni. Nello specifico: "1. L'API deve attendere che innner (e) restituisca CompletableFuture ; 2. deve attendere il completamento di CompletableFuture da completare. Solo allora è il futuro completo. Quindi, come puoi vedere, non posso fare ciò che suggerisci e restituire il Futuro incompleto. " Non sto facendo il salto mentale sottinteso da "come puoi vedere ..." Ci penserò un po 'poi postare qui quando lo farò meglio. – sethwm