2009-07-31 8 views
26

Ho il classico problema di un thread che spinge gli eventi sulla coda in arrivo di un secondo thread. Solo che questa volta sono molto interessato alle prestazioni. Quello che voglio ottenere è:Coda concurrent e blocking in Java

  • voglio l'accesso simultaneo alla coda, spingendo il produttore, il Poping ricevitore.
  • Quando la coda è vuota, voglio che il consumatore blocchi in coda, in attesa del produttore.

La mia prima idea era quella di utilizzare un LinkedBlockingQueue, ma mi sono presto reso conto che non è concorrente e le prestazioni hanno sofferto. D'altra parte, io ora uso un ConcurrentLinkedQueue, ma sto pagando ancora il costo di wait()/notify() su ogni pubblicazione. Dal momento che il consumatore, dopo aver trovato una coda vuota, non blocca, devo sincronizzarlo e wait() su un lucchetto. Dall'altra parte, il produttore deve ottenere quel lucchetto e notify() su ogni singola pubblicazione. Il risultato complessivo è che sto pagando il costo di sycnhronized (lock) {lock.notify()} in ogni singola pubblicazione, anche quando non è necessario.

Ciò che immagino sia necessario qui, è una coda che è allo stesso tempo bloccante e concomitante. Immagino che l'operazione push() funzioni come ConcurrentLinkedQueue, con un ulteriore notify() sull'oggetto quando l'elemento spinto è il primo della lista. Tale controllo ritengo di esistere già nello ConcurrentLinkedQueue, poiché la spinta richiede il collegamento con l'elemento successivo. Quindi, questo sarebbe molto più veloce della sincronizzazione ogni volta sul blocco esterno.

È qualcosa di simile disponibile/ragionevole?

+0

Perché pensi che java.util.concurrent.LinkedBlockingQueue non sia simultaneo? Penso che sia perfettamente concorrente, vedendo il suo javadoc e il codice sorgente. Ma non ho idea delle prestazioni. – Rorick

+0

Vedere anche http://stackoverflow.com/questions/1301691/java-queue-implementations-which-one – Vadzim

risposta

11

Penso che si possa attenersi a java.util.concurrent.LinkedBlockingQueue indipendentemente dai propri dubbi. È simultaneo. Tuttavia, non ho idea delle sue prestazioni. Probabilmente, altre implementazioni di BlockingQueue ti andranno bene. Non ce ne sono troppi, quindi esegui test e misurazioni delle prestazioni.

+0

Dico questo solo osservando il mio throughput riducendo di circa 8 volte rispetto a ConcurrentLinkedQueue. Immaginavo che stesse bloccando tutto internamente solo per offrire la sicurezza del thread. Hai ragione, però, potrebbe anche essere solo una prestazione peggiore rispetto a ConcurrentLinkedQueue. Che tipo di battere la causa ;-) – Yiannis

+1

Quale coda si utilizza, si finirà per utilizzare un blocco per supportare l'attesa per una nuova voce. (A meno che non si attende occupato) Il blocco non è poi così costoso (circa 0,5 microsecondi per Lock), quindi se il suo un problema di prestazioni si può avere un problema con il vostro disegno, per esempio crea meno attività/trova un modo per aggiungere meno oggetti alla coda/batch del tuo lavoro. –

+6

Come nota, se si vuole un throughput più elevato, utilizzare drainTo() sul tuo LinkedBlcokingQueue, abbiamo misurato quasi il 500% di migliorare il throughput su una delle nostre app, invece di prendere() 'ing uno ed un solo elemento dalla coda – nos

2

Ecco uno list of classes implementing BlockingQueue.

Si consiglia di verificare SynchronousQueue.

Come @Rorick menzionato nel suo commento, credo che tutte queste implementazioni siano concomitanti. Penso che le tue preoccupazioni con LinkedBlockingQueue potrebbero essere fuori luogo.

+1

SynchronousQueue non è quello che voglio, in quanto bloccherà il mio produttore ogni volta che sta tentando di pubblicare. – Yiannis

+0

SynchronousQueue sembra più una pipa che una coda. Sembra che non possa contenere attività in attesa di elaborazione, ma solo "spingere" singole attività dal produttore al consumatore. – Rorick

+0

Hehe, mi dispiace. – jjnguy

5

Vorrei suggerire di guardare ThreadPoolExecutor newSingleThreadExecutor. Gestirà il mantenimento dei compiti ordinati per te e, se invii Callables all'esecutore, sarai in grado di ottenere anche il comportamento di blocco che stai cercando.

3

Uso ArrayBlockingQueue ogni volta che è necessario passare i dati da un thread a un altro. Utilizzando i metodi put and take (che bloccherà se pieno/vuoto).

4

Si può provare LinkedTransferQueue da jsr166: http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166y/

Esso soddisfa le vostre esigenze e hanno meno spese generali per le operazioni di offerta/sondaggio. Come posso vedere dal codice, quando la coda non è vuota, utilizza le operazioni atomiche per gli elementi di polling. E quando la coda è vuota, gira per un po 'di tempo e parcheggia il thread se non ha successo. Penso che possa aiutarti nel tuo caso.

6

Simile a questa risposta https://stackoverflow.com/a/1212515/1102730 ma un po 'diverso .. Ho finito per utilizzare un ExecutorService. È possibile creare un'istanza utilizzando Executors.newSingleThreadExecutor(). Avevo bisogno di una coda concorrente per leggere/scrivere BufferedImages in file, così come l'atomicità con letture e scritture. Ho solo bisogno di un singolo thread perché il file I/O è di ordine di grandezza più veloce dell'origine, rete IO. Inoltre, ero più preoccupato per l'atomicità delle azioni e la correttezza delle prestazioni, ma questo approccio può essere fatto anche con più thread nel pool per accelerare le cose.

per ottenere un'immagine (try-catch-Infine omesso):

Future<BufferedImage> futureImage = executorService.submit(new Callable<BufferedImage>() { 
    @Override 
     public BufferedImage call() throws Exception { 
      ImageInputStream is = new FileImageInputStream(file); 
      return ImageIO.read(is); 
     } 
    }) 

image = futureImage.get(); 

Per salvare un'immagine (try-catch-Infine omesso):

Future<Boolean> futureWrite = executorService.submit(new Callable<Boolean>() { 
    @Override 
    public Boolean call() { 
     FileOutputStream os = new FileOutputStream(file); 
     return ImageIO.write(image, getFileFormat(), os); 
    } 
}); 

boolean wasWritten = futureWrite.get(); 

E 'importante notare che si dovrebbe lavare e chiudere i tuoi flussi in un blocco finale. Non so come funzioni rispetto ad altre soluzioni, ma è piuttosto versatile.