2012-03-28 8 views
9

Ho lavorato con strategie diverse per il pooling di thread utilizzando ThreadPoolExecutor con JDK6. Ho una coda con priorità che funziona, ma non ero sicuro se mi piacesse il modo in cui il pool non ha ridimensionato dopo keepAliveTime (ciò che ottieni con una coda illimitata). Quindi, sto guardando un ThreadPoolExecutor utilizzando un LinkedBlockingQueue e il criterio CallerRuns.Perché ThreadPoolExecutor riduce i thread sotto corePoolSize dopo keepAliveTime?

Il problema che sto avendo con esso ora è che la piscina rampe su, come i documenti spiegano che dovrebbe, ma dopo i compiti completo e il KeepAliveTime entra in gioco getPoolSize mostra la piscina sempre ridotto a zero. Il codice di esempio riportato di seguito dovrebbe farvi vedere la base per la mia domanda:

public class ThreadPoolingDemo { 
    private final static Logger LOGGER = 
     Logger.getLogger(ThreadPoolingDemo.class.getName()); 

    public static void main(String[] args) throws Exception { 
     LOGGER.info("MAIN THREAD:starting"); 
     runCallerTestPlain(); 
    } 

    private static void runCallerTestPlain() throws InterruptedException { 
     //10 core threads, 
     //50 max pool size, 
     //100 tasks in queue, 
     //at max pool and full queue - caller runs task 
     ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 50, 
      5L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(100), 
      new ThreadPoolExecutor.CallerRunsPolicy()); 

     //dump 5000 tasks on the queue 
     for (int i = 0; i < 5000; i++) { 
      tpe.submit(new Runnable() { 
       @Override 
       public void run() { 
        //just to eat some time and give a little feedback 
        for (int j = 0; j < 20; j++) { 
         LOGGER.info("First-batch Task, looping:" + j + "[" 
           + Thread.currentThread().getId() + "]"); 
        } 
       } 
      }, null); 
     } 
     LOGGER.info("MAIN THREAD:!!Done queueing!!"); 

     //check tpe statistics forever 
     while (true) { 
      LOGGER.info("Active count: " + tpe.getActiveCount() + " Pool size: " 
       + tpe.getPoolSize() + " Largest Pool: " + tpe.getLargestPoolSize()); 
      Thread.sleep(1000); 
     } 
    } 
} 

Ho trovato un vecchio bug che sembra essere il problema ma era chiuso: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6458662. Potrebbe essere ancora presente in 1.6 o mi manca qualcosa?

Sembra che io abbia gommato questo Ducati (http://www.codinghorror.com/blog/2012/03/rubber-duck-problem-solving.html). Il bug che ho linkato sopra è relativo a questo: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6576792, dove il problema sembra essere risolto in 1.7 (Ho caricato 1.7 e verificato - risolto ...). Immagino che il mio problema principale fosse che un bug questo fondamentale è rimasto per quasi un decennio. Ho passato troppo tempo a scrivere questo per non postarlo ora, spero che aiuti qualcuno.

+0

+1 Bella scoperta, sono rimasto sorpreso di vedere questo comportamento. –

+1

Forse sarebbe meglio strutturare il tuo post come una domanda e quindi fornire ciò che hai imparato come risposta? –

risposta

6

... dopo che le attività sono state completate e il keepAliveTime entra in gioco, getPoolSize mostra il pool ridotto a zero.

Quindi questa sembra essere una condizione di competizione nel ThreadPoolExecutor. Immagino che stia funzionando secondo il design, anche se non è previsto. Nel metodo getTask() cui il ciclo thread di lavoro attraverso per ottenere compiti dalla coda di blocco, si vede questo codice:

if (state == SHUTDOWN) // Help drain queue 
    r = workQueue.poll(); 
else if (poolSize > corePoolSize || allowCoreThreadTimeOut) 
    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); 
else 
    r = workQueue.take(); 
if (r != null) 
    return r; 
if (workerCanExit()) { 
    if (runState >= SHUTDOWN) // Wake up others 
     interruptIdleWorkers(); 
    return null; 
} 

Se il poolSize cresce al di sopra del corePoolSize poi se i tempi del sondaggio fuori dopo keepAliveTime, il codice cade giù a workerCanExit() dal r è null. Tutti i fili può tornare true da tale metodo in quanto è solo testando lo stato di poolSize:

mainLock.lock(); 
    boolean canExit; 
    try { 
     canExit = runState >= STOP || 
      workQueue.isEmpty() || 
      (allowCoreThreadTimeOut && 
      poolSize > Math.max(1, corePoolSize)); << test poolSize here 
    } finally { 
     mainLock.unlock();       << race to workerDone() begins 
    } 

volta che restituisce true poi i thread di lavoro e quindi il poolSize viene decrementato. Se tutti i thread di lavoro eseguono questo test contemporaneamente, tutti usciranno a causa della competizione tra il test di poolSize e l'arresto del worker quando si verifica --poolSize.

Ciò che mi sorprende è la coerenza con le condizioni di gara. Se aggiungi qualche randomizzazione allo sleep() all'interno dello run() di seguito, puoi ottenere alcuni thread principali per non uscire, ma avrei pensato che la condizione della gara sarebbe stata più difficile da colpire.


Si può vedere questo comportamento nel seguente test:

@Test 
public void test() throws Exception { 
    int before = Thread.activeCount(); 
    int core = 10; 
    int max = 50; 
    int queueSize = 100; 
    ThreadPoolExecutor tpe = 
      new ThreadPoolExecutor(core, max, 1L, TimeUnit.SECONDS, 
        new LinkedBlockingQueue<Runnable>(queueSize), 
        new ThreadPoolExecutor.CallerRunsPolicy()); 
    tpe.allowCoreThreadTimeOut(false); 
    assertEquals(0, tpe.getActiveCount()); 
    // if we start 1 more than can go into core or queue, poolSize goes to 0 
    int startN = core + queueSize + 1; 
    // if we only start jobs the core can take care of, then it won't go to 0 
    // int startN = core + queueSize; 
    for (int i = 0; i < startN; i++) { 
     tpe.submit(new Runnable() { 
      @Override 
      public void run() { 
       try { 
        Thread.sleep(100); 
       } catch (InterruptedException e) { 
        e.printStackTrace(); 
       } 
      } 
     }); 
    } 
    while (true) { 
     System.out.println("active = " + tpe.getActiveCount() + ", poolSize = " + tpe.getPoolSize() 
       + ", largest = " + tpe.getLargestPoolSize() + ", threads = " + (Thread.activeCount() - before)); 
     Thread.sleep(1000); 
    } 
} 

Se si modifica la linea sleep all'interno del metodo run() a qualcosa di simile:

private final Random random = new Random(); 
... 
    Thread.sleep(100 + random.nextInt(100)); 

Questo renderà la condizione della gara è più difficile da colpire, quindi alcuni thread chiave saranno ancora in circolazione.

+0

L'OP è più interessato a 'getPoolSize()' e non a 'getActiveCount()' Dopo il completamento di tutte le attività e la scadenza di keepAliveTime, tutti i thread terminano di fatto anche se il TPE non lo fa. –

+0

@Gray I miei parametri TPE sono importanti. Questo non sempre fallisce. Usando i parametri che hai usato vedremo che funziona come previsto. I parametri che ho specificato giocano con le regole di TPE in un modo unico ma anche realistico (vale a dire che un enorme lavoro batch viene scaricato sulla coda). Il problema principale con i parametri è che non si attiva mai la situazione che causa l'aggiunta di thread al pool sopra e oltre le dimensioni del core. Per superare/aggiungere oltre la dimensione del core, il pool deve essere al core size (10) e la coda deve superare il suo limite (100). Prova il tuo esempio con 'for (int i = 0; i andematt

+0

@andematt Viaggio interessante. Questa è una condizione di competizione in 'ThreadPoolExecutor'. Ho modificato la mia risposta. – Gray