2012-01-23 7 views
59

Ho un produttore di thread singolo che crea alcuni oggetti task che vengono quindi aggiunti in un ArrayBlockingQueue (che è di dimensioni fisse).Utente bloccante e destinatario multi-thread, come sapere quando fermarsi

Avvio anche un utente multi-thread. Questo è compilato come un pool di thread fisso (Executors.newFixedThreadPool(threadCount);). Quindi invio alcuni intances ConsumerWorker a questo threadPool, ciascun ConsumerWorker che ha una refferenza all'istanza ArrayBlockingQueue sopra menzionata.

Ciascuno di questi Worker eseguirà uno take() sulla coda e gestirà l'attività.

Il mio problema è, qual è il modo migliore per far sapere a un lavoratore quando non ci sarà più lavoro da fare. In altre parole, come faccio a dire ai Lavoratori che il produttore ha finito di aggiungere alla coda, e da questo momento in poi, ogni lavoratore dovrebbe fermarsi quando vedrà che la coda è vuota.

Quello che ho ora è un setup in cui il mio Producer è inizializzato con un callback che viene attivato quando termina il suo lavoro (di aggiungere cose alla coda). Tengo anche un elenco di tutti i ConsumerWorkers che ho creato e inviato al ThreadPool. Quando il Callback del produttore mi dice che il produttore ha finito, posso dirlo a ciascuno dei lavoratori. A questo punto dovrebbero semplicemente continuare a controllare se la coda non è vuota, e quando diventa vuota dovrebbero fermarsi, permettendomi così di chiudere con grazia il pool di thread di ExecutorService. E 'qualcosa di simile

public class ConsumerWorker implements Runnable{ 

private BlockingQueue<Produced> inputQueue; 
private volatile boolean isRunning = true; 

public ConsumerWorker(BlockingQueue<Produced> inputQueue) { 
    this.inputQueue = inputQueue; 
} 

@Override 
public void run() { 
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty: 
    while(isRunning || !inputQueue.isEmpty()) { 
     System.out.println("Consumer "+Thread.currentThread().getName()+" START"); 
     try { 
      Object queueElement = inputQueue.take(); 
      //process queueElement 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
    } 
} 

//this is used to signal from the main thread that he producer has finished adding stuff to the queue 
public void setRunning(boolean isRunning) { 
    this.isRunning = isRunning; 
} 

}

Il problema qui è che ho una condizione di competizione evidente dove a volte il produttore si concluderà, segnalarlo, e le ConsumerWorkers fermerò prima di consumare tutto nella coda.

La mia domanda è qual è il modo migliore per sincronizzarlo in modo che tutto funzioni correttamente? Devo sincronizzare l'intera parte in cui controlla se il produttore è in esecuzione più se la coda è vuota più prendere qualcosa dalla coda in un blocco (sull'oggetto coda)? Devo sincronizzare l'aggiornamento del valore booleano isRunning nell'istanza ConsumerWorker? Qualche altro suggerimento?

UPDATE, ECCO LA REALIZZAZIONE DI LAVORO CHE HO finito per usare:

public class ConsumerWorker implements Runnable{ 

private BlockingQueue<Produced> inputQueue; 

private final static Produced POISON = new Produced(-1); 

public ConsumerWorker(BlockingQueue<Produced> inputQueue) { 
    this.inputQueue = inputQueue; 
} 

@Override 
public void run() { 
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty: 
    while(true) { 
     System.out.println("Consumer "+Thread.currentThread().getName()+" START"); 
     try { 
      Produced queueElement = inputQueue.take(); 
      Thread.sleep(new Random().nextInt(100)); 
      if(queueElement==POISON) { 
       break; 
      } 
      //process queueElement 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
     System.out.println("Consumer "+Thread.currentThread().getName()+" END"); 
    } 
} 

//this is used to signal from the main thread that he producer has finished adding stuff to the queue 
public void stopRunning() { 
    try { 
     inputQueue.put(POISON); 
    } catch (InterruptedException e) { 
     // TODO Auto-generated catch block 
     e.printStackTrace(); 
    } 
} 

}

Questo è stato ispirato pesantemente dalla risposta di JohnVint di seguito, con solo alcune piccole modifiche.

=== Aggiornamento dovuto al commento di @ vendhan.

Grazie per l'obesità. Hai ragione, il primo snippet di codice in questa domanda ha (tra gli altri aspetti) quello in cui lo while(isRunning || !inputQueue.isEmpty()) non ha molto senso.

Nella mia effettiva implementazione finale di questo, faccio qualcosa che è più vicino al tuo suggerimento di sostituire "||" (o) con "& &" (e), nel senso che ogni lavoratore (consumatore) ora controlla solo se l'elemento che ha ricevuto dalla lista è una pillola avvelenata, e in tal caso si ferma (quindi in teoria possiamo dire che il lavoratore deve essere in esecuzione E la coda non deve essere vuota).

+1

Un executorService ha già una coda, quindi non è necessario un altro. È possibile avviare l'intero servizio executor con un arresto(). –

+0

@PeterLawrey Mi dispiace ma non capisco il tuo commento ... –

+8

Come un ExecutorService ha già una coda, puoi semplicemente aggiungere delle attività e non avrai bisogno di una coda aggiuntiva, né dovrai lavorare fuori come fermarli poiché questo è già implementato. –

risposta

78

Si dovrebbe continuare a take() dalla coda. Puoi usare una pillola velenosa per dire al lavoratore di fermarsi.Per esempio:

private final Object POISON_PILL = new Object(); 

@Override 
public void run() { 
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty: 
    while(isRunning) { 
     System.out.println("Consumer "+Thread.currentThread().getName()+" START"); 
     try { 
      Object queueElement = inputQueue.take(); 
      if(queueElement == POISON_PILL) { 
       inputQueue.add(POISON_PILL);//notify other threads to stop 
       return; 
      } 
      //process queueElement 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
    } 
} 

//this is used to signal from the main thread that he producer has finished adding stuff to the queue 
public void finish() { 
    //you can also clear here if you wanted 
    isRunning = false; 
    inputQueue.add(POISON_PILL); 
} 
+37

+1 per il mio dizionario tecnico ha appena ottenuto un termine più ricco con "poison pill" – Kiril

+2

Ho provato questo e funziona bene, tranne che per un litle: devo fare inputQueue .put (POISON_PILL); non offrire, perché se offro() e la coda è a pieno regime in quel momento (cioè i Lavoratori erano veramente pigri) non aggiungerà l'elemento POISON PILL ad esso. È corretto per favore, o sto dicendo cose stupide? –

+0

@AndreiBodnarescu Hai ragione non è un'offerta. orlo ma solo uno fisso :) –

0

Ci sono una serie di strategie che si potrebbero utilizzare, ma un semplice è quello di avere una sottoclasse di compito che segnala la fine del lavoro. Il produttore non invia questo segnale direttamente. Invece, accoda un'istanza di questa sottoclasse di attività. Quando uno dei tuoi consumatori estrae questa attività e la esegue, ciò fa sì che il segnale venga inviato.

14

mi avrebbero mandato gli operai uno speciale pacchetto di lavoro per segnalare che dovrebbero spegnere:

public class ConsumerWorker implements Runnable{ 

private static final Produced DONE = new Produced(); 

private BlockingQueue<Produced> inputQueue; 

public ConsumerWorker(BlockingQueue<Produced> inputQueue) { 
    this.inputQueue = inputQueue; 
} 

@Override 
public void run() { 
    for (;;) { 
     try { 
      Produced item = inputQueue.take(); 
      if (item == DONE) { 
       inputQueue.add(item); // keep in the queue so all workers stop 
       break; 
      } 
      // process `item` 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
    } 
} 

}

per fermare i lavoratori, è sufficiente aggiungere ConsumerWorker.DONE alla coda.

1

Nel blocco di codice in cui si tenta di recuperare elementi dalla coda, utilizzare poll(time,unit) anziché take().

try { 
    Object queueElement = inputQueue.poll(timeout,unit); 
    //process queueElement   
} catch (InterruptedException e) { 
     if(!isRunning && queue.isEmpty()) 
     return ; 
} 

Specificando valori appropriati di timeout, si assicura che le discussioni solito tenere bloccando in caso v'è una spiacevole sequela di

  1. isRunning è vero
  2. coda si svuota, quindi fili entrano bloccati attesa (se si utilizza take()
  3. isRunning è impostata su false
+0

Sì, questa è una delle alternative che ho provato, ma ha alcuni svantaggi: prima faccio molte chiamate inutili al poll(), poi quando faccio questo (! IsRunning e& queue.isEmpty()) oltre a prendere roba dalla coda Devo sincronizzarli tutti con un blocco sincronizzato e questo è ridondante dato che BlockingQueue si occupa già di tutto da solo. –

+0

Per evitare il primo punto, perché non è sufficiente disporre il thread che imposta 'isRunning' su false per inviare un interrupt sul thread in attesa sulla chiamata' take() '? Il blocco catch funziona ancora allo stesso modo - questo non ha bisogno di sincronizzazione separata - a meno che non si preveda di impostare 'isRunning' su true da false .. – Bhaskar

+0

Ho provato anche quello, inviando interrupt ai thread. Il problema è che, se i tuoi thread sono avviati da un ExecutorService (Like a FixedThreadPool), quando esegui executorService.shutDown() tutti i thread riceveranno InterruptedException che li farà fermare a metà strada nel compito (dal momento che ora sono truccati per trattare InterruptedException come un tappo). Inoltre, comunicare tramite eccezioni generate come quella non è molto efficace. –

0

Ho dovuto utilizzare un produttore multi-threaded e un consumer multi-thread. Ho finito con uno schema Scheduler -- N Producers -- M Consumers, ognuno dei due comunica tramite una coda (due code totali). Lo Scheduler riempie la prima coda con le richieste di produzione di dati, quindi la riempie con N "pillole veleno". C'è un contatore di produttori attivi (atomic int), e l'ultimo produttore che riceve l'ultima pillola velenosa manda le pillole veleno M alla coda dei consumatori.

0

Impossibile farlo utilizzando uno CountDownLatch, in cui la dimensione è il numero di record nel produttore. E ogni consumatore sarà countDown dopo aver elaborato un record. E attraversa il metodo awaits() quando tutte le attività sono terminate. Quindi interrompere tutti i tuoi consumatori. Come tutti i record vengono elaborati.