2009-02-11 26 views
37

Prima di tutto, devo dire che sono abbastanza nuovo per l'API java.util.concurrent, quindi forse quello che sto facendo è completamente sbagliato.È un buon modo per usare java.util.concurrent.FutureTask?

Che cosa voglio fare?

Ho un'applicazione Java che gira fondamentalmente 2 elaborazione separata (chiamata myFirstProcess, mySecondProcess), ma questi l'elaborazione deve essere eseguita allo stesso tempo.

Così, ho provato a farlo:

public void startMyApplication() { 
    ExecutorService executor = Executors.newFixedThreadPool(2); 
    FutureTask<Object> futureOne = new FutureTask<Object>(myFirstProcess); 
    FutureTask<Object> futureTwo = new FutureTask<Object>(mySecondProcess); 
    executor.execute(futureOne); 
    executor.execute(futureTwo); 
    while (!(futureOne.isDone() && futureTwo.isDone())) { 
     try { 
      // I wait until both processes are finished. 
      Thread.sleep(1000); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
    } 
    logger.info("Processing finished"); 
    executor.shutdown(); 
    // Do some processing on results 
    ... 
} 

myFirstProcess e mySecondProcess sono classi che implementa Callable<Object>, e dove tutti loro trattamento è fatto nel metodo() chiamata.

Sta funzionando abbastanza bene, ma non sono sicuro che sia il modo corretto per farlo. È un buon modo per fare ciò che voglio? In caso contrario, puoi darmi qualche suggerimento per migliorare il mio codice (e mantenerlo il più semplice possibile).

+3

Le condizioni del tuo tempo mancano una parentesi chiusa :) –

risposta

43

Faresti meglio a usare il metodo get().

futureOne.get(); 
futureTwo.get(); 

Entrambi i quali attendere la notifica dal thread che terminata l'elaborazione, Ciò consente di risparmiare il occupato-attendere-con-timer ora si sta utilizzando, che non è efficiente né elegante.

Come bonus, si dispone dell'API get(long timeout, TimeUnit unit) che consente di definire un tempo massimo per il thread per la sospensione e attendere una risposta, e altrimenti continua a essere in esecuzione.

Vedere Java API per ulteriori informazioni.

+0

Quindi invece di Callable dovrebbe implementare Future o dovrebbe implementare entrambi? –

+1

Oh scusa, ero confuso dal tuo link. FutureTask ha il metodo .get(). –

+1

La risposta che parla dell'utilizzo di un CompletionService è molto meglio di questa. –

5

È possibile utilizzare uno CyclicBarrier se si desidera avviare i thread contemporaneamente o attendere che vengano completati e quindi eseguire ulteriori operazioni. Vedere javadoc per ulteriori informazioni.

+0

Non sono sicuro che avrò bisogno di usare le specificità di CyclicBarrier, ma è una classe abbastanza interessante ... – romaintaz

17

La soluzione di Yuval va bene. In alternativa puoi anche farlo:

ExecutorService executor = Executors.newFixedThreadPool(); 
FutureTask<Object> futureOne = new FutureTask<Object>(myFirstProcess); 
FutureTask<Object> futureTwo = new FutureTask<Object>(mySecondProcess); 
executor.execute(futureOne); 
executor.execute(futureTwo); 
executor.shutdown(); 
try { 
    executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); 
} catch (InterruptedException e) { 
    // interrupted 
} 

Qual è il vantaggio di questo approccio? Non c'è molta differenza, tranne che in questo modo si impedisce all'executor di accettare ulteriori compiti (lo si può fare anche nell'altro modo). Tendo a preferire questo idioma a quello però.

Inoltre, se get() genera un'eccezione, si può finire in una parte del codice che presuppone che entrambe le attività siano state eseguite, il che potrebbe essere negativo.

+0

"se get() genera un'eccezione potresti finire in una parte del tuo codice che presuppone sia le attività sono fatte "sicuramente è solo se si ingeriscono l'eccezione? – artbristol

+0

sì, use awaitTermination è più conveniente di Thread.sleep() –

+0

newFixedThreadPool(); il metodo senza parametro non esiste – Kachna

18

Gli usi di FutureTask sopra sono tollerabili, ma sicuramente non idiomatici. In realtà stai avvolgendo unFutureTask in più intorno a quello che hai inviato allo ExecutorService. Il FutureTask viene considerato come Runnable dallo ExecutorService. Internamente, avvolge il tuo FutureTask -as- in un nuovo FutureTask e te lo restituisce come Future<?>.

Invece, è necessario inviare le istanze Callable<Object> a CompletionService. Si rilasciano due Callable s in via submit(Callable<V>), quindi si gira e chiama lo CompletionService#take() due volte (una per ogni inviato Callable). Quelle chiamate verranno bloccate fino a quando una e le altre attività inoltrate saranno completate.

Dato che hai già un Executor in mano, costruisci un nuovo ExecutorCompletionService attorno ad esso e lascia cadere le attività lì. Non girare e dormire in attesa; CompletionService#take() si bloccherà fino a quando una delle attività non sarà completata (terminata in esecuzione o annullata) o il thread in attesa su take() viene interrotto.

7

È possibile utilizzare invokeall (colelction ....) Metodo

package concurrent.threadPool; 

import java.util.Arrays; 
import java.util.List; 
import java.util.concurrent.Callable; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.Future; 

public class InvokeAll { 

    public static void main(String[] args) throws Exception { 
     ExecutorService service = Executors.newFixedThreadPool(5); 
     List<Future<java.lang.String>> futureList = service.invokeAll(Arrays.asList(new Task1<String>(),new Task2<String>())); 

     System.out.println(futureList.get(1).get()); 
     System.out.println(futureList.get(0).get()); 
    } 

    private static class Task1<String> implements Callable<String>{ 

     @Override 
     public String call() throws Exception { 
      Thread.sleep(1000 * 10); 
      return (String) "1000 * 5"; 
     } 

    } 

    private static class Task2<String> implements Callable<String>{ 

     @Override 
     public String call() throws Exception { 
      Thread.sleep(1000 * 2); 
      int i=3; 
      if(i==3) 
       throw new RuntimeException("Its Wrong"); 
      return (String) "1000 * 2"; 
     } 

    } 
} 
2

Se i futureTasks sono più di 2, perche [ListenableFuture][1].

Quando più operazioni dovrebbero iniziare al più presto un'altra operazione inizia - "fan-out" - ListenableFuture funziona: si innesca tutti i callback richiesti. Con un po 'più di lavoro, possiamo "fan-in," o innescare un ListenableFuture per essere computati non appena molti altri futures hanno tutti finito.