2012-05-26 12 views
5

Ho iniziato a leggere di più su ThreadPoolExecutor da Java Doc mentre lo sto usando in uno dei miei progetti. Quindi qualcuno può spiegarmi cosa significa in realtà questa linea? - So cosa significa ciascun parametro, ma volevo comprenderlo in un modo più generale/personale da alcuni degli esperti qui.ThreadPoolExecutor con ArrayBlockingQueue

ExecutorService service = new ThreadPoolExecutor(10, 10, 1000L, 
TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10, true), new 
ThreadPoolExecutor.CallerRunsPolicy()); 

Aggiornato: - problema dichiarazione è: -

Ogni thread utilizza ID univoco tra 1 e 1000 e il programma è di correre per 60 minuti o più, quindi in questo 60 minuti è possibile che tutti gli ID saranno finiti quindi ho bisogno di riutilizzare quegli ID di nuovo. Quindi questo è il programma qui sotto che ho scritto usando sopra l'esecutore.

class IdPool { 
    private final LinkedList<Integer> availableExistingIds = new LinkedList<Integer>(); 

    public IdPool() { 
     for (int i = 1; i <= 1000; i++) { 
      availableExistingIds.add(i); 
     } 
    } 

    public synchronized Integer getExistingId() { 
     return availableExistingIds.removeFirst(); 
    } 

    public synchronized void releaseExistingId(Integer id) { 
     availableExistingIds.add(id); 
    } 
} 


class ThreadNewTask implements Runnable { 
    private IdPool idPool; 

    public ThreadNewTask(IdPool idPool) { 
     this.idPool = idPool; 
    } 

    public void run() { 
     Integer id = idPool.getExistingId(); 
     someMethod(id); 
     idPool.releaseExistingId(id); 
    } 

// This method needs to be synchronized or not? 
    private synchronized void someMethod(Integer id) { 
     System.out.println("Task: " +id); 
// and do other calcuations whatever you need to do in your program 
    } 
} 

public class TestingPool { 
    public static void main(String[] args) throws InterruptedException { 
     int size = 10; 
     int durationOfRun = 60; 
     IdPool idPool = new IdPool(); 
     // create thread pool with given size 
     ExecutorService service = new ThreadPoolExecutor(size, size, 500L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(size), new ThreadPoolExecutor.CallerRunsPolicy()); 

     // queue some tasks 
     long startTime = System.currentTimeMillis(); 
     long endTime = startTime + (durationOfRun * 60 * 1000L); 

     // Running it for 60 minutes 
     while(System.currentTimeMillis() <= endTime) { 
      service.submit(new ThreadNewTask(idPool)); 
     } 

     // wait for termination   
     service.shutdown(); 
     service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 
    } 
} 

Le mie domande sono: - Questo codice è giusto per quanto riguarda la Performance è considerato o no? E cos'altro posso farlo qui per renderlo più preciso? Qualsiasi aiuto sarà apprezzato.

+0

Non vedo niente di sbagliato con il codice, diverso forse avere un try/finally nel metodo "run" per garantire che id sempre ottenere rilasciato (per quando si dispone di un codice più complicato "someMethod "). – Matt

+0

@Matt, Grazie per aver commentato, Quindi someMethod deve essere sincronizzato? o no? Come nel mio caso l'ho reso sincronizzato. – ferhan

+0

E 'idPool.releaseExistingId (id);' dovrebbe venire nel blocco finally giusto? – ferhan

risposta

8

[Per prima cosa, mi scuso, questa è una risposta a una risposta precedente, ma volevo la formattazione].

Tranne che in realtà, NON si blocca quando un articolo viene inviato a un ThreadPoolExecutor con una coda completa. Il motivo è che ThreadPoolExecutor chiama il metodo BlockingQueue.offer (T item) che per definizione è un metodo non bloccante. Aggiunge l'elemento e restituisce true, oppure non aggiunge (quando è pieno) e restituisce false. ThreadPoolExecutor chiama quindi RejectedExecutionHandler per far fronte a questa situazione.

Dal javadoc:

Esegue il compito affidato in futuro. L'attività può eseguire in un nuovo thread o in un thread in pool esistente. Se l'attività non può essere inoltrata per l'esecuzione, sia perché questo esecutore è stato arrestato o perché la sua capacità è stata raggiunta, l'attività viene gestita da dall'attuale RejectedExecutionHandler.

Per impostazione predefinita, il ThreadPoolExecutor.AbortPolicy() viene utilizzato che getta un RejectedExecutionException dal metodo "Invia" o "esecuzione" del ThreadPoolExecutor.

try { 
    executorService.execute(new Runnable() { ... }); 
} 
catch (RejectedExecutionException e) { 
    // the queue is full, and you're using the AbortPolicy as the 
    // RejectedExecutionHandler 
} 

Tuttavia, è possibile utilizzare altri gestori di fare qualcosa di diverso, come ad esempio ignorare l'errore (DiscardPolicy), o eseguirlo nel thread che chiama il metodo "esecuzione" o "submit" (CallerRunsPolicy). Questo esempio consente a qualsiasi thread che chiama il metodo "submit" o "execute" di eseguire l'operazione richiesta quando la coda è piena. (Questo significa che in un dato momento, si potrebbe 1 cosa aggiuntiva che gira sopra a quello che c'è nella piscina stessa):

ExecutorService service = new ThreadPoolExecutor(..., new ThreadPoolExecutor.CallerRunsPolicy()); 

Se si desidera bloccare e aspettare, è possibile implementare il proprio RejectedExecutionHandler che avrebbe bloccato fino a quando non c'è uno slot disponibili sulla coda (si tratta di una stima approssimativa, non ho compilato o eseguire questo, ma si dovrebbe ottenere l'idea):

public class BlockUntilAvailableSlot implements RejectedExecutionHandler { 
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 
    if (e.isTerminated() || e.isShutdown()) { 
     return; 
    } 

    boolean submitted = false; 
    while (! submitted) { 
     if (Thread.currentThread().isInterrupted()) { 
      // be a good citizen and do something nice if we were interrupted 
      // anywhere other than during the sleep method. 
     } 

     try { 
      e.execute(r); 
      submitted = true; 
     } 
     catch (RejectedExceptionException e) { 
     try { 
      // Sleep for a little bit, and try again. 
      Thread.sleep(100L); 
     } 
     catch (InterruptedException e) { 
      ; // do you care if someone called Thread.interrupt? 
      // if so, do something nice here, and maybe just silently return. 
     } 
     } 
    } 
    } 
} 
+0

Non capisco molto quello che hai appena detto in quanto nuovo per la famiglia Executor, quindi il modo in cui sto facendo nella mia domanda è giusto o no? O ci può essere qualche miglioramento nel codice? – ferhan

+0

Stavo semplicemente commentando l'affermazione di qualcuno che "Usa un ArrayBlockingQueue per gestire le richieste di esecuzione con 10 slot, quindi quando la coda è piena (dopo 10 thread sono stati accodati), bloccherà il chiamante." Questo non è il caso, come ho spiegato sopra, a meno che non si faccia deliberatamente del codice per farlo accadere. L'utilizzo della politica di esecuzione del chiamante in realtà la esegue nel thread corrente, questa istruzione sopra implica che si bloccherebbe finché non ci sarà spazio nella coda. – Matt

2

Sta creando un ExecutorService che gestisce l'esecuzione di un pool di thread. In questo caso, il numero iniziale e massimo di thread nel pool è 10. Quando un thread nel pool diventa inattivo per 1 secondo (1000ms) lo ucciderà (il timer di inattività), tuttavia poiché il numero massimo e il numero di thread è lo stesso, questo non avverrà mai (mantiene sempre 10 thread intorno e mai eseguito più di 10 thread).

Utilizza un ArrayBlockingQueue per gestire le richieste di esecuzione con 10 slot, quindi quando la coda è piena (dopo 10 thread sono stati accodati), bloccherà il chiamante.

Se il thread viene rifiutato (che in questo caso sarebbe dovuto allo spegnimento del servizio, poiché i thread verranno accodati o verrà bloccato quando si accoda un thread se la coda è piena), verrà eseguito il Runnable offerto sul thread del chiamante.

+0

Grazie per il commento e cosa significa vero in ArrayBlockingQueue? – ferhan

+0

Ha un numero fisso di slot supportato da un array e bloccherà il chiamante quando si accoda se sono pieni. Esistono altri tipi di code che possono essere utilizzate. Guarda la descrizione di esso nel Javadoc per ulteriori informazioni. –

+0

Ho aggiornato la mia domanda in cui ho pubblicato il mio codice, e ho anche menzionato la mia affermazione sul problema, quindi c'è qualche problema in quel codice? se sì, come posso renderlo più preciso? Qualsiasi aiuto sarà apprezzato. – ferhan

0

consideri semafori. Questi sono pensati per lo stesso scopo. Si prega di controllare sotto per il codice utilizzando semaforo. Non sono sicuro se questo è quello che vuoi. Ma questo bloccherà se non ci sono più permessi da acquisire. Anche l'ID è importante per te?

import java.util.concurrent.ArrayBlockingQueue; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Semaphore; 
import java.util.concurrent.ThreadPoolExecutor; 
import java.util.concurrent.TimeUnit; 

class ThreadNewTask implements Runnable { 
    private Semaphore idPool; 

    public ThreadNewTask(Semaphore idPool) { 
     this.idPool = idPool; 
    } 

    public void run() { 
//  Integer id = idPool.getExistingId(); 
     try { 
      idPool.acquire(); 
      someMethod(0); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } finally { 
      idPool.release(); 
     } 
//  idPool.releaseExistingId(id); 
    } 

    // This method needs to be synchronized or not? 
    private void someMethod(Integer id) { 
     System.out.println("Task: " + id); 
     // and do other calcuations whatever you need to do in your program 
    } 
} 

public class TestingPool { 
    public static void main(String[] args) throws InterruptedException { 
     int size = 10; 
     int durationOfRun = 60; 
     Semaphore idPool = new Semaphore(100); 
//  IdPool idPool = new IdPool(); 
     // create thread pool with given size 
     ExecutorService service = new ThreadPoolExecutor(size, size, 500L, 
       TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(size), 
       new ThreadPoolExecutor.CallerRunsPolicy()); 

     // queue some tasks 
     long startTime = System.currentTimeMillis(); 
     long endTime = startTime + (durationOfRun * 60 * 1000L); 

     // Running it for 60 minutes 
     while (System.currentTimeMillis() <= endTime) { 
      service.submit(new ThreadNewTask(idPool)); 
     } 

     // wait for termination 
     service.shutdown(); 
     service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 
    } 
} 
+0

Puoi darmi un esempio basato sulla mia affermazione sul problema? – ferhan

+0

Sì ID è importante per me e ho bisogno di passare l'id al metodo in modo che ogni thread utilizzi un ID univoco diverso. – ferhan

+0

OK. Il tuo approccio sembra buono allora? – Ravi