2009-10-28 12 views
23

Ho riscontrato un problema due volte in cui un thread del produttore produce N elementi di lavoro, li invia a un ExecutorService e quindi deve attendere fino a quando tutti gli elementi N sono stati elaborati.CountDownLatch flessibile?

Caveats

  • N non è noto in anticipo. Se lo fosse vorrei semplicemente creare un CountDownLatch e quindi avere il thread di produzione await() fino al completamento di tutti i lavori.
  • utilizzando un CompletionService non è appropriato perché anche se il mio thread produttore ha bisogno di bloccare (cioè chiamando take()) non c'è alcun modo di segnalando che tutto il lavoro è completo, per causare il filo produttore a smettere di aspettare.

mia soluzione preferita attuale è quella di utilizzare un contatore intero, e incremento questo ogni volta che viene presentata una voce di lavoro e decremento quando un elemento di lavoro viene elaborato. Dopo la sottomissione di tutti gli N task, il thread del mio produttore dovrà attendere un lock, controllando se counter == 0 ogni volta che viene notificato. Il/i thread/i del consumatore dovrà notificare al produttore se ha diminuito il contatore e il nuovo valore è 0.

C'è un approccio migliore a questo problema o c'è un costrutto adatto in java.util.concurrent Dovrei usare piuttosto di "rotolare il mio"?

Grazie in anticipo.

+0

In che momento il produttore sa quanti oggetti di lavoro ci sono? Quando l'ultimo oggetto è stato prodotto? –

+0

La tua soluzione attuale potrebbe soffrire di una condizione di competizione: produrre elemento 1 -> contatore ++ -> processo articolo 1 -> contatore-- -> produrre elemento 2. Poiché il contatore è stato decrementato prima che il produttore abbia prodotto l'articolo successivo, il produttore pensa di essere pronto. –

+0

@rwwilden: Hai ragione nel senso che questo scenario potrebbe verificarsi. Tuttavia, il mio produttore ispezionerà/aspetterà sul bancone solo dopo aver inviato * tutti * gli articoli di lavoro e quindi non rappresenta una condizione di competizione in questo caso particolare. – Adamski

risposta

24

java.util.concurrent.Phaser sembra che funzioni bene per voi. È previsto il rilascio in Java 7, ma la versione più stabile può essere trovata sul sito web del gruppo di interesse jsr166.

Il phaser è una barriera ciclica glorificata. Puoi registrare N numero di feste e quando sei pronto attendi il loro avanzamento nella fase specifica.

Un esempio veloce di come avrebbe funzionato:

final Phaser phaser = new Phaser(); 

public Runnable getRunnable(){ 
    return new Runnable(){ 
     public void run(){ 
      ..do stuff... 
      phaser.arriveAndDeregister(); 
     } 
    }; 
} 
public void doWork(){ 
    phaser.register();//register self 
    for(int i=0 ; i < N; i++){ 
     phaser.register(); // register this task prior to execution 
     executor.submit(getRunnable()); 
    } 
    phaser.arriveAndAwaitAdvance(); 
} 
+3

Cool - Grazie; Questo sembra esattamente quello che sto cercando ... Non posso credere che lo abbiano chiamato Phaser. – Adamski

+2

Vorrei poter passare oltre x100 volte. Molte grazie. –

+2

Se il punto di 'getRunnable()' è quello di rappresentare un'attività che segnala solo il phaser e quindi termina, si desidera richiamare 'phaser.arriveAndDeregister()' e ** not ** 'phaser.arrive()' come altrimenti quando l'attività genitore invoca 'phaser.arriveAndAwaitAdvance()' una seconda volta sarà deadlock, in quanto l'attività sarà in attesa di attività finite che sono ancora registrate nel phaser. –

2

Si potrebbe naturalmente utilizzare un CountDownLatch protetto da un AtomicReference in modo che le attività vengono avvolte in tal modo:

public class MyTask extends Runnable { 
    private final Runnable r; 
    public MyTask(Runnable r, AtomicReference<CountDownLatch> l) { this.r = r; } 

    public void run() { 
     r.run(); 
     while (l.get() == null) Thread.sleep(1000L); //handle Interrupted 
     l.get().countDown(); 
    } 
} 

