2010-08-10 4 views
42

Sto cercando di eseguire molte attività utilizzando un ThreadPoolExecutor. Di seguito è riportato un esempio ipotetico:ThreadPoolExecutor Block quando la coda è piena?

def workQueue = new ArrayBlockingQueue<Runnable>(3, false) 
def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue) 
for(int i = 0; i < 100000; i++) 
    threadPoolExecutor.execute(runnable) 

Il problema è che ho subito ottenere un java.util.concurrent.RejectedExecutionException dal momento che il numero di attività supera le dimensioni della coda di lavoro. Tuttavia, il comportamento desiderato che sto cercando è quello di avere il blocco thread principale fino a quando non c'è spazio nella coda. Qual è il modo migliore per farlo?

+2

Dai un'occhiata a questa domanda: http://stackoverflow.com/questions/2001086/how-to-make-threadpoolexecutors-submit-method-block-if-it-is-saturated – Kiril

+2

[Questa risposta] (http : //stackoverflow.com/a/4522411/394431) a un'altra domanda suggerisce di utilizzare una sottoclasse 'BlockingQueue' personalizzata che blocca su' offerta() 'delegando a' put() '. Penso che finisca per funzionare più o meno come il 'RejectedExecutionHandler' che chiama' getQueue(). Put() '. –

+2

L'inserimento diretto in coda sarebbe errato, come spiegato in questa risposta http://stackoverflow.com/a/3518588/585903 –

risposta

45

In alcune circostanze molto strette, è possibile implementare un java.util.concurrent.RejectedExecutionHandler che esegue ciò che è necessario.

RejectedExecutionHandler block = new RejectedExecutionHandler() { 
    rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 
    executor.getQueue().put(r); 
    } 
}; 

ThreadPoolExecutor pool = new ... 
pool.setRejectedExecutionHandler(block); 

Ora. Questo è una pessima idea per i seguenti motivi

  • E 'incline a un punto morto perché tutti i thread nel pool possono morire prima che la cosa si mette in coda è visibile. Mitigare questo impostando un tempo di mantenimento in tempo ragionevole.
  • L'attività non è delimitata dal modo in cui il tuo Executor può aspettarsi. Molte implementazioni di esecutori eseguono il wrapping delle loro attività in una sorta di oggetto di tracciamento prima dell'esecuzione. Guarda la tua fonte.
  • L'aggiunta tramite getQueue() è fortemente sconsigliata dall'API e potrebbe essere vietata a un certo punto.

Una strategia quasi sempre migliore consiste nell'installare ThreadPoolExecutor.CallerRunsPolicy che limiterà la tua app eseguendo l'attività sul thread che sta chiamando execute().

Tuttavia, a volte una strategia di blocco, con tutti i suoi rischi intrinseci, è davvero ciò che si desidera. Direi che in queste condizioni

  • Hai solo un thread chiamando execute()
  • Devi (o vuole) avere una molto piccola lunghezza della coda
  • È assolutamente necessario limitare il numero di i thread che eseguono questo lavoro (di solito per motivi esterni) e una strategia di esecuzione del chiamante lo infrangono.
  • Le tue attività sono di dimensioni imprevedibili, quindi le chiamate dei chiamanti potrebbero causare l'inedia se il pool fosse momentaneamente occupato con 4 brevi attività e il tuo singolo thread di chiamata execute si è bloccato con uno più grande.

Quindi, come ho detto. Raramente è necessario e può essere pericoloso, ma ci vai.

Buona fortuna.

+2

Una risposta molto ben congegnata. Prendo un piccolo problema con le tue condizioni che> "Devi (o volere) avere una coda di coda molto piccola". Potrebbe non essere in grado di prevedere il numero di attività che un dato lavoro accoderà. Forse stai facendo un lavoro quotidiano che elabora i dati da qualche DB e lunedì ci sono 500 record da elaborare, ma martedì ce ne sono 50.000. Devi impostare un limite superiore sulla tua coda in modo da non far saltare il tuo mucchio quando arriva un grosso lavoro. In tal caso non c'è nulla di male nell'attesa che alcune attività vengano completate prima di fare la coda di più. – skelly

+0

Risposta fantastica, grazie –

+0

"È soggetto a deadlock perché tutti i thread nella piscina possono morire prima che la cosa che si mette in coda sia visibile. Mitigare questo impostando un tempo di mantenimento in tempo ragionevole." Non è possibile evitare completamente il deadlock impostando la dimensione min del pool su un valore maggiore di zero? Ogni altro motivo è il fallout di Java che non ha il supporto integrato per il blocco delle code di executor. Il che è interessante, perché sembra essere una strategia abbastanza ragionevole. Mi chiedo quale sia la logica. –

1

Ecco il mio frammento di codice in questo caso:

public void executeBlocking(Runnable command) { 
    if (threadPool == null) { 
     logger.error("Thread pool '{}' not initialized.", threadPoolName); 
     return; 
    } 
    ThreadPool threadPoolMonitor = this; 
    boolean accepted = false; 
    do { 
     try { 
      threadPool.execute(new Runnable() { 
       @Override 
       public void run() { 
        try { 
         command.run(); 
        } 
        // to make sure that the monitor is freed on exit 
        finally { 
         // Notify all the threads waiting for the resource, if any. 
         synchronized (threadPoolMonitor) { 
          threadPoolMonitor.notifyAll(); 
         } 
        } 
       } 
      }); 
      accepted = true; 
     } 
     catch (RejectedExecutionException e) { 
      // Thread pool is full 
      try { 
       // Block until one of the threads finishes its job and exits. 
       synchronized (threadPoolMonitor) { 
        threadPoolMonitor.wait(); 
       } 
      } 
      catch (InterruptedException ignored) { 
       // return immediately 
       break; 
      } 
     } 
    } while (!accepted); 
} 

ThreadPool è un'istanza locale di java.util.concurrent.ExecutorService che è stato inizializzato già.

0

ho risolto questo problema utilizzando un RejectedExecutionHandler personalizzato, che semplicemente blocca il thread chiamante per un po 'e poi tenta di inviare nuovamente l'attività:

public class BlockWhenQueueFull implements RejectedExecutionHandler { 

    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 

     // The pool is full. Wait, then try again. 
     try { 
      long waitMs = 250; 
      Thread.sleep(waitMs); 
     } catch (InterruptedException interruptedException) {} 

     executor.execute(r); 
    } 
} 

Questa classe può solo essere utilizzato nel thread-pool esecutore come RejectedExecutionHandler come qualsiasi altro. In questo esempio:

executorPool = new def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue, new BlockWhenQueueFull()) 

L'unico aspetto negativo che vedo è che il thread chiamante potrebbe avere un po 'più a lungo bloccato strettamente necessario (fino a 250 ms). Per molte attività di breve durata, è possibile ridurre il tempo di attesa a 10 ms circa. Inoltre, poiché questo executor viene chiamato in modo ricorsivo, attese molto lunghe affinché un thread diventi disponibile (ore) potrebbe causare un overflow dello stack.

Tuttavia, personalmente mi piace questo metodo. È compatto, facile da capire e funziona bene. Mi sto perdendo qualcosa di importante?