2012-06-07 4 views
5

Ho un'applicazione multi thread che ha un thread di produzione e numerosi thread di consumo. I dati vengono archiviati in una raccolta condivisa e protetta da thread e scaricati su un database quando nel buffer sono presenti dati sufficienti.Buffering dei dati nell'applicazione java con multithreading

Dalle javadocs -

BlockingQueue<E> 

una coda che supporta anche operazioni che attendono la coda di diventare non vuoto quando si recuperano un elemento, e attendere spazio diventi disponibile nella coda quando si memorizza un elemento .

take() 

Recupera e rimuove il capo di questa coda, in attesa, se necessario, fino a quando un elemento diventa disponibile.

Le mie domande -

  1. C'è un'altra collezione che ha un metodo E [] prendere (int n)? Ad esempio, la coda di blocco attende fino a quando un elemento è disponibile. Quello che voglio è che dovrebbe aspettare fino a 100 o 200 elementi sono disponibili.
  2. In alternativa, esiste un altro metodo che è possibile utilizzare per risolvere il problema senza eseguire il polling?
+0

Gli elementi devono essere distribuiti equamente a ciascun consumatore o il primo consumatore al metodo take ottiene i primi elementi 'n', il secondo consumatore gli elementi successivi' n', ecc.? – SimonC

+0

Questo è davvero quello che vuoi fare?Ciò può introdurre una latenza quasi arbitrariamente ampia tra i dati prodotti e scaricati nel database se il ritmo di produzione rallenta sempre oltre ciò che si finisce per essere sintonizzati. Se hai davvero bisogno di fare questo buffering, la tua logica dovrebbe probabilmente essere più come "Aspetta di avere N elementi o X ms sono passati" – DRMacIver

+0

Perché vuoi aspettare? Perché non usare semplicemente 'drain()'? Scriverò tutti i dati che hai a disposizione fino ad un massimo e preferirei non perdere i dati. –

risposta

2

penso che l'unico modo è per estendere l'implementazione di BlockingQueue o creare una sorta di metodo di utilità utilizzando take:

public <E> void take(BlockingQueue<E> queue, List<E> to, int max) 
     throws InterruptedException { 

    for (int i = 0; i < max; i++) 
     to.add(queue.take()); 
} 
+0

In realtà, il tuo approccio è molto più sano del mio, assumendo che ci sarà sempre un solo consumatore. – Zarkonnen

+1

Questo approccio non soddisfa affatto InterruptedException perché si perdono elementi presi se interrotti. Deve davvero aggiungere elementi a una raccolta passata se non gestirà l'interrupt stesso, o catturare e restituire gli elementi vuoti fino a quel momento se lo desidera. – DRMacIver

+0

Oh, buon punto +1 sul commento. Aggiornato! – dacwe

1

Non sono sicuro se c'è una classe simile nella libreria standard che ha take(int n) metodo di tipo, ma si dovrebbe essere in grado di avvolgere il default BlockingQueue aggiungere quella funzione senza problemi troppo, non si pensare?

Scenario alternativo sarebbe l'attivazione di un'azione in cui si inseriscono elementi nella raccolta, in cui una soglia impostata dall'utente innescherebbe lo svuotamento.

2

Il metodo drainTo non è esattamente quello che stai cercando, ma servirebbe al tuo scopo?

http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/BlockingQueue.html#drainTo(java.util.Collection, int)

EDIT

Si potrebbe implementare un lotto leggermente più performante blocco takemin utilizzando una combinazione di take e drainTo:

public <E> void drainTo(final BlockingQueue<E> queue, final List<E> list, final int min) throws InterruptedException 
{ 
    int drained = 0; 
    do 
    { 
    if (queue.size() > 0) 
     drained += queue.drainTo(list, min - drained); 
    else 
    { 
     list.add(queue.take()); 
     drained++; 
    } 
    } 
    while (drained < min); 
} 
+1

, come menzionato in un'altra risposta (che ora viene rimossa apparentemente) questo è esattamente ciò che lui/lei vuole fare. – posdef

+0

Ho aggiornato la risposta per indicare che non risolve la domanda esatta. A volte l'OP non è a conoscenza di soluzioni alternative, quindi vale sempre la pena chiedere. – SimonC

+0

Non posso controbattere :) – posdef

1

Quindi questa dovrebbe essere una coda di sicurezza che consente di bloccare un numero arbitrario di elementi. Altri occhi per verificare che il codice di threading sia corretto sarebbero i benvenuti.

package mybq; 

import java.util.ArrayList; 
import java.util.LinkedList; 
import java.util.List; 

public class ChunkyBlockingQueue<T> { 
    protected final LinkedList<T> q = new LinkedList<T>(); 
    protected final Object lock = new Object(); 

    public void add(T t) { 
     synchronized (lock) { 
      q.add(t); 
      lock.notifyAll(); 
     } 
    } 

    public List<T> take(int numElements) { 
     synchronized (lock) { 
      while (q.size() < numElements) { 
       try { 
        lock.wait(); 
       } catch (InterruptedException e) { 
        Thread.currentThread().interrupt(); 
       } 
      } 
      ArrayList<T> l = new ArrayList<T>(numElements); 
      l.addAll(q.subList(0, numElements)); 
      q.subList(0, numElements).clear(); 
      return l; 
     } 
    } 
} 
+1

Il 'notifyAll' in' add' è un po 'dispendioso qui. Dovrebbe essere un singolo 'notify', quindi' take' può richiamare 'notify' di nuovo se ci sono ancora più elementi rimasti quando è finito. – SimonC