Avviso che i compiti eseguiti il ​​loro lavoro e poi rotazione fino a quando il conto alla rovescia è set (ovvero il numero totale di attività è noto). Non appena viene impostato il conto alla rovescia, lo contano ed escono. Questi vengono presentati come segue:

AtomicReference<CountDownLatch> l = new AtomicReference<CountDownLatch>(); 
executor.submit(new MyTask(r, l)); 

Dopo il punto della creazione/invio del vostro lavoro, quando si sa quanti esercizi sono stati creati:

latch.set(new CountDownLatch(nTasks)); 
latch.get().await(); 
+0

Perché il wrapping del CountDownLatch con AtomicReference ti aiuta in questo caso? Il riferimento non ha bisogno di essere protetto, poiché viene passato al MyTask nel suo costruttore, e non cambia mai dopo. – Avi

+0

Inoltre, assicurati di gestire i throwables arbitrari (RuntimeExceptions ed errori) nel metodo run(), almeno inserendo coundDown() in un blocco finally {}. – Avi

+0

Ovviamente, la domanda riguardava specificamente il caso in cui non si conosce il numero di attività al momento della creazione dell'attività. – Avi

0

presumo vostro produttore fa Non è necessario sapere quando la coda è vuota ma è necessario sapere quando è stata completata l'ultima attività.

Vorrei aggiungere un metodo waitforWorkDone(producer) al consumatore. Il produttore può aggiungere le sue attività N e chiamare il metodo wait. Il metodo wait blocca il thread in entrata se la coda di lavoro non è vuota e al momento nessuna attività è in esecuzione.

Le thread del cliente notifyAll() sul blocco waitfor se la sua attività è stata completata, la coda è vuota e non è in esecuzione nessun'altra attività.

1

ho usato un ExecutorCompletionService per qualcosa di simile:

ExecutorCompletionService executor = ...; 
int count = 0; 
while (...) { 
    executor.submit(new Processor()); 
    count++; 
} 

//Now, pull the futures out of the queue: 
for (int i = 0; i < count; i++) { 
    executor.take().get(); 
} 

Si tratta di mantenere una coda di compiti che sono state presentate, quindi se il vostro elenco è arbitrariamente lunga, il metodo potrebbe essere migliore.

Tuttavia, assicurarsi di utilizzare uno AtomicInteger per il coordinamento, in modo da poterlo incrementare in un thread e decrementarlo nei thread di lavoro.

+0

Come si può 'attendereTerminazione'? Cosa sta spegnendo il tuo servizio? Inoltre, non è possibile riutilizzare il servizio di completamento nella risposta in ogni caso, perché si potrebbe essere in attesa di attività che erano logicamente parte di * un altro set *. E (come menzionato nei commenti alla mia risposta), è ancora necessario conoscere 'nTasks' ad un certo punto –

+0

@oxbow_lakes: True. Questo è il motivo per cui l'ho solo come opzione (che ho modificato, poiché non aiuta in questo caso). Suppongo che questo servizio di completamento non verrà utilizzato per un altro insieme di attività contemporaneamente (sovrapposte) con questo. Può essere riutilizzato più tardi, dallo stesso thread. – Avi

+0

Il vantaggio di questo modo di tenere traccia del numero di attività piuttosto che utilizzare un AtomicBoolean, è che non devi gestire manualmente wait() e notify() - la coda si prenderà cura di questo. Tutto ciò che devi tracciare è il numero di elementi in coda. – Avi

0

Quello che descritto è abbastanza simile all'utilizzo di un semaforo standard, ma usato "all'indietro".

  • tuo semaforo inizia con 0 permessi
  • Ogni unità di lavoro rilascia un permesso di quando completa
  • È blocco per in attesa di acquisire N permette

Inoltre si ha la flessibilità necessaria per acquisire solo M < N permessi che è utile se si desidera verificare lo stato intermedio. Ad esempio sto testando una coda di messaggi con limiti asincroni quindi mi aspetto che la coda sia piena per alcuni M < N così posso acquisire M e controllare che la coda sia effettivamente piena quindi acquisire i permessi N-M rimanenti dopo aver consumato i messaggi dal coda.