Ho un produttore di thread singolo che crea alcuni oggetti task che vengono quindi aggiunti in un ArrayBlockingQueue
(che è di dimensioni fisse).Utente bloccante e destinatario multi-thread, come sapere quando fermarsi
Avvio anche un utente multi-thread. Questo è compilato come un pool di thread fisso (Executors.newFixedThreadPool(threadCount);
). Quindi invio alcuni intances ConsumerWorker a questo threadPool, ciascun ConsumerWorker che ha una refferenza all'istanza ArrayBlockingQueue sopra menzionata.
Ciascuno di questi Worker eseguirà uno take()
sulla coda e gestirà l'attività.
Il mio problema è, qual è il modo migliore per far sapere a un lavoratore quando non ci sarà più lavoro da fare. In altre parole, come faccio a dire ai Lavoratori che il produttore ha finito di aggiungere alla coda, e da questo momento in poi, ogni lavoratore dovrebbe fermarsi quando vedrà che la coda è vuota.
Quello che ho ora è un setup in cui il mio Producer è inizializzato con un callback che viene attivato quando termina il suo lavoro (di aggiungere cose alla coda). Tengo anche un elenco di tutti i ConsumerWorkers che ho creato e inviato al ThreadPool. Quando il Callback del produttore mi dice che il produttore ha finito, posso dirlo a ciascuno dei lavoratori. A questo punto dovrebbero semplicemente continuare a controllare se la coda non è vuota, e quando diventa vuota dovrebbero fermarsi, permettendomi così di chiudere con grazia il pool di thread di ExecutorService. E 'qualcosa di simile
public class ConsumerWorker implements Runnable{
private BlockingQueue<Produced> inputQueue;
private volatile boolean isRunning = true;
public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
this.inputQueue = inputQueue;
}
@Override
public void run() {
//worker loop keeps taking en element from the queue as long as the producer is still running or as
//long as the queue is not empty:
while(isRunning || !inputQueue.isEmpty()) {
System.out.println("Consumer "+Thread.currentThread().getName()+" START");
try {
Object queueElement = inputQueue.take();
//process queueElement
} catch (Exception e) {
e.printStackTrace();
}
}
}
//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void setRunning(boolean isRunning) {
this.isRunning = isRunning;
}
}
Il problema qui è che ho una condizione di competizione evidente dove a volte il produttore si concluderà, segnalarlo, e le ConsumerWorkers fermerò prima di consumare tutto nella coda.
La mia domanda è qual è il modo migliore per sincronizzarlo in modo che tutto funzioni correttamente? Devo sincronizzare l'intera parte in cui controlla se il produttore è in esecuzione più se la coda è vuota più prendere qualcosa dalla coda in un blocco (sull'oggetto coda)? Devo sincronizzare l'aggiornamento del valore booleano isRunning
nell'istanza ConsumerWorker? Qualche altro suggerimento?
UPDATE, ECCO LA REALIZZAZIONE DI LAVORO CHE HO finito per usare:
public class ConsumerWorker implements Runnable{
private BlockingQueue<Produced> inputQueue;
private final static Produced POISON = new Produced(-1);
public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
this.inputQueue = inputQueue;
}
@Override
public void run() {
//worker loop keeps taking en element from the queue as long as the producer is still running or as
//long as the queue is not empty:
while(true) {
System.out.println("Consumer "+Thread.currentThread().getName()+" START");
try {
Produced queueElement = inputQueue.take();
Thread.sleep(new Random().nextInt(100));
if(queueElement==POISON) {
break;
}
//process queueElement
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("Consumer "+Thread.currentThread().getName()+" END");
}
}
//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void stopRunning() {
try {
inputQueue.put(POISON);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
Questo è stato ispirato pesantemente dalla risposta di JohnVint di seguito, con solo alcune piccole modifiche.
=== Aggiornamento dovuto al commento di @ vendhan.
Grazie per l'obesità. Hai ragione, il primo snippet di codice in questa domanda ha (tra gli altri aspetti) quello in cui lo while(isRunning || !inputQueue.isEmpty())
non ha molto senso.
Nella mia effettiva implementazione finale di questo, faccio qualcosa che è più vicino al tuo suggerimento di sostituire "||" (o) con "& &" (e), nel senso che ogni lavoratore (consumatore) ora controlla solo se l'elemento che ha ricevuto dalla lista è una pillola avvelenata, e in tal caso si ferma (quindi in teoria possiamo dire che il lavoratore deve essere in esecuzione E la coda non deve essere vuota).
Un executorService ha già una coda, quindi non è necessario un altro. È possibile avviare l'intero servizio executor con un arresto(). –
@PeterLawrey Mi dispiace ma non capisco il tuo commento ... –
Come un ExecutorService ha già una coda, puoi semplicemente aggiungere delle attività e non avrai bisogno di una coda aggiuntiva, né dovrai lavorare fuori come fermarli poiché questo è già implementato. –