2013-03-03 21 views
13

Sto cercando un modo per bloccare fino a quando uno BlockingQueue è vuoto.Come bloccare fino a quando un BlockingQueue è vuoto?

So che, in un ambiente con multithreading, finché ci sono produttori che inseriscono articoli nello BlockingQueue, ci possono essere situazioni in cui la coda diventa vuota e dopo pochi nanosecondi è piena di elementi.

Tuttavia, se c'è solo un produttore, potrebbe essere necessario attendere (e bloccarlo) finché la coda non è vuota dopo che ha interrotto l'inserimento degli elementi nella coda.

Java/Pseudocodice:

// Producer code 
BlockingQueue queue = new BlockingQueue(); 

while (having some tasks to do) { 
    queue.put(task); 
} 

queue.waitUntilEmpty(); // <-- how to do this? 

print("Done"); 

Avete qualche idea?

EDIT: so che il confezionamento BlockingQueue e l'utilizzo di una condizione in più farebbe il trucco, sto solo chiedendo se ci sono alcune soluzioni pre-fatto e/o alternative migliori.

+0

Chiaramente si potrebbe chiamare 'peek' fino a quando non ritorna' null'. Che cosa si tratta del blocco che rende questa una soluzione inaccettabile? – OldCurmudgeon

+0

La risposta alla tua modifica, AFAIK, è no. Nota che, come ho scritto nella mia risposta, il tuo caso d'uso è molto particolare ... assicurati di aver davvero bisogno di fare ciò che stai chiedendo. Non dichiari perché ** hai bisogno ** di un simile comportamento. –

+0

@ JoãoFernandes: Non ne ho assolutamente bisogno ora, è solo per curiosità. Mi piace leggere opinioni su problemi di programmazione. – gd1

risposta

12

Una soluzione semplice utilizzando wait() e notify():

// Producer: 
synchronized(queue) { 
    while (!queue.isEmpty()) 
     queue.wait(); //wait for the queue to become empty 
    queue.put(); 
} 

//Consumer: 
synchronized(queue) { 
    queue.get(); 
    if (queue.isEmpty()) 
     queue.notify(); // notify the producer 
} 
+0

Ciao @niculare, questo codice causerebbe un deadlock? Due snippet di codice si bloccano sulla stessa 'coda ', che impedisce al produttore e al consumatore di eseguire i loro lavori contemporaneamente. Quindi la coda potrebbe non avere mai la possibilità di essere vuota quando il produttore è in attesa che sia. – liuyaodong

+3

@liuyaodong No, leggi Javadoc per 'Object.wait()'. Rilascia il blocco fino a quando un altro thread chiama 'notify'. Dice anche esplicitamente che "il thread corrente deve possedere il monitor di questo oggetto". – Timmos

+0

Come funzionerebbe? Dì prima all'esecuzione dei consumi e viene bloccato su queue.get() perché non c'è ancora alcun elemento nella coda. Ora il produttore non può produrre perché non può ottenere la serratura. – hankduan

3

Non è proprio quello che si vuole fare, ma utilizzando un SynchronousQueue avrebbe un effetto molto simile come Java/Pseudocodice, vale a dire il produttore di blocco fino a quando tutti dei dati è stato recuperato da qualche consumatore.

L'unica differenza è che il produttore blocca su ogni put fino a quando un consumatore non viene a recuperare i dati, anziché solo una volta alla fine. Non sono sicuro se ciò potrebbe fare la differenza nel tuo caso. Mi aspetto che faccia una differenza evidente, se l'attività svolta dal produttore è piuttosto costosa.

1

tuo caso d'uso dovrebbe essere abbastanza speciale perché più tipicamente si sarebbe solo desidera bloccare il produttore quando la coda è piena, ma non aspettare fino a quando è vuota.

In ogni caso, questo è fattibile. Credo che gira fino isEmpty ritorna vero non è così inefficiente perché il produttore sarà localmente filatura, vale a dire, sarà l'accesso a una propria cache, non sbattere il bus. Sarà comunque consumare tempo CPU in quanto il thread rimane programmabile. Ma la rotazione locale è sicuramente il modo più semplice. In caso contrario, io vedo due opzioni:

  1. Uso wait + notify come @niculare suggerito
  2. In qualche modo rendono il primo consumatore che rileva la coda vuota di notificare al produttore in modo libero-lock; questo sarà più lento ma degradare "di più" grazia
6

ho capito si potrebbe già avere fascio di fili attivamente polling o prendere la coda, ma ancora non si sentono abbastanza di destra circa il flusso di/disegno.

La coda diventa vuota non significa i compiti precedentemente aggiunti sono finiti, somes degli elementi potrebbero richiedere molto tempo per elaborare, quindi non è troppo utile per verificare la presenza di vuoto.

Che cosa si deve fare è dimenticare la BlockingQueue, è possibile utilizzarlo come qualsiasi altre collezioni. Tradurre le voci in un Collections di Callable e fare uso del ExecutorService.invokeAll().

Collection<Item> queue = ... 
    Collection<Callable<Result>> tasks = new ArrayList<Callable<Result>>(); 

    for (Item item : queue) { 
     tasks.add(new Callable<Result>() { 

      @Override 
      public Result call() throws Exception { 
       // process the item ... 

       return result; 
      } 
     }); 
    } 

    // look at the results, add timeout for invokeAll if necessary 
    List<Future<Result>> results = executorService.invokeAll(tasks); 

    // done 

Questo approccio vi darà il pieno controllo di quanto tempo il produttore potrebbe aspettare e la corretta gestione delle eccezioni.

-1

Secondo il documentation, BlockingQueue possono essere bloccati su "Rimuovi" con metodi: take() o poll(time, unit) invece di poll()