2009-08-24 6 views
14

Devo inviare un numero di attività e quindi attendere fino a quando tutti i risultati sono disponibili. Ognuno di essi aggiunge uno String a Vector (che è sincronizzato per impostazione predefinita). Quindi ho bisogno di iniziare una nuova attività per ogni risultato in Vector, ma devo farlo solo quando tutte le attività precedenti hanno smesso di fare il loro lavoro.Esecutori Java: attendere la chiusura dell'attività.

Voglio utilizzare Java Executor, in particolare ho provato a utilizzare Executors.newFixedThreadPool(100) per utilizzare un numero fisso di thread (ho un numero variabile di attività che può essere 10 o 500) ma sono nuovo con gli esecutori e io non so come aspettare la fine dell'attività. Questo è qualcosa come un pseudocodice di ciò che il mio programma ha bisogno di fare:

ExecutorService e = Executors.newFixedThreadPool(100); 
while(true){ 

/*do something*/ 

for(...){ 
<start task> 
} 

<wait for all task termination> 

for each String in result{ 
<start task> 
} 

<wait for all task termination> 
} 

non posso fare un e.shutdown perché sono in un po '(vero) e ho bisogno di riutilizzare il executorService ..

Potete aiutarmi? Puoi suggerirmi una guida/un libro sugli esecutori java?

+0

Dai un'occhiata a http://stackoverflow.com/questions/1228433/java-parallel-work-iterator/1228445 – Tim

risposta

21

Il ExecutorService fornisce un meccanismo per eseguire più attività contemporaneamente e ottenere una raccolta di oggetti Future indietro (che rappresenta il calcolo asincrono dell'attività).

Collection<Callable<?>> tasks = new LinkedList<Callable<?>>(); 
//populate tasks 
for (Future<?> f : executorService.invokeAll(tasks)) { //invokeAll() blocks until ALL tasks submitted to executor complete 
    f.get(); 
} 

Se hai Runnable s invece di Callable s, si può facilmente trasformare un Runnable in un Callable<Object> utilizzando il metodo:

Callable<?> c = Executors.callable(runnable); 
+4

Scusa per aver recuperato un vecchio post, ma non vedo il punto nel chiamare f.get(); invokeAll() è a sua volta bloccato, quindi non è necessario chiamare get() a meno che non si sia interessati al risultato (che il proprio codice non è). – hooch

+0

@hooch: IMO dovresti sempre chiamare 'get()' nel caso in cui il tuo 'Callable' abbia lanciato un'eccezione. Altrimenti si perde semplicemente. –

2

Quando si invia a un servizio esecutore, si otterrà un oggetto Future indietro.

Archiviare gli oggetti in una raccolta, quindi chiamare a turno uno get(). get() blocca fino a quando il lavoro di base completa, e così il risultato è che chiamando get() su ogni completerà una volta tutti i lavori alla base hanno finito.

ad es.

Collection<Future> futures = ... 
for (Future f : futures) { 
    Object result = f.get(); 
    // maybe do something with the result. This could be a 
    // genericised Future<T> 
} 
System.out.println("Tasks completed"); 

Una volta completati tutti, avviare la seconda richiesta. Si noti che questo potrebbe non essere un utilizzo ottimale del vostro pool di thread, dal momento che diventare dormienti, e quindi si sta ri-popolato. Se possibile, prova a tenerlo occupato a fare cose.

+0

Qual è la differenza tra e.submit (penso di doverlo utilizzare seguendo il tuo esempio) ed e .eseguire?? – Raffo

+0

La differenza è che con submit si ottiene un futuro indietro e con esecuzione non lo si fa. Se usi il tuo 'ThreadFactory' con' UncaughtExceptionHandler', quindi 'execute' farà sì che il gestore riceva eventuali eccezioni non rilevate, mentre' submit' no - otterresti solo le eccezioni tramite 'Future''s 'get 'metodo –

14

Mi potete suggerire una guida/libro su java esecutori ??

posso rispondere a questa parte:

Java Concurrency in Practice da Brian Goetz (con Tim Peierls, Joshua Bloch, Joseph Bowbeer, David Holmes e Doug Lea) è più probabile che la soluzione migliore.

Non è solo su esecutori, però, ma invece copre java.util.concurrent pacchetto in generale, così come i concetti di concorrenza di base e tecniche, e alcuni argomenti avanzati come il modello di memoria di Java.

+0

Questo è un libro eccellente, anche se non proprio per i principianti. –

14

Anziché presentare Runnable s o Callable s ad un Executor direttamente e memorizzando i corrispondenti valori di ritorno Future Mi consiglia di utilizzare un'implementazione CompletionService per recuperare ogni Futurequando completa. Questo approccio disaccoppia la produzione di compiti dal consumo di attività completate, consentendo ad esempio a nuovi compiti di originare su un thread di produzione per un periodo di tempo.

Collection<Callable<Result>> workItems = ... 
ExecutorService executor = Executors.newSingleThreadExecutor(); 
CompletionService<Result> compService = new ExecutorCompletionService<Result>(executor); 

// Add work items to Executor. 
for (Callable<Result> workItem : workItems) { 
    compService.submit(workItem); 
} 

// Consume results as they complete (this would typically occur on a different thread). 
for (int i=0; i<workItems.size(); ++i) { 
    Future<Result> fut = compService.take(); // Will block until a result is available. 
    Result result = fut.get(); // Extract result; this will not block. 
} 
+0

In pratica questo comporta più LOC rispetto al mio esempio sopra. Un servizio di completamento è generalmente utile se si inviano attività da più località ma si desidera gestire in modo coerente i completamenti delle attività (ad esempio, per eseguire altri calcoli), che si desidera definire solo una volta –

+1

@oxbow: Oppure se si desidera iniziare l'elaborazione dei risultati non appena il primo compito termina! Altrimenti potresti aspettare il tuo compito più lento mentre gli altri sono già fatti .. (Adamski +1) – Tim

+1

@Tim - L'OP ha detto abbastanza chiaramente che voleva aspettare fino a quando tutti i compiti sono finiti, quindi non fa differenza (a parte pochi nanosecondi) il cui compito finisce prima. –

1
ExecutorService executor = ... 
//submit tasks 
executor.shutdown(); // previously submitted tasks are executed, 
        // but no new tasks will be accepted 
while(!executor.awaitTermination(1, TimeUnit.SECONDS)) 
    ; 

Non c'è un modo semplice per fare quello che vuoi senza creare ExecutorService personalizzato